import logging
import numpy as np
from scipy.stats import norm
from scipy.ndimage import uniform_filter1d
from scipy.special import gammaincc
from ppcpy.misc.helper import uniform_filter, moving_average
#import fastrdp
[docs]
def getVal(array:np.ndarray, indices:list) -> list:
"""Get values of array for a set of indices, including robustness
to NaN indices.
Parameters
----------
height : ndarray
Array to be searched.
indices : arry_like
Indeces of the array, may include NaN values.
Returns
-------
out : list
Values of the array at given indices.
If indx is NaN value is also NaN.
"""
if indices[0] >= indices[1]:
raise ValueError('Invalid index pair.')
indices = np.asarray(indices, dtype=float)
out = np.full(indices.shape, np.nan)
mask = ~np.isnan(indices)
out[mask] = array[indices[mask].astype(int)]
return out.tolist()
[docs]
def rayleighfit(data_cube, collect_debug:bool=False) -> list:
"""Perform Rayleighfit procedure using Douglas Peucker algorithm for each channel and cloud
free period.
Parameters
----------
data_cube : object
Main PicassoProc object.
collect_debug : bool
If true, collects debug information.
Returns
-------
refH : list of dicts
Reference heights per channel per cloud free period.
Notes
-----
- This function uses the Douglas Peucker algorithm to segmentize the signal inside a search range
specified by the config variables 'heightFullOverlap' and 'maxDecomHeight'. These segments are
considered as potential reference height. The final reference height is chosen as the segment
with the best fit to the molecular signal.
- Currently, only the FR reference heights are calculated, while the NR reference heights are
taken from the config files.
"""
# ..TODO:: is data.distance0 and height the same? https://github.com/PollyNET/Pollynet_Processing_Chain/blob/e413f9254094ff2c0a18fcdac4e9bebb5385d526/lib/preprocess/pollyPreprocess.m#L299
# --> data.distance0 equals range, not height. However, Picasso uses a constant 7.5 m height resolution while PicassoPy uses the height resolution form the raw data dict ~ 7.475 m
height = data_cube.retrievals_highres['range']
logging.warning(f'rayleighfit seems to use range in matlab, but the met data should be in height >> RECHECK!')
logging.warning(f'at 10km height this is a difference of about 4 indices')
config_dict = data_cube.polly_config_dict
refH = [None for i in range(len(data_cube.clFreeGrps))]
if not data_cube.polly_config_dict['flagUseManualRefH']:
for i, cldFree in enumerate(data_cube.clFreeGrps):
print(i, cldFree)
refH_cldFree = {}
for wv, t, tel in [(532, 'total', 'FR'), (355, 'total', 'FR'), (1064, 'total', 'FR')]:
if np.any(data_cube.gf(wv, t, tel)):
print(f'refH for {wv}')
# Extract signal
rcs = np.squeeze(data_cube.retrievals_profile['RCS'][i, :, data_cube.gf(wv, t, tel)])
sig = np.squeeze(data_cube.retrievals_profile['sigBGCor'][i, :, data_cube.gf(wv, t, tel)])
bg = np.squeeze(data_cube.retrievals_profile['BG'][i, data_cube.gf(wv, t, tel)])
mSig = (
data_cube.mol_profiles[f'mBsc_{wv}'][i, :] * \
np.exp(-2 * np.cumsum(data_cube.mol_profiles[f'mExt_{wv}'][i, :] * np.concatenate(([height[0]], np.diff(height)))))
)
# ..TODO:: should the molecular signal be calculated with respect to height or range???
# mSig = (
# data_cube.mol_profiles[f'mBsc_{wv}'][i, :] * \
# np.exp(-2 * np.cumsum(data_cube.mol_profiles[f'mExt_{wv}'][i, :] *\
# np.concatenate(([data_cube.retrievals_highres['height'][0]], np.diff(data_cube.retrievals_highres['height'])))))
# )
scaRatio = rcs/mSig
# signal decomposition with Douglas-Peucker algorithm
DPInd = DouglasPeucker(
signal=scaRatio,
height=height,
epsilon=config_dict[f'minDecomLogDist{wv}'],
heightBase=np.array(config_dict['heightFullOverlap'])[data_cube.gf(wv, t, tel)],
heightTop=config_dict[f'maxDecomHeight{wv}'],
maxHThick=config_dict[f'maxDecomThickness{wv}'],
window_size=config_dict[f'decomSmoothWin{wv}'],
showDetails=collect_debug
)
print('DPInd:', DPInd)
# Rayleigh fitting
refInd = fit_profile(
height=height,
sig_aer=rcs,
pc=sig,
bg=bg,
sig_mol=mSig,
dpIndx=DPInd,
layerThickConstrain=config_dict[f'minRefThickness{wv}'],
slopeConstrain=config_dict[f'minRefDeltaExt{wv}'],
SNRConstrain=config_dict[f'minRefSNR{wv}'],
flagShowDetail=collect_debug,
flagPicassoComparison=config_dict['flagPicassoComparison']
)
print('refInd:', refInd)
refHeight = getVal(data_cube.retrievals_highres['height'], refInd)
refRange = getVal(data_cube.retrievals_highres['range'], refInd)
# calculate PCR after averaging or average directly PCR -> PCR from average above was cross checked
# with matlab. Above 15km, the relative difference increased to ~ 3%
# continue at https://github.com/PollyNET/Pollynet_Processing_Chain/blob/b3b8ec7726b75d9db6287dcba29459587ca34491/lib/interface/picassoProcV3.m#L788
refH_cldFree[f"{wv}_{t}_{tel}"] = {
'DPInd': DPInd, 'refInd': refInd, 'refHeight': refHeight, 'refRange':refRange
}
else:
logging.warning(f"No channel for rayleigh fit {wv}_{t}_{tel}")
refH[i] = refH_cldFree
else: # manual RefH
logging.info("Using manualy set reference height")
for wv, t, tel in [(532, 'total', 'FR'), (355, 'total', 'FR'), (1064, 'total', 'FR')]:
if not f'refH_{tel}_{wv}' in config_dict:
raise ValueError('Reference height not correctly set in config file.')
refInd = (
np.searchsorted(
data_cube.retrievals_highres['height'],
config_dict[f'refH_{tel}_{wv}']
) - [0, 1] # - [0, 1] --> make sure all values lay inside the designeted range
).tolist()
refHeight = getVal(data_cube.retrievals_highres['height'], refInd)
refRange = getVal(data_cube.retrievals_highres['range'], refInd)
for i, cldFree in enumerate(data_cube.clFreeGrps):
refH[i][f"{wv}_{t}_{tel}"] = {
'DPInd': [], 'refInd': refInd, 'refHeight': refHeight, 'refRange': refRange
}
# Current methodology is to take NR refH values form the config file.
logging.info("Using Config values for NR refH.")
for wv, t, tel in [(532, 'total', 'NR'), (355, 'total', 'NR')]:
if not f'refH_{tel}_{wv}' in config_dict:
raise ValueError('NR reference height not correctly set in config file.')
refInd = (
np.searchsorted(
data_cube.retrievals_highres['height'],
config_dict[f'refH_{tel}_{wv}']
) - [0, 1] # - [0, 1] --> make sure all values lay inside the designeted range
).tolist()
refHeight = getVal(data_cube.retrievals_highres['height'], refInd)
refRange = getVal(data_cube.retrievals_highres['range'], refInd)
for i in range(len(data_cube.clFreeGrps)):
refH[i][f'{wv}_{t}_{tel}'] = {
'DPInd': [], 'refInd': refInd, 'refHeight': refHeight, 'refRange': refRange
}
return refH
[docs]
def smooth_signal(signal:np.ndarray, window_len:int) -> np.ndarray:
"""Uniformly smooth the input signal
Parameters
----------
singal : ndarray
Signal to be smooth
window_len : int
Width of the applied uniform filter
Returns
-------
ndarray
Smoothed signal
Notes
-----
**History**
- 2026-02-04: Changed from scipy.ndimage.uniform_filter1d to ppcpy.misc.helper.uniform_filter
- 2026-04-17: Changed to ppcpy.misc.helper.moving_average to get values at edges.
"""
# return uniform_filter1d(signal, window_len, mode='nearest')
return moving_average(signal, window_len)
[docs]
def DouglasPeucker(signal:np.ndarray, height:np.ndarray, epsilon:float, heightBase:float,
heightTop:float, maxHThick:float, window_size:int=1, showDetails:bool=False) -> np.ndarray:
"""Simplify signal according to Douglas-Peucker algorithm.
Parameters
----------
signal : ndarray
Molecule corrected signal. [MHz]
height : ndarray
Height. [m]
epsilon : float
Maximum distance.
heightBase : float
Minimum height for the algorithm. [m]
heightTop : float
Maximum height for the algorithm. [m]
maxHThick : float
Maximum spatial thickness of each segment. [m]
window_size : int
Size of the average smooth window. Default is 1.
showDetails : bool
If true, print debug information. Default is False.
Returns
-------
sigIndx : array
Index of the signal that stands for different segments of the signal.
References
----------
https://en.wikipedia.org/wiki/Ramer%E2%80%93Douglas%E2%80%93Peucker_algorithm
Notes
-----
**History**
- 2017-12-29: First edition by Zhenping.
- 2018-07-29: Add the height range for the searching instead of SNR restriction.
- 2018-07-31: Add the maxHThick argument to control the maximum thickness of each output segment.
- 2024-12-20: Direct translated from matlab with ai
"""
#print('height', height[:30], 'epsilon', epsilon, 'height', heightBase, heightTop, 'maxHThick', maxHThick, 'window_size', window_size)
# Input check
if len(signal) != len(height):
raise ValueError("signal and height must have the same length.")
# Find the boundary for implementing Douglas-Peucker method
hBaseIndx = np.argmax((height[:-1] - heightBase) * (height[1:] - heightBase) <= 0) + 1
hTopIndx = np.argmax((height[:-1] - heightTop) * (height[1:] - heightTop) <= 0) + 1
# print('hBaseIndx', hBaseIndx, height[hBaseIndx])
# print('hTopIndx', hTopIndx, height[hTopIndx])
if hBaseIndx == 1: # if np.argmax returns 0
logging.warning(f'The base region can not be found. Set default value to {height[0]}.')
hBaseIndx = 0
if hTopIndx == 1: # if np.argmax returns 0
logging.warning(f'The top region can not be found. Set default value to {height[-1]}.')
hTopIndx = len(height)
if hTopIndx <= hBaseIndx:
logging.warning('Invalid search range settings.')
return []
# Smooth the signal
signalTmp = smooth_signal(signal[hBaseIndx:hTopIndx+1], window_size)
heightTmp = height[hBaseIndx:hTopIndx+1]
posIndx = np.where(signalTmp > 0)[0]
signalTmp = signalTmp[posIndx]
heightTmp = heightTmp[posIndx]
if len(signalTmp) < 2:
return np.array([0, hTopIndx - hBaseIndx])
pointList = [[heightTmp[i], np.log(signalTmp[i])] for i in range(len(signalTmp))]
sigIndx = DP_algorithm(pointList, epsilon, maxHThick, showDetails=showDetails)
return posIndx[sigIndx] + hBaseIndx
[docs]
def DP_algorithm(pointList:list, epsilon:float, maxHThick:float, recursive_depth:int=0, showDetails:bool=False) -> list:
"""Recursive implementation of the Douglas-Peucker algorithm.
Parameters
----------
pointList : list
List of points [[x1, y1], [x2, y2], ...].
epsilon : float
Maximum distance.
maxHThick : float
Maximum thickness for each segment.
showDetails : bool
If True, print debug information. Default is False.
Returns
-------
sigIndx : list
Indices of simplified points.
"""
if len(pointList) == 1:
if showDetails: print(f'Recursion: {recursive_depth}, len(pointList) == 1, returning [0]')
return [0]
elif len(pointList) == 2:
if showDetails: print(f'Recursion: {recursive_depth}, len(pointList) == 2, returning [0, 1]')
return [0, 1]
dMax = 0
index = 0
thickness = pointList[-1][0] - pointList[0][0]
# Find point with maximum distance
for i in range(1, len(pointList) - 1):
d = my_dist(pointList[i], pointList[0], pointList[-1], full=False)
if d > dMax:
index = i
dMax = d
if dMax > epsilon:
recResult1 = DP_algorithm(pointList[:index+1], epsilon, maxHThick, recursive_depth=recursive_depth+1, showDetails=showDetails)
recResult2 = DP_algorithm(pointList[index:], epsilon, maxHThick, recursive_depth=recursive_depth+1, showDetails=showDetails)
if showDetails:
print(f'Recursion: {recursive_depth}, spliting at index = {index}, dmax = {dMax}, returning {recResult1[:-1] + [x + index for x in recResult2]}')
return recResult1[:-1] + [x + index for x in recResult2]
elif thickness > maxHThick:
for i in range(1, len(pointList) - 1):
if pointList[i][0] - pointList[0][0] >= maxHThick:
break
recResult1 = DP_algorithm(pointList[:i+1], epsilon, maxHThick, recursive_depth=recursive_depth+1, showDetails=showDetails)
recResult2 = DP_algorithm(pointList[i:], epsilon, maxHThick, recursive_depth=recursive_depth+1, showDetails=showDetails)
if showDetails:
print(f'Recursion: {recursive_depth}, spliting at index = {i}, returning {recResult1[:-1] + [x + i for x in recResult2]}')
return recResult1[:-1] + [x + i for x in recResult2]
else:
if showDetails: print(f'Recursion: {recursive_depth}, returning (else) values = {[0, len(pointList) - 1]}')
return [0, len(pointList) - 1]
[docs]
def my_dist(pointM:list, pointS:list, pointE:list, full:bool=True) -> float:
"""Calculate the perpendicular distance between pointM and the line
connecting pointS and pointE.
Parameters
----------
pointM : list
Middle point [x, y].
pointS : list
Start point [x, y].
pointE : list
End point [x, y].
full : bool
If true, calculate the full distance, othrewise calculate only the numerator.
Default is True.
Returns
-------
d : float
Distance.
Notes
-----
- If pointS and pointE are constant only the numerator is neede to calculate the
extreme points with respect to pointM.
"""
num = abs(pointM[1] - pointS[1] + (pointS[1] - pointE[1]) / \
(pointS[0] - pointE[0]) * (pointS[0] - pointM[0]))
if full:
den = np.sqrt(1 + ((pointS[1] - pointE[1]) / (pointS[0] - pointE[0])) ** 2)
return num / den
return num
[docs]
def chi2fit(x:np.ndarray, y:np.ndarray, measure_error:np.ndarray) -> tuple:
"""CHI2FIT Chi-2 fitting. All the code are translated from the exemplified code in Numerical
Recipes in C (2nd Edition). Great help comes from Birgit Heese.
Parameters
----------
x : ndarray
The length of x should be larger than 1.
y : ndarray
The measured signal.
measure_error : ndarray
Measurement errors for the y values.
Returns
-------
a : float
Intercept of the linear regression.
b : float
Slope of the linear regression.
sigmaA : float
Uncertainty of the intercept.
sigmaB : float
Uncertainty of the slope.
chi2 : float
Chi-square value.
Q : float
Goodness of fit.
Notes
-----
**History**
- 2018-08-03: First edition by Zhenping.
Examples
--------
``a, b, sigmaA, sigmaB, chi2, Q = chi2fit(x, y, measure_error)``
"""
if len(x) != len(y):
raise ValueError("Array lengths of x and y must agree.")
if len(y) != len(measure_error):
raise ValueError("Array lengths of y and measure_error must agree.")
if np.sum(measure_error) > 0:
valid_indices = (~np.isnan(y)) & (~np.isnan(x)) & (measure_error != 0)
else:
measure_error = np.ones_like(measure_error)
valid_indices = (~np.isnan(y)) & (~np.isnan(x))
xN = x[valid_indices]
yN = y[valid_indices]
measure_errorN = measure_error[valid_indices]
# Initialize the outputs
a, b, sigmaA, sigmaB, chi2, Q = [np.nan] * 6
if xN.size <= 1:
# Not enough data for chi2 regression
return a, b, sigmaA, sigmaB, chi2, Q
S = np.sum(1 / measure_errorN**2)
Sx = np.sum(xN / measure_errorN**2)
Sy = np.sum(yN / measure_errorN**2)
Sxx = np.sum(xN**2 / measure_errorN**2)
Sxy = np.sum(xN * yN / measure_errorN**2)
Delta = S * Sxx - Sx**2
a = (Sxx * Sy - Sx * Sxy) / Delta
b = (S * Sxy - Sx * Sy) / Delta
sigmaA = np.sqrt(Sxx / Delta)
sigmaB = np.sqrt(S / Delta)
chi2 = np.sum(((yN - a - b * xN) / measure_errorN)**2)
Q = gammaincc((len(xN) - 2) / 2, chi2 / 2) # Complemented gamma function for goodness of fit
return a, b, sigmaA, sigmaB, chi2, Q
[docs]
def fit_profile(height:np.ndarray, sig_aer:np.ndarray, pc:np.ndarray, bg:np.ndarray, sig_mol:np.ndarray,
dpIndx:np.ndarray, layerThickConstrain:float, slopeConstrain:float, SNRConstrain:float,
flagShowDetail:bool=False, flagPicassoComparison:bool=False) -> tuple:
#def rayleighfit(height, sig_aer, snr, sig_mol, dpIndx, layerThickConstrain,
# slopeConstrain, SNRConstrain, flagShowDetail=False):
"""Search the clean region with rayleigh fit algorithm.
Parameters
----------
height : array_like
height [m]
sig_aer : array_like
range corrected signal
pc : array_like
photon count signal
bg : array_like
background
sig_mol : array_like
range corrected molecular signal
dpIndx : array_like
index of the region calculated by Douglas-Peucker algorithm
layerThickConstrain : float
constrain for the reference layer thickness [m]
slopeConstrain : float
constrain for the uncertainty of the regressed extinction coefficient
SNRConstrain : float
minimum SNR for the signal at the reference height
flagShowDetail : bool, optional
If true, calculation information will be printed. Default is False.
flagPicassoComparison : bool, optional
If true, use Picasso values and logic. Default is False.
Returns
-------
tuple
(hBIndx, hTIndx) - indices of bottom and top of searched region
Returns (nan, nan) if region not found
Notes
-----
- There are known numerical discrepancies between this and the Picasso versions especially in the
residual calculations due to the subtraction and mean of very small numbers [-1e-24, 1e-24].
these numerical discrepancies may cause a different reference height to be chosen compared to
Picasso.
"""
# if len([height, sig_aer, pc, bg, sig_mol, dpIndx]) < 6:
# #if len([height, sig_aer, pc, bg, sig_mol, dpIndx]) < 6:
# raise ValueError('Not enough inputs.')
if not (isinstance(sig_aer, (list, np.ndarray)) and
isinstance(sig_mol, (list, np.ndarray))):
raise ValueError('sig_aer and sig_mol must be 1-dimensional array')
if dpIndx is None or len(dpIndx) == 0:
print('Warning: dpIndx is empty')
return np.nan, np.nan
# parameter initialize
numTest = 0
hIndxT_Test = np.full(len(dpIndx), np.nan)
hIndxB_Test = np.full(len(dpIndx), np.nan)
mean_resid = np.full(len(dpIndx), np.nan)
std_resid = np.full(len(dpIndx), np.nan)
slope_resid = np.full(len(dpIndx), np.nan)
msre_resid = np.full(len(dpIndx), np.nan)
Astat = np.full(len(dpIndx), np.nan)
SNR_ref = np.full(len(dpIndx), np.nan)
# search for the qualified region
for iIndx in range(len(dpIndx) - 1):
test1 = test2 = test3 = test4 = test5 = True
iDpBIndx = dpIndx[iIndx]
iDpTIndx = dpIndx[iIndx + 1] # + 1 # matlab slicing issue??
#print(iIndx, iDpBIndx, iDpTIndx)
# check layer thickness
if not ((height[iDpTIndx] - height[iDpBIndx]) > layerThickConstrain):
if flagShowDetail:
print(f'Region {iIndx}: {height[iDpBIndx]} - {height[iDpTIndx]} '
f'is less than {layerThickConstrain:5.1f}m')
continue
# normalize the recorded signal to the molecular signal
if np.sum(sig_aer[iDpBIndx:iDpTIndx+1]) == 0:
continue
sig_factor = np.nanmean(sig_mol[iDpBIndx:iDpTIndx+1]) / \
np.nanmean(sig_aer[iDpBIndx:iDpTIndx+1])
#print('sig factor ', sig_factor)
sig_aer_norm = sig_aer * sig_factor
std_aer_norm = sig_aer_norm / np.sqrt(pc + bg)
#print('sig_aer_norm ', sig_aer_norm.shape, sig_aer_norm[iDpBIndx:iDpTIndx+1])
#print('std_aer_norm ', std_aer_norm.shape, std_aer_norm[iDpBIndx:iDpTIndx+1])
# Quality test 2: near and far - range cross criteria
winLen = int(layerThickConstrain / (height[1] - height[0]))
if winLen <= 0:
# print('Warning: layerThickConstrain is too small.')
winLen = 5
for jIndx in range(dpIndx[0], dpIndx[-1] - winLen, winLen):
slice_range = slice(jIndx, jIndx + winLen + 1)
deltaSig_aer = np.nanstd(sig_aer_norm[slice_range])
meanSig_aer = np.nanmean(sig_aer_norm[slice_range])
meanSig_mol = np.nanmean(sig_mol[slice_range])
if not ((meanSig_aer + deltaSig_aer/3) >= meanSig_mol):
if flagShowDetail:
print(f'Region {iIndx}: {height[iDpBIndx]} - '
f'{height[iDpTIndx]} fails in near and far-Range '
'cross test.')
test2 = False
break
if not test2:
continue
# Quality test 3: white-noise criterion
#print('white noise criterion ', iDpBIndx, iDpTIndx)
residual = (sig_aer_norm[iDpBIndx:iDpTIndx+1] -
sig_mol[iDpBIndx:iDpTIndx+1])
#print(sig_aer_norm[iDpBIndx], sig_mol[iDpBIndx], sig_aer_norm[iDpBIndx]-sig_mol[iDpBIndx])
#print(sig_aer_norm[iDpBIndx+1], sig_mol[iDpBIndx+1], sig_aer_norm[iDpBIndx+1]-sig_mol[iDpBIndx+1])
#print(sig_aer_norm[iDpBIndx+2], sig_mol[iDpBIndx+2], sig_aer_norm[iDpBIndx+2]-sig_mol[iDpBIndx+2])
#print(sig_aer_norm[iDpBIndx+3], sig_mol[iDpBIndx+3], sig_aer_norm[iDpBIndx+3]-sig_mol[iDpBIndx+3])
#print('residual ', residual.shape, residual)
x = height[iDpBIndx:iDpTIndx+1] / 1e3
if len(residual) <= 10:
if flagShowDetail:
print(f'Region {iIndx}: signal is too noisy.')
test3 = False
#continue
# Note: chi2fit implementation needed here
#print('white noise chi2fit input', np.nanmean(x), np.nanmean(residual), np.nanmean(std_aer_norm[iDpBIndx:iDpTIndx+1]))
thisIntersect, thisSlope, _, _, _, _ = chi2fit(
x, residual, std_aer_norm[iDpBIndx:iDpTIndx+1])
#print('white noise chi2fit ', thisIntersect, thisSlope)
residual_fit = thisIntersect + thisSlope * x
et = residual - residual_fit
d = np.sum(np.diff(et)**2) / np.sum(et**2)
if not (1 <= d <= 3):
if flagShowDetail:
print(f'Region {iIndx}: {height[iDpBIndx]} - '
f'{height[iDpTIndx]} fails in white-noise criterion.')
test3 = False
#continue
# Quality test 4: SNR check
sigsum = np.nansum(pc[dpIndx[iIndx]:dpIndx[iIndx+1]+1])
# adaption needed for the background not given as a profile
bgsum = bg * (dpIndx[iIndx + 1] - dpIndx[iIndx])
SNR = sigsum / np.sqrt(sigsum + 2*bgsum)
#print('SNR', sigsum, '/', bgsum, '=', SNR)
if SNR < SNRConstrain:
if flagShowDetail:
print(f'Region {iIndx}: {height[iDpBIndx]} - '
f'{height[iDpTIndx]} fails in SNR criterion.')
test4 = False
#continue
# Quality test 5: slope check
x = height[iDpBIndx:iDpTIndx+1]
yTmp_aer = sig_aer_norm[iDpBIndx:iDpTIndx+1]
yTmp_mol = sig_mol[iDpBIndx:iDpTIndx+1]
std_yTmp_aer = std_aer_norm[iDpBIndx:iDpTIndx+1]
mask = yTmp_aer > 0
y_aer = yTmp_aer[mask]
y_mol = yTmp_mol[mask]
std_y_aer = std_yTmp_aer[mask]
x = x[mask]
std_y_aer = std_y_aer / y_aer
y_aer = np.log(y_aer)
y_mol = np.log(y_mol)
if len(y_aer) <= 10:
if flagShowDetail:
print(f'Region {iIndx}: signal is too noisy.')
test5 = False
#continue
_, aerSlope, _, deltaAerSlope, _, _ = chi2fit(x, y_aer, std_y_aer)
#print('aer chi2fit', aerSlope, deltaAerSlope)
_, molSlope, _, deltaMolSlope, _, _ = chi2fit(x, y_mol, np.zeros_like(x))
#print('mol chi2fit', molSlope, deltaMolSlope)
slope_condition = (molSlope <= (aerSlope + (deltaAerSlope + deltaMolSlope) *
slopeConstrain) and
molSlope >= (aerSlope - (deltaAerSlope + deltaMolSlope) *
slopeConstrain))
if not slope_condition:
if flagShowDetail:
print(f'Slope_aer: {aerSlope}, delta_Slope_aer: {deltaAerSlope}, '
f'Slope_mol: {molSlope}')
print(f'Region {iIndx}: {height[iDpBIndx]} - '
f'{height[iDpTIndx]} fails in slope test.')
test5 = False
#continue
if not (test1 and test2 and test3 and test4 and test5):
print("one tests failed?")
continue
# save statistics
hIndxB_Test[numTest] = dpIndx[iIndx]
hIndxT_Test[numTest] = dpIndx[iIndx+1]
mean_resid[numTest] = np.nanmean(residual)
std_resid[numTest] = np.nanstd(residual)
slope_resid[numTest] = thisSlope
msre_resid[numTest] = np.sum(et**2)
SNR_ref[numTest] = SNR
# Anderson Darling test
normP = norm.pdf((residual - mean_resid[numTest]) /
std_resid[numTest])
indices = np.arange(1, len(residual) + 1)
A = np.sum((2 * indices - 1) * np.log(normP) +
(2 * (len(residual) - indices) + 1) * np.log(1 - normP))
A = -len(residual) - A/len(residual)
Astat[numTest] = A * (1 + 0.75/len(residual) +
2.25/len(residual)**2)
# Update numTest
numTest += 1
if numTest == 0:
if flagShowDetail:
print('None clean region found.')
return np.nan, np.nan
# search the best fit region
X_val = (np.abs(mean_resid) * np.abs(std_resid) * np.abs(slope_resid) *
np.abs(msre_resid) * np.abs(Astat) / SNR_ref)
X_val[X_val == 0] = np.nan
if flagPicassoComparison:
# # Matlab equivalent code:
if np.isnan(X_val).all():
indxBest_Int = 0
else:
indxBest_Int = np.nanargmin(X_val)
else:
# QuicFix for all NaN arrays:
if np.isnan(X_val).all():
print("None valid clean region found.")
return np.nan, np.nan
indxBest_Int = np.nanargmin(X_val)
if flagShowDetail:
print('hIndB_Test', hIndxB_Test)
print('hIndT_Test', hIndxT_Test)
print('mean_resid', mean_resid)
print('std_resid', std_resid)
print('slope_resid', slope_resid)
print('msre_resid', msre_resid)
print('SNR_ref', SNR_ref)
print('Astat', Astat)
print('X_val', X_val)
if flagShowDetail:
print(f'The best interval is {height[int(hIndxB_Test[indxBest_Int])]} - '
f'{height[int(hIndxT_Test[indxBest_Int])]}')
return (int(hIndxB_Test[indxBest_Int]), int(hIndxT_Test[indxBest_Int]))