diff --git a/peakdetect.py b/peakdetect.py deleted file mode 100644 index 5add7e8..0000000 --- a/peakdetect.py +++ /dev/null @@ -1,735 +0,0 @@ -#!/usr/bin/env python2 -import numpy as np -from math import pi, log -import pylab -from scipy import fft, ifft -from scipy.optimize import curve_fit - -i = 10000 -x = np.linspace(0, 3.5 * pi, i) -y = (0.3*np.sin(x) + np.sin(1.3 * x) + 0.9 * np.sin(4.2 * x) + 0.06 * - np.random.randn(i)) - - -def _datacheck_peakdetect(x_axis, y_axis): - if x_axis is None: - x_axis = range(len(y_axis)) - - if len(y_axis) != len(x_axis): - raise (ValueError, - 'Input vectors y_axis and x_axis must have same length') - - #needs to be a numpy array - y_axis = np.array(y_axis) - x_axis = np.array(x_axis) - return x_axis, y_axis - -def _peakdetect_parabole_fitter(raw_peaks, x_axis, y_axis, points): - """ - Performs the actual parabole fitting for the peakdetect_parabole function. - - keyword arguments: - raw_peaks -- A list of either the maximium or the minimum peaks, as given - by the peakdetect_zero_crossing function, with index used as x-axis - x_axis -- A numpy list of all the x values - y_axis -- A numpy list of all the y values - points -- How many points around the peak should be used during curve - fitting, must be odd. - - return -- A list giving all the peaks and the fitted waveform, format: - [[x, y, [fitted_x, fitted_y]]] - - """ - func = lambda x, k, tau, m: k * ((x - tau) ** 2) + m - fitted_peaks = [] - for peak in raw_peaks: - index = peak[0] - x_data = x_axis[index - points // 2: index + points // 2 + 1] - y_data = y_axis[index - points // 2: index + points // 2 + 1] - # get a first approximation of tau (peak position in time) - tau = x_axis[index] - # get a first approximation of peak amplitude - m = peak[1] - - # build list of approximations - # k = -m as first approximation? - p0 = (-m, tau, m) - popt, pcov = curve_fit(func, x_data, y_data, p0) - # retrieve tau and m i.e x and y value of peak - x, y = popt[1:3] - - # create a high resolution data set for the fitted waveform - x2 = np.linspace(x_data[0], x_data[-1], points * 10) - y2 = func(x2, *popt) - - fitted_peaks.append([x, y, [x2, y2]]) - - return fitted_peaks - - -def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0): - """ - Converted from/based on a MATLAB script at: - http://billauer.co.il/peakdet.html - - function for detecting local maximas and minmias in a signal. - Discovers peaks by searching for values which are surrounded by lower - or larger values for maximas and minimas respectively - - keyword arguments: - y_axis -- A list containg the signal over which to find peaks - x_axis -- (optional) A x-axis whose values correspond to the y_axis list - and is used in the return to specify the postion of the peaks. If - omitted an index of the y_axis is used. (default: None) - lookahead -- (optional) distance to look ahead from a peak candidate to - determine if it is the actual peak (default: 200) - '(sample / period) / f' where '4 >= f >= 1.25' might be a good value - delta -- (optional) this specifies a minimum difference between a peak and - the following points, before a peak may be considered a peak. Useful - to hinder the function from picking up false peaks towards to end of - the signal. To work well delta should be set to delta >= RMSnoise * 5. - (default: 0) - delta function causes a 20% decrease in speed, when omitted - Correctly used it can double the speed of the function - - return -- two lists [max_peaks, min_peaks] containing the positive and - negative peaks respectively. Each cell of the lists contains a tupple - of: (position, peak_value) - to get the average peak value do: np.mean(max_peaks, 0)[1] on the - results to unpack one of the lists into x, y coordinates do: - x, y = zip(*tab) - """ - max_peaks = [] - min_peaks = [] - dump = [] #Used to pop the first hit which almost always is false - - # check input data - x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) - # store data length for later use - length = len(y_axis) - - - #perform some checks - if lookahead < 1: - raise ValueError, "Lookahead must be '1' or above in value" - if not (np.isscalar(delta) and delta >= 0): - raise ValueError, "delta must be a positive number" - - #maxima and minima candidates are temporarily stored in - #mx and mn respectively - mn, mx = np.Inf, -np.Inf - - #Only detect peak if there is 'lookahead' amount of points after it - for index, (x, y) in enumerate(zip(x_axis[:-lookahead], - y_axis[:-lookahead])): - if y > mx: - mx = y - mxpos = x - if y < mn: - mn = y - mnpos = x - - ####look for max#### - if y < mx-delta and mx != np.Inf: - #Maxima peak candidate found - #look ahead in signal to ensure that this is a peak and not jitter - if y_axis[index:index+lookahead].max() < mx: - max_peaks.append([mxpos, mx]) - dump.append(True) - #set algorithm to only find minima now - mx = np.Inf - mn = np.Inf - if index+lookahead >= length: - #end is within lookahead no more peaks can be found - break - continue - #else: #slows shit down this does - # mx = ahead - # mxpos = x_axis[np.where(y_axis[index:index+lookahead]==mx)] - - ####look for min#### - if y > mn+delta and mn != -np.Inf: - #Minima peak candidate found - #look ahead in signal to ensure that this is a peak and not jitter - if y_axis[index:index+lookahead].min() > mn: - min_peaks.append([mnpos, mn]) - dump.append(False) - #set algorithm to only find maxima now - mn = -np.Inf - mx = -np.Inf - if index+lookahead >= length: - #end is within lookahead no more peaks can be found - break - #else: #slows shit down this does - # mn = ahead - # mnpos = x_axis[np.where(y_axis[index:index+lookahead]==mn)] - - - #Remove the false hit on the first value of the y_axis - try: - if dump[0]: - max_peaks.pop(0) - else: - min_peaks.pop(0) - del dump - except IndexError: - #no peaks were found, should the function return empty lists? - pass - - return [max_peaks, min_peaks] - - -def peakdetect_fft(y_axis, x_axis, pad_len = 5): - """ - Performs a FFT calculation on the data and zero-pads the results to - increase the time domain resolution after performing the inverse fft and - send the data to the 'peakdetect' function for peak - detection. - - Omitting the x_axis is forbidden as it would make the resulting x_axis - value silly if it was returned as the index 50.234 or similar. - - Will find at least 1 less peak then the 'peakdetect_zero_crossing' - function, but should result in a more precise value of the peak as - resolution has been increased. Some peaks are lost in an attempt to - minimize spectral leakage by calculating the fft between two zero - crossings for n amount of signal periods. - - The biggest time eater in this function is the ifft and thereafter it's - the 'peakdetect' function which takes only half the time of the ifft. - Speed improvementd could include to check if 2**n points could be used for - fft and ifft or change the 'peakdetect' to the 'peakdetect_zero_crossing', - which is maybe 10 times faster than 'peakdetct'. The pro of 'peakdetect' - is that it resutls in one less lost peak. It should also be noted that the - time used by the ifft function can change greatly depending on the input. - - keyword arguments: - y_axis -- A list containg the signal over which to find peaks - x_axis -- A x-axis whose values correspond to the y_axis list and is used - in the return to specify the postion of the peaks. - pad_len -- (optional) By how many times the time resolution should be - increased by, e.g. 1 doubles the resolution. The amount is rounded up - to the nearest 2 ** n amount (default: 5) - - return -- two lists [max_peaks, min_peaks] containing the positive and - negative peaks respectively. Each cell of the lists contains a tupple - of: (position, peak_value) - to get the average peak value do: np.mean(max_peaks, 0)[1] on the - results to unpack one of the lists into x, y coordinates do: - x, y = zip(*tab) - """ - # check input data - x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) - zero_indices = zero_crossings(y_axis, window = 11) - #select a n amount of periods - last_indice = - 1 - (1 - len(zero_indices) & 1) - # Calculate the fft between the first and last zero crossing - # this method could be ignored if the begining and the end of the signal - # are discardable as any errors induced from not using whole periods - # should mainly manifest in the beginning and the end of the signal, but - # not in the rest of the signal - fft_data = fft(y_axis[zero_indices[0]:zero_indices[last_indice]]) - padd = lambda x, c: x[:len(x) // 2] + [0] * c + x[len(x) // 2:] - n = lambda x: int(log(x)/log(2)) + 1 - # padds to 2**n amount of samples - fft_padded = padd(list(fft_data), 2 ** - n(len(fft_data) * pad_len) - len(fft_data)) - - # There is amplitude decrease directly proportional to the sample increase - sf = len(fft_padded) / float(len(fft_data)) - # There might be a leakage giving the result an imaginary component - # Return only the real component - y_axis_ifft = ifft(fft_padded).real * sf #(pad_len + 1) - x_axis_ifft = np.linspace( - x_axis[zero_indices[0]], x_axis[zero_indices[last_indice]], - len(y_axis_ifft)) - # get the peaks to the interpolated waveform - max_peaks, min_peaks = peakdetect(y_axis_ifft, x_axis_ifft, 500, - delta = abs(np.diff(y_axis).max() * 2)) - #max_peaks, min_peaks = peakdetect_zero_crossing(y_axis_ifft, x_axis_ifft) - - # store one 20th of a period as waveform data - data_len = int(np.diff(zero_indices).mean()) / 10 - data_len += 1 - data_len & 1 - - - fitted_wave = [] - for peaks in [max_peaks, min_peaks]: - peak_fit_tmp = [] - index = 0 - for peak in peaks: - index = np.where(x_axis_ifft[index:]==peak[0])[0][0] + index - x_fit_lim = x_axis_ifft[index - data_len // 2: - index + data_len // 2 + 1] - y_fit_lim = y_axis_ifft[index - data_len // 2: - index + data_len // 2 + 1] - - peak_fit_tmp.append([x_fit_lim, y_fit_lim]) - fitted_wave.append(peak_fit_tmp) - - #pylab.plot(range(len(fft_data)), fft_data) - #pylab.show() - - #pylab.plot(x_axis, y_axis) - #pylab.hold(True) - #pylab.plot(x_axis_ifft, y_axis_ifft) - #for max_p in max_peaks: - # pylab.plot(max_p[0], max_p[1], 'xr') - #pylab.show() - return [max_peaks, min_peaks] - - -def peakdetect_parabole(y_axis, x_axis, points = 9): - """ - Function for detecting local maximas and minmias in a signal. - Discovers peaks by fitting the model function: y = k (x - tau) ** 2 + m - to the peaks. The amount of points used in the fitting is set by the - points argument. - - Omitting the x_axis is forbidden as it would make the resulting x_axis - value silly if it was returned as index 50.234 or similar. - - will find the same amount of peaks as the 'peakdetect_zero_crossing' - function, but might result in a more precise value of the peak. - - keyword arguments: - y_axis -- A list containg the signal over which to find peaks - x_axis -- A x-axis whose values correspond to the y_axis list and is used - in the return to specify the postion of the peaks. - points -- (optional) How many points around the peak should be used during - curve fitting, must be odd (default: 9) - - return -- two lists [max_peaks, min_peaks] containing the positive and - negative peaks respectively. Each cell of the lists contains a list - of: (position, peak_value) - to get the average peak value do: np.mean(max_peaks, 0)[1] on the - results to unpack one of the lists into x, y coordinates do: - x, y = zip(*max_peaks) - """ - # check input data - x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) - # make the points argument odd - points += 1 - points % 2 - #points += 1 - int(points) & 1 slower when int conversion needed - - # get raw peaks - max_raw, min_raw = peakdetect_zero_crossing(y_axis) - - # define output variable - max_peaks = [] - min_peaks = [] - - max_ = _peakdetect_parabole_fitter(max_raw, x_axis, y_axis, points) - min_ = _peakdetect_parabole_fitter(min_raw, x_axis, y_axis, points) - - max_peaks = map(lambda x: [x[0], x[1]], max_) - max_fitted = map(lambda x: x[-1], max_) - min_peaks = map(lambda x: [x[0], x[1]], min_) - min_fitted = map(lambda x: x[-1], min_) - - - #pylab.plot(x_axis, y_axis) - #pylab.hold(True) - #for max_p, max_f in zip(max_peaks, max_fitted): - # pylab.plot(max_p[0], max_p[1], 'x') - # pylab.plot(max_f[0], max_f[1], 'o', markersize = 2) - #for min_p, min_f in zip(min_peaks, min_fitted): - # pylab.plot(min_p[0], min_p[1], 'x') - # pylab.plot(min_f[0], min_f[1], 'o', markersize = 2) - #pylab.show() - - return [max_peaks, min_peaks] - - -def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False): - """ - Function for detecting local maximas and minmias in a signal. - Discovers peaks by fitting the model function: - y = A * sin(2 * pi * f * x - tau) to the peaks. The amount of points used - in the fitting is set by the points argument. - - Omitting the x_axis is forbidden as it would make the resulting x_axis - value silly if it was returned as index 50.234 or similar. - - will find the same amount of peaks as the 'peakdetect_zero_crossing' - function, but might result in a more precise value of the peak. - - The function might have some problems if the sine wave has a - non-negligible total angle i.e. a k*x component, as this messes with the - internal offset calculation of the peaks, might be fixed by fitting a - k * x + m function to the peaks for offset calculation. - - keyword arguments: - y_axis -- A list containg the signal over which to find peaks - x_axis -- A x-axis whose values correspond to the y_axis list and is used - in the return to specify the postion of the peaks. - points -- (optional) How many points around the peak should be used during - curve fitting, must be odd (default: 9) - lock_frequency -- (optional) Specifies if the frequency argument of the - model function should be locked to the value calculated from the raw - peaks or if optimization process may tinker with it. (default: False) - - return -- two lists [max_peaks, min_peaks] containing the positive and - negative peaks respectively. Each cell of the lists contains a tupple - of: (position, peak_value) - to get the average peak value do: np.mean(max_peaks, 0)[1] on the - results to unpack one of the lists into x, y coordinates do: - x, y = zip(*tab) - """ - # check input data - x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) - # make the points argument odd - points += 1 - points % 2 - #points += 1 - int(points) & 1 slower when int conversion needed - - # get raw peaks - max_raw, min_raw = peakdetect_zero_crossing(y_axis) - - # define output variable - max_peaks = [] - min_peaks = [] - - # get global offset - offset = np.mean([np.mean(max_raw, 0)[1], np.mean(min_raw, 0)[1]]) - # fitting a k * x + m function to the peaks might be better - #offset_func = lambda x, k, m: k * x + m - - # calculate an approximate frequenzy of the signal - Hz = [] - for raw in [max_raw, min_raw]: - if len(raw) > 1: - peak_pos = [x_axis[index] for index in zip(*raw)[0]] - Hz.append(np.mean(np.diff(peak_pos))) - Hz = 1 / np.mean(Hz) - - # model function - # if cosine is used then tau could equal the x position of the peak - # if sine were to be used then tau would be the first zero crossing - if lock_frequency: - func = lambda x, A, tau: A * np.sin(2 * pi * Hz * (x - tau) + pi / 2) - else: - func = lambda x, A, Hz, tau: A * np.sin(2 * pi * Hz * (x - tau) + - pi / 2) - #func = lambda x, A, Hz, tau: A * np.cos(2 * pi * Hz * (x - tau)) - - - #get peaks - fitted_peaks = [] - for raw_peaks in [max_raw, min_raw]: - peak_data = [] - for peak in raw_peaks: - index = peak[0] - x_data = x_axis[index - points // 2: index + points // 2 + 1] - y_data = y_axis[index - points // 2: index + points // 2 + 1] - # get a first approximation of tau (peak position in time) - tau = x_axis[index] - # get a first approximation of peak amplitude - A = peak[1] - - # build list of approximations - if lock_frequency: - p0 = (A, tau) - else: - p0 = (A, Hz, tau) - - # subtract offset from waveshape - y_data -= offset - popt, pcov = curve_fit(func, x_data, y_data, p0) - # retrieve tau and A i.e x and y value of peak - x = popt[-1] - y = popt[0] - - # create a high resolution data set for the fitted waveform - x2 = np.linspace(x_data[0], x_data[-1], points * 10) - y2 = func(x2, *popt) - - # add the offset to the results - y += offset - y2 += offset - y_data += offset - - peak_data.append([x, y, [x2, y2]]) - - fitted_peaks.append(peak_data) - - # structure date for output - max_peaks = map(lambda x: [x[0], x[1]], fitted_peaks[0]) - max_fitted = map(lambda x: x[-1], fitted_peaks[0]) - min_peaks = map(lambda x: [x[0], x[1]], fitted_peaks[1]) - min_fitted = map(lambda x: x[-1], fitted_peaks[1]) - - - #pylab.plot(x_axis, y_axis) - #pylab.hold(True) - #for max_p, max_f in zip(max_peaks, max_fitted): - # pylab.plot(max_p[0], max_p[1], 'x') - # pylab.plot(max_f[0], max_f[1], 'o', markersize = 2) - #for min_p, min_f in zip(min_peaks, min_fitted): - # pylab.plot(min_p[0], min_p[1], 'x') - # pylab.plot(min_f[0], min_f[1], 'o', markersize = 2) - #pylab.show() - - return [max_peaks, min_peaks] - - -def peakdetect_sine_locked(y_axis, x_axis, points = 9): - """ - Convinience function for calling the 'peakdetect_sine' function with - the lock_frequency argument as True. - - keyword arguments: - y_axis -- A list containg the signal over which to find peaks - x_axis -- A x-axis whose values correspond to the y_axis list and is used - in the return to specify the postion of the peaks. - points -- (optional) How many points around the peak should be used during - curve fitting, must be odd (default: 9) - - return -- see 'peakdetect_sine' - """ - return peakdetect_sine(y_axis, x_axis, points, True) - - -def peakdetect_zero_crossing(y_axis, x_axis = None, window = 11): - """ - Function for detecting local maximas and minmias in a signal. - Discovers peaks by dividing the signal into bins and retrieving the - maximum and minimum value of each the even and odd bins respectively. - Division into bins is performed by smoothing the curve and finding the - zero crossings. - - Suitable for repeatable signals, where some noise is tolerated. Excecutes - faster than 'peakdetect', although this function will break if the offset - of the signal is too large. It should also be noted that the first and - last peak will probably not be found, as this function only can find peaks - between the first and last zero crossing. - - keyword arguments: - y_axis -- A list containg the signal over which to find peaks - x_axis -- (optional) A x-axis whose values correspond to the y_axis list - and is used in the return to specify the postion of the peaks. If - omitted an index of the y_axis is used. (default: None) - window -- the dimension of the smoothing window; should be an odd integer - (default: 11) - - return -- two lists [max_peaks, min_peaks] containing the positive and - negative peaks respectively. Each cell of the lists contains a tupple - of: (position, peak_value) - to get the average peak value do: np.mean(max_peaks, 0)[1] on the - results to unpack one of the lists into x, y coordinates do: - x, y = zip(*tab) - """ - # check input data - x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) - - zero_indices = zero_crossings(y_axis, window = window) - period_lengths = np.diff(zero_indices) - - bins_y = [y_axis[index:index + diff] for index, diff in - zip(zero_indices, period_lengths)] - bins_x = [x_axis[index:index + diff] for index, diff in - zip(zero_indices, period_lengths)] - - even_bins_y = bins_y[::2] - odd_bins_y = bins_y[1::2] - even_bins_x = bins_x[::2] - odd_bins_x = bins_x[1::2] - hi_peaks_x = [] - lo_peaks_x = [] - - #check if even bin contains maxima - if abs(even_bins_y[0].max()) > abs(even_bins_y[0].min()): - hi_peaks = [bin.max() for bin in even_bins_y] - lo_peaks = [bin.min() for bin in odd_bins_y] - # get x values for peak - for bin_x, bin_y, peak in zip(even_bins_x, even_bins_y, hi_peaks): - hi_peaks_x.append(bin_x[np.where(bin_y==peak)[0][0]]) - for bin_x, bin_y, peak in zip(odd_bins_x, odd_bins_y, lo_peaks): - lo_peaks_x.append(bin_x[np.where(bin_y==peak)[0][0]]) - else: - hi_peaks = [bin.max() for bin in odd_bins_y] - lo_peaks = [bin.min() for bin in even_bins_y] - # get x values for peak - for bin_x, bin_y, peak in zip(odd_bins_x, odd_bins_y, hi_peaks): - hi_peaks_x.append(bin_x[np.where(bin_y==peak)[0][0]]) - for bin_x, bin_y, peak in zip(even_bins_x, even_bins_y, lo_peaks): - lo_peaks_x.append(bin_x[np.where(bin_y==peak)[0][0]]) - - max_peaks = [[x, y] for x,y in zip(hi_peaks_x, hi_peaks)] - min_peaks = [[x, y] for x,y in zip(lo_peaks_x, lo_peaks)] - - return [max_peaks, min_peaks] - - -def _smooth(x, window_len=11, window='hanning'): - """ - smooth the data using a window of the requested size. - - This method is based on the convolution of a scaled window on the signal. - The signal is prepared by introducing reflected copies of the signal - (with the window size) in both ends so that transient parts are minimized - in the begining and end part of the output signal. - - input: - x: the input signal - window_len: the dimension of the smoothing window; should be an odd - integer - window: the type of window from 'flat', 'hanning', 'hamming', - 'bartlett', 'blackman' - flat window will produce a moving average smoothing. - - output: - the smoothed signal - - example: - - t = linspace(-2,2,0.1) - x = sin(t)+randn(len(t))*0.1 - y = _smooth(x) - - see also: - - numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, - numpy.convolve, scipy.signal.lfilter - - TODO: the window parameter could be the window itself if a list instead of - a string - """ - if x.ndim != 1: - raise ValueError, "smooth only accepts 1 dimension arrays." - - if x.size < window_len: - raise ValueError, "Input vector needs to be bigger than window size." - - if window_len<3: - return x - - if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']: - raise(ValueError, - "Window is not one of '{0}', '{1}', '{2}', '{3}', '{4}'".format( - *('flat', 'hanning', 'hamming', 'bartlett', 'blackman'))) - - s = np.r_[x[window_len-1:0:-1], x, x[-1:-window_len:-1]] - #print(len(s)) - if window == 'flat': #moving average - w = np.ones(window_len,'d') - else: - w = eval('np.' + window + '(window_len)') - - y = np.convolve(w / w.sum(), s, mode = 'valid') - return y - - -def zero_crossings(y_axis, window = 11): - """ - Algorithm to find zero crossings. Smoothens the curve and finds the - zero-crossings by looking for a sign change. - - - keyword arguments: - y_axis -- A list containg the signal over which to find zero-crossings - window -- the dimension of the smoothing window; should be an odd integer - (default: 11) - - return -- the index for each zero-crossing - """ - # smooth the curve - length = len(y_axis) - x_axis = np.asarray(range(length), int) - - # discard tail of smoothed signal - y_axis = _smooth(y_axis, window)[:length] - zero_crossings = np.where(np.diff(np.sign(y_axis)))[0] - indices = [x_axis[index] for index in zero_crossings] - - # check if zero-crossings are valid - diff = np.diff(indices) - if diff.std() / diff.mean() > 0.2: - print diff.std() / diff.mean() - print np.diff(indices) - raise(ValueError, - "False zero-crossings found, indicates problem {0} or {1}".format( - "with smoothing window", "problem with offset")) - # check if any zero crossings were found - if len(zero_crossings) < 1: - raise(ValueError, "No zero crossings found") - - return indices - # used this to test the fft function's sensitivity to spectral leakage - #return indices + np.asarray(30 * np.random.randn(len(indices)), int) - -############################Frequency calculation############################# -# diff = np.diff(indices) -# time_p_period = diff.mean() -# -# if diff.std() / time_p_period > 0.1: -# raise ValueError, -# "smoothing window too small, false zero-crossing found" -# -# #return frequency -# return 1.0 / time_p_period -############################################################################## - - - - - -def _test_zero(): - _max, _min = peakdetect_zero_crossing(y,x) -def _test(): - _max, _min = peakdetect(y,x, delta=0.30) - - -def _test_graph(): - i = 10000 - x = np.linspace(0,3.7*pi,i) - y = (0.3*np.sin(x) + np.sin(1.3 * x) + 0.9 * np.sin(4.2 * x) + 0.06 * - np.random.randn(i)) - y *= -1 - x = range(i) - - _max, _min = peakdetect(y,x,750, 0.30) - xm = [p[0] for p in _max] - ym = [p[1] for p in _max] - xn = [p[0] for p in _min] - yn = [p[1] for p in _min] - - plot = pylab.plot(x,y) - pylab.hold(True) - pylab.plot(xm, ym, 'r+') - pylab.plot(xn, yn, 'g+') - - _max, _min = peak_det_bad.peakdetect(y, 0.7, x) - xm = [p[0] for p in _max] - ym = [p[1] for p in _max] - xn = [p[0] for p in _min] - yn = [p[1] for p in _min] - pylab.plot(xm, ym, 'y*') - pylab.plot(xn, yn, 'k*') - pylab.show() - - - -if __name__ == "__main__": - from math import pi - import pylab - - i = 10000 - x = np.linspace(0,3.7*pi,i) - y = (0.3*np.sin(x) + np.sin(1.3 * x) + 0.9 * np.sin(4.2 * x) + 0.06 * - np.random.randn(i)) - y *= -1 - - #_max, _min = peakdetect(y, x, 750, 0.30) - _max, _min = peakdetect(y, x, lookahead=1000) - xm = [p[0] for p in _max] - ym = [p[1] for p in _max] - xn = [p[0] for p in _min] - yn = [p[1] for p in _min] - - plot = pylab.plot(x, y) - pylab.hold(True) - pylab.plot(xm, ym, 'r+') - pylab.plot(xn, yn, 'g+') - - - pylab.show() diff --git a/pydsf.py b/pydsf.py index a900857..98a51e8 100644 --- a/pydsf.py +++ b/pydsf.py @@ -8,10 +8,16 @@ try: mpl.use('Qt4Agg') import matplotlib.ticker as ticker import matplotlib.pyplot as plt + import matplotlib.patches as mpatches except ImportError: raise ImportError('----- Matplotlib must be installed. -----') -from peakdetect import * +try: + import peakutils +except ImportError: + raise ImportError('----- PeakUtils must be installed. -----') + +#from peakdetect import * try: import numpy as np @@ -35,8 +41,10 @@ class Well: self.splines = {"raw": None, "filtered": None, "derivative1": None} - self.tm = None - self.tm_sd = None + self.tm = np.NaN + self.tm_sd = np.NaN + self.baseline_correction = owner.baseline_correction + self.baseline = None def filter_raw(self): """ @@ -57,38 +65,51 @@ class Well: temp = self.splines[spline].derivatives(t) for i in range(4): self.derivatives[i, t - self.owner.t1] = temp[i] + + def calc_baseline(self, y): + try: + baseline = peakutils.baseline(y) + return baseline + except: + return np.NaN def calc_tm(self): """ - Calculate the Tm of the well. Returns either the Tm or 'None'. + Calculate the Tm of the well. Returns either the Tm or 'np.NaN'. """ - # Check if the well has already been flagged as denatured if self in self.owner.denatured_wells: - return None # Return 'None' if true - + return np.NaN # Return 'NaN' if true + # First assume that the well is denatured self.owner.denatured_wells.append(self) + if self.owner.tm_cutoff_low != self.owner.t1 or self.owner.tm_cutoff_high != self.owner.t1: + x = np.arange(self.owner.tm_cutoff_low, self.owner.tm_cutoff_high + 1, self.owner.dt, dtype=float) + x = self.owner.temprange y = self.derivatives[1] - lookahead = len(x)/4 # Amount of data points to look around potential peaks to verify + + if self.baseline_correction: + y = y - self.baseline + try: - peaks = peakdetect(y, x, lookahead) # Run peak finding algorithm + peak_indexes = peakutils.indexes(y, min_dist=len(x)) # calculate a rough estimate of peaks; set min_dist + # temperature range to only find one/the highest peak + tm = peakutils.interpolate(x, y, ind=peak_indexes)[0] # increase resolution by fitting gaussian function + # to peak except: - return None # In case of error, return no peak + return np.NaN # In case of error, return no peak try: - for i in peaks[0]: # Iterate over all peaks - tm = i[0] # Highest peak found is the first in list # Check if the peak is within cutoff range if tm and tm >= self.owner.tm_cutoff_low and tm <= self.owner.tm_cutoff_high: self.owner.denatured_wells.remove(self) # If everything is fine, remove the denatured flag - return tm # and return the Tm + return tm # and return the Tm else: - return None # otherwise, return no Tm + return np.NaN # otherwise, return NaN except: - return None # In case of error, return nothing + return np.NaN # In case of error, return NaN def is_denatured(self): """ @@ -131,16 +152,19 @@ class Well: self.splines["filtered"] = self.calc_spline(self.filtered) self.calc_derivatives() + if self.baseline_correction: + self.baseline = self.calc_baseline(self.derivatives[1]) if self.is_denatured(): self.owner.denatured_wells.append(self) self.splines["derivative1"] = self.calc_spline(self.derivatives[1]) - + self.tm = self.calc_tm() - #print(self.tm) + if self.tm is None: + self.tm = np.NaN class Experiment: - def __init__(self, type, gui=None, files=None, replicates=None, t1=25, t2=95, dt=1, cols=12, rows=8, cutoff_low=None, cutoff_high=None, signal_threshold=None, color_range=None): + def __init__(self, type, gui=None, files=None, replicates=None, t1=25, t2=95, dt=1, cols=12, rows=8, cutoff_low=None, cutoff_high=None, signal_threshold=None, color_range=None, baseline_correction=False): self.replicates = replicates self.cols = cols self.rows = rows @@ -159,6 +183,7 @@ class Experiment: self.gui=gui self.signal_threshold = signal_threshold self.avg_plate = None + self.baseline_correction=baseline_correction if cutoff_low: self.tm_cutoff_low = cutoff_low else: @@ -176,12 +201,12 @@ class Experiment: i = 1 for file in files: - plate = Plate(type=self.type, filename=file, t1=self.t1, t2=self.t2, dt=self.dt, cols=self.cols, rows=self.rows, cutoff_low=self.tm_cutoff_low, cutoff_high=self.tm_cutoff_high, signal_threshold=self.signal_threshold, color_range=self.color_range) + plate = Plate(type=self.type, owner=self, filename=file, t1=self.t1, t2=self.t2, dt=self.dt, cols=self.cols, rows=self.rows, cutoff_low=self.tm_cutoff_low, cutoff_high=self.tm_cutoff_high, signal_threshold=self.signal_threshold, color_range=self.color_range) plate.id = i self.plates.append(plate) i += 1 if len(files) > 1: - self.avg_plate = Plate(type=self.type, filename=None, t1=self.t1, t2=self.t2, dt=self.dt, cols=self.cols, rows=self.rows, cutoff_low=self.tm_cutoff_low, cutoff_high=self.tm_cutoff_high, signal_threshold=self.signal_threshold, color_range=self.color_range) + self.avg_plate = Plate(type=self.type, owner=self, filename=None, t1=self.t1, t2=self.t2, dt=self.dt, cols=self.cols, rows=self.rows, cutoff_low=self.tm_cutoff_low, cutoff_high=self.tm_cutoff_high, signal_threshold=self.signal_threshold, color_range=self.color_range) self.avg_plate.id = 'average' def analyze(self): @@ -211,12 +236,22 @@ class Experiment: class Plate: - def __init__(self, type, id=None, filename=None, replicates=None, t1=25, t2=95, dt=1, cols=12, rows=8, cutoff_low=None, cutoff_high=None, signal_threshold=None, color_range=None): + def __init__(self, type, owner, id=None, filename=None, replicates=None, t1=None, t2=None, dt=None, cols=12, rows=8, cutoff_low=None, cutoff_high=None, signal_threshold=None, color_range=None): self.cols = cols self.rows = rows - self.t1 = t1 - self.t2 = t2 - self.dt = dt + self.owner = owner + if t1: + self.t1 = t1 + else: + self.t1 = owner.t1 + if t1: + self.t2 = t2 + else: + self.t2 = owner.t2 + if t1: + self.dt = dt + else: + self.dt = owner.dt self.temprange = np.arange(self.t1, self.t2 + 1, self.dt, dtype=float) self.reads = int(round((t2 + 1 - t1) / dt)) self.wellnum = self.cols * self.rows @@ -228,6 +263,7 @@ class Plate: self.replicates = None self.signal_threshold = signal_threshold self.id = id + self.baseline_correction = owner.baseline_correction if cutoff_low: self.tm_cutoff_low = cutoff_low else: @@ -271,7 +307,7 @@ class Plate: self.wells[i].name = row[read] self.wells[i].raw = temp i += 1 - + def analyze(self, gui=None): try: # Try to access data file in the given path @@ -298,10 +334,10 @@ class Plate: if self.replicates: if self.replicates == 'rows': - print "rows" + print("rows") if self.replicates == 'cols': - print "cols" - + print("cols") + #print(self.tms) self.max_tm = max(self.tms) self.min_tm = min(self.tms) @@ -309,7 +345,7 @@ class Plate: with open(filename, 'w') as f: f.write('#{:<4s}{:>13s}\n'.format('ID', '"Tm [°C]"')) for well in self.wells: - if well.tm == None or well in self.denatured_wells: + if np.isnan(well.tm) or well in self.denatured_wells: f.write('{:<5s}{:>12s}\n'.format(well.name, 'NaN')) else: f.write('{:<5s}{:>12s}\n'.format(well.name, str(well.tm))) @@ -318,7 +354,7 @@ class Plate: with open(filename, 'w') as f: f.write('#{:<4s}{:>13s}{:>13s}\n'.format('"ID"', '"Tm [°C]"', '"SD"')) for well in self.wells: - if well.tm == None or well in self.denatured_wells: + if np.isnan(well.tm) or well in self.denatured_wells: f.write('{:<5s}{:>12s}{:>12s}\n'.format(well.name, 'NaN', 'NaN')) else: f.write('{:<5s}{:>12s}{:>12s}\n'.format(well.name, str(well.tm), str(well.tm_sd))) @@ -373,6 +409,11 @@ class Plate: f.write('{:>-15.3f}'.format(float(np.round(d, decimals=3)))) f.write('\n') i += 1 + + # TODO: Implement 'write_baseline_corrected_table() + + def write_baseline_corrected_table(self, filename): + raise NotImplementedError def update_progress_bar(bar, value): @@ -431,6 +472,9 @@ def plot_tm_heatmap_single(plate, gui=None): x_values = [] # Array holding the columns y_values = [] # Array holding the rows c_values = [] # Array holding the color values aka Tm + dx_values = [] + dy_values = [] + dc_values = [] for well in plate.wells: # Iterate over all wells if well not in plate.denatured_wells: # Check if well is denatured (no Tm found) c = well.tm # If not, set color to Tm @@ -440,6 +484,8 @@ def plot_tm_heatmap_single(plate, gui=None): c = plate.tm_cutoff_high # If it is, set color to cutoff else: # If the plate is denatured c = plate.tm_cutoff_low # Set its color to the low cutoff + dx_values.append(x) + dy_values.append(y) x_values.append(x) # Add values to the respective arrays y_values.append(y) c_values.append(c) @@ -448,25 +494,31 @@ def plot_tm_heatmap_single(plate, gui=None): x = 1 # reset column to one y += 1 # and increase row by one - fig1 = plt.figure() # new figure + fig1 = plt.figure() # new figure ax1 = fig1.add_subplot(1, 1, 1) # A single canvas ax1.autoscale(tight=True) # Scale plate size ax1.xaxis.set_major_locator(ticker.MaxNLocator(plate.cols + 1)) # n columns ax1.yaxis.set_major_locator(ticker.MaxNLocator(plate.rows + 1)) # n rows if plate.color_range: - cax = ax1.scatter(x_values, y_values, s=300, c=c_values, marker='s', vmin=plate.color_range[0], vmax=plate.color_range[1]) # plot wells and color using the colormap + cax = ax1.scatter(x_values, y_values, s=305, c=c_values, marker='s', vmin=plate.color_range[0], vmax=plate.color_range[1]) # plot wells and color using the colormap else: - cax = ax1.scatter(x_values, y_values, s=300, c=c_values, marker='s') # plot wells and color using the colormap + cax = ax1.scatter(x_values, y_values, s=305, c=c_values, marker='s') # plot wells and color using the colormap + + cax2 = ax1.scatter(dx_values, dy_values, s=80, c='white', marker='x', linewidths=(1.5,)) ax1.invert_yaxis() # invert y axis to math plate layout cbar = fig1.colorbar(cax) # show colorbar ax1.set_xlabel('Columns') # set axis and colorbar label ax1.set_ylabel('Rows') + if str(plate.id) == 'average': title = '$T_m$ heatmap (average)' else: title = '$T_m$ heatmap (plate #{})'.format(str(plate.id)) ax1.set_title(title) cbar.set_label(u"Temperature [°C]") + + #magenta_patch = mpatches.Patch(color='magenta', label='Denatured') + #fig1.legend([magenta_patch], 'Denatured', loc='lower right', bbox_to_anchor=[0.5, 0.5]) # if gui: # update_progress_bar(gui.pb, 50) @@ -492,14 +544,18 @@ def plot_derivative(plate, gui=None): ax.set_ylabel('dI/dT', size='xx-small') x = plate.temprange # set values for the x axis to the given temperature range - y = well.derivatives[1] # grab y values from the first derivative of the well + if well.baseline_correction: + print(well.baseline) + y = well.derivatives[1] - well.baseline + else: + y = well.derivatives[1] # grab y values from the first derivative of the well ax.xaxis.set_major_locator(ticker.MaxNLocator(4)) # only show three tickmarks on both axes ax.yaxis.set_major_locator(ticker.MaxNLocator(4)) if well not in plate.denatured_wells: # check if well is denatured (without determined Tm) tm = well.tm # if not, grab its Tm else: - tm = None # else set Tm to None + tm = np.NaN # else set Tm to np.NaN if tm: ax.axvline(x=tm) # plot vertical line at the Tm ax.axvspan(plate.t1, plate.tm_cutoff_low, facecolor='0.8', alpha=0.5) # shade lower cutoff area diff --git a/ui/Ui_mainwindow.py b/ui/Ui_mainwindow.py index b9e23dd..314e071 100644 --- a/ui/Ui_mainwindow.py +++ b/ui/Ui_mainwindow.py @@ -258,7 +258,7 @@ class Ui_MainWindow(object): self.actionAbout.setText(QtGui.QApplication.translate("MainWindow", "About", None, QtGui.QApplication.UnicodeUTF8)) self.actionAbout_Qt.setText(QtGui.QApplication.translate("MainWindow", "About Qt", None, QtGui.QApplication.UnicodeUTF8)) -import icons_rc +#import icons_rc if __name__ == "__main__": import sys diff --git a/ui/__pycache__/Ui_mainwindow.cpython-34.pyc b/ui/__pycache__/Ui_mainwindow.cpython-34.pyc new file mode 100644 index 0000000..11ae995 Binary files /dev/null and b/ui/__pycache__/Ui_mainwindow.cpython-34.pyc differ diff --git a/ui/__pycache__/__init__.cpython-34.pyc b/ui/__pycache__/__init__.cpython-34.pyc new file mode 100644 index 0000000..a6bec46 Binary files /dev/null and b/ui/__pycache__/__init__.cpython-34.pyc differ diff --git a/ui/__pycache__/mainwindow.cpython-34.pyc b/ui/__pycache__/mainwindow.cpython-34.pyc new file mode 100644 index 0000000..1169324 Binary files /dev/null and b/ui/__pycache__/mainwindow.cpython-34.pyc differ diff --git a/ui/mainwindow.py b/ui/mainwindow.py index fefe5a3..fde0f24 100644 --- a/ui/mainwindow.py +++ b/ui/mainwindow.py @@ -80,7 +80,7 @@ class MainWindow(QMainWindow, Ui_MainWindow): if self.groupBox_signal_threshold.isChecked(): signal_threshold = self.spinBox_signal_threshold.value() - items = (self.listWidget_data.item(i) for i in xrange(self.listWidget_data.count())) + items = (self.listWidget_data.item(i) for i in range(self.listWidget_data.count())) files = [] for item in items: