"""
Testtext for documentation
contains :py:func:`get_best_LC`
"""
import numpy as np
import logging
from collections import defaultdict
from ppcpy.misc.helper import mean_stable
from ppcpy.misc.helper import default_to_regular
elastic2raman:dict = {355: 387, 532: 607}
[docs]
def lc_for_cldFreeGrps(data_cube, retrieval:str, collect_debug:bool=False) -> list:
"""Estimate the lidar constant from the optical profiles.
Parameters
----------
data_cube : object
Main PicassoProc object.
retrieval : str
Retrieval type. 'klett' or 'raman'.
collect_debug : bool
If true, collects debug information.
Returns
-------
LCs : list
Lidar constant for retrieval type per channel per cloud free period.
Notes
-----
- For NR, done directly form the optical profiles, whereas in the matlab version, the ``LC*olAttri387.sigRatio`` is taken.
- Through the config variable 'flagUseRetrievedExt4LCCalc', the extinction used to calculate the LCs can be specified.
if 'flagUseRetrievedExt4LCCalc' is True the retrieved extinction will be used otherwise the extinction approximated by
the backscatter times the assumed lidar constant will be used.
- Missing Rotational Raman and Aeronet LC retrieval.
.. TODO:: Check if LC's are normalized with respect to the mean of the profiles.
.. TODO:: Add option for Aeronet and rotational Raman retrieved LC.
**History**
xxxx-xx-xx: First edition by ...
2026-03-18: Changed beta_mol for inelastic wavelengths and added the 'flagUseRetrievedExt4LCCalc' variable.
"""
logging.info(f'LC retrieval: {retrieval} method')
height = data_cube.retrievals_highres['range']
hres = data_cube.rawdata_dict['measurement_height_resolution']['var_data']
config_dict = data_cube.polly_config_dict
heightFullOverlap = [np.array(config_dict['heightFullOverlap']) for i in data_cube.clFreeGrps]
LCs = defaultdict(list)
for i, cldFree in enumerate(data_cube.clFreeGrps):
cldFreeTime = np.array(data_cube.retrievals_highres['time'])[cldFree]
cldFree = cldFree[0], cldFree[1] + 1
profiles = data_cube.retrievals_profile[retrieval][i]
for channel in profiles:
wv, t, tel = channel.split('_')
# Telescope type dependent configurations:
if tel == 'NR':
key_smooth = f'smoothWin_{retrieval}_NR_'
key_LR = 'LR_NR_'
else:
key_smooth = f'smoothWin_{retrieval}_'
key_LR = 'LR'
hFullOverlap = heightFullOverlap[i][data_cube.gf(wv, t, tel)][0]
hBaseInd = np.argmax(
height >= (hFullOverlap + config_dict[f'{key_smooth}{wv}'] / 2 * hres))
# Elastic signal:
sig = profiles[channel]['signal']
signal = np.nanmean(np.squeeze(
data_cube.retrievals_highres[f'sig{sig}'][slice(*cldFree), :, data_cube.gf(wv, t, tel)]), axis=0)
molBsc = data_cube.mol_profiles[f'mBsc_{wv}'][i, :].copy()
molExt = data_cube.mol_profiles[f'mExt_{wv}'][i, :].copy()
# Check for avaiabel retrievals:
if not ('aerExt' in profiles[channel] and 'aerBsc' in profiles[channel]):
logging.warning(f'No availabel retrievals, skipping {channel} {cldFree}')
continue
# Backscatter and extinction retrievals:
aerBsc = profiles[channel]['aerBsc'].copy()
if config_dict['flagUseRetrievedExt4LCCalc'] & ~config_dict['flagPicassoComparison']:
logging.info('Using Retrieved Exticntion')
aerExt = profiles[channel]['aerExt'].copy()
else:
logging.info('Using approximated Extinction')
aerBsc[aerBsc <= 0] = np.nan
aerExt = aerBsc * config_dict[f'{key_LR}{wv}']
# Interpolate extinction to ground
aerExt[:hBaseInd + 1] = aerExt[hBaseInd]
## Optical depth (OD)
aerOD = np.nancumsum(aerExt * np.concatenate(([height[0]], np.diff(height))))
molOD = np.nancumsum(molExt * np.concatenate(([height[0]], np.diff(height))))
## Round trip transmission
trans = np.exp(-2 * (aerOD + molOD))
bsc = molBsc + aerBsc
## Lidar calibration constant
LC = (signal * height**2) / (bsc * trans)
LC[LC <= 0] = np.nan
LC_stable, _, LCStd = mean_stable(
x=LC,
win=config_dict['LCMeanWindow'],
minBin=config_dict['LCMeanMinIndx'],
maxBin=config_dict['LCMeanMaxIndx']
)
logging.info(f'cldFreGrp {i}, Channel {wv} {t} {tel}, LC_stable {LC_stable}, LCStd {LCStd}')
if LC_stable is None:
logging.warning(f'Can not find a stable LC value, skipping {wv} nm {t} {tel} channel for cloud free period {cldFree}')
continue
if collect_debug:
LCs[channel].append({
'LC': LC_stable, 'LCStd': LC_stable * LCStd, 'LC_profile': LC,
'time_start': int(cldFreeTime[0]), 'time_end': int(cldFreeTime[1])
})
else:
LCs[channel].append({
'LC': LC_stable, 'LCStd': LC_stable * LCStd,
'time_start': int(cldFreeTime[0]), 'time_end': int(cldFreeTime[1])
})
# -----------------------------------------------------------------------------------
# LC for raman / inelastic channels
# -----------------------------------------------------------------------------------
if retrieval == 'raman' and int(wv) in elastic2raman.keys():
wv_r = elastic2raman[int(wv)]
## Inelastic signal, backscatter and extinction:
signal_r = np.nanmean(np.squeeze(
data_cube.retrievals_highres[f'sig{sig}'][slice(*cldFree), :, data_cube.gf(wv_r, t, tel)]), axis=0)
molBsc_r = data_cube.mol_profiles[f'mBsc_{wv_r}'][i, :].copy()
molExt_r = data_cube.mol_profiles[f'mExt_{wv_r}'][i, :].copy()
aerExt_r = aerExt * (int(wv)/int(wv_r))**config_dict['angstrexp']
## Optical depth (OD)
aerOD_r = np.nancumsum(aerExt_r * np.concatenate(([height[0]], np.diff(height))))
molOD_r = np.nancumsum(molExt_r * np.concatenate(([height[0]], np.diff(height))))
## Round-trip transmission
trans_r = np.exp(- (aerOD + molOD + aerOD_r + molOD_r))
bsc_r = molBsc_r
if config_dict['flagPicassoComparison']:
bsc_r = molBsc
## Lidar clibration constant
LC_r = (signal_r * height**2) / (bsc_r * trans_r)
LC_r[LC_r <= 0] = np.nan
LC_r_stable, _, LCStd_r = mean_stable(
x=LC_r,
win=config_dict['LCMeanWindow'],
minBin=config_dict['LCMeanMinIndx'],
maxBin=config_dict['LCMeanMaxIndx']
)
logging.info(f'cldFreGrp {i}, Channel {wv_r} {t} {tel}, LC_stable {LC_r_stable}, LCStd {LCStd_r}')
if LC_r_stable is None:
logging.warning(f'Can not find a stable LC value, skipping {wv_r} nm {t} {tel} channel for cloud free period {cldFree}')
continue
if collect_debug:
LCs[f"{wv_r}_{t}_{tel}"].append({
'LC': LC_r_stable, 'LCStd': LC_r_stable * LCStd_r, 'LC_profile': LC_r,
'time_start': int(cldFreeTime[0]), 'time_end': int(cldFreeTime[1])
})
else:
LCs[f"{wv_r}_{t}_{tel}"].append({
'LC': LC_r_stable, 'LCStd': LC_r_stable * LCStd_r,
'time_start': int(cldFreeTime[0]), 'time_end': int(cldFreeTime[1])
})
return default_to_regular(LCs)