diff --git a/analyze-cli.py b/analyze-cli.py index d7e5148..9f811dd 100755 --- a/analyze-cli.py +++ b/analyze-cli.py @@ -36,6 +36,11 @@ except ImportError: class ExperimentHelper(): + """ + Helper class for dealing with results of libkinetics. + + Provides plotting and data output functionality for libkinetics.Experiment. + """ def __init__(self, experiment, logger): self.exp = experiment self.logger = logger @@ -44,9 +49,9 @@ class ExperimentHelper(): y = slope * x + intercept return y - def plot_data(self, exp, outpath): + def plot_data(self, outpath): # iterate over all measurements - for m in exp.measurements: + for m in self.exp.measurements: # plot each measurement fig, ax = plt.subplots() ax.set_xlabel('Time') @@ -74,7 +79,8 @@ class ExperimentHelper(): bbox_inches='tight') plt.close(fig) - def plot_kinetics(self, exp, outpath): + def plot_kinetics(self, outpath): + exp = self.exp fig, ax = plt.subplots() ax.set_xlabel('c [mM]') ax.set_ylabel('dA/dt [Au/s]') @@ -101,35 +107,49 @@ class ExperimentHelper(): plt.savefig('{}/kinetics.png'.format(outpath), bbox_inches='tight') plt.close(fig) - def write_data(self, exp, outpath): + def write_data(self, outpath): + + exp = self.exp with open('{}/results.csv'.format(outpath), 'w', newline='\n') as csvfile: writer = csv.writer(csvfile, dialect='excel-tab') - writer.writerow(['# LINEAR FITS']) + writer.writerow(['LINEAR FITS']) writer.writerow([]) - writer.writerow(['# concentration', 'avg. slope', 'slope std_err', - 'replicates (slope, intercept and r value)']) + writer.writerow(['concentration', + 'avg. slope', + 'slope std_err', + 'replicates (slope, intercept and r_squared)']) for m in exp.measurements: row = [m.concentration, m.avg_slope, m.avg_slope_err] for r in m.replicates: row.append(r.fitresult['slope']) row.append(r.fitresult['intercept']) - row.append(r.fitresult['r_value']) + row.append(r.fitresult['r_squared']) writer.writerow(row) writer.writerow([]) if exp.mm: - writer.writerow(['# MICHAELIS-MENTEN KINETICS']) - writer.writerow(['# vmax', 'Km']) - writer.writerow([exp.mm['vmax'], exp.mm['Km']]) + self.logger.debug(' Writing Michaelis-Menten results.') + writer.writerow(['MICHAELIS-MENTEN KINETICS']) + writer.writerow(['', 'value', 'std']) + writer.writerow(['vmax', exp.mm['vmax'], + exp.mm['vmax_err']]) + writer.writerow(['Kprime', exp.mm['Km'], + exp.mm['Km_err']]) if exp.hill: - writer.writerow(['# HILL KINETICS']) - writer.writerow(['# vmax', 'Kprime', 'h']) - writer.writerow([exp.hill['vmax'], exp.hill['Kprime'], - exp.hill['h']]) + self.logger.debug(' Writing Hill results.') + writer.writerow([]) + writer.writerow(['HILL KINETICS']) + writer.writerow(['', 'value', 'std']) + writer.writerow(['vmax', exp.hill['vmax'], + exp.hill['vmax_err']]) + writer.writerow(['Kprime', exp.hill['Kprime'], + exp.hill['Kprime_err']]) + writer.writerow(['h', exp.hill['h'], + exp.hill['h_err']]) def parse_arguments(): @@ -165,9 +185,14 @@ def parse_arguments(): def initialize_logger(): """ - Initialization of logging subsystem. Two logging handlers are brought up: - 'fh' which logs to a log file and 'ch' which logs to standard output. - :return logger: returns a logger instance + Initialization of logging subsystem. + + Two logging handlers are brought up: 'fh' which logs to a log file and + 'ch' which logs to standard output. If colorlog is installed, logging + to console will be colored. + + Returns: + logger: logging.Logger instance """ if COLORED_LOG: fmt = '%(log_color)s%(levelname)-8s%(reset)s %(message)s' @@ -211,6 +236,11 @@ def initialize_logger(): def main(): + """ + Main method of pyKinetics. + + Will be executed when analyze-cli.py is started from the command line. + """ # parse command line arguments args = parse_arguments() # initialize logger @@ -256,11 +286,11 @@ def main(): fit_to_replicates=fit_to_replicates) ehlp = ExperimentHelper(exp, logger) logger.info('Plotting linear fits') - ehlp.plot_data(exp, str(output_path)) + ehlp.plot_data(str(output_path)) logger.info('Plotting kinetic fit(s)') - ehlp.plot_kinetics(exp, str(output_path)) + ehlp.plot_kinetics(str(output_path)) logger.info('Writing results to results.csv') - ehlp.write_data(exp, str(output_path)) + ehlp.write_data(str(output_path)) logger.info('Finished!') else: msg = '{} is not a directory!'.format(input_path) diff --git a/libkinetics/__init__.py b/libkinetics/__init__.py index 541c9c7..e103a65 100644 --- a/libkinetics/__init__.py +++ b/libkinetics/__init__.py @@ -1,15 +1,51 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -from scipy import stats, optimize -import numpy as np import logging import warnings +import operator + +try: + import numpy as np +except ImportError: + print('----- NumPy must be installed! -----') + +try: + from scipy import stats, optimize +except ImportError: + print('----- SciPy must be installed! -----') + + +class Error(Exception): + """ + Main Error class derived from Exception. + """ + pass + + +class FitError(Error): + """ + Exception raised if fitting fails. + """ + pass class Replicate(): """ - Represents a single replicate within a measurement + Represents a single replicate within a measurement. + + Linear regression for v0 is executed at this level. + + Attributes: + logger: logging.Logger instance inherited by the owning Measurement. + num: float + identification number of replicate within the experiment + x, y: float + time and signal to be analysed. + owner: object + measurement this measurement belongs to + xlim: + fitresult: """ def __init__(self, num, xy, owner): @@ -67,6 +103,26 @@ class Replicate(): class Measurement(): """ Represents a single measurement within an experiment. + + Each measurement consists of one ore more replicates and is associated + with a specific substrate concentration. + + Attributes: + logger: logging.Logger instance inherited by the owning Experiment. + concentration: float + substrate concentration associated with the measurement. + concentration_unit: string + unit of the concentration; only used for labeling plots/tables. + x, y: float + time and signal to be analysed. + replicates: array_like + list of individual replicates of the measurement. + owner: object + experiment this measurement belongs to + xlim: array_like + lower and upper bounds for calculating the v0 linear fit. + avg_slope, avg_slope_err: float + average slope (v0) of the measurement and its standard deviation """ def __init__(self, xy, conc, conc_unit, owner): @@ -102,6 +158,15 @@ class Measurement(): self.logger.info('-----') def get_results(self): + """ + Collect results of the individual replicates. + + Collects results of each replicate and append to results list. + + Returns: + results: array_like + list of results from each replicate of the measurement. + """ results = [] for r in self.replicates: results.append(r.fitresult) @@ -113,22 +178,48 @@ class Experiment(): """ Represents the actual experiment. - Consists of several Measurement objects. + Representation of a kinetics experiment. It consists of multiple + objects of type Measurement. - Args: - data_files: list containing csv-formatted data files - xlim: tuple of float values defining the lower and upper bound for - linear fitting of v0 - do_hill: boolean to define whether to fit Hill-type kinetics in - addition to Michaelis-Menten kinetics. Defaults to False - fit_to_replicates: boolean to define wheter to fit to individual - replicates instead of the avarage slope. Defaults to False - logger: logging.Logger instance. If not given, a new logger is created + Attributes: + logger: logging.Logger instance that is used for logging to console + and log file. + measurements: array_like + list of individual measurements of the experiment. + Individual measurements are sorted by their substrate + concentration. + fit_to_replicates: boolean + whether to fit to individual replicates instead to the average of + each measurement. + raw_kinetic_data: dictionary + storing x, y and std_err of each measurement for fitting kinetic + curves. + xlim: array_like + lower and upper bounds for calculating the v0 linear fit. """ def __init__(self, data_files, xlim, do_hill=False, fit_to_replicates=False, logger=None): + """ + Inits Experiment class with experimental parameters + This is the only class you should have to use directly in your program. + Instances of Measurement and Replicate objects are created + automatically using the provided data files. + + Args: + data_files: list containing csv-formatted data files + xlim: tuple of float values defining the lower and upper bound for + linear fitting of v0 + do_hill: boolean to define whether to fit Hill-type kinetics in + addition to Michaelis-Menten kinetics. Defaults to False + fit_to_replicates: boolean to define wheter to fit to individual + replicates instead of the avarage slope. Defaults to False + logger: logging.Logger instance. If not given, a new logger is + created + """ + + # check if a logger was handed over; if not, create a new instance if logger: self.logger = logger else: @@ -144,12 +235,22 @@ class Experiment(): # parse data files and generate measurements for csvfile in data_files: - tmp = np.genfromtxt(str(csvfile), comments='#') - with open(str(csvfile)) as datafile: - head = [next(datafile) for x in range(2)] + try: + tmp = np.genfromtxt(str(csvfile), comments='#') + with open(str(csvfile)) as datafile: + head = [next(datafile) for x in range(2)] + except OSError: + msg = "Failed reading file {}".format(str(csvfile)) + self.logger.error(msg) + raise + # extract concentration and unit from header # TODO: move unit to parameter - # TODO: More error-proof header detection + # check header for correct number of items + if len(head) < 2 or len(head) > 2: + msg = 'Parsing header of data files failed! Wrong format?' + self.logger.error(msg) + raise Error(msg) conc = head[0].strip('#').strip() unit = head[1].strip('#').strip() # split x and y data apart @@ -161,6 +262,10 @@ class Experiment(): measurement = Measurement((x, y), conc, unit, self) self.measurements.append(measurement) + # sort measurements by concentration + self.measurements = sorted(self.measurements, + key=operator.attrgetter('concentration')) + # iterate over all measurements for m in self.measurements: if self.fit_to_replicates: @@ -185,73 +290,140 @@ class Experiment(): else: self.hill = None - def plot_data(self, outpath): - # iterate over all measurements - for m in self.measurements: - # plot each measurement - m.plot(outpath) - def mm_kinetics_function(self, x, vmax, Km): + """ + Michaelis-Menten function. + + Classical Michaelis-Menten enzyme kinetics function. + + Args: + x: float + concentration at velocity v + vmax: float + maximum velocity + Km: float + Michaelis constant + + Returns: + v: float + velocity at given concentration x + """ v = (vmax * x) / (Km + x) return v def hill_kinetics_function(self, x, vmax, Kprime, h): + """ + Hill function. + + Hill function for enzyme kinetics with cooperativity. + + Args: + x: float + concentration at velocity v. + vmax: float + maximum velocity. + Kprime: float + kinetics constant related to Michaelis constant. + h: float + hill slope; if h=1, Hill function is identical to + Michaelis-Menten function. + + Returns: + v: float + velocity at given concentration x + """ v = (vmax * (x ** h)) / (Kprime + (x ** h)) return v def do_mm_kinetics(self): + """ + Calculates Michaelis-Menten kinetics. + + Returns: + On success, returns a dictionary containing the kinetic parameters + and their errors: + + {'vmax': float, + 'Km': float, + 'vmax_err': float, + 'Km_err': float, + 'x': array_like} + + Raises: + FitError if fitting fails. + """ try: popt, pconv = optimize.curve_fit(self.mm_kinetics_function, self.raw_kinetic_data['x'], self.raw_kinetic_data['y']) - - perr = np.sqrt(np.diag(pconv)) - vmax = popt[0] - Km = popt[1] - x = np.arange(0, max(self.raw_kinetic_data['x']), 0.0001) - - self.logger.info('Michaelis-Menten Kinetics:') - self.logger.info(' v_max: {} ± {}'.format(vmax, perr[0])) - self.logger.info(' Km: {} ± {}'.format(Km, perr[1])) - - return {'vmax': float(vmax), 'Km': float(Km), 'perr': perr, 'x': x} - except: - msg = 'Calculation of Michaelis-Menten kinetics failed!' + except ValueError: + msg = ('Calculation of Michaelis-Menten kinetics failed! Your ' + 'input data (either x or y) contain empty (NaN) values!') if self.logger: self.logger.error('{}'.format(msg)) - else: - print(msg) - return None + raise FitError(msg) + + perr = np.sqrt(np.diag(pconv)) + vmax = popt[0] + Km = popt[1] + x = np.arange(0, max(self.raw_kinetic_data['x']), 0.0001) + + self.logger.info('Michaelis-Menten Kinetics:') + self.logger.info(' v_max: {} ± {}'.format(vmax, perr[0])) + self.logger.info(' Km: {} ± {}'.format(Km, perr[1])) + + return {'vmax': np.float(vmax), + 'Km': np.float(Km), + 'vmax_err': np.float(perr[0]), + 'Km_err': np.float(perr[1]), + 'x': x} def do_hill_kinetics(self): + """ + Calculates Hill kinetics. + + Returns: + On success, returns a dictionary containing the kinetic parameters + their errors: + + {'vmax': float, + 'Kprime': float, + 'vmax_err': float, + 'Km_prime': float, + 'h_err': float, + 'h': float, + 'x': array_like} + + Raises: + FitError if fitting fails. + """ try: popt, pconv = optimize.curve_fit(self.hill_kinetics_function, self.raw_kinetic_data['x'], self.raw_kinetic_data['y']) - - perr = np.sqrt(np.diag(pconv)) - vmax = popt[0] - Kprime = popt[1] - h = popt[2] - - x = np.arange(0, max(self.raw_kinetic_data['x']), 0.0001) - - self.logger.info('Hill Kinetics:') - self.logger.info(' v_max: {} ± {}'.format(vmax, perr[0])) - self.logger.info(' K_prime: {} ± {}'.format(Kprime, perr[1])) - self.logger.info(' h: {} ± {}'.format(h, perr[2])) - - return { - 'vmax': float(vmax), - 'Kprime': float(Kprime), - 'perr': perr, - 'h': h, - 'x': x - } - except: - msg = 'Calculation of Hill kinetics failed!' + except ValueError: + msg = ('Calculation of Hill kinetics failed! Your input data ' + '(either x or y) contains empty (NaN) values!') if self.logger: self.logger.error('{}'.format(msg)) - else: - print(msg) - return None + raise FitError(msg) + + perr = np.sqrt(np.diag(pconv)) + vmax = popt[0] + Kprime = popt[1] + h = popt[2] + + x = np.arange(0, max(self.raw_kinetic_data['x']), 0.0001) + + self.logger.info('Hill Kinetics:') + self.logger.info(' v_max: {} ± {}'.format(vmax, perr[0])) + self.logger.info(' K_prime: {} ± {}'.format(Kprime, perr[1])) + self.logger.info(' h: {} ± {}'.format(h, perr[2])) + + return {'vmax': np.float(vmax), + 'Kprime': np.float(Kprime), + 'vmax_err': np.float(perr[0]), + 'Kprime_err': np.float(perr[1]), + 'h_err': np.float(perr[2]), + 'h': np.float(h), + 'x': x}