From 091c6e62a2ff10aaba1155eed59bfdebba20b3dd Mon Sep 17 00:00:00 2001 From: Alexander Minges Date: Fri, 21 Aug 2015 16:18:47 +0200 Subject: [PATCH] Moved plotting from library to cli-interface; improved logging ...and colorized logging --- analyze-cli.py | 161 ++++++++++++++++++++++++++++++++++++---- libkinetics/__init__.py | 161 +++++++++++++++++----------------------- 2 files changed, 215 insertions(+), 107 deletions(-) diff --git a/analyze-cli.py b/analyze-cli.py index f66a33c..c3bfe4e 100755 --- a/analyze-cli.py +++ b/analyze-cli.py @@ -4,11 +4,109 @@ import argparse import logging import csv +import matplotlib.pyplot as plt + from pathlib import Path +from colorlog import ColoredFormatter import libkinetics +class ExperimentHelper(): + + def __init__(self, experiment, logger): + self.exp = experiment + self.logger = logger + + def linear_regression_function(self, slope, x, intercept): + y = slope * x + intercept + return y + + def plot_data(self, exp, outpath): + # iterate over all measurements + for m in exp.measurements: + # plot each measurement + fig, ax = plt.subplots() + ax.set_xlabel('Time') + ax.set_ylabel('Raw Signal') + ax_title = 'Linear regression {} {}'.format(m.concentration, + m.concentration_unit) + ax.set_title(ax_title) + + for r in m.replicates: + ax.plot(r.x, r.y, linestyle='None', + marker='o', ms=3, fillstyle='none') + y = self.linear_regression_function(r.fitresult['slope'], + r.x, + r.fitresult['intercept']) + ax.plot(r.x, y, 'k-') + ax.axvspan(m.xlim[0], m.xlim[1], facecolor='0.8', alpha=0.5) + + plt.savefig('{}/fit_{}_{}.png'.format(outpath, + m.concentration, + m.concentration_unit), + bbox_inches='tight') + plt.close(fig) + + def plot_kinetics(self, exp, outpath): + fig, ax = plt.subplots() + ax.set_xlabel('c [mM]') + ax.set_ylabel('dA/dt [Au/s]') + ax.set_title('Kinetics') + + ax.errorbar(exp.raw_kinetic_data['x'], + exp.raw_kinetic_data['y'], + yerr=exp.raw_kinetic_data['yerr'], + fmt='ok', ms=3, fillstyle='none', label="Data with error") + + if exp.mm: + y = exp.mm_kinetics_function(exp.mm['x'], + exp.mm['vmax'], + exp.mm['Km']) + ax.plot(exp.mm['x'], y, 'b-', label="Michaelis-Menten") + if exp.hill: + y = exp.hill_kinetics_function(exp.hill['x'], + exp.hill['vmax'], + exp.hill['Kprime'], + exp.hill['h']) + ax.plot(exp.hill['x'], y, 'g-', label="Hill") + + ax.legend(loc='best', fancybox=True) + plt.savefig('{}/kinetics.png'.format(outpath), bbox_inches='tight') + plt.close(fig) + + def write_data(self, exp, outpath): + + with open('{}/results.csv'.format(outpath), + 'w', + newline='\n') as csvfile: + + writer = csv.writer(csvfile, dialect='excel-tab') + writer.writerow(['# LINEAR FITS']) + writer.writerow([]) + writer.writerow(['# concentration', + 'avg. slope', + 'slope std_err', + 'replicates (slope, intercept and r value)']) + for m in exp.measurements: + row = [m.concentration, m.avg_slope, m.avg_slope_err] + for r in m.replicates: + row.append(r.fitresult['slope']) + row.append(r.fitresult['intercept']) + row.append(r.fitresult['r_value']) + writer.writerow(row) + + writer.writerow([]) + if exp.mm: + writer.writerow(['# MICHAELIS-MENTEN KINETICS']) + writer.writerow(['# vmax', 'Km']) + writer.writerow([exp.mm['vmax'], exp.mm['Km']]) + if exp.hill: + writer.writerow(['# HILL KINETICS']) + writer.writerow(['# vmax', 'Kprime', 'h']) + writer.writerow([exp.hill['vmax'], exp.hill['Kprime'], + exp.hill['h']]) + def parse_arguments(): parser = argparse.ArgumentParser() @@ -16,8 +114,12 @@ def parse_arguments(): '--verbose', action='store_true', help='increase output verbosity') + parser.add_argument('-r', + '--replicates', + action='store_true', + help='fit kinetics to individual replicates') parser.add_argument('-wh', - '--with_hill', + '--hill', action='store_true', help='compute additional kinetics using Hill equation') parser.add_argument('input', @@ -38,11 +140,25 @@ def initialize_logger(): 'fh' which logs to a log file and 'ch' which logs to standard output. :return logger: returns a logger instance """ + fmt = '%(log_color)s%(levelname)-8s%(reset)s %(message)s' + formatter = ColoredFormatter(fmt, + datefmt=None, + reset=True, + log_colors={'DEBUG': 'cyan', + 'INFO': 'green', + 'WARNING': 'yellow', + 'ERROR': 'red', + 'CRITICAL': 'red,bg_white'}, + secondary_log_colors={}, + style='%') + + logging.captureWarnings(True) logger = logging.getLogger('pyKinetics-cli') - logger.setLevel(logging.INFO) + logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() - ch.setLevel(logging.INFO) + ch.setLevel(logging.DEBUG) + ch.setFormatter(formatter) logger.addHandler(ch) try: @@ -51,7 +167,7 @@ def initialize_logger(): fh.setLevel(logging.INFO) logger.addHandler(fh) except IOError as error: - logger.warning('WARNING: Cannot create log file! Run pyKinetics-cli' + logger.warning('Cannot create log file! Run pyKinetics-cli' 'from a directory to which you have write access.') logger.warning(error.msg) pass @@ -65,37 +181,50 @@ def main(): # initialize logger logger = initialize_logger() - if args.with_hill: - do_hill = args.with_hill + if args.hill: + do_hill = args.hill else: do_hill = False + + if args.replicates: + fit_to_replicates = args.replicates + else: + fit_to_replicates = False + try: input_path = Path(args.input).resolve() except FileNotFoundError: - logger.critical('CRITICAL: Path containing input data ' + logger.critical('Path containing input data ' 'not found: {}'.format(args.input)) raise try: output_path = Path(args.output).resolve() except FileNotFoundError: - logger.critical('CRITICAL: Path for writing results ' + logger.critical('Path for writing results ' 'not found: {}'.format(args.output)) raise if output_path.is_dir(): if input_path.is_dir(): - logger.info('INFO: Collecting data files') + logger.info('Collecting data files') data_files = sorted(input_path.glob('**/*.csv')) msg = 'Calculating kinetics' if do_hill: msg = '{} including Hill kinetics'.format(msg) - logger.info('INFO: {}'.format(msg)) - exp = libkinetics.Experiment(data_files, (10, 25), do_hill) - logger.info('INFO: Plotting linear fits to data and kinetics') - exp.plot_data(str(output_path)) - exp.plot_kinetics(str(output_path)) - logger.info('INFO: Writing results to results.csv') - exp.write_data(str(output_path)) + logger.info('{}'.format(msg)) + exp = libkinetics.Experiment(data_files, + (10, 25), + do_hill=do_hill, + logger=logger, + fit_to_replicates=fit_to_replicates) + ehlp = ExperimentHelper(exp, logger) + logger.info('Plotting linear fits to data') + ehlp.plot_data(exp, str(output_path)) + logger.info('Plotting kinetics fit(s)') + ehlp.plot_kinetics(exp, str(output_path)) + logger.info('Writing results to results.csv') + ehlp.write_data(exp, str(output_path)) + logger.info('Finished!') else: msg = '{} is not a directory!'.format(input_path) logger.critical('CRITICAL: '.format(msg)) diff --git a/libkinetics/__init__.py b/libkinetics/__init__.py index edd5ae0..a14dc3b 100644 --- a/libkinetics/__init__.py +++ b/libkinetics/__init__.py @@ -3,14 +3,15 @@ from scipy import stats, optimize import numpy as np -import matplotlib.pyplot as plt -import csv +import logging +import warnings class Replicate(): - def __init__(self, x, y, owner): + def __init__(self, num, x, y, owner): self.logger = owner.logger + self.num = num + 1 self.x = x self.y = y self.owner = owner @@ -23,14 +24,40 @@ class Replicate(): x_for_fit = np.take(self.x, ind_min_max) y_for_fit = np.take(self.y, ind_min_max) + # ignore warnings about invalid values in sqrt during linear fitting + # they occur frequently and will just clutter the cli + warnings.filterwarnings('ignore', + category=RuntimeWarning, + message='invalid value encountered in sqrt') + (slope, intercept, r_value, p_value, std_err) = stats.linregress(x_for_fit, y_for_fit) + r_squared = r_value**2 + conc = '{} {}'.format(self.owner.concentration, + self.owner.concentration_unit) + + self.logger.info('Linear fit for {} #{}:'.format(conc, self.num)) + if r_squared < 0.9: + msg = ' r-squared: {} < 0.9; Check fit manually!' + self.logger.warning(msg.format(round(r_squared, 4))) + else: + msg = ' r-squared: {}' + self.logger.info(msg.format(round(r_squared, 4))) + self.logger.info(' slope: {}'.format(slope)) + if slope < 0: + self.logger.warning('Slope is negative. Will use absolute value ' + 'for further calculations!') + self.logger.info(' intercept: {}'.format(slope)) + + + return {'slope': slope, 'intercept': intercept, 'r_value': r_value, + 'r_squared': r_squared, 'p_value': p_value, 'std_err': std_err} @@ -54,7 +81,10 @@ class Measurement(): length_x, num_replicates = self.y.shape for n in range(num_replicates): - self.replicates.append(Replicate(self.x, self.y[:, n:n+1], self)) + self.replicates.append(Replicate(n, + self.x, + self.y[:, n:n+1], + self)) for r in self.replicates: self.slopes.append(r.fitresult['slope']) @@ -62,25 +92,12 @@ class Measurement(): self.avg_slope = np.average(self.slopes) self.avg_slope_err = np.std(self.slopes) - def plot(self, outpath): - fig, ax = plt.subplots() - ax.set_xlabel('Time [s]') - ax.set_ylabel('Absorption (340 nm) [Au]') - ax.set_title('Linear regression {} {}'.format(self.concentration, - self.concentration_unit)) - - for r in self.replicates: - ax.plot(r.x, r.y, linestyle='None', - marker='o', ms=3, fillstyle='none') - ax.plot(r.x, r.fitresult['slope']*r.x+r.fitresult['intercept'], - 'k-') - ax.axvspan(self.xlim[0], self.xlim[1], facecolor='0.8', alpha=0.5) - - plt.savefig('{}/fit_{}_{}.png'.format(outpath, - self.concentration, - self.concentration_unit), - bbox_inches='tight') - plt.close(fig) + self.logger.info('Average slope: {} ± {}'.format(self.avg_slope, + self.avg_slope_err)) + if self.avg_slope < 0: + self.logger.warning('Avererage slope is negative. Will use ' + 'absolute value for further calculations!') + self.logger.info('-----') def get_results(self): results = [] @@ -92,12 +109,17 @@ class Measurement(): class Experiment(): - def __init__(self, data_files, xlim, do_hill=False, logger=None): + def __init__(self, data_files, xlim, do_hill=False, fit_to_replicates=False, logger=None): - self.logger = logger + if logger: + self.logger = logger + else: + self.logger = logging.getLogger(__name__) # collction of indepentend measurements self.measurements = [] + + self.fit_to_replicates = fit_to_replicates # dictionary to store data for the kinetics calculation self.raw_kinetic_data = {'x': [], 'y': [], @@ -125,11 +147,18 @@ class Experiment(): # iterate over all measurements for m in self.measurements: - # extract relevant data for kinetics calculation (concentration, - # average slope and error) - self.raw_kinetic_data['x'].append(m.concentration) - self.raw_kinetic_data['y'].append(np.absolute(m.avg_slope)) - self.raw_kinetic_data['yerr'].append(m.avg_slope_err) + if self.fit_to_replicates: + for r in m.replicates: + self.raw_kinetic_data['x'].append(m.concentration) + self.raw_kinetic_data['y'].append(np.absolute(r.fitresult['slope'])) + self.raw_kinetic_data['yerr'].append(r.fitresult['std_err']) + + else: + # extract relevant data for kinetics calculation + # (concentration, average slope and error) + self.raw_kinetic_data['x'].append(m.concentration) + self.raw_kinetic_data['y'].append(np.absolute(m.avg_slope)) + self.raw_kinetic_data['yerr'].append(m.avg_slope_err) # calculate kinetics self.mm = self.do_mm_kinetics() @@ -163,6 +192,10 @@ class Experiment(): Km = popt[1] x = np.arange(0, max(self.raw_kinetic_data['x']), 0.0001) + self.logger.info('Michaelis-Menten Kinetics:') + self.logger.info(' v_max: {} ± {}'.format(vmax, perr[0])) + self.logger.info(' Km: {} ± {}'.format(Km, perr[1])) + return {'vmax': float(vmax), 'Km': float(Km), 'perr': perr, @@ -170,7 +203,7 @@ class Experiment(): except: msg = 'Calculation of Michaelis-Menten kinetics failed!' if self.logger: - self.logger.error('ERROR: {}'.format(msg)) + self.logger.error('{}'.format(msg)) else: print(msg) return None @@ -188,6 +221,11 @@ class Experiment(): x = np.arange(0, max(self.raw_kinetic_data['x']), 0.0001) + self.logger.info('Hill Kinetics:') + self.logger.info(' v_max: {} ± {}'.format(vmax, perr[0])) + self.logger.info(' K_prime: {} ± {}'.format(Kprime, perr[1])) + self.logger.info(' h: {} ± {}'.format(h, perr[2])) + return {'vmax': float(vmax), 'Kprime': float(Kprime), 'perr': perr, @@ -196,66 +234,7 @@ class Experiment(): except: msg = 'Calculation of Hill kinetics failed!' if self.logger: - self.logger.error('ERROR: {}'.format(msg)) + self.logger.error('{}'.format(msg)) else: print(msg) return None - - def plot_kinetics(self, outpath): - fig, ax = plt.subplots() - ax.set_xlabel('c [mM]') - ax.set_ylabel('dA/dt [Au/s]') - ax.set_title('Kinetics') - - ax.errorbar(self.raw_kinetic_data['x'], - self.raw_kinetic_data['y'], - yerr=self.raw_kinetic_data['yerr'], - fmt='ok', ms=3, fillstyle='none', label="Data with error") - - if self.mm: - y = self.mm_kinetics_function(self.mm['x'], - self.mm['vmax'], - self.mm['Km']) - ax.plot(self.mm['x'], y, 'b-', label="Michaelis-Menten") - if self.hill: - y = self.hill_kinetics_function(self.hill['x'], - self.hill['vmax'], - self.hill['Kprime'], - self.hill['h']) - ax.plot(self.hill['x'], y, 'g-', label="Hill") - - ax.legend(loc='best', fancybox=True) - plt.savefig('{}/kinetics.png'.format(outpath), bbox_inches='tight') - plt.close(fig) - - def write_data(self, outpath): - - with open('{}/results.csv'.format(outpath), - 'w', - newline='\n') as csvfile: - - writer = csv.writer(csvfile, dialect='excel-tab') - writer.writerow(['# LINEAR FITS']) - writer.writerow([]) - writer.writerow(['# concentration', - 'avg. slope', - 'slope std_err', - 'replicates (slope, intercept and r value)']) - for m in self.measurements: - row = [m.concentration, m.avg_slope, m.avg_slope_err] - for r in m.replicates: - row.append(r.fitresult['slope']) - row.append(r.fitresult['intercept']) - row.append(r.fitresult['r_value']) - writer.writerow(row) - - writer.writerow([]) - if self.mm: - writer.writerow(['# MICHAELIS-MENTEN KINETICS']) - writer.writerow(['# vmax', 'Km']) - writer.writerow([self.mm['vmax'], self.mm['Km']]) - if self.hill: - writer.writerow(['# HILL KINETICS']) - writer.writerow(['# vmax', 'Kprime', 'h']) - writer.writerow([self.hill['vmax'], self.hill['Kprime'], - self.hill['h']])