diff --git a/pydsf.py b/pydsf.py index e92deac..4583c8b 100644 --- a/pydsf.py +++ b/pydsf.py @@ -1,7 +1,11 @@ #! /usr/bin/env python # -*- coding: utf-8 -*- + +# Import csv library; Part of python standard libs import csv +# Import 3rd party packages. Check if installed and die with error message +# if not. try: import matplotlib as mpl import mpl_toolkits.axes_grid1 @@ -30,10 +34,13 @@ except ImportError: class Well: - - def __init__(self, owner): + """ + Represents a well in a microtiter plate. + Owned by an object of type 'Plate'. + """ + def __init__(self, owner, name=None): self.owner = owner - self.name = None + self.name = name self.raw = np.zeros(self.owner.reads, dtype=np.float) self.filtered = np.zeros(self.owner.reads, dtype=np.float) self.derivatives = np.zeros((4, self.owner.reads)) @@ -49,25 +56,42 @@ class Well: """ Apply a filter to the raw data """ - b, a = butter(3, 0.3) - self.filtered = filtfilt(b, a, self.raw) + try: + b, a = butter(3, 0.3) + self.filtered = filtfilt(b, a, self.raw) + except Exception as err: + print('Filtering of raw data failed!', err) def calc_spline(self, y): """ Calculate a spline that represents the smoothed data points """ - t_range = self.owner.temprange - spline = interpolate.InterpolatedUnivariateSpline(t_range, y) - return spline + try: + t_range = self.owner.temprange + spline = interpolate.InterpolatedUnivariateSpline(t_range, y) + return spline + except Exception as err: + print('Calculation of spline failed! ', err) def calc_derivatives(self, spline='filtered'): - for t in self.owner.temprange: - temp = self.splines[spline].derivatives(t) - for i in range(4): - self.derivatives[i, t - self.owner.t1] = temp[i] + """ + Calculates derivatives of a function, representing the raw data. + Defaults to using the filtered raw data. + """ + try: + # iterate over temperature range (x values) + for t in self.owner.temprange: + temp = self.splines[spline].derivatives(t) + for i in range(4): + self.derivatives[i, t - self.owner.t1] = temp[i] + except Exception as err: + print('Calculation of derivatives failed!', err) @staticmethod def calc_baseline(y): + """ + Calculate baseline of the well. Used for peak identification. + """ try: baseline = peakutils.baseline(y) return baseline @@ -85,36 +109,49 @@ class Well: # First assume that the well is denatured self.owner.denatured_wells.append(self) + # If a temperatur cutoff has been set and the minimum/maximum + # temperature values of the provided data is not within that range, + # cut data accordingly. if (self.owner.tm_cutoff_low != self.owner.t1 or self.owner.tm_cutoff_high != self.owner.t1): x = np.arange(self.owner.tm_cutoff_low, self.owner.tm_cutoff_high + 1, self.owner.dt, dtype=np.dtype(np.float)) + # Otherwise use the whole temperature range of the data + else: + x = self.owner.temprange - x = self.owner.temprange + # Use second derivative as y y = self.derivatives[1] + # Substract baseline from y if self.baseline_correction: y = y - self.baseline + # Run peak finding; return NaN in case of error try: peak_indexes = peakutils.indexes(y, thres=0.3) # loop over results to find maximum value for peak candidates - max_y = None - max_i = None + max_y = None # current maximum + max_i = None # index of current maximum for peak in peak_indexes: - if not max_y or y[peak] > max_y: + # if current peak is larger than old maximum and its + # second derivative is positive, replace maximum with + # current peak + if (not max_y or y[peak] > max_y) and y[peak] > 0: max_y = y[peak] max_i = peak - if y[max_i] > 0: - # if value of second derivative is positive, choose identified - # position as peak candidate + # If a global maximum is identified, return use its x value as + # melting temperature + if max_y and max_i: tm = x[max_i] + # if no maximum is found, return NaN else: - return np.NaN # else discard + return np.NaN + except: return np.NaN # In case of error, return no peak @@ -175,19 +212,29 @@ class Well: return denatured def analyze(self): + """ + Analyse data of the well. Takes care of the calculation of derivatives, + fitting of splines to derivatives and calculation of melting point. + """ + # apply signal filter to raw data to filter out some noise self.filter_raw() + # fit a spline to unfiltered and filtered raw data self.splines["raw"] = self.calc_spline(self.raw) self.splines["filtered"] = self.calc_spline(self.filtered) - + # calculate derivatives of filtered data self.calc_derivatives() + # if baseline correction is requested, calculate baseline if self.baseline_correction: self.baseline = self.calc_baseline(self.derivatives[1]) + # do an initial check if data suggest denaturation if self.is_denatured(): + # if appropriate, append well to denatured wells of the plate self.owner.denatured_wells.append(self) - + # fit a spline to the first derivative self.splines["derivative1"] = self.calc_spline(self.derivatives[1]) - + # calculate and set melting point self.tm = self.calc_tm() + # fallback: set melting point to NaN if self.tm is None: self.tm = np.NaN @@ -216,6 +263,7 @@ class Experiment: self.signal_threshold = signal_threshold self.avg_plate = None self.baseline_correction = baseline_correction + # use cuttoff if provided, otherwise cut at edges if cutoff_low: self.tm_cutoff_low = cutoff_low else: @@ -224,13 +272,16 @@ class Experiment: self.tm_cutoff_high = cutoff_high else: self.tm_cutoff_high = self.t2 + # use a specified color range, if provided, otherwise set to None if color_range: self.color_range = color_range else: self.color_range = None - + # Initialize self.plates as empty list. New plates will be added to + # this list self.plates = [] + # populate self.plates with data in provided files list i = 1 for file in files: plate = Plate(type=self.type, owner=self, filename=file, @@ -242,6 +293,8 @@ class Experiment: plate.id = i self.plates.append(plate) i += 1 + # if more than one file is provied, assume that those are replicates + # and add a special plate representing the average results if len(files) > 1: self.avg_plate = Plate(type=self.type, owner=self, filename=None, t1=self.t1, t2=self.t2, dt=self.dt, @@ -253,22 +306,32 @@ class Experiment: self.avg_plate.id = 'average' def analyze(self): + """ + Triggers analyzation of plates. + """ for plate in self.plates: plate.analyze(gui=self.gui) - + # if more than one plate is present, calculate average values for the + # merged average plate if len(self.plates) > 1: - + # iterate over all wells in a plate for i in range(self.wellnum): tmp = [] + # iterate over all plates for plate in self.plates: tm = plate.wells[i].tm self.avg_plate.wells[i].name = plate.wells[i].name if plate.wells[i] not in plate.denatured_wells: + # if well is not denatured, add to collection of tm + # values tmp.append(tm) if len(tmp) > 0: + # if at least one tm is added, calculate average + # and standard deviation self.avg_plate.wells[i].tm = np.mean(tmp) self.avg_plate.wells[i].tm_sd = np.std(tmp) else: + # otherwise add to denatured wells append_well = self.avg_plate.wells[i] self.avg_plate.denatured_wells.append(append_well)