Source code for ppcpy.calibration.rayleighfit


import logging
import numpy as np
from scipy.ndimage import uniform_filter1d

#import fastrdp

[docs] def rayleighfit(data_cube): """ """ # TODO ist data.distance0 and height the same? https://github.com/PollyNET/Pollynet_Processing_Chain/blob/e413f9254094ff2c0a18fcdac4e9bebb5385d526/lib/preprocess/pollyPreprocess.m#L299 height = data_cube.retrievals_highres['range'] logging.warning(f'rayleighfit seems to use range in matlab, but the met data should be in height >> RECHECK!') logging.warning(f'at 10km height this is a difference of about 4 indices') config_dict = data_cube.polly_config_dict refH = [None for i in range(len(data_cube.clFreeGrps))] if not data_cube.polly_config_dict['flagUseManualRefH']: for i, cldFree in enumerate(data_cube.clFreeGrps): print(i, cldFree) refH_cldFree = {} for wv, t, tel in [(532, 'total', 'FR'), (355, 'total', 'FR'), (1064, 'total', 'FR')]: if np.any(data_cube.gf(wv, 'total', 'FR')): print(f'refH for {wv}') rcs = np.squeeze(data_cube.retrievals_profile['RCS'][i,:,data_cube.gf(wv, t, tel)]) sig = np.squeeze(data_cube.retrievals_profile['sigBGCor'][i,:,data_cube.gf(wv, t, tel)]) bg = np.squeeze(data_cube.retrievals_profile['BG'][i,data_cube.gf(wv, t, tel)]) mSig = ( data_cube.mol_profiles[f'mBsc_{wv}'][i,:] * \ np.exp(-2 * np.cumsum(data_cube.mol_profiles[f'mExt_{wv}'][i,:] * np.concatenate(([height[0]], np.diff(height)))))) scaRatio = rcs/mSig #print(config_dict['minDecomLogDist532'], config_dict['heightFullOverlap'], config_dict['maxDecomHeight532'], # config_dict['maxDecomThickness532'], config_dict['decomSmoothWin532']) # p.Results.minDecomLogDist, p.Results.heightFullOverlap, p.Results.maxDecomHeight, p.Results.maxDecomThickness, p.Results.decomSmWin); DPInd = DouglasPeucker( scaRatio, height, config_dict[f'minDecomLogDist{wv}'], np.array(config_dict['heightFullOverlap'])[data_cube.gf(wv, t, tel)], config_dict[f'maxDecomHeight{wv}'], config_dict[f'maxDecomThickness{wv}'], config_dict[f'decomSmoothWin{wv}'] ) #print('DPInd', DPInd) #DPInd = [133, 332, 333, 533, 675, 875, 1075, 1274, 1275, 1333] #print('!!!! overwrite DPInd !!!!!!!!!!!!!!!!!!!!') #print('DPInd', DPInd) #p.Results.minRefThickness, p.Results.minRefDeltaExt, p.Results.minRefSNR, flagShowDetail refHInd = fit_profile( height, rcs, sig, bg, mSig, DPInd, config_dict[f'minRefThickness{wv}'], config_dict[f'minRefDeltaExt{wv}'], config_dict[f'minRefSNR{wv}'], flagShowDetail=False) print('refHInd', refHInd) # for debugging reasons #return rcs, mSig, scaRatio # calculate PCR after averaging or average directly PCR -> PCR from average above was cross checked # with matlab. Above 15km, the relative difference increased to ~ 3% # continue at https://github.com/PollyNET/Pollynet_Processing_Chain/blob/b3b8ec7726b75d9db6287dcba29459587ca34491/lib/interface/picassoProcV3.m#L788 refH_cldFree[f"{wv}_{t}_{tel}"] = { 'DPInd': DPInd, 'refHInd': refHInd } else: logging.warning(f"No channel for rayleigh fit {wv}_{t}_{tel}") refH[i] = refH_cldFree else: # manual RefH logging.info(f"manual reference height") for i, cldFree in enumerate(data_cube.clFreeGrps): print(i, cldFree) refH_cldFree = {} for wv, t, tel in [(532, 'total', 'FR'), (355, 'total', 'FR'), (1064, 'total', 'FR')]: refBInd = int(np.searchsorted(height, config_dict[f'refH_{tel}_{wv}'][0])) refTInd = int(np.searchsorted(height, config_dict[f'refH_{tel}_{wv}'][1])) print(wv, t, tel, refBInd, refTInd) refH_cldFree[f"{wv}_{t}_{tel}"] = { 'DPInd': [], 'refHInd': [refBInd, refTInd] } refH[i] = refH_cldFree return refH
[docs] def smooth_signal(signal, window_len): return uniform_filter1d(signal, size=window_len, mode='nearest')
[docs] def DouglasPeucker(signal, height, epsilon, heightBase, heightTop, maxHThick, window_size=1): """ Simplify signal according to Douglas-Peucker algorithm. Parameters: signal (array): Molecule corrected signal. [MHz] height (array): Height. [m] epsilon (float): Maximum distance. heightBase (float): Minimum height for the algorithm. [m] heightTop (float): Maximum height for the algorithm. [m] maxHThick (float): Maximum spatial thickness of each segment. [m] window_size (int): Size of the average smooth window. Returns: sigIndx (array): Index of the signal that stands for different segments of the signal. References: https://en.wikipedia.org/wiki/Ramer%E2%80%93Douglas%E2%80%93Peucker_algorithm History: - 2017-12-29: First edition by Zhenping. - 2018-07-29: Add the height range for the searching instead of SNR restriction. - 2018-07-31: Add the maxHThick argument to control the maximum thickness of each output segment. - 2024-12-20: Direct translated from matlab with ai """ #print('height', height[:30], 'epsilon', epsilon, 'height', heightBase, heightTop, 'maxHThick', maxHThick, 'window_size', window_size) # Input check if len(signal) != len(height): raise ValueError("signal and height must have the same length.") # Find the boundary for implementing Douglas-Peucker method hBaseIndx = np.argmax((height[:-1] - heightBase) * (height[1:] - heightBase) <= 0) + 1 hTopIndx = np.argmax((height[:-1] - heightTop) * (height[1:] - heightTop) <= 0) + 1 #print('hBaseIndx', hBaseIndx, height[hBaseIndx]) #print('hTopIndx', hTopIndx, height[hTopIndx]) if hBaseIndx == 0: hBaseIndx = 1 if hTopIndx == 0: hTopIndx = len(height) if hTopIndx <= hBaseIndx: return [] # Smooth the signal signalTmp = smooth_signal(signal[hBaseIndx:hTopIndx+1], window_size) heightTmp = height[hBaseIndx:hTopIndx+1] posIndx = np.where(signalTmp > 0)[0] signalTmp = signalTmp[posIndx] heightTmp = heightTmp[posIndx] if len(signalTmp) < 2: return np.array([1, hTopIndx - hBaseIndx + 1]) pointList = [[heightTmp[i], np.log(signalTmp[i])] for i in range(len(signalTmp))] #for i in range(len(signalTmp))[:10]: # print(i, heightTmp[i], np.log(signalTmp[i]), signalTmp[i]) # print(pointList[i]) sigIndx = DP_algorithm(pointList, epsilon, maxHThick) return posIndx[sigIndx] + hBaseIndx - 1
[docs] def DP_algorithm(pointList, epsilon, maxHThick): """ Recursive implementation of the Douglas-Peucker algorithm. Parameters: pointList (list): List of points [[x1, y1], [x2, y2], ...]. epsilon (float): Maximum distance. maxHThick (float): Maximum thickness for each segment. Returns: sigIndx (list): Indices of simplified points. """ if len(pointList) == 1: #print('pointList == 1') return [0] elif len(pointList) == 2: #print('pointList == 2') return [0, 1] dMax = 0 index = 0 thickness = pointList[-1][0] - pointList[0][0] #print('thickness', thickness) for i in range(1, len(pointList) - 1): d = my_dist(pointList[i], pointList[0], pointList[-1]) if d > dMax: index = i dMax = d #print('dMax', dMax) if dMax > epsilon: #print('dMax > epsilon') recResult1 = DP_algorithm(pointList[:index + 1], epsilon, maxHThick) recResult2 = DP_algorithm(pointList[index:], epsilon, maxHThick) return recResult1[:-1] + [x + index for x in recResult2] elif thickness > maxHThick: #print('thickness > maxHThick') for i in range(1, len(pointList) - 1): if pointList[i][0] - pointList[0][0] >= maxHThick: break recResult1 = DP_algorithm(pointList[:i + 1], epsilon, maxHThick) recResult2 = DP_algorithm(pointList[i:], epsilon, maxHThick) return recResult1[:-1] + [x + i for x in recResult2] else: return [0, len(pointList) - 1]
[docs] def my_dist(pointM, pointS, pointE): """ Calculate the perpendicular distance between pointM and the line connecting pointS and pointE. Parameters: pointM (list): Middle point [x, y]. pointS (list): Start point [x, y]. pointE (list): End point [x, y]. Returns: d (float): Distance. """ num = abs(pointM[1] - pointS[1] + (pointS[1] - pointE[1]) / (pointS[0] - pointE[0]) * (pointS[0] - pointM[0])) den = np.sqrt(1 + ((pointS[1] - pointE[1]) / (pointS[0] - pointE[0])) ** 2) return num / den
from scipy.special import gammaincc
[docs] def chi2fit(x, y, measure_error): """ CHI2FIT Chi-2 fitting. All the code are translated from the exemplified code in Numerical Recipes in C (2nd Edition). Great help comes from Birgit Heese. USAGE: a, b, sigmaA, sigmaB, chi2, Q = chi2fit(x, y, measure_error) INPUTS: x: array The length of x should be larger than 1. y: array The measured signal. measure_error: array Measurement errors for the y values. OUTPUTS: a: float Intercept of the linear regression. b: float Slope of the linear regression. sigmaA: float Uncertainty of the intercept. sigmaB: float Uncertainty of the slope. chi2: float Chi-square value. Q: float Goodness of fit. HISTORY: - 2018-08-03: First edition by Zhenping. Authors: - zhenping@tropos.de """ if len(x) != len(y): raise ValueError("Array lengths of x and y must agree.") if len(y) != len(measure_error): raise ValueError("Array lengths of y and measure_error must agree.") if np.sum(measure_error) > 0: valid_indices = (~np.isnan(y)) & (~np.isnan(x)) & (measure_error != 0) else: measure_error = np.ones_like(measure_error) valid_indices = (~np.isnan(y)) & (~np.isnan(x)) xN = x[valid_indices] yN = y[valid_indices] measure_errorN = measure_error[valid_indices] # Initialize the outputs a, b, sigmaA, sigmaB, chi2, Q = [np.nan] * 6 if xN.size <= 1: # Not enough data for chi2 regression return a, b, sigmaA, sigmaB, chi2, Q S = np.sum(1 / measure_errorN**2) Sx = np.sum(xN / measure_errorN**2) Sy = np.sum(yN / measure_errorN**2) Sxx = np.sum(xN**2 / measure_errorN**2) Sxy = np.sum(xN * yN / measure_errorN**2) Delta = S * Sxx - Sx**2 a = (Sxx * Sy - Sx * Sxy) / Delta b = (S * Sxy - Sx * Sy) / Delta sigmaA = np.sqrt(Sxx / Delta) sigmaB = np.sqrt(S / Delta) chi2 = np.sum(((yN - a - b * xN) / measure_errorN)**2) Q = gammaincc((len(xN) - 2) / 2, chi2 / 2) # Complemented gamma function for goodness of fit return a, b, sigmaA, sigmaB, chi2, Q
[docs] def fit_profile(height, sig_aer, pc, bg, sig_mol, dpIndx, layerThickConstrain, slopeConstrain, SNRConstrain, flagShowDetail=False): #def rayleighfit(height, sig_aer, snr, sig_mol, dpIndx, layerThickConstrain, # slopeConstrain, SNRConstrain, flagShowDetail=False): """ Search the clean region with rayleigh fit algorithm. Parameters ---------- height : array_like height [m] sig_aer : array_like range corrected signal pc : array_like photon count signal bg : array_like background sig_mol : array_like range corrected molecular signal dpIndx : array_like index of the region calculated by Douglas-Peucker algorithm layerThickConstrain : float constrain for the reference layer thickness [m] slopeConstrain : float constrain for the uncertainty of the regressed extinction coefficient SNRConstrain : float minimum SNR for the signal at the reference height flagShowDetail : bool, optional if True, calculation information will be printed (default: False) Returns ------- tuple (hBIndx, hTIndx) - indices of bottom and top of searched region Returns (nan, nan) if region not found """ import numpy as np from scipy.stats import norm if len([height, sig_aer, pc, bg, sig_mol, dpIndx]) < 6: #if len([height, sig_aer, pc, bg, sig_mol, dpIndx]) < 6: raise ValueError('Not enough inputs.') if not (isinstance(sig_aer, (list, np.ndarray)) and isinstance(sig_mol, (list, np.ndarray))): raise ValueError('sig_aer and sig_mol must be 1-dimensional array') if dpIndx is None or len(dpIndx) == 0: print('Warning: dpIndx is empty') return np.nan, np.nan # parameter initialize numTest = 0 hIndxT_Test = np.full(len(dpIndx), np.nan) hIndxB_Test = np.full(len(dpIndx), np.nan) mean_resid = np.full(len(dpIndx), np.nan) std_resid = np.full(len(dpIndx), np.nan) slope_resid = np.full(len(dpIndx), np.nan) msre_resid = np.full(len(dpIndx), np.nan) Astat = np.full(len(dpIndx), np.nan) SNR_ref = np.full(len(dpIndx), np.nan) # search for the qualified region for iIndx in range(len(dpIndx) - 1): test1 = test2 = test3 = test4 = test5 = True iDpBIndx = dpIndx[iIndx] iDpTIndx = dpIndx[iIndx + 1] + 1 # matlab slicing issue?? #print(iIndx, iDpBIndx, iDpTIndx) # check layer thickness if not ((height[iDpTIndx] - height[iDpBIndx]) > layerThickConstrain): if flagShowDetail: print(f'Region {iIndx}: {height[iDpBIndx]} - {height[iDpTIndx]} ' f'is less than {layerThickConstrain:5.1f}m') continue # normalize the recorded signal to the molecular signal if np.sum(sig_aer[iDpBIndx:iDpTIndx]) == 0: continue sig_factor = np.nanmean(sig_mol[iDpBIndx:iDpTIndx]) / \ np.nanmean(sig_aer[iDpBIndx:iDpTIndx]) #print('sig factor ', sig_factor) sig_aer_norm = sig_aer * sig_factor std_aer_norm = sig_aer_norm / np.sqrt(pc + bg) #print('sig_aer_norm ', sig_aer_norm.shape, sig_aer_norm[iDpBIndx:iDpTIndx]) #print('std_aer_norm ', std_aer_norm.shape, std_aer_norm[iDpBIndx:iDpTIndx]) # Quality test 2: near and far - range cross criteria winLen = int(layerThickConstrain / (height[1] - height[0])) if winLen <= 0: # print('Warning: layerThickConstrain is too small.') winLen = 5 for jIndx in range(dpIndx[0], dpIndx[-1] - winLen, winLen): slice_range = slice(jIndx, jIndx + winLen) deltaSig_aer = np.nanstd(sig_aer_norm[slice_range]) meanSig_aer = np.nanmean(sig_aer_norm[slice_range]) meanSig_mol = np.nanmean(sig_mol[slice_range]) if not ((meanSig_aer + deltaSig_aer/3) >= meanSig_mol): if flagShowDetail: print(f'Region {iIndx}: {height[iDpBIndx]} - ' f'{height[iDpTIndx]} fails in near and far-Range ' 'cross test.') test2 = False break if not test2: continue # Quality test 3: white-noise criterion #print('white noise criterion ', iDpBIndx, iDpTIndx) residual = (sig_aer_norm[iDpBIndx:iDpTIndx] - sig_mol[iDpBIndx:iDpTIndx]) #print(sig_aer_norm[iDpBIndx], sig_mol[iDpBIndx], sig_aer_norm[iDpBIndx]-sig_mol[iDpBIndx]) #print(sig_aer_norm[iDpBIndx+1], sig_mol[iDpBIndx+1], sig_aer_norm[iDpBIndx+1]-sig_mol[iDpBIndx+1]) #print(sig_aer_norm[iDpBIndx+2], sig_mol[iDpBIndx+2], sig_aer_norm[iDpBIndx+2]-sig_mol[iDpBIndx+2]) #print(sig_aer_norm[iDpBIndx+3], sig_mol[iDpBIndx+3], sig_aer_norm[iDpBIndx+3]-sig_mol[iDpBIndx+3]) #print('residual ', residual.shape, residual) x = height[iDpBIndx:iDpTIndx] / 1e3 if len(residual) <= 10: if flagShowDetail: print(f'Region {iIndx}: signal is too noisy.') test3 = False #continue # Note: chi2fit implementation needed here #print('white noise chi2fit input', np.nanmean(x), np.nanmean(residual), np.nanmean(std_aer_norm[iDpBIndx:iDpTIndx])) thisIntersect, thisSlope, _, _, _, _ = chi2fit( x, residual, std_aer_norm[iDpBIndx:iDpTIndx]) #print('white noise chi2fit ', thisIntersect, thisSlope) residual_fit = thisIntersect + thisSlope * x et = residual - residual_fit d = np.sum(np.diff(et)**2) / np.sum(et**2) if not (1 <= d <= 3): if flagShowDetail: print(f'Region {iIndx}: {height[iDpBIndx]} - ' f'{height[iDpTIndx]} fails in white-noise criterion.') test3 = False #continue # Quality test 4: SNR check sigsum = np.nansum(pc[dpIndx[iIndx]:dpIndx[iIndx + 1]]) # adaption needed for the background not given as a profile bgsum = bg * (dpIndx[iIndx + 1] - dpIndx[iIndx]) SNR = sigsum / np.sqrt(sigsum + 2*bgsum) #print('SNR', sigsum, '/', bgsum, '=', SNR) if SNR < SNRConstrain: if flagShowDetail: print(f'Region {iIndx}: {height[iDpBIndx]} - ' f'{height[iDpTIndx]} fails in SNR criterion.') test4 = False #continue # Quality test 5: slope check x = height[iDpBIndx:iDpTIndx] yTmp_aer = sig_aer_norm[iDpBIndx:iDpTIndx] yTmp_mol = sig_mol[iDpBIndx:iDpTIndx] std_yTmp_aer = std_aer_norm[iDpBIndx:iDpTIndx] mask = yTmp_aer > 0 y_aer = yTmp_aer[mask] y_mol = yTmp_mol[mask] std_y_aer = std_yTmp_aer[mask] x = x[mask] std_y_aer = std_y_aer / y_aer y_aer = np.log(y_aer) y_mol = np.log(y_mol) if len(y_aer) <= 10: if flagShowDetail: print(f'Region {iIndx}: signal is too noisy.') test5 = False #continue _, aerSlope, _, deltaAerSlope, _, _ = chi2fit(x, y_aer, std_y_aer) #print('aer chi2fit', aerSlope, deltaAerSlope) _, molSlope, _, deltaMolSlope, _, _ = chi2fit(x, y_mol, np.zeros_like(x)) #print('mol chi2fit', molSlope, deltaMolSlope) slope_condition = (molSlope <= (aerSlope + (deltaAerSlope + deltaMolSlope) * slopeConstrain) and molSlope >= (aerSlope - (deltaAerSlope + deltaMolSlope) * slopeConstrain)) if not slope_condition: if flagShowDetail: print(f'Slope_aer: {aerSlope}, delta_Slope_aer: {deltaAerSlope}, ' f'Slope_mol: {molSlope}') print(f'Region {iIndx}: {height[iDpBIndx]} - ' f'{height[iDpTIndx]} fails in slope test.') test5 = False #continue if not (test1 and test2 and test3 and test4 and test5): print("one tests failed?") continue # save statistics numTest += 1 hIndxB_Test[numTest-1] = dpIndx[iIndx] hIndxT_Test[numTest-1] = dpIndx[iIndx + 1] mean_resid[numTest-1] = np.nanmean(residual) std_resid[numTest-1] = np.nanstd(residual) slope_resid[numTest-1] = thisSlope msre_resid[numTest-1] = np.sum(et**2) SNR_ref[numTest-1] = SNR # Anderson Darling test normP = norm.pdf((residual - mean_resid[numTest-1]) / std_resid[numTest-1]) indices = np.arange(1, len(residual) + 1) A = np.sum((2 * indices - 1) * np.log(normP) + (2 * (len(residual) - indices) + 1) * np.log(1 - normP)) A = -len(residual) - A/len(residual) Astat[numTest-1] = A * (1 + 0.75/len(residual) + 2.25/len(residual)**2) if numTest == 0: if flagShowDetail: print('None clean region is found.') return np.nan, np.nan # search the best fit region X_val = (np.abs(mean_resid) * np.abs(std_resid) * np.abs(slope_resid) * np.abs(msre_resid) * np.abs(Astat) / SNR_ref) X_val[X_val == 0] = np.nan indxBest_Int = np.nanargmin(X_val) # print('hIndB_Test', hIndxB_Test) # print('hIndT_Test', hIndxT_Test) # print('mean_resid', mean_resid) # print('std_resid', std_resid) # print('slope_resid', slope_resid) # print('msre_resid', msre_resid) # print('SNR_ref', SNR_ref) # print('Astat', Astat) # print('X_val', X_val) if flagShowDetail: print(f'The best interval is {height[int(hIndxB_Test[indxBest_Int])]} - ' f'{height[int(hIndxT_Test[indxBest_Int])]}') return (int(hIndxB_Test[indxBest_Int]), int(hIndxT_Test[indxBest_Int]))