1
0
Fork 0
mirror of https://github.com/Athemis/pyKinetics.git synced 2025-04-05 22:56:02 +00:00

Moved plotting from library to cli-interface; improved logging

...and colorized logging
This commit is contained in:
Alexander Minges 2015-08-21 16:18:47 +02:00
parent c90c18ee84
commit 091c6e62a2
2 changed files with 215 additions and 107 deletions

View file

@ -4,11 +4,109 @@
import argparse import argparse
import logging import logging
import csv import csv
import matplotlib.pyplot as plt
from pathlib import Path from pathlib import Path
from colorlog import ColoredFormatter
import libkinetics import libkinetics
class ExperimentHelper():
def __init__(self, experiment, logger):
self.exp = experiment
self.logger = logger
def linear_regression_function(self, slope, x, intercept):
y = slope * x + intercept
return y
def plot_data(self, exp, outpath):
# iterate over all measurements
for m in exp.measurements:
# plot each measurement
fig, ax = plt.subplots()
ax.set_xlabel('Time')
ax.set_ylabel('Raw Signal')
ax_title = 'Linear regression {} {}'.format(m.concentration,
m.concentration_unit)
ax.set_title(ax_title)
for r in m.replicates:
ax.plot(r.x, r.y, linestyle='None',
marker='o', ms=3, fillstyle='none')
y = self.linear_regression_function(r.fitresult['slope'],
r.x,
r.fitresult['intercept'])
ax.plot(r.x, y, 'k-')
ax.axvspan(m.xlim[0], m.xlim[1], facecolor='0.8', alpha=0.5)
plt.savefig('{}/fit_{}_{}.png'.format(outpath,
m.concentration,
m.concentration_unit),
bbox_inches='tight')
plt.close(fig)
def plot_kinetics(self, exp, outpath):
fig, ax = plt.subplots()
ax.set_xlabel('c [mM]')
ax.set_ylabel('dA/dt [Au/s]')
ax.set_title('Kinetics')
ax.errorbar(exp.raw_kinetic_data['x'],
exp.raw_kinetic_data['y'],
yerr=exp.raw_kinetic_data['yerr'],
fmt='ok', ms=3, fillstyle='none', label="Data with error")
if exp.mm:
y = exp.mm_kinetics_function(exp.mm['x'],
exp.mm['vmax'],
exp.mm['Km'])
ax.plot(exp.mm['x'], y, 'b-', label="Michaelis-Menten")
if exp.hill:
y = exp.hill_kinetics_function(exp.hill['x'],
exp.hill['vmax'],
exp.hill['Kprime'],
exp.hill['h'])
ax.plot(exp.hill['x'], y, 'g-', label="Hill")
ax.legend(loc='best', fancybox=True)
plt.savefig('{}/kinetics.png'.format(outpath), bbox_inches='tight')
plt.close(fig)
def write_data(self, exp, outpath):
with open('{}/results.csv'.format(outpath),
'w',
newline='\n') as csvfile:
writer = csv.writer(csvfile, dialect='excel-tab')
writer.writerow(['# LINEAR FITS'])
writer.writerow([])
writer.writerow(['# concentration',
'avg. slope',
'slope std_err',
'replicates (slope, intercept and r value)'])
for m in exp.measurements:
row = [m.concentration, m.avg_slope, m.avg_slope_err]
for r in m.replicates:
row.append(r.fitresult['slope'])
row.append(r.fitresult['intercept'])
row.append(r.fitresult['r_value'])
writer.writerow(row)
writer.writerow([])
if exp.mm:
writer.writerow(['# MICHAELIS-MENTEN KINETICS'])
writer.writerow(['# vmax', 'Km'])
writer.writerow([exp.mm['vmax'], exp.mm['Km']])
if exp.hill:
writer.writerow(['# HILL KINETICS'])
writer.writerow(['# vmax', 'Kprime', 'h'])
writer.writerow([exp.hill['vmax'], exp.hill['Kprime'],
exp.hill['h']])
def parse_arguments(): def parse_arguments():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -16,8 +114,12 @@ def parse_arguments():
'--verbose', '--verbose',
action='store_true', action='store_true',
help='increase output verbosity') help='increase output verbosity')
parser.add_argument('-r',
'--replicates',
action='store_true',
help='fit kinetics to individual replicates')
parser.add_argument('-wh', parser.add_argument('-wh',
'--with_hill', '--hill',
action='store_true', action='store_true',
help='compute additional kinetics using Hill equation') help='compute additional kinetics using Hill equation')
parser.add_argument('input', parser.add_argument('input',
@ -38,11 +140,25 @@ def initialize_logger():
'fh' which logs to a log file and 'ch' which logs to standard output. 'fh' which logs to a log file and 'ch' which logs to standard output.
:return logger: returns a logger instance :return logger: returns a logger instance
""" """
fmt = '%(log_color)s%(levelname)-8s%(reset)s %(message)s'
formatter = ColoredFormatter(fmt,
datefmt=None,
reset=True,
log_colors={'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white'},
secondary_log_colors={},
style='%')
logging.captureWarnings(True)
logger = logging.getLogger('pyKinetics-cli') logger = logging.getLogger('pyKinetics-cli')
logger.setLevel(logging.INFO) logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler() ch = logging.StreamHandler()
ch.setLevel(logging.INFO) ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch) logger.addHandler(ch)
try: try:
@ -51,7 +167,7 @@ def initialize_logger():
fh.setLevel(logging.INFO) fh.setLevel(logging.INFO)
logger.addHandler(fh) logger.addHandler(fh)
except IOError as error: except IOError as error:
logger.warning('WARNING: Cannot create log file! Run pyKinetics-cli' logger.warning('Cannot create log file! Run pyKinetics-cli'
'from a directory to which you have write access.') 'from a directory to which you have write access.')
logger.warning(error.msg) logger.warning(error.msg)
pass pass
@ -65,37 +181,50 @@ def main():
# initialize logger # initialize logger
logger = initialize_logger() logger = initialize_logger()
if args.with_hill: if args.hill:
do_hill = args.with_hill do_hill = args.hill
else: else:
do_hill = False do_hill = False
if args.replicates:
fit_to_replicates = args.replicates
else:
fit_to_replicates = False
try: try:
input_path = Path(args.input).resolve() input_path = Path(args.input).resolve()
except FileNotFoundError: except FileNotFoundError:
logger.critical('CRITICAL: Path containing input data ' logger.critical('Path containing input data '
'not found: {}'.format(args.input)) 'not found: {}'.format(args.input))
raise raise
try: try:
output_path = Path(args.output).resolve() output_path = Path(args.output).resolve()
except FileNotFoundError: except FileNotFoundError:
logger.critical('CRITICAL: Path for writing results ' logger.critical('Path for writing results '
'not found: {}'.format(args.output)) 'not found: {}'.format(args.output))
raise raise
if output_path.is_dir(): if output_path.is_dir():
if input_path.is_dir(): if input_path.is_dir():
logger.info('INFO: Collecting data files') logger.info('Collecting data files')
data_files = sorted(input_path.glob('**/*.csv')) data_files = sorted(input_path.glob('**/*.csv'))
msg = 'Calculating kinetics' msg = 'Calculating kinetics'
if do_hill: if do_hill:
msg = '{} including Hill kinetics'.format(msg) msg = '{} including Hill kinetics'.format(msg)
logger.info('INFO: {}'.format(msg)) logger.info('{}'.format(msg))
exp = libkinetics.Experiment(data_files, (10, 25), do_hill) exp = libkinetics.Experiment(data_files,
logger.info('INFO: Plotting linear fits to data and kinetics') (10, 25),
exp.plot_data(str(output_path)) do_hill=do_hill,
exp.plot_kinetics(str(output_path)) logger=logger,
logger.info('INFO: Writing results to results.csv') fit_to_replicates=fit_to_replicates)
exp.write_data(str(output_path)) ehlp = ExperimentHelper(exp, logger)
logger.info('Plotting linear fits to data')
ehlp.plot_data(exp, str(output_path))
logger.info('Plotting kinetics fit(s)')
ehlp.plot_kinetics(exp, str(output_path))
logger.info('Writing results to results.csv')
ehlp.write_data(exp, str(output_path))
logger.info('Finished!')
else: else:
msg = '{} is not a directory!'.format(input_path) msg = '{} is not a directory!'.format(input_path)
logger.critical('CRITICAL: '.format(msg)) logger.critical('CRITICAL: '.format(msg))

View file

@ -3,14 +3,15 @@
from scipy import stats, optimize from scipy import stats, optimize
import numpy as np import numpy as np
import matplotlib.pyplot as plt import logging
import csv import warnings
class Replicate(): class Replicate():
def __init__(self, x, y, owner): def __init__(self, num, x, y, owner):
self.logger = owner.logger self.logger = owner.logger
self.num = num + 1
self.x = x self.x = x
self.y = y self.y = y
self.owner = owner self.owner = owner
@ -23,14 +24,40 @@ class Replicate():
x_for_fit = np.take(self.x, ind_min_max) x_for_fit = np.take(self.x, ind_min_max)
y_for_fit = np.take(self.y, ind_min_max) y_for_fit = np.take(self.y, ind_min_max)
# ignore warnings about invalid values in sqrt during linear fitting
# they occur frequently and will just clutter the cli
warnings.filterwarnings('ignore',
category=RuntimeWarning,
message='invalid value encountered in sqrt')
(slope, intercept, (slope, intercept,
r_value, r_value,
p_value, p_value,
std_err) = stats.linregress(x_for_fit, y_for_fit) std_err) = stats.linregress(x_for_fit, y_for_fit)
r_squared = r_value**2
conc = '{} {}'.format(self.owner.concentration,
self.owner.concentration_unit)
self.logger.info('Linear fit for {} #{}:'.format(conc, self.num))
if r_squared < 0.9:
msg = ' r-squared: {} < 0.9; Check fit manually!'
self.logger.warning(msg.format(round(r_squared, 4)))
else:
msg = ' r-squared: {}'
self.logger.info(msg.format(round(r_squared, 4)))
self.logger.info(' slope: {}'.format(slope))
if slope < 0:
self.logger.warning('Slope is negative. Will use absolute value '
'for further calculations!')
self.logger.info(' intercept: {}'.format(slope))
return {'slope': slope, return {'slope': slope,
'intercept': intercept, 'intercept': intercept,
'r_value': r_value, 'r_value': r_value,
'r_squared': r_squared,
'p_value': p_value, 'p_value': p_value,
'std_err': std_err} 'std_err': std_err}
@ -54,7 +81,10 @@ class Measurement():
length_x, num_replicates = self.y.shape length_x, num_replicates = self.y.shape
for n in range(num_replicates): for n in range(num_replicates):
self.replicates.append(Replicate(self.x, self.y[:, n:n+1], self)) self.replicates.append(Replicate(n,
self.x,
self.y[:, n:n+1],
self))
for r in self.replicates: for r in self.replicates:
self.slopes.append(r.fitresult['slope']) self.slopes.append(r.fitresult['slope'])
@ -62,25 +92,12 @@ class Measurement():
self.avg_slope = np.average(self.slopes) self.avg_slope = np.average(self.slopes)
self.avg_slope_err = np.std(self.slopes) self.avg_slope_err = np.std(self.slopes)
def plot(self, outpath): self.logger.info('Average slope: {} ± {}'.format(self.avg_slope,
fig, ax = plt.subplots() self.avg_slope_err))
ax.set_xlabel('Time [s]') if self.avg_slope < 0:
ax.set_ylabel('Absorption (340 nm) [Au]') self.logger.warning('Avererage slope is negative. Will use '
ax.set_title('Linear regression {} {}'.format(self.concentration, 'absolute value for further calculations!')
self.concentration_unit)) self.logger.info('-----')
for r in self.replicates:
ax.plot(r.x, r.y, linestyle='None',
marker='o', ms=3, fillstyle='none')
ax.plot(r.x, r.fitresult['slope']*r.x+r.fitresult['intercept'],
'k-')
ax.axvspan(self.xlim[0], self.xlim[1], facecolor='0.8', alpha=0.5)
plt.savefig('{}/fit_{}_{}.png'.format(outpath,
self.concentration,
self.concentration_unit),
bbox_inches='tight')
plt.close(fig)
def get_results(self): def get_results(self):
results = [] results = []
@ -92,12 +109,17 @@ class Measurement():
class Experiment(): class Experiment():
def __init__(self, data_files, xlim, do_hill=False, logger=None): def __init__(self, data_files, xlim, do_hill=False, fit_to_replicates=False, logger=None):
self.logger = logger if logger:
self.logger = logger
else:
self.logger = logging.getLogger(__name__)
# collction of indepentend measurements # collction of indepentend measurements
self.measurements = [] self.measurements = []
self.fit_to_replicates = fit_to_replicates
# dictionary to store data for the kinetics calculation # dictionary to store data for the kinetics calculation
self.raw_kinetic_data = {'x': [], self.raw_kinetic_data = {'x': [],
'y': [], 'y': [],
@ -125,11 +147,18 @@ class Experiment():
# iterate over all measurements # iterate over all measurements
for m in self.measurements: for m in self.measurements:
# extract relevant data for kinetics calculation (concentration, if self.fit_to_replicates:
# average slope and error) for r in m.replicates:
self.raw_kinetic_data['x'].append(m.concentration) self.raw_kinetic_data['x'].append(m.concentration)
self.raw_kinetic_data['y'].append(np.absolute(m.avg_slope)) self.raw_kinetic_data['y'].append(np.absolute(r.fitresult['slope']))
self.raw_kinetic_data['yerr'].append(m.avg_slope_err) self.raw_kinetic_data['yerr'].append(r.fitresult['std_err'])
else:
# extract relevant data for kinetics calculation
# (concentration, average slope and error)
self.raw_kinetic_data['x'].append(m.concentration)
self.raw_kinetic_data['y'].append(np.absolute(m.avg_slope))
self.raw_kinetic_data['yerr'].append(m.avg_slope_err)
# calculate kinetics # calculate kinetics
self.mm = self.do_mm_kinetics() self.mm = self.do_mm_kinetics()
@ -163,6 +192,10 @@ class Experiment():
Km = popt[1] Km = popt[1]
x = np.arange(0, max(self.raw_kinetic_data['x']), 0.0001) x = np.arange(0, max(self.raw_kinetic_data['x']), 0.0001)
self.logger.info('Michaelis-Menten Kinetics:')
self.logger.info(' v_max: {} ± {}'.format(vmax, perr[0]))
self.logger.info(' Km: {} ± {}'.format(Km, perr[1]))
return {'vmax': float(vmax), return {'vmax': float(vmax),
'Km': float(Km), 'Km': float(Km),
'perr': perr, 'perr': perr,
@ -170,7 +203,7 @@ class Experiment():
except: except:
msg = 'Calculation of Michaelis-Menten kinetics failed!' msg = 'Calculation of Michaelis-Menten kinetics failed!'
if self.logger: if self.logger:
self.logger.error('ERROR: {}'.format(msg)) self.logger.error('{}'.format(msg))
else: else:
print(msg) print(msg)
return None return None
@ -188,6 +221,11 @@ class Experiment():
x = np.arange(0, max(self.raw_kinetic_data['x']), 0.0001) x = np.arange(0, max(self.raw_kinetic_data['x']), 0.0001)
self.logger.info('Hill Kinetics:')
self.logger.info(' v_max: {} ± {}'.format(vmax, perr[0]))
self.logger.info(' K_prime: {} ± {}'.format(Kprime, perr[1]))
self.logger.info(' h: {} ± {}'.format(h, perr[2]))
return {'vmax': float(vmax), return {'vmax': float(vmax),
'Kprime': float(Kprime), 'Kprime': float(Kprime),
'perr': perr, 'perr': perr,
@ -196,66 +234,7 @@ class Experiment():
except: except:
msg = 'Calculation of Hill kinetics failed!' msg = 'Calculation of Hill kinetics failed!'
if self.logger: if self.logger:
self.logger.error('ERROR: {}'.format(msg)) self.logger.error('{}'.format(msg))
else: else:
print(msg) print(msg)
return None return None
def plot_kinetics(self, outpath):
fig, ax = plt.subplots()
ax.set_xlabel('c [mM]')
ax.set_ylabel('dA/dt [Au/s]')
ax.set_title('Kinetics')
ax.errorbar(self.raw_kinetic_data['x'],
self.raw_kinetic_data['y'],
yerr=self.raw_kinetic_data['yerr'],
fmt='ok', ms=3, fillstyle='none', label="Data with error")
if self.mm:
y = self.mm_kinetics_function(self.mm['x'],
self.mm['vmax'],
self.mm['Km'])
ax.plot(self.mm['x'], y, 'b-', label="Michaelis-Menten")
if self.hill:
y = self.hill_kinetics_function(self.hill['x'],
self.hill['vmax'],
self.hill['Kprime'],
self.hill['h'])
ax.plot(self.hill['x'], y, 'g-', label="Hill")
ax.legend(loc='best', fancybox=True)
plt.savefig('{}/kinetics.png'.format(outpath), bbox_inches='tight')
plt.close(fig)
def write_data(self, outpath):
with open('{}/results.csv'.format(outpath),
'w',
newline='\n') as csvfile:
writer = csv.writer(csvfile, dialect='excel-tab')
writer.writerow(['# LINEAR FITS'])
writer.writerow([])
writer.writerow(['# concentration',
'avg. slope',
'slope std_err',
'replicates (slope, intercept and r value)'])
for m in self.measurements:
row = [m.concentration, m.avg_slope, m.avg_slope_err]
for r in m.replicates:
row.append(r.fitresult['slope'])
row.append(r.fitresult['intercept'])
row.append(r.fitresult['r_value'])
writer.writerow(row)
writer.writerow([])
if self.mm:
writer.writerow(['# MICHAELIS-MENTEN KINETICS'])
writer.writerow(['# vmax', 'Km'])
writer.writerow([self.mm['vmax'], self.mm['Km']])
if self.hill:
writer.writerow(['# HILL KINETICS'])
writer.writerow(['# vmax', 'Kprime', 'h'])
writer.writerow([self.hill['vmax'], self.hill['Kprime'],
self.hill['h']])