#from ..misc import *
import datetime
import re
import numpy as np
import logging
import ppcpy.misc.pollyChannelTags as pollyChannelTags
import ppcpy.preprocess.pollyPreprocess as pollyPreprocess
import ppcpy.qc.pollySaturationDetect as pollySaturationDetect
import ppcpy.qc.transCor as transCor
import ppcpy.qc.overlapEst as overlapEst
import ppcpy.qc.overlapCor as overlapCor
import ppcpy.qc.qualityMask as qualityMask
import ppcpy.calibration.polarization as polarization
import ppcpy.cloudmask.cloudscreen as cloudscreen
import ppcpy.cloudmask.profilesegment as profilesegment
import ppcpy.preprocess.profiles as preprocprofiles
import ppcpy.io.readMeteo as readMeteo
import ppcpy.misc.molecular as molecular
import ppcpy.calibration.rayleighfit as rayleighfit
import ppcpy.retrievals.klettfernald as klettfernald
import ppcpy.retrievals.raman as raman
import ppcpy.retrievals.depolarization as depolarization
import ppcpy.retrievals.angstroem as angstroem
import ppcpy.calibration.lidarconstant as lidarconstant
import ppcpy.retrievals.highres as highres
import ppcpy.retrievals.quasiV1 as quasiV1
import ppcpy.retrievals.quasiV2 as quasiV2
import ppcpy.retrievals.quasi as quasi
import ppcpy.io.sql_interaction as sql_db
[docs]
class PicassoProc:
counter = 0
def __init__(self, rawdata_dict, polly_config_dict, picasso_config_dict):
"""initialize the data_cube
Parameters
----------
rawdata_dict
the dict returned by readPollyRawData.readPollyRawData(filename=rawfile)
polly_config_dict
the configuration specific to the specific polly loadConfigs.loadPollyConfig(polly_config_file_fullname, polly_default_config_file)
picasso_config_dict
the general picasso config loadConfigs.loadPicassoConfig(args.picasso_config_file,picasso_default_config_file)
Notes
-----
The `polly_default_dict` is not longer available as a separate variable, but is included into the `polly_config_dict`
"""
type(self).counter += 1
self.rawfile = rawdata_dict['filename_path']
self.rawdata_dict = rawdata_dict
self.polly_config_dict = polly_config_dict
self.picasso_config_dict = picasso_config_dict
self.device = self.polly_config_dict['name']
self.location = self.polly_config_dict['site']
self.date = self.mdate_filename()
self.num_of_channels = len(self.rawdata_dict['measurement_shots']['var_data'][0])
self.num_of_profiles = self.rawdata_dict['raw_signal']['var_data'].shape[0]
self.retrievals_highres = {}
self.retrievals_profile = {}
self.retrievals_profile['avail_optical_profiles'] = []
[docs]
def mdate_filename(self):
filename = self.rawdata_dict['filename']
mdate = re.split(r'_',filename)[0:3]
YYYY = mdate[0]
MM = mdate[1]
DD = mdate[2]
return f"{YYYY}{MM}{DD}"
[docs]
def gf(self, wavelength, meth, telescope):
"""get flag shorthand
i.e., the following two calls are equivalent
```
data_cube.flag_532_total_FR
data_cube.gf(532, 'total', 'FR')
```
where the pattern `{wavelength}_{total|cross|parallel|rr}_{NR|FR|DFOV}` from
https://github.com/PollyNET/Pollynet_Processing_Chain/issues/303 is obeyed
Parameters
----------
wavelength
wavelength tag
meth
method
telescope
telescope
Returns
-------
array
with bool flag
"""
return getattr(self, f'flag_{wavelength}_{meth}_{telescope}', False)
# def msite(self):
# #msite = f"measurement site: {self.rawdata_dict['global_attributes']['location']}"
# msite = self.polly_config_dict['site']
# logging.info(f'measurement site: {msite}')
# return msite
#
# def device(self):
# device = self.polly_config_dict['name']
# logging.info(f'measurement device: {device}')
# return device
#
# def mdate(self):
# mdate = self.mdate_filename()
# logging.info(f'measuremnt date: {mdate}')
# return mdate
[docs]
def mdate_infile(self):
mdate_infilename = self.rawdata_dict['measurement_time']['var_data'][0][0]
return f"{mdate_infilename}"
[docs]
def check_for_correct_mshots(self):
laser_rep_rate = self.rawdata_dict['laser_rep_rate']['var_data']
mShotsPerPrf = laser_rep_rate * self.polly_config_dict['deltaT']
mShots = self.rawdata_dict['measurement_shots']['var_data']
# Check for values > 1.1*mShotsPerPrf or <= 0
condition_check_matrix = (mShots > mShotsPerPrf*1.1) | (mShots <= 0)
return condition_check_matrix
[docs]
def filter_or_correct_false_mshots(self):
logging.info(f"flagFilterFalseMShots: {self.polly_config_dict['flagFilterFalseMShots']}")
logging.info(f"flagCorrectFalseMShots: {self.polly_config_dict['flagCorrectFalseMShots']}")
if self.polly_config_dict['flagFilterFalseMShots']:
logging.info('filtering false mshots')
elif self.polly_config_dict['flagCorrectFalseMShots']:
logging.info('correcting false mshots')
condition_check_matrix = self.check_for_correct_mshots()
##TODO
return self
[docs]
def mdate_consistency(self) -> bool:
if self.mdate_filename() == self.mdate_infile():
logging.info('... date in nc-file equals date of filename')
return True
else:
logging.warning('... date in nc-file differs from date of filename')
return False
[docs]
def reset_date_infile(self):
logging.info('date consistency-check... ')
if self.mdate_consistency() == False:
logging.info('date in nc-file will be replaced with date of filename.')
np_array = np.array(self.rawdata_dict['measurement_time']['var_data']) ## converting to numpy-array for easier numpy-operations
mdate = self.mdate_filename()
np_array[:, 0] = mdate ## assign new date value to the whole first column of the 2d-numpy-array
self.rawdata_dict['measurement_time']['var_data'] = np_array
return self
[docs]
def preprocessing(self, collect_debug:bool=False):
"""
Preprocessing of Lidar data. Including in the followin processes in order:
Dead time correction
Background correction
Range correction
etc.
Returns:
- Background corrected signal including the background per channel.
- Range corrected signal.
- etc.
TODO:
- This is just a first draft for a docstring. Improve it. There is more
processes and outputs of the function.
"""
preproc_dict = pollyPreprocess.pollyPreprocess(
self.rawdata_dict,
deltaT=self.polly_config_dict['deltaT'],
flagForceMeasTime = self.polly_config_dict['flagForceMeasTime'],
maxHeightBin = self.polly_config_dict['max_height_bin'],
firstBinIndex = self.polly_config_dict['first_range_gate_indx'],
firstBinHeight = self.polly_config_dict['first_range_gate_height'],
pollyType = self.polly_config_dict['name'],
flagDeadTimeCorrection = self.polly_config_dict['flagDTCor'],
deadtimeCorrectionMode = self.polly_config_dict['dtCorMode'],
deadtimeParams = self.polly_config_dict['dt'],
flagSigTempCor = self.polly_config_dict['flagSigTempCor'],
tempCorFunc = self.polly_config_dict['tempCorFunc'],
meteorDataSource = self.polly_config_dict['meteorDataSource'],
gdas1Site = self.polly_config_dict['gdas1Site'],
gdas1_folder = self.picasso_config_dict['gdas1_folder'],
radiosondeSitenum = self.polly_config_dict['radiosondeSitenum'],
radiosondeFolder = self.polly_config_dict['radiosondeFolder'],
radiosondeType = self.polly_config_dict['radiosondeType'],
bgCorrectionIndexLow = self.polly_config_dict['bgCorRangeIndxLow'],
bgCorrectionIndexHigh = self.polly_config_dict['bgCorRangeIndxHigh'],
asl = self.polly_config_dict['asl'],
initialPolAngle = self.polly_config_dict['init_depAng'],
maskPolCalAngle = self.polly_config_dict['maskDepCalAng'],
minSNRThresh = self.polly_config_dict['mask_SNRmin'],
minPC_fog = self.polly_config_dict['minPC_fog'],
flagFarRangeChannel = self.polly_config_dict['isFR'],
flag532nmChannel = self.polly_config_dict['is532nm'],
flagTotalChannel = self.polly_config_dict['isTot'],
flag355nmChannel = self.polly_config_dict['is355nm'],
flag607nmChannel = self.polly_config_dict['is607nm'],
flag387nmChannel = self.polly_config_dict['is387nm'],
flag407nmChannel = self.polly_config_dict['is407nm'],
flag355nmRotRaman = np.bitwise_and(np.array(self.polly_config_dict['is355nm']), np.array(self.polly_config_dict['isRR'])).tolist(),
flag532nmRotRaman = np.bitwise_and(np.array(self.polly_config_dict['is532nm']), np.array(self.polly_config_dict['isRR'])).tolist(),
flag1064nmRotRaman = np.bitwise_and(np.array(self.polly_config_dict['is1064nm']), np.array(self.polly_config_dict['isRR'])).tolist(),
isUseLatestGDAS = self.polly_config_dict['flagUseLatestGDAS'],
collect_debug=collect_debug,
)
self.retrievals_highres.update(preproc_dict)
return self
[docs]
def SaturationDetect(self):
"""
Saturation Detection ...
"""
self.flagSaturation = pollySaturationDetect.pollySaturationDetect(
data_cube = self,
sigSaturateThresh = self.polly_config_dict['saturate_thresh'])
return self
[docs]
def polarizationCaliD90(self):
"""
The stuff that starts here in the matlab version
https://github.com/PollyNET/Pollynet_Processing_Chain/blob/5efd7d35596c67ef8672f5948e47d1f9d46ab867/lib/interface/picassoProcV3.m#L442
"""
polarization.loadGHK(self)
self.pol_cali = polarization.calibrateGHK(self)
[docs]
def cloudScreen(self):
"""https://github.com/PollyNET/Pollynet_Processing_Chain/blob/b3b8ec7726b75d9db6287dcba29459587ca34491/lib/interface/picassoProcV3.m#L663"""
self.flagCloudFree = cloudscreen.cloudscreen(self)
[docs]
def cloudFreeSeg(self):
"""https://github.com/PollyNET/Pollynet_Processing_Chain/blob/b3b8ec7726b75d9db6287dcba29459587ca34491/lib/interface/picassoProcV3.m#L707
.. code-block:: python
data_cube.clFreGrps = [
[35,300],
[2500,2800]
]
"""
self.clFreeGrps = profilesegment.segment(self)
[docs]
def aggregate_profiles(self, var=None):
"""
Aggregate highres profiles over cloud free segments
TODO: Decide on a consistent way for doing the aggregation, do not mix mean and sum
"""
if var == None:
self.retrievals_profile['RCS'] = \
preprocprofiles.aggregate_clFreeGrps(self, 'RCS', func=np.nanmean)
self.retrievals_profile['sigBGCor'] = \
preprocprofiles.aggregate_clFreeGrps(self, 'sigBGCor')
self.retrievals_profile['BG'] = \
preprocprofiles.aggregate_clFreeGrps(self, 'BG')
# Remove empty dict keys (temporarly solution)
for v in ['RCS', 'sigBGCor', 'BG']:
if self.retrievals_profile[v] is None:
del self.retrievals_profile[v]
else:
self.retrievals_profile[var] = \
preprocprofiles.aggregate_clFreeGrps(self, var)
# Remove empty dict keys (temporarly solution)
if self.retrievals_profile[var] is None:
del self.retrievals_profile[var]
[docs]
def loadMeteo(self):
""" Load meteorological data """
self.met = readMeteo.Meteo(
self.polly_config_dict['meteorDataSource'],
self.polly_config_dict['meteo_folder'],
self.polly_config_dict['meteo_file'])
self.met.load(
datetime.datetime.timestamp(datetime.datetime.strptime(self.date, '%Y%m%d')),
self.retrievals_highres['height'])
[docs]
def loadAOD(self):
"""
TODO: Not yet implemented!
"""
# raise NotImplementedError
pass
[docs]
def calcMolecular(self):
"""calculate the molecular scattering for the cloud free periods
with the strategy of first averaging the met data and then calculating the rayleigh scattering
"""
time_slices = [self.retrievals_highres['time64'][grp] for grp in self.clFreeGrps]
print('time slices of cloud free ', time_slices)
mean_profiles = self.met.get_mean_profiles(time_slices)
self.mol_profiles = molecular.calc_profiles(mean_profiles)
[docs]
def rayleighFit(self):
"""do the rayleigh fit
direct translation from the matlab code. There might be noticeable numerical discrepancies (especially in the residual)
seemed to work ok for 532, 1064, but with issues for 355
"""
print('Start Rayleigh Fit')
logging.warning(f'Potential for differences to matlab code du to numerical issues (subtraction of two small values)')
self.refH = rayleighfit.rayleighfit(self)
return self.refH
[docs]
def polarizationCaliMol(self):
"""
"""
logging.warning(f'not checked against the matlab code')
if self.polly_config_dict['flagMolDepolCali']:
self.pol_cali_mol = polarization.calibrateMol(self)
[docs]
def transCor(self):
"""
TODO:
flagTransCor = True:
Fix the GHK - Transmission correction
flagTransCor = False:
Check if it is correct to use the BG corrected signal and find a better solution.
It is a bit confusing to overwrite the signal as it is called sigTCor but actually is sigBGCor
Like storing a dedicated signal dict to be used throuhot the processing, the dictionary could
have elements like signal (sig), background (bg), and name. which we could overwrite each time
a new correction is made. And by checking the name of the signal (TCor, BGCor) you can find out
which signal it is.
"""
if self.polly_config_dict['flagTransCor']:
logging.warning('transmission correction')
self.retrievals_highres['sigTCor'], self.retrievals_highres['BGTCor'] = \
transCor.transCorGHK_cube(self)
else:
logging.warning('NO transmission correction')
self.retrievals_highres['sigTCor'], self.retrievals_highres['BGTCor'] = \
self.retrievals_highres['sigBGCor'], self.retrievals_highres['BG']
[docs]
def retrievalKlett(self, oc=False, nr=False):
"""
"""
retrievalname = 'klett'
kwargs = {}
if oc:
# Check if overlap corrected signal exist
if "sigOLCor" not in self.retrievals_highres: # This should be done more general not only for OLCor but for other signals as well.
return
retrievalname +='_OC'
kwargs['signal'] = 'OLCor'
if nr:
kwargs['nr'] = True
print('retrievalname', retrievalname)
self.retrievals_profile[retrievalname] = klettfernald.run_cldFreeGrps(self, **kwargs)
if retrievalname not in self.retrievals_profile['avail_optical_profiles']:
self.retrievals_profile['avail_optical_profiles'].append(retrievalname)
[docs]
def retrievalRaman(self, oc=False, nr=False, collect_debug=False):
"""
"""
retrievalname = 'raman'
kwargs = {}
if oc:
# Check if overlap corrected signal exist
if "sigOLCor" not in self.retrievals_highres: # This should be done more general not only for OLCor but for other signals as well.
return
retrievalname +='_OC'
kwargs['signal'] = 'OLCor'
# get the full overlap height for the overlap corrected variant
# group by the cloud free groups
kwargs['heightFullOverlap'] = \
[np.mean(self.retrievals_highres['heightFullOverCor'][slice(*cF)], axis=0) for
cF in self.clFreeGrps]
if nr:
kwargs['nr'] = True
kwargs['collect_debug'] = collect_debug
self.retrievals_profile[retrievalname] = raman.run_cldFreeGrps(self, **kwargs)
if retrievalname not in self.retrievals_profile['avail_optical_profiles']:
self.retrievals_profile['avail_optical_profiles'].append(retrievalname)
[docs]
def overlapCalc(self, collect_debug=False):
"""estimate the overlap function
different to the matlab version, where an average over all cloud
free periods is taken, it is done here per cloud free segment
"""
if not self.polly_config_dict["flagOLCor"]:
return
self.retrievals_profile['overlap'] = {}
self.retrievals_profile['overlap']['frnr'] = overlapEst.run_frnr_cldFreeGrps(self, collect_debug=collect_debug)
self.retrievals_profile['overlap']['raman'] = overlapEst.run_raman_cldFreeGrps(self, collect_debug=collect_debug)
[docs]
def overlapFixLowestBins(self):
"""the lowest bins are affected by stange near range effects"""
if not self.polly_config_dict["flagOLCor"]:
return
height = self.retrievals_highres['range']
for k in self.retrievals_profile['overlap']:
print(f"Fixing lower bins for {k} overlap functions")
overlapCor.fixLowest(self.retrievals_profile['overlap'][k], np.where(height > 800)[0][0])
[docs]
def overlapCor(self):
"""
the overlap correction is implemented differently to the matlab version
first a 2d (time, height) correction array is constructed then it is applied.
In future this will allow for time varing overlap functions
"""
if self.polly_config_dict['overlapCorMode'] == 0 or not self.polly_config_dict["flagOLCor"]:
logging.info('no overlap Correction')
return self
logging.info('overlap Correction')
if self.polly_config_dict['overlapCorMode'] == 1:
logging.info('overlapCorMode 1 -> need file for overlapfunction')
if not 'overlap' in self.retrievals_profile:
self.retrievals_profile['overlap'] = {}
self.retrievals_profile['overlap']['file'] = overlapEst.load(self)
self.retrievals_highres['overlap2d'] = overlapCor.spread(self)
ret = overlapCor.apply_cube(self)
self.retrievals_highres['sigOLCor'] = ret[0]
self.retrievals_highres['BGOLCor'] = ret[1]
self.retrievals_highres['heightFullOverCor'] = ret[2]
[docs]
def calcDepol(self):
"""
"""
for ret_prof_name in self.retrievals_profile['avail_optical_profiles']:
print(ret_prof_name)
self.retrievals_profile[ret_prof_name] = depolarization.voldepol_cldFreeGrps(
self, ret_prof_name)
self.retrievals_profile[ret_prof_name] = depolarization.pardepol_cldFreeGrps(
self, ret_prof_name)
[docs]
def estQualityMask(self):
"""
"""
self.retrievals_highres['quality_mask'] = qualityMask.qualityMask(self)
[docs]
def Angstroem(self):
"""
"""
for ret_prof_name in self.retrievals_profile['avail_optical_profiles']:
print(ret_prof_name)
self.retrievals_profile[ret_prof_name] = angstroem.ae_cldFreeGrps(
self, ret_prof_name)
[docs]
def LidarCalibration(self):
"""
TODO: Add option to read constants from database.
TODO: Find out how we prioritise raman, klett, and database retrieved LC...
"""
self.LC = {}
self.LC['klett'] = lidarconstant.lc_for_cldFreeGrps(
self, 'klett')
self.LC['raman'] = lidarconstant.lc_for_cldFreeGrps(
self, 'raman')
logging.warning('reading calibration constant from database not working yet')
# Prioritise Raman retrieved LCs but use Klett retrieved ones when no Raman retrieval exists.
self.LCused = lidarconstant.get_best_LC(self.LC['klett']) | lidarconstant.get_best_LC(self.LC['raman'])
[docs]
def attBsc_volDepol(self):
"""highres attBsc and voldepol in 2d
"""
# for now try with mutable state in data_cube
logging.info('attBsc 2d retrieval')
highres.attbsc_2d(self)
logging.info('voldepol 2d retrieval')
highres.voldepol_2d(self)
[docs]
def molecularHighres(self):
"""
"""
self.mol_2d = molecular.calc_2d(
self.met.ds)
[docs]
def quasiV1(self):
"""quasiV1 retrivals and target categorisation.
"""
quasiV1.quasi_bsc(self)
quasi.quasi_pdr(self, version='V1')
quasi.quasi_angstrom(self, version='V1')
quasi.target_cat(self, version='V1')
[docs]
def quasiV2(self):
"""quasiV2 retrivals and target categorisation.
"""
quasiV2.quasi_bsc(self)
quasi.quasi_pdr(self, version='V2')
quasi.quasi_angstrom(self, version='V2')
quasi.target_cat(self, version='V2')
[docs]
def write_2_sql_db(self, db_path:str, parameter:str, method:str|None=None):
""" write LC or eta to sqlite db table
parameters:
- parameter (str): can be LC (Lidar-calibration-constant) or DC (Depol-calibration-constant)
- method (str): 'raman' or 'klett'
- db_path (str): location of the sqlite db-file
"""
if parameter == 'LC':
table_name = 'lidar_calibration_constant'
column_names = [
'cali_start_time', 'cali_stop_time', 'liconst', 'uncertainty_liconst', 'used_for_processing',
'wavelength', 'nc_zip_file', 'polly_type', 'cali_method', 'telescope']
data_types = ['text', 'text', 'real', 'real', 'integer', 'text', 'text', 'text', 'text', 'text']
elif parameter == 'DC':
table_name = 'depol_calibration_constant'
column_names = [
'cali_start_time', 'cali_stop_time', 'depol_const', 'uncertainty_depol_const', 'used_for_processing',
'wavelength', 'telescope', 'nc_zip_file', 'polly_type']
data_types = ['text', 'text', 'real', 'real', 'integer', 'text', 'text', 'text', 'text']
assert len(column_names) == len(data_types), 'column names do not match data types'
logging.info(f'writing to sqlite-db: {db_path}')
logging.info(f'writing {parameter} to table: {table_name}')
sql_db.setup_empty(db_path, table_name, column_names, data_types)
rows_to_insert = sql_db.prepare_for_sql_db_writing(self, parameter, method)
sql_db.write_rows_to_sql_db(db_path, table_name, column_names, rows_to_insert)
# def __str__(self):
# return f"{self.rawdata_dict}"
def __del__(self):
type(self).counter -= 1