1
0
Fork 0
mirror of https://github.com/Athemis/PyDSF.git synced 2025-04-05 14:46:03 +00:00

Adding comments; Proper error handling

This commit is contained in:
Alexander Minges 2015-07-07 14:12:06 +02:00
parent a5eb072d4a
commit a28142fa1e

115
pydsf.py
View file

@ -1,7 +1,11 @@
#! /usr/bin/env python #! /usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Import csv library; Part of python standard libs
import csv import csv
# Import 3rd party packages. Check if installed and die with error message
# if not.
try: try:
import matplotlib as mpl import matplotlib as mpl
import mpl_toolkits.axes_grid1 import mpl_toolkits.axes_grid1
@ -30,10 +34,13 @@ except ImportError:
class Well: class Well:
"""
def __init__(self, owner): Represents a well in a microtiter plate.
Owned by an object of type 'Plate'.
"""
def __init__(self, owner, name=None):
self.owner = owner self.owner = owner
self.name = None self.name = name
self.raw = np.zeros(self.owner.reads, dtype=np.float) self.raw = np.zeros(self.owner.reads, dtype=np.float)
self.filtered = np.zeros(self.owner.reads, dtype=np.float) self.filtered = np.zeros(self.owner.reads, dtype=np.float)
self.derivatives = np.zeros((4, self.owner.reads)) self.derivatives = np.zeros((4, self.owner.reads))
@ -49,25 +56,42 @@ class Well:
""" """
Apply a filter to the raw data Apply a filter to the raw data
""" """
b, a = butter(3, 0.3) try:
self.filtered = filtfilt(b, a, self.raw) b, a = butter(3, 0.3)
self.filtered = filtfilt(b, a, self.raw)
except Exception as err:
print('Filtering of raw data failed!', err)
def calc_spline(self, y): def calc_spline(self, y):
""" """
Calculate a spline that represents the smoothed data points Calculate a spline that represents the smoothed data points
""" """
t_range = self.owner.temprange try:
spline = interpolate.InterpolatedUnivariateSpline(t_range, y) t_range = self.owner.temprange
return spline spline = interpolate.InterpolatedUnivariateSpline(t_range, y)
return spline
except Exception as err:
print('Calculation of spline failed! ', err)
def calc_derivatives(self, spline='filtered'): def calc_derivatives(self, spline='filtered'):
for t in self.owner.temprange: """
temp = self.splines[spline].derivatives(t) Calculates derivatives of a function, representing the raw data.
for i in range(4): Defaults to using the filtered raw data.
self.derivatives[i, t - self.owner.t1] = temp[i] """
try:
# iterate over temperature range (x values)
for t in self.owner.temprange:
temp = self.splines[spline].derivatives(t)
for i in range(4):
self.derivatives[i, t - self.owner.t1] = temp[i]
except Exception as err:
print('Calculation of derivatives failed!', err)
@staticmethod @staticmethod
def calc_baseline(y): def calc_baseline(y):
"""
Calculate baseline of the well. Used for peak identification.
"""
try: try:
baseline = peakutils.baseline(y) baseline = peakutils.baseline(y)
return baseline return baseline
@ -85,36 +109,49 @@ class Well:
# First assume that the well is denatured # First assume that the well is denatured
self.owner.denatured_wells.append(self) self.owner.denatured_wells.append(self)
# If a temperatur cutoff has been set and the minimum/maximum
# temperature values of the provided data is not within that range,
# cut data accordingly.
if (self.owner.tm_cutoff_low != self.owner.t1 or if (self.owner.tm_cutoff_low != self.owner.t1 or
self.owner.tm_cutoff_high != self.owner.t1): self.owner.tm_cutoff_high != self.owner.t1):
x = np.arange(self.owner.tm_cutoff_low, x = np.arange(self.owner.tm_cutoff_low,
self.owner.tm_cutoff_high + 1, self.owner.tm_cutoff_high + 1,
self.owner.dt, self.owner.dt,
dtype=np.dtype(np.float)) dtype=np.dtype(np.float))
# Otherwise use the whole temperature range of the data
else:
x = self.owner.temprange
x = self.owner.temprange # Use second derivative as y
y = self.derivatives[1] y = self.derivatives[1]
# Substract baseline from y
if self.baseline_correction: if self.baseline_correction:
y = y - self.baseline y = y - self.baseline
# Run peak finding; return NaN in case of error
try: try:
peak_indexes = peakutils.indexes(y, thres=0.3) peak_indexes = peakutils.indexes(y, thres=0.3)
# loop over results to find maximum value for peak candidates # loop over results to find maximum value for peak candidates
max_y = None max_y = None # current maximum
max_i = None max_i = None # index of current maximum
for peak in peak_indexes: for peak in peak_indexes:
if not max_y or y[peak] > max_y: # if current peak is larger than old maximum and its
# second derivative is positive, replace maximum with
# current peak
if (not max_y or y[peak] > max_y) and y[peak] > 0:
max_y = y[peak] max_y = y[peak]
max_i = peak max_i = peak
if y[max_i] > 0: # If a global maximum is identified, return use its x value as
# if value of second derivative is positive, choose identified # melting temperature
# position as peak candidate if max_y and max_i:
tm = x[max_i] tm = x[max_i]
# if no maximum is found, return NaN
else: else:
return np.NaN # else discard return np.NaN
except: except:
return np.NaN # In case of error, return no peak return np.NaN # In case of error, return no peak
@ -175,19 +212,29 @@ class Well:
return denatured return denatured
def analyze(self): def analyze(self):
"""
Analyse data of the well. Takes care of the calculation of derivatives,
fitting of splines to derivatives and calculation of melting point.
"""
# apply signal filter to raw data to filter out some noise
self.filter_raw() self.filter_raw()
# fit a spline to unfiltered and filtered raw data
self.splines["raw"] = self.calc_spline(self.raw) self.splines["raw"] = self.calc_spline(self.raw)
self.splines["filtered"] = self.calc_spline(self.filtered) self.splines["filtered"] = self.calc_spline(self.filtered)
# calculate derivatives of filtered data
self.calc_derivatives() self.calc_derivatives()
# if baseline correction is requested, calculate baseline
if self.baseline_correction: if self.baseline_correction:
self.baseline = self.calc_baseline(self.derivatives[1]) self.baseline = self.calc_baseline(self.derivatives[1])
# do an initial check if data suggest denaturation
if self.is_denatured(): if self.is_denatured():
# if appropriate, append well to denatured wells of the plate
self.owner.denatured_wells.append(self) self.owner.denatured_wells.append(self)
# fit a spline to the first derivative
self.splines["derivative1"] = self.calc_spline(self.derivatives[1]) self.splines["derivative1"] = self.calc_spline(self.derivatives[1])
# calculate and set melting point
self.tm = self.calc_tm() self.tm = self.calc_tm()
# fallback: set melting point to NaN
if self.tm is None: if self.tm is None:
self.tm = np.NaN self.tm = np.NaN
@ -216,6 +263,7 @@ class Experiment:
self.signal_threshold = signal_threshold self.signal_threshold = signal_threshold
self.avg_plate = None self.avg_plate = None
self.baseline_correction = baseline_correction self.baseline_correction = baseline_correction
# use cuttoff if provided, otherwise cut at edges
if cutoff_low: if cutoff_low:
self.tm_cutoff_low = cutoff_low self.tm_cutoff_low = cutoff_low
else: else:
@ -224,13 +272,16 @@ class Experiment:
self.tm_cutoff_high = cutoff_high self.tm_cutoff_high = cutoff_high
else: else:
self.tm_cutoff_high = self.t2 self.tm_cutoff_high = self.t2
# use a specified color range, if provided, otherwise set to None
if color_range: if color_range:
self.color_range = color_range self.color_range = color_range
else: else:
self.color_range = None self.color_range = None
# Initialize self.plates as empty list. New plates will be added to
# this list
self.plates = [] self.plates = []
# populate self.plates with data in provided files list
i = 1 i = 1
for file in files: for file in files:
plate = Plate(type=self.type, owner=self, filename=file, plate = Plate(type=self.type, owner=self, filename=file,
@ -242,6 +293,8 @@ class Experiment:
plate.id = i plate.id = i
self.plates.append(plate) self.plates.append(plate)
i += 1 i += 1
# if more than one file is provied, assume that those are replicates
# and add a special plate representing the average results
if len(files) > 1: if len(files) > 1:
self.avg_plate = Plate(type=self.type, owner=self, filename=None, self.avg_plate = Plate(type=self.type, owner=self, filename=None,
t1=self.t1, t2=self.t2, dt=self.dt, t1=self.t1, t2=self.t2, dt=self.dt,
@ -253,22 +306,32 @@ class Experiment:
self.avg_plate.id = 'average' self.avg_plate.id = 'average'
def analyze(self): def analyze(self):
"""
Triggers analyzation of plates.
"""
for plate in self.plates: for plate in self.plates:
plate.analyze(gui=self.gui) plate.analyze(gui=self.gui)
# if more than one plate is present, calculate average values for the
# merged average plate
if len(self.plates) > 1: if len(self.plates) > 1:
# iterate over all wells in a plate
for i in range(self.wellnum): for i in range(self.wellnum):
tmp = [] tmp = []
# iterate over all plates
for plate in self.plates: for plate in self.plates:
tm = plate.wells[i].tm tm = plate.wells[i].tm
self.avg_plate.wells[i].name = plate.wells[i].name self.avg_plate.wells[i].name = plate.wells[i].name
if plate.wells[i] not in plate.denatured_wells: if plate.wells[i] not in plate.denatured_wells:
# if well is not denatured, add to collection of tm
# values
tmp.append(tm) tmp.append(tm)
if len(tmp) > 0: if len(tmp) > 0:
# if at least one tm is added, calculate average
# and standard deviation
self.avg_plate.wells[i].tm = np.mean(tmp) self.avg_plate.wells[i].tm = np.mean(tmp)
self.avg_plate.wells[i].tm_sd = np.std(tmp) self.avg_plate.wells[i].tm_sd = np.std(tmp)
else: else:
# otherwise add to denatured wells
append_well = self.avg_plate.wells[i] append_well = self.avg_plate.wells[i]
self.avg_plate.denatured_wells.append(append_well) self.avg_plate.denatured_wells.append(append_well)