Source code for ppcpy.calibration.lidarconstant

"""
Testtext for documentation


contains :py:func:`get_best_LC`
"""
import numpy as np
import logging
from collections import defaultdict
from ppcpy.misc.helper import mean_stable
from ppcpy.misc.helper import default_to_regular

elastic2raman:dict = {355: 387, 532: 607}

[docs] def lc_for_cldFreeGrps(data_cube, retrieval:str, collect_debug:bool=False) -> list: """Estimate the lidar constant from the optical profiles. Parameters ---------- data_cube : object Main PicassoProc object. retrieval : str Retrieval type. 'klett' or 'raman'. collect_debug : bool If true, collects debug information. Returns ------- LCs : list Lidar constant for retrieval type per channel per cloud free period. Notes ----- - For NR, done directly form the optical profiles, whereas in the matlab version, the ``LC*olAttri387.sigRatio`` is taken. - Through the config variable 'flagUseRetrievedExt4LCCalc', the extinction used to calculate the LCs can be specified. if 'flagUseRetrievedExt4LCCalc' is True the retrieved extinction will be used otherwise the extinction approximated by the backscatter times the assumed lidar constant will be used. - Missing Rotational Raman and Aeronet LC retrieval. .. TODO:: Check if LC's are normalized with respect to the mean of the profiles. .. TODO:: Add option for Aeronet and rotational Raman retrieved LC. **History** xxxx-xx-xx: First edition by ... 2026-03-18: Changed beta_mol for inelastic wavelengths and added the 'flagUseRetrievedExt4LCCalc' variable. """ logging.info(f'LC retrieval: {retrieval} method') height = data_cube.retrievals_highres['range'] hres = data_cube.rawdata_dict['measurement_height_resolution']['var_data'] config_dict = data_cube.polly_config_dict heightFullOverlap = [np.array(config_dict['heightFullOverlap']) for i in data_cube.clFreeGrps] LCs = defaultdict(list) for i, cldFree in enumerate(data_cube.clFreeGrps): cldFreeTime = np.array(data_cube.retrievals_highres['time'])[cldFree] cldFree = cldFree[0], cldFree[1] + 1 profiles = data_cube.retrievals_profile[retrieval][i] for channel in profiles: wv, t, tel = channel.split('_') # Telescope type dependent configurations: if tel == 'NR': key_smooth = f'smoothWin_{retrieval}_NR_' key_LR = 'LR_NR_' else: key_smooth = f'smoothWin_{retrieval}_' key_LR = 'LR' hFullOverlap = heightFullOverlap[i][data_cube.gf(wv, t, tel)][0] hBaseInd = np.argmax( height >= (hFullOverlap + config_dict[f'{key_smooth}{wv}'] / 2 * hres)) # Elastic signal: sig = profiles[channel]['signal'] signal = np.nanmean(np.squeeze( data_cube.retrievals_highres[f'sig{sig}'][slice(*cldFree), :, data_cube.gf(wv, t, tel)]), axis=0) molBsc = data_cube.mol_profiles[f'mBsc_{wv}'][i, :].copy() molExt = data_cube.mol_profiles[f'mExt_{wv}'][i, :].copy() # Check for avaiabel retrievals: if not ('aerExt' in profiles[channel] and 'aerBsc' in profiles[channel]): logging.warning(f'No availabel retrievals, skipping {channel} {cldFree}') continue # Backscatter and extinction retrievals: aerBsc = profiles[channel]['aerBsc'].copy() if config_dict['flagUseRetrievedExt4LCCalc'] & ~config_dict['flagPicassoComparison']: logging.info('Using Retrieved Exticntion') aerExt = profiles[channel]['aerExt'].copy() else: logging.info('Using approximated Extinction') aerBsc[aerBsc <= 0] = np.nan aerExt = aerBsc * config_dict[f'{key_LR}{wv}'] # Interpolate extinction to ground aerExt[:hBaseInd + 1] = aerExt[hBaseInd] ## Optical depth (OD) aerOD = np.nancumsum(aerExt * np.concatenate(([height[0]], np.diff(height)))) molOD = np.nancumsum(molExt * np.concatenate(([height[0]], np.diff(height)))) ## Round trip transmission trans = np.exp(-2 * (aerOD + molOD)) bsc = molBsc + aerBsc ## Lidar calibration constant LC = (signal * height**2) / (bsc * trans) LC[LC <= 0] = np.nan LC_stable, _, LCStd = mean_stable( x=LC, win=config_dict['LCMeanWindow'], minBin=config_dict['LCMeanMinIndx'], maxBin=config_dict['LCMeanMaxIndx'] ) logging.info(f'cldFreGrp {i}, Channel {wv} {t} {tel}, LC_stable {LC_stable}, LCStd {LCStd}') if LC_stable is None: logging.warning(f'Can not find a stable LC value, skipping {wv} nm {t} {tel} channel for cloud free period {cldFree}') continue if collect_debug: LCs[channel].append({ 'LC': LC_stable, 'LCStd': LC_stable * LCStd, 'LC_profile': LC, 'time_start': int(cldFreeTime[0]), 'time_end': int(cldFreeTime[1]) }) else: LCs[channel].append({ 'LC': LC_stable, 'LCStd': LC_stable * LCStd, 'time_start': int(cldFreeTime[0]), 'time_end': int(cldFreeTime[1]) }) # ----------------------------------------------------------------------------------- # LC for raman / inelastic channels # ----------------------------------------------------------------------------------- if retrieval == 'raman' and int(wv) in elastic2raman.keys(): wv_r = elastic2raman[int(wv)] ## Inelastic signal, backscatter and extinction: signal_r = np.nanmean(np.squeeze( data_cube.retrievals_highres[f'sig{sig}'][slice(*cldFree), :, data_cube.gf(wv_r, t, tel)]), axis=0) molBsc_r = data_cube.mol_profiles[f'mBsc_{wv_r}'][i, :].copy() molExt_r = data_cube.mol_profiles[f'mExt_{wv_r}'][i, :].copy() aerExt_r = aerExt * (int(wv)/int(wv_r))**config_dict['angstrexp'] ## Optical depth (OD) aerOD_r = np.nancumsum(aerExt_r * np.concatenate(([height[0]], np.diff(height)))) molOD_r = np.nancumsum(molExt_r * np.concatenate(([height[0]], np.diff(height)))) ## Round-trip transmission trans_r = np.exp(- (aerOD + molOD + aerOD_r + molOD_r)) bsc_r = molBsc_r if config_dict['flagPicassoComparison']: bsc_r = molBsc ## Lidar clibration constant LC_r = (signal_r * height**2) / (bsc_r * trans_r) LC_r[LC_r <= 0] = np.nan LC_r_stable, _, LCStd_r = mean_stable( x=LC_r, win=config_dict['LCMeanWindow'], minBin=config_dict['LCMeanMinIndx'], maxBin=config_dict['LCMeanMaxIndx'] ) logging.info(f'cldFreGrp {i}, Channel {wv_r} {t} {tel}, LC_stable {LC_r_stable}, LCStd {LCStd_r}') if LC_r_stable is None: logging.warning(f'Can not find a stable LC value, skipping {wv_r} nm {t} {tel} channel for cloud free period {cldFree}') continue if collect_debug: LCs[f"{wv_r}_{t}_{tel}"].append({ 'LC': LC_r_stable, 'LCStd': LC_r_stable * LCStd_r, 'LC_profile': LC_r, 'time_start': int(cldFreeTime[0]), 'time_end': int(cldFreeTime[1]) }) else: LCs[f"{wv_r}_{t}_{tel}"].append({ 'LC': LC_r_stable, 'LCStd': LC_r_stable * LCStd_r, 'time_start': int(cldFreeTime[0]), 'time_end': int(cldFreeTime[1]) }) return default_to_regular(LCs)