mirror of
https://github.com/Athemis/PyDSF.git
synced 2025-04-19 03:19:13 +00:00
Migrated to Python3; Switched to Peakutils for peak finding routines; CHanged behaviour of temperature cut-offs
This commit is contained in:
parent
41b252a3c9
commit
ff7f10438b
7 changed files with 93 additions and 772 deletions
126
pydsf.py
126
pydsf.py
|
@ -8,10 +8,16 @@ try:
|
|||
mpl.use('Qt4Agg')
|
||||
import matplotlib.ticker as ticker
|
||||
import matplotlib.pyplot as plt
|
||||
import matplotlib.patches as mpatches
|
||||
except ImportError:
|
||||
raise ImportError('----- Matplotlib must be installed. -----')
|
||||
|
||||
from peakdetect import *
|
||||
try:
|
||||
import peakutils
|
||||
except ImportError:
|
||||
raise ImportError('----- PeakUtils must be installed. -----')
|
||||
|
||||
#from peakdetect import *
|
||||
|
||||
try:
|
||||
import numpy as np
|
||||
|
@ -35,8 +41,10 @@ class Well:
|
|||
self.splines = {"raw": None,
|
||||
"filtered": None,
|
||||
"derivative1": None}
|
||||
self.tm = None
|
||||
self.tm_sd = None
|
||||
self.tm = np.NaN
|
||||
self.tm_sd = np.NaN
|
||||
self.baseline_correction = owner.baseline_correction
|
||||
self.baseline = None
|
||||
|
||||
def filter_raw(self):
|
||||
"""
|
||||
|
@ -57,38 +65,51 @@ class Well:
|
|||
temp = self.splines[spline].derivatives(t)
|
||||
for i in range(4):
|
||||
self.derivatives[i, t - self.owner.t1] = temp[i]
|
||||
|
||||
def calc_baseline(self, y):
|
||||
try:
|
||||
baseline = peakutils.baseline(y)
|
||||
return baseline
|
||||
except:
|
||||
return np.NaN
|
||||
|
||||
def calc_tm(self):
|
||||
"""
|
||||
Calculate the Tm of the well. Returns either the Tm or 'None'.
|
||||
Calculate the Tm of the well. Returns either the Tm or 'np.NaN'.
|
||||
"""
|
||||
|
||||
# Check if the well has already been flagged as denatured
|
||||
if self in self.owner.denatured_wells:
|
||||
return None # Return 'None' if true
|
||||
|
||||
return np.NaN # Return 'NaN' if true
|
||||
|
||||
# First assume that the well is denatured
|
||||
self.owner.denatured_wells.append(self)
|
||||
|
||||
if self.owner.tm_cutoff_low != self.owner.t1 or self.owner.tm_cutoff_high != self.owner.t1:
|
||||
x = np.arange(self.owner.tm_cutoff_low, self.owner.tm_cutoff_high + 1, self.owner.dt, dtype=float)
|
||||
|
||||
x = self.owner.temprange
|
||||
y = self.derivatives[1]
|
||||
lookahead = len(x)/4 # Amount of data points to look around potential peaks to verify
|
||||
|
||||
if self.baseline_correction:
|
||||
y = y - self.baseline
|
||||
|
||||
try:
|
||||
peaks = peakdetect(y, x, lookahead) # Run peak finding algorithm
|
||||
peak_indexes = peakutils.indexes(y, min_dist=len(x)) # calculate a rough estimate of peaks; set min_dist
|
||||
# temperature range to only find one/the highest peak
|
||||
tm = peakutils.interpolate(x, y, ind=peak_indexes)[0] # increase resolution by fitting gaussian function
|
||||
# to peak
|
||||
except:
|
||||
return None # In case of error, return no peak
|
||||
return np.NaN # In case of error, return no peak
|
||||
|
||||
try:
|
||||
for i in peaks[0]: # Iterate over all peaks
|
||||
tm = i[0] # Highest peak found is the first in list
|
||||
# Check if the peak is within cutoff range
|
||||
if tm and tm >= self.owner.tm_cutoff_low and tm <= self.owner.tm_cutoff_high:
|
||||
self.owner.denatured_wells.remove(self) # If everything is fine, remove the denatured flag
|
||||
return tm # and return the Tm
|
||||
return tm # and return the Tm
|
||||
else:
|
||||
return None # otherwise, return no Tm
|
||||
return np.NaN # otherwise, return NaN
|
||||
except:
|
||||
return None # In case of error, return nothing
|
||||
return np.NaN # In case of error, return NaN
|
||||
|
||||
def is_denatured(self):
|
||||
"""
|
||||
|
@ -131,16 +152,19 @@ class Well:
|
|||
self.splines["filtered"] = self.calc_spline(self.filtered)
|
||||
|
||||
self.calc_derivatives()
|
||||
if self.baseline_correction:
|
||||
self.baseline = self.calc_baseline(self.derivatives[1])
|
||||
if self.is_denatured():
|
||||
self.owner.denatured_wells.append(self)
|
||||
|
||||
self.splines["derivative1"] = self.calc_spline(self.derivatives[1])
|
||||
|
||||
|
||||
self.tm = self.calc_tm()
|
||||
#print(self.tm)
|
||||
if self.tm is None:
|
||||
self.tm = np.NaN
|
||||
|
||||
class Experiment:
|
||||
def __init__(self, type, gui=None, files=None, replicates=None, t1=25, t2=95, dt=1, cols=12, rows=8, cutoff_low=None, cutoff_high=None, signal_threshold=None, color_range=None):
|
||||
def __init__(self, type, gui=None, files=None, replicates=None, t1=25, t2=95, dt=1, cols=12, rows=8, cutoff_low=None, cutoff_high=None, signal_threshold=None, color_range=None, baseline_correction=False):
|
||||
self.replicates = replicates
|
||||
self.cols = cols
|
||||
self.rows = rows
|
||||
|
@ -159,6 +183,7 @@ class Experiment:
|
|||
self.gui=gui
|
||||
self.signal_threshold = signal_threshold
|
||||
self.avg_plate = None
|
||||
self.baseline_correction=baseline_correction
|
||||
if cutoff_low:
|
||||
self.tm_cutoff_low = cutoff_low
|
||||
else:
|
||||
|
@ -176,12 +201,12 @@ class Experiment:
|
|||
|
||||
i = 1
|
||||
for file in files:
|
||||
plate = Plate(type=self.type, filename=file, t1=self.t1, t2=self.t2, dt=self.dt, cols=self.cols, rows=self.rows, cutoff_low=self.tm_cutoff_low, cutoff_high=self.tm_cutoff_high, signal_threshold=self.signal_threshold, color_range=self.color_range)
|
||||
plate = Plate(type=self.type, owner=self, filename=file, t1=self.t1, t2=self.t2, dt=self.dt, cols=self.cols, rows=self.rows, cutoff_low=self.tm_cutoff_low, cutoff_high=self.tm_cutoff_high, signal_threshold=self.signal_threshold, color_range=self.color_range)
|
||||
plate.id = i
|
||||
self.plates.append(plate)
|
||||
i += 1
|
||||
if len(files) > 1:
|
||||
self.avg_plate = Plate(type=self.type, filename=None, t1=self.t1, t2=self.t2, dt=self.dt, cols=self.cols, rows=self.rows, cutoff_low=self.tm_cutoff_low, cutoff_high=self.tm_cutoff_high, signal_threshold=self.signal_threshold, color_range=self.color_range)
|
||||
self.avg_plate = Plate(type=self.type, owner=self, filename=None, t1=self.t1, t2=self.t2, dt=self.dt, cols=self.cols, rows=self.rows, cutoff_low=self.tm_cutoff_low, cutoff_high=self.tm_cutoff_high, signal_threshold=self.signal_threshold, color_range=self.color_range)
|
||||
self.avg_plate.id = 'average'
|
||||
|
||||
def analyze(self):
|
||||
|
@ -211,12 +236,22 @@ class Experiment:
|
|||
|
||||
|
||||
class Plate:
|
||||
def __init__(self, type, id=None, filename=None, replicates=None, t1=25, t2=95, dt=1, cols=12, rows=8, cutoff_low=None, cutoff_high=None, signal_threshold=None, color_range=None):
|
||||
def __init__(self, type, owner, id=None, filename=None, replicates=None, t1=None, t2=None, dt=None, cols=12, rows=8, cutoff_low=None, cutoff_high=None, signal_threshold=None, color_range=None):
|
||||
self.cols = cols
|
||||
self.rows = rows
|
||||
self.t1 = t1
|
||||
self.t2 = t2
|
||||
self.dt = dt
|
||||
self.owner = owner
|
||||
if t1:
|
||||
self.t1 = t1
|
||||
else:
|
||||
self.t1 = owner.t1
|
||||
if t1:
|
||||
self.t2 = t2
|
||||
else:
|
||||
self.t2 = owner.t2
|
||||
if t1:
|
||||
self.dt = dt
|
||||
else:
|
||||
self.dt = owner.dt
|
||||
self.temprange = np.arange(self.t1, self.t2 + 1, self.dt, dtype=float)
|
||||
self.reads = int(round((t2 + 1 - t1) / dt))
|
||||
self.wellnum = self.cols * self.rows
|
||||
|
@ -228,6 +263,7 @@ class Plate:
|
|||
self.replicates = None
|
||||
self.signal_threshold = signal_threshold
|
||||
self.id = id
|
||||
self.baseline_correction = owner.baseline_correction
|
||||
if cutoff_low:
|
||||
self.tm_cutoff_low = cutoff_low
|
||||
else:
|
||||
|
@ -271,7 +307,7 @@ class Plate:
|
|||
self.wells[i].name = row[read]
|
||||
self.wells[i].raw = temp
|
||||
i += 1
|
||||
|
||||
|
||||
def analyze(self, gui=None):
|
||||
try:
|
||||
# Try to access data file in the given path
|
||||
|
@ -298,10 +334,10 @@ class Plate:
|
|||
|
||||
if self.replicates:
|
||||
if self.replicates == 'rows':
|
||||
print "rows"
|
||||
print("rows")
|
||||
if self.replicates == 'cols':
|
||||
print "cols"
|
||||
|
||||
print("cols")
|
||||
#print(self.tms)
|
||||
self.max_tm = max(self.tms)
|
||||
self.min_tm = min(self.tms)
|
||||
|
||||
|
@ -309,7 +345,7 @@ class Plate:
|
|||
with open(filename, 'w') as f:
|
||||
f.write('#{:<4s}{:>13s}\n'.format('ID', '"Tm [°C]"'))
|
||||
for well in self.wells:
|
||||
if well.tm == None or well in self.denatured_wells:
|
||||
if np.isnan(well.tm) or well in self.denatured_wells:
|
||||
f.write('{:<5s}{:>12s}\n'.format(well.name, 'NaN'))
|
||||
else:
|
||||
f.write('{:<5s}{:>12s}\n'.format(well.name, str(well.tm)))
|
||||
|
@ -318,7 +354,7 @@ class Plate:
|
|||
with open(filename, 'w') as f:
|
||||
f.write('#{:<4s}{:>13s}{:>13s}\n'.format('"ID"', '"Tm [°C]"', '"SD"'))
|
||||
for well in self.wells:
|
||||
if well.tm == None or well in self.denatured_wells:
|
||||
if np.isnan(well.tm) or well in self.denatured_wells:
|
||||
f.write('{:<5s}{:>12s}{:>12s}\n'.format(well.name, 'NaN', 'NaN'))
|
||||
else:
|
||||
f.write('{:<5s}{:>12s}{:>12s}\n'.format(well.name, str(well.tm), str(well.tm_sd)))
|
||||
|
@ -373,6 +409,11 @@ class Plate:
|
|||
f.write('{:>-15.3f}'.format(float(np.round(d, decimals=3))))
|
||||
f.write('\n')
|
||||
i += 1
|
||||
|
||||
# TODO: Implement 'write_baseline_corrected_table()
|
||||
|
||||
def write_baseline_corrected_table(self, filename):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def update_progress_bar(bar, value):
|
||||
|
@ -431,6 +472,9 @@ def plot_tm_heatmap_single(plate, gui=None):
|
|||
x_values = [] # Array holding the columns
|
||||
y_values = [] # Array holding the rows
|
||||
c_values = [] # Array holding the color values aka Tm
|
||||
dx_values = []
|
||||
dy_values = []
|
||||
dc_values = []
|
||||
for well in plate.wells: # Iterate over all wells
|
||||
if well not in plate.denatured_wells: # Check if well is denatured (no Tm found)
|
||||
c = well.tm # If not, set color to Tm
|
||||
|
@ -440,6 +484,8 @@ def plot_tm_heatmap_single(plate, gui=None):
|
|||
c = plate.tm_cutoff_high # If it is, set color to cutoff
|
||||
else: # If the plate is denatured
|
||||
c = plate.tm_cutoff_low # Set its color to the low cutoff
|
||||
dx_values.append(x)
|
||||
dy_values.append(y)
|
||||
x_values.append(x) # Add values to the respective arrays
|
||||
y_values.append(y)
|
||||
c_values.append(c)
|
||||
|
@ -448,25 +494,31 @@ def plot_tm_heatmap_single(plate, gui=None):
|
|||
x = 1 # reset column to one
|
||||
y += 1 # and increase row by one
|
||||
|
||||
fig1 = plt.figure() # new figure
|
||||
fig1 = plt.figure() # new figure
|
||||
ax1 = fig1.add_subplot(1, 1, 1) # A single canvas
|
||||
ax1.autoscale(tight=True) # Scale plate size
|
||||
ax1.xaxis.set_major_locator(ticker.MaxNLocator(plate.cols + 1)) # n columns
|
||||
ax1.yaxis.set_major_locator(ticker.MaxNLocator(plate.rows + 1)) # n rows
|
||||
if plate.color_range:
|
||||
cax = ax1.scatter(x_values, y_values, s=300, c=c_values, marker='s', vmin=plate.color_range[0], vmax=plate.color_range[1]) # plot wells and color using the colormap
|
||||
cax = ax1.scatter(x_values, y_values, s=305, c=c_values, marker='s', vmin=plate.color_range[0], vmax=plate.color_range[1]) # plot wells and color using the colormap
|
||||
else:
|
||||
cax = ax1.scatter(x_values, y_values, s=300, c=c_values, marker='s') # plot wells and color using the colormap
|
||||
cax = ax1.scatter(x_values, y_values, s=305, c=c_values, marker='s') # plot wells and color using the colormap
|
||||
|
||||
cax2 = ax1.scatter(dx_values, dy_values, s=80, c='white', marker='x', linewidths=(1.5,))
|
||||
ax1.invert_yaxis() # invert y axis to math plate layout
|
||||
cbar = fig1.colorbar(cax) # show colorbar
|
||||
ax1.set_xlabel('Columns') # set axis and colorbar label
|
||||
ax1.set_ylabel('Rows')
|
||||
|
||||
if str(plate.id) == 'average':
|
||||
title = '$T_m$ heatmap (average)'
|
||||
else:
|
||||
title = '$T_m$ heatmap (plate #{})'.format(str(plate.id))
|
||||
ax1.set_title(title)
|
||||
cbar.set_label(u"Temperature [°C]")
|
||||
|
||||
#magenta_patch = mpatches.Patch(color='magenta', label='Denatured')
|
||||
#fig1.legend([magenta_patch], 'Denatured', loc='lower right', bbox_to_anchor=[0.5, 0.5])
|
||||
|
||||
# if gui:
|
||||
# update_progress_bar(gui.pb, 50)
|
||||
|
@ -492,14 +544,18 @@ def plot_derivative(plate, gui=None):
|
|||
ax.set_ylabel('dI/dT', size='xx-small')
|
||||
|
||||
x = plate.temprange # set values for the x axis to the given temperature range
|
||||
y = well.derivatives[1] # grab y values from the first derivative of the well
|
||||
if well.baseline_correction:
|
||||
print(well.baseline)
|
||||
y = well.derivatives[1] - well.baseline
|
||||
else:
|
||||
y = well.derivatives[1] # grab y values from the first derivative of the well
|
||||
|
||||
ax.xaxis.set_major_locator(ticker.MaxNLocator(4)) # only show three tickmarks on both axes
|
||||
ax.yaxis.set_major_locator(ticker.MaxNLocator(4))
|
||||
if well not in plate.denatured_wells: # check if well is denatured (without determined Tm)
|
||||
tm = well.tm # if not, grab its Tm
|
||||
else:
|
||||
tm = None # else set Tm to None
|
||||
tm = np.NaN # else set Tm to np.NaN
|
||||
if tm:
|
||||
ax.axvline(x=tm) # plot vertical line at the Tm
|
||||
ax.axvspan(plate.t1, plate.tm_cutoff_low, facecolor='0.8', alpha=0.5) # shade lower cutoff area
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue