Source code for ppcpy.interface.picassoProc

#from ..misc import *
import datetime
import re
import numpy as np
import logging
import ppcpy.misc.pollyChannelTags as pollyChannelTags
import ppcpy.preprocess.pollyPreprocess as pollyPreprocess
import ppcpy.qc.pollySaturationDetect as pollySaturationDetect
import ppcpy.qc.transCor as transCor
import ppcpy.qc.overlapEst as overlapEst
import ppcpy.qc.overlapCor as overlapCor
import ppcpy.qc.qualityMask as qualityMask


import ppcpy.calibration.select as select
import ppcpy.calibration.polarization as polarization
import ppcpy.cloudmask.cloudscreen as cloudscreen
import ppcpy.cloudmask.profilesegment as profilesegment
import ppcpy.preprocess.profiles as preprocprofiles
import ppcpy.io.readMeteo as readMeteo
import ppcpy.misc.molecular as molecular
import ppcpy.calibration.rayleighfit as rayleighfit
import ppcpy.retrievals.klettfernald as klettfernald
import ppcpy.retrievals.raman as raman
import ppcpy.retrievals.depolarization as depolarization 
import ppcpy.retrievals.angstroem as angstroem 
import ppcpy.calibration.lidarconstant as lidarconstant

import ppcpy.retrievals.highres as highres
import ppcpy.retrievals.quasiV1 as quasiV1
import ppcpy.retrievals.quasiV2 as quasiV2
import ppcpy.retrievals.quasi as quasi

import ppcpy.io.sql_interaction as sql_db

[docs] class PicassoProc: counter = 0 def __init__(self, rawdata_dict, polly_config_dict, picasso_config_dict): """initialize the data_cube Parameters ---------- rawdata_dict the dict returned by readPollyRawData.readPollyRawData(filename=rawfile) polly_config_dict the configuration specific to the specific polly loadConfigs.loadPollyConfig(polly_config_file_fullname, polly_default_config_file) picasso_config_dict the general picasso config loadConfigs.loadPicassoConfig(args.picasso_config_file,picasso_default_config_file) Notes ----- The `polly_default_dict` is not longer available as a separate variable, but is included into the `polly_config_dict` """ type(self).counter += 1 self.rawfile = rawdata_dict['filename_path'] self.rawdata_dict = rawdata_dict self.polly_config_dict = polly_config_dict self.picasso_config_dict = picasso_config_dict self.device = self.polly_config_dict['name'] self.location = self.polly_config_dict['site'] self.date = self.mdate_filename() self.num_of_channels = len(self.rawdata_dict['measurement_shots']['var_data'][0]) self.num_of_profiles = self.rawdata_dict['raw_signal']['var_data'].shape[0] self.retrievals_highres = {} self.retrievals_profile = {} self.retrievals_profile['avail_optical_profiles'] = [] self.pol_cali = {} self.LC = {}
[docs] def mdate_filename(self): """get the date from filename in YYYYMMDD""" filename = self.rawdata_dict['filename'] mdate = re.split(r'_',filename)[0:3] YYYY = mdate[0] MM = mdate[1] DD = mdate[2] return f"{YYYY}{MM}{DD}"
[docs] def gf(self, wavelength, meth, telescope): """get flag shorthand i.e., the following two calls are equivalent ``` data_cube.flag_532_total_FR data_cube.gf(532, 'total', 'FR') ``` where the pattern `{wavelength}_{total|cross|parallel|rr}_{NR|FR|DFOV}` from https://github.com/PollyNET/Pollynet_Processing_Chain/issues/303 is obeyed Parameters ---------- wavelength wavelength tag meth method telescope telescope Returns ------- array with bool flag """ return getattr(self, f'flag_{wavelength}_{meth}_{telescope}', False)
# def msite(self): # #msite = f"measurement site: {self.rawdata_dict['global_attributes']['location']}" # msite = self.polly_config_dict['site'] # logging.info(f'measurement site: {msite}') # return msite # # def device(self): # device = self.polly_config_dict['name'] # logging.info(f'measurement device: {device}') # return device # # def mdate(self): # mdate = self.mdate_filename() # logging.info(f'measuremnt date: {mdate}') # return mdate
[docs] def mdate_infile(self): """first date in file as string""" mdate_infilename = self.rawdata_dict['measurement_time']['var_data'][0][0] return f"{mdate_infilename}"
[docs] def check_for_correct_mshots(self): """check if mshots are more than 1.1 * laser_prf * deltatime or smaller 0""" laser_rep_rate = self.rawdata_dict['laser_rep_rate']['var_data'] mShotsPerPrf = laser_rep_rate * self.polly_config_dict['deltaT'] mShots = self.rawdata_dict['measurement_shots']['var_data'] # Check for values > 1.1*mShotsPerPrf or <= 0 condition_check_matrix = (mShots > mShotsPerPrf*1.1) | (mShots <= 0) return condition_check_matrix
[docs] def filter_or_correct_false_mshots(self): """ filter or correct the mshots (currently only logging) .. TODO:: that might be covered via the mcps conversion """ logging.info(f"flagFilterFalseMShots: {self.polly_config_dict['flagFilterFalseMShots']}") logging.info(f"flagCorrectFalseMShots: {self.polly_config_dict['flagCorrectFalseMShots']}") if self.polly_config_dict['flagFilterFalseMShots']: logging.info('filtering false mshots') elif self.polly_config_dict['flagCorrectFalseMShots']: logging.info('correcting false mshots') condition_check_matrix = self.check_for_correct_mshots() ##TODO return self
[docs] def mdate_consistency(self) -> bool: """check mdate consistency""" if self.mdate_filename() == self.mdate_infile(): logging.info('... date in nc-file equals date of filename') return True else: logging.warning('... date in nc-file differs from date of filename') return False
[docs] def reset_date_infile(self): """correct the date in the file """ logging.info('date consistency-check... ') if self.mdate_consistency() == False: logging.info('date in nc-file will be replaced with date of filename.') np_array = np.array(self.rawdata_dict['measurement_time']['var_data']) ## converting to numpy-array for easier numpy-operations mdate = self.mdate_filename() np_array[:, 0] = mdate ## assign new date value to the whole first column of the 2d-numpy-array self.rawdata_dict['measurement_time']['var_data'] = np_array return self
[docs] def setChannelTags(self): """set the channel tags they are stored as dictionary in ``` data_cube.channel_dict ``` and as list in ``` data_cube.retrievals_highres['channel'] data_cube.polly_config_dict['channelTags'] ``` as an array of boolean flags in ``` data_cube.flags ``` as flags per channel in ``` data_cube.flag_355_total_FR ``` Returns ------- self """ ChannelTags = pollyChannelTags.pollyChannelTags( self.polly_config_dict['channelTag'], # TODO key: channelTags vs channelTag??? flagFarRangeChannel=self.polly_config_dict['isFR'], flagNearRangeChannel=self.polly_config_dict['isNR'], flagRotRamanChannel=self.polly_config_dict['isRR'], flagTotalChannel=self.polly_config_dict['isTot'], flagCrossChannel=self.polly_config_dict['isCross'], flagParallelChannel=self.polly_config_dict['isParallel'], flag355nmChannel=self.polly_config_dict['is355nm'], flag387nmChannel=self.polly_config_dict['is387nm'], flag407nmChannel=self.polly_config_dict['is407nm'], flag532nmChannel=self.polly_config_dict['is532nm'], flag607nmChannel=self.polly_config_dict['is607nm'], flag1064nmChannel=self.polly_config_dict['is1064nm'], flagDFOVChannel=self.polly_config_dict['isDFOV'], ) ChannelTags, self.polly_config_dict = pollyChannelTags.polly_config_channel_corrections(chTagsOut_ls=ChannelTags, polly_config_dict=self.polly_config_dict) self.retrievals_highres['channel'] = ChannelTags self.polly_config_dict['channelTags'] = ChannelTags self.channel_dict = {i: item for i, item in enumerate(ChannelTags)} ChannelFlags = pollyChannelTags.pollyChannelflags( channel_dict_length=len(self.channel_dict), flagFarRangeChannel=self.polly_config_dict['isFR'], flagNearRangeChannel=self.polly_config_dict['isNR'], flagRotRamanChannel=self.polly_config_dict['isRR'], flagTotalChannel=self.polly_config_dict['isTot'], flagCrossChannel=self.polly_config_dict['isCross'], flagParallelChannel=self.polly_config_dict['isParallel'], flag355nmChannel=self.polly_config_dict['is355nm'], flag387nmChannel=self.polly_config_dict['is387nm'], flag407nmChannel=self.polly_config_dict['is407nm'], flag532nmChannel=self.polly_config_dict['is532nm'], flag607nmChannel=self.polly_config_dict['is607nm'], flag1064nmChannel=self.polly_config_dict['is1064nm'], flagDFOVChannel=self.polly_config_dict['isDFOV'], ) self.flags = ChannelFlags self.flag_355_total_FR = ChannelFlags[0] self.flag_355_cross_FR = ChannelFlags[1] self.flag_355_parallel_FR = ChannelFlags[2] self.flag_355_total_NR = ChannelFlags[3] self.flag_387_total_FR = ChannelFlags[4] self.flag_387_total_NR = ChannelFlags[5] self.flag_407_total_FR = ChannelFlags[6] self.flag_407_total_NR = ChannelFlags[7] self.flag_532_total_FR = ChannelFlags[8] self.flag_532_cross_FR = ChannelFlags[9] self.flag_532_parallel_FR = ChannelFlags[10] self.flag_532_total_NR = ChannelFlags[11] self.flag_532_cross_DFOV = ChannelFlags[12] self.flag_532_rr_FR = ChannelFlags[13] self.flag_607_total_FR = ChannelFlags[14] self.flag_607_total_NR = ChannelFlags[15] self.flag_1058_total_FR = ChannelFlags[16] self.flag_1064_total_FR = ChannelFlags[17] self.flag_1064_cross_FR = ChannelFlags[18] self.flag_1064_total_NR = ChannelFlags[19] return self
[docs] def preprocessing(self, collect_debug:bool=False): """ Preprocessing of Lidar data. Including in the followin processes in order: Dead time correction Background correction Range correction etc. Returns: - Background corrected signal including the background per channel. - Range corrected signal. - etc. .. TODO:: This is just a first draft for a docstring. Improve it. There is more processes and outputs of the function. """ preproc_dict = pollyPreprocess.pollyPreprocess( self.rawdata_dict, deltaT=self.polly_config_dict['deltaT'], flagForceMeasTime = self.polly_config_dict['flagForceMeasTime'], maxHeightBin = self.polly_config_dict['max_height_bin'], firstBinIndex = self.polly_config_dict['first_range_gate_indx'], firstBinHeight = self.polly_config_dict['first_range_gate_height'], pollyType = self.polly_config_dict['name'], flagDeadTimeCorrection = self.polly_config_dict['flagDTCor'], deadtimeCorrectionMode = self.polly_config_dict['dtCorMode'], deadtimeParams = self.polly_config_dict['dt'], flagSigTempCor = self.polly_config_dict['flagSigTempCor'], tempCorFunc = self.polly_config_dict['tempCorFunc'], meteorDataSource = self.polly_config_dict['meteorDataSource'], gdas1Site = self.polly_config_dict['gdas1Site'], gdas1_folder = self.picasso_config_dict['gdas1_folder'], radiosondeSitenum = self.polly_config_dict['radiosondeSitenum'], radiosondeFolder = self.polly_config_dict['radiosondeFolder'], radiosondeType = self.polly_config_dict['radiosondeType'], bgCorrectionIndexLow = self.polly_config_dict['bgCorRangeIndxLow'], bgCorrectionIndexHigh = self.polly_config_dict['bgCorRangeIndxHigh'], asl = self.polly_config_dict['asl'], initialPolAngle = self.polly_config_dict['init_depAng'], maskPolCalAngle = self.polly_config_dict['maskDepCalAng'], minSNRThresh = self.polly_config_dict['mask_SNRmin'], minPC_fog = self.polly_config_dict['minPC_fog'], flagFarRangeChannel = self.polly_config_dict['isFR'], flag532nmChannel = self.polly_config_dict['is532nm'], flagTotalChannel = self.polly_config_dict['isTot'], flag355nmChannel = self.polly_config_dict['is355nm'], flag607nmChannel = self.polly_config_dict['is607nm'], flag387nmChannel = self.polly_config_dict['is387nm'], flag407nmChannel = self.polly_config_dict['is407nm'], flag355nmRotRaman = np.bitwise_and(np.array(self.polly_config_dict['is355nm']), np.array(self.polly_config_dict['isRR'])).tolist(), flag532nmRotRaman = np.bitwise_and(np.array(self.polly_config_dict['is532nm']), np.array(self.polly_config_dict['isRR'])).tolist(), flag1064nmRotRaman = np.bitwise_and(np.array(self.polly_config_dict['is1064nm']), np.array(self.polly_config_dict['isRR'])).tolist(), isUseLatestGDAS = self.polly_config_dict['flagUseLatestGDAS'], collect_debug=collect_debug, ) self.retrievals_highres.update(preproc_dict) return self
[docs] def SaturationDetect(self): """Saturation Detection """ self.flagSaturation = pollySaturationDetect.pollySaturationDetect( data_cube = self, sigSaturateThresh = self.polly_config_dict['saturate_thresh']) return self
[docs] def polarizationCaliD90(self): """calibration with the Delta-90 method The stuff that starts here in the matlab version https://github.com/PollyNET/Pollynet_Processing_Chain/blob/5efd7d35596c67ef8672f5948e47d1f9d46ab867/lib/interface/picassoProcV3.m#L442 """ polarization.loadGHK(self) self.pol_cali['D90'] = polarization.calibrateGHK(self) self.etaused = select.single_best(self.pol_cali['D90'], 'eta', 'eta_std')
[docs] def cloudScreen(self): """basic cloud screenting https://github.com/PollyNET/Pollynet_Processing_Chain/blob/b3b8ec7726b75d9db6287dcba29459587ca34491/lib/interface/picassoProcV3.m#L663 """ self.flagCloudFree = cloudscreen.cloudscreen(self)
[docs] def cloudFreeSeg(self): """cloud free profile segmentation https://github.com/PollyNET/Pollynet_Processing_Chain/blob/b3b8ec7726b75d9db6287dcba29459587ca34491/lib/interface/picassoProcV3.m#L707 .. code-block:: python data_cube.clFreGrps = [ [35,300], [2500,2800] ] """ self.clFreeGrps = profilesegment.segment(self)
[docs] def aggregate_profiles(self, var=None): """Aggregate highres profiles over cloud free segments .. TODO:: Decide on a consistent way for doing the aggregation, do not mix mean and sum """ if var == None: self.retrievals_profile['RCS'] = \ preprocprofiles.aggregate_clFreeGrps(self, 'RCS', func=np.nanmean) self.retrievals_profile['sigBGCor'] = \ preprocprofiles.aggregate_clFreeGrps(self, 'sigBGCor') self.retrievals_profile['BG'] = \ preprocprofiles.aggregate_clFreeGrps(self, 'BG') # Remove empty dict keys (temporarly solution) for v in ['RCS', 'sigBGCor', 'BG']: if self.retrievals_profile[v] is None: del self.retrievals_profile[v] else: self.retrievals_profile[var] = \ preprocprofiles.aggregate_clFreeGrps(self, var) # Remove empty dict keys (temporarly solution) if self.retrievals_profile[var] is None: del self.retrievals_profile[var]
[docs] def loadMeteo(self): """ Load meteorological data """ self.met = readMeteo.Meteo( self.polly_config_dict['meteorDataSource'], self.polly_config_dict['meteo_folder'], self.polly_config_dict['meteo_file']) self.met.load( datetime.datetime.timestamp(datetime.datetime.strptime(self.date, '%Y%m%d')), self.retrievals_highres['height'])
[docs] def loadAOD(self): """load the AOD from a co-located fotometer .. TODO:: Not yet implemented! """ # raise NotImplementedError pass
[docs] def calcMolecular(self): """calculate the molecular scattering for the cloud free periods with the strategy of first averaging the met data and then calculating the rayleigh scattering """ time_slices = [self.retrievals_highres['time64'][grp] for grp in self.clFreeGrps] print('time slices of cloud free ', time_slices) mean_profiles = self.met.get_mean_profiles(time_slices) self.mol_profiles = molecular.calc_profiles(mean_profiles)
[docs] def rayleighFit(self): """do the rayleigh fit direct translation from the matlab code. There might be noticeable numerical discrepancies (especially in the residual) seemed to work ok for 532, 1064, but with issues for 355 """ print('Start Rayleigh Fit') logging.warning(f'Potential for differences to matlab code du to numerical issues (subtraction of two small values)') # self.refH = rayleighfit.rayleighfit(self) self.retrievals_profile['refH'] = rayleighfit.rayleighfit(self)
# return self.refH # example # self.retrievals_profile['klett'][cldFreeGrp]['355_total_NR']['refH']
[docs] def polarizationCaliMol(self): """calibration with molecular signal in reference height """ logging.warning(f'not checked against the matlab code') if self.polly_config_dict['flagMolDepolCali']: self.pol_cali['mol'] = polarization.calibrateMol(self) else: logging.warning("'flagMolDepolCali' set to False")
[docs] def transCor(self): """transmission correction .. TODO:: flagTransCor = True: Fix the GHK - Transmission correction flagTransCor = False: Check if it is correct to use the BG corrected signal and find a better solution. It is a bit confusing to overwrite the signal as it is called sigTCor but actually is sigBGCor Like storing a dedicated signal dict to be used throuhot the processing, the dictionary could have elements like signal (sig), background (bg), and name. which we could overwrite each time a new correction is made. And by checking the name of the signal (TCor, BGCor) you can find out which signal it is. """ if self.polly_config_dict['flagTransCor']: logging.warning('transmission correction') self.retrievals_highres['sigTCor'], self.retrievals_highres['BGTCor'] = \ transCor.transCorGHK_cube(self) else: logging.warning('NO transmission correction') self.retrievals_highres['sigTCor'], self.retrievals_highres['BGTCor'] = \ self.retrievals_highres['sigBGCor'], self.retrievals_highres['BG']
[docs] def retrievalKlett(self, oc=False, nr=False): """apply the klett retrieval """ retrievalname = 'klett' kwargs = {} if oc: # Check if overlap corrected signal exist if "sigOLCor" not in self.retrievals_highres: # This should be done more general not only for OLCor but for other signals as well. return retrievalname +='_OC' kwargs['signal'] = 'OLCor' if nr: kwargs['nr'] = True print('retrievalname', retrievalname) self.retrievals_profile[retrievalname] = klettfernald.run_cldFreeGrps(self, **kwargs) if retrievalname not in self.retrievals_profile['avail_optical_profiles']: self.retrievals_profile['avail_optical_profiles'].append(retrievalname)
[docs] def retrievalRaman(self, oc=False, nr=False, collect_debug=False): """apply the raman retrieval (nighttime only) """ retrievalname = 'raman' kwargs = {} if oc: # Check if overlap corrected signal exist if "sigOLCor" not in self.retrievals_highres: # This should be done more general not only for OLCor but for other signals as well. return retrievalname +='_OC' kwargs['signal'] = 'OLCor' # get the full overlap height for the overlap corrected variant # group by the cloud free groups kwargs['heightFullOverlap'] = \ [np.mean(self.retrievals_highres['heightFullOverCor'][slice(*cF)], axis=0) for cF in self.clFreeGrps] if nr: kwargs['nr'] = True kwargs['collect_debug'] = collect_debug self.retrievals_profile[retrievalname] = raman.run_cldFreeGrps(self, **kwargs) if retrievalname not in self.retrievals_profile['avail_optical_profiles']: self.retrievals_profile['avail_optical_profiles'].append(retrievalname)
[docs] def overlapCalc(self, collect_debug=False): """estimate the overlap function different to the matlab version, where an average over all cloud free periods is taken, it is done here per cloud free segment """ if not self.polly_config_dict["flagOLCor"]: return self.retrievals_profile['overlap'] = {} self.retrievals_profile['overlap']['frnr'] = overlapEst.run_frnr_cldFreeGrps(self, collect_debug=collect_debug) self.retrievals_profile['overlap']['raman'] = overlapEst.run_raman_cldFreeGrps(self, collect_debug=collect_debug)
[docs] def overlapFixLowestBins(self): """the lowest bins are affected by stange near range effects""" if not self.polly_config_dict["flagOLCor"]: return height = self.retrievals_highres['range'] for k in self.retrievals_profile['overlap']: print(f"Fixing lower bins for {k} overlap functions") overlapCor.fixLowest(self.retrievals_profile['overlap'][k], np.where(height > 800)[0][0])
[docs] def overlapCor(self): """overlap correction the overlap correction is implemented differently to the matlab version first a 2d (time, height) correction array is constructed then it is applied. In future this will allow for time varing overlap functions """ if self.polly_config_dict['overlapCorMode'] == 0 or not self.polly_config_dict["flagOLCor"]: logging.info('no overlap Correction') return self logging.info('overlap Correction') if self.polly_config_dict['overlapCorMode'] == 1: logging.info('overlapCorMode 1 -> need file for overlapfunction') if not 'overlap' in self.retrievals_profile: self.retrievals_profile['overlap'] = {} self.retrievals_profile['overlap']['file'] = overlapEst.load(self) self.retrievals_highres['overlap2d'] = overlapCor.spread(self) ret = overlapCor.apply_cube(self) self.retrievals_highres['sigOLCor'] = ret[0] self.retrievals_highres['BGOLCor'] = ret[1] self.retrievals_highres['heightFullOverCor'] = ret[2]
[docs] def calcDepol(self): """calculate the volume depol and the particle depol """ for ret_prof_name in self.retrievals_profile['avail_optical_profiles']: print(ret_prof_name) self.retrievals_profile[ret_prof_name] = depolarization.voldepol_cldFreeGrps( self, ret_prof_name) self.retrievals_profile[ret_prof_name] = depolarization.pardepol_cldFreeGrps( self, ret_prof_name)
[docs] def estQualityMask(self): """estimate the quality mask """ self.retrievals_highres['quality_mask'] = qualityMask.qualityMask(self)
[docs] def Angstroem(self): """calculate the angstrom exponent """ for ret_prof_name in self.retrievals_profile['avail_optical_profiles']: print(ret_prof_name) self.retrievals_profile[ret_prof_name] = angstroem.ae_cldFreeGrps( self, ret_prof_name)
[docs] def LidarCalibration(self): """calculate the lidar constant .. TODO:: Find out how we prioritise raman, klett, and database retrieved LC... """ self.LC['klett'] = lidarconstant.lc_for_cldFreeGrps( self, 'klett') self.LC['raman'] = lidarconstant.lc_for_cldFreeGrps( self, 'raman') logging.warning('reading calibration constant from database not working yet') # Prioritise Raman retrieved LCs but use Klett retrieved ones when no Raman retrieval exists. self.LCused = select.single_best(self.LC['klett'], 'LC', 'LCStd') | select.single_best(self.LC['raman'], 'LC', 'LCStd')
[docs] def attBsc_volDepol(self): """highres attBsc and voldepol in 2d """ # for now try with mutable state in data_cube logging.info('attBsc 2d retrieval') highres.attbsc_2d(self) logging.info('voldepol 2d retrieval') highres.voldepol_2d(self)
[docs] def molecularHighres(self): """calculate the molecular signal for the 2d high resolution """ self.mol_2d = molecular.calc_2d( self.met.ds)
[docs] def quasiV1(self): """quasiV1 retrivals and target categorisation. """ quasiV1.quasi_bsc(self) quasi.quasi_pdr(self, version='V1') quasi.quasi_angstrom(self, version='V1') quasi.target_cat(self, version='V1')
[docs] def quasiV2(self): """quasiV2 retrivals and target categorisation. """ quasiV2.quasi_bsc(self) quasi.quasi_pdr(self, version='V2') quasi.quasi_angstrom(self, version='V2') quasi.target_cat(self, version='V2')
[docs] def write_2_sql_db(self, parameter:str, db_path:str|None=None, method:str|None=None): """ write LC or eta to sqlite db table Parameters ---------- parameter : str can be LC (Lidar-calibration-constant) or DC (Depol-calibration-constant) method : str 'raman' or 'klett' db_path : str location of the sqlite db-file Notes ----- The unique columns are needed that new entries overwrite old ones, otherwise they are just added to the table with same timestamps. """ if db_path == None: db_path = self.polly_config_dict['calibrationDB'] logging.info(f"read db_path from polly_config_dict {db_path}") if parameter == 'LC': table_name = 'lidar_calibration_constant' column_names = [ 'cali_start_time', 'cali_stop_time', 'liconst', 'uncertainty_liconst', 'used_for_processing', 'wavelength', 'nc_zip_file', 'polly_type', 'cali_method', 'telescope'] data_types = ['text', 'text', 'real', 'real', 'integer', 'text', 'text', 'text', 'text', 'text'] unique=', UNIQUE(cali_start_time, cali_stop_time, wavelength, polly_type, telescope, cali_method)' elif parameter == 'DC': table_name = 'depol_calibration_constant' column_names = [ 'cali_start_time', 'cali_stop_time', 'depol_const', 'uncertainty_depol_const', 'used_for_processing', 'wavelength', 'telescope', 'nc_zip_file', 'polly_type'] data_types = ['text', 'text', 'real', 'real', 'integer', 'text', 'text', 'text', 'text'] unique=', UNIQUE(cali_start_time, cali_stop_time, wavelength, polly_type, telescope)' assert len(column_names) == len(data_types), 'column names do not match data types' logging.info(f'writing to sqlite-db: {db_path}') logging.info(f'writing {parameter} to table: {table_name}') sql_db.setup_empty(db_path, table_name, column_names, data_types, unique=unique) rows_to_insert = sql_db.prepare_for_sql_db_writing(self, parameter, method) sql_db.write_rows_to_sql_db(db_path, table_name, column_names, rows_to_insert)
[docs] def read_calibration_db(self, db_path:str|None=None): """read the calibration constants from database time interval includes 24h before and after the actual date """ if db_path == None: db_path = self.polly_config_dict['calibrationDB'] logging.info(f"read db_path from polly_config_dict {db_path}") ts_interval = self.retrievals_highres['time'][0], self.retrievals_highres['time'][-1] table_name = 'lidar_calibration_constant' self.LC.update(sql_db.get_from_sql_db(db_path, table_name, ts_interval)) table_name = 'depol_calibration_constant' self.pol_cali.update(sql_db.get_from_sql_db(db_path, table_name, ts_interval))
[docs] def adding_retrieving_infos_2_polly_config_dict(self): """ some infos from the polly_config_dict should have there own keys, e.g. reference_search_range Parameters ---------- self Returns ------- self (with added polly_config dict_keys) """ lower_overlap = np.array(self.polly_config_dict['heightFullOverlap']) reference_search_range_355_total_FR = [int(lower_overlap[self.flag_355_total_FR][0]),self.polly_config_dict['maxDecomHeight355']] reference_search_range_532_total_FR = [int(lower_overlap[self.flag_532_total_FR][0]),self.polly_config_dict['maxDecomHeight532']] reference_search_range_1064_total_FR = [int(lower_overlap[self.flag_1064_total_FR][0]),self.polly_config_dict['maxDecomHeight1064']] self.polly_config_dict['reference_search_range_355_total_FR'] = reference_search_range_355_total_FR self.polly_config_dict['reference_search_range_532_total_FR'] = reference_search_range_532_total_FR self.polly_config_dict['reference_search_range_1064_total_FR'] = reference_search_range_1064_total_FR return self
# def __str__(self): # return f"{self.rawdata_dict}" def __del__(self): type(self).counter -= 1