Source code for ppcpy.io.loadConfigs
from . import *
import json
import pandas as pd
import numpy as np
import traceback
[docs]
def loadPicassoConfig(picasso_config_file, picasso_default_config_file):
picasso_default_config_file_path = Path(picasso_default_config_file)
picasso_config_file_path = Path(picasso_config_file)
picasso_config_dict = {}
if picasso_default_config_file_path.is_file():
logging.info(f'picasso_default_config_file: {picasso_default_config_file}')
try:
picasso_default_config_file_json = open(picasso_default_config_file, "r")
picasso_default_config_file_dict = json.load(picasso_default_config_file_json)
except Exception:
logging.warning('picasso_default_config_file: {picasso_default_config_file} can not be read.', exc_info=True)
if picasso_config_file_path.is_file():
logging.info(f'picasso_config_file: {picasso_config_file}')
try:
picasso_config_file_json = open(picasso_config_file, "r")
picasso_config_file_dict = json.load(picasso_config_file_json)
## check if key exists in the picasso_config_file, if yes, take that one instead of the one from the picasso_default_config_file
for key in picasso_default_config_file_dict.keys():
if key in picasso_config_file_dict.keys():
picasso_config_dict[key] = picasso_config_file_dict[key]
else:
picasso_config_dict[key] = picasso_default_config_file_dict[key]
## check if a key in the picasso_config_file exists, but not in picasso_default_config_file
for key in picasso_config_file_dict.keys():
if key not in picasso_default_config_file_dict.keys():
picasso_config_dict[key] = picasso_config_file_dict[key]
return picasso_config_dict
except Exception:
logging.critical('picasso_default_config_file: {picasso_default_config_file} can not be read.', exc_info=True)
else:
logging.warning(f'picasso config file: {picasso_config_file} does not exist')
logging.info(f'picasso_default_config_file: {picasso_default_config_file} will be used')
return picasso_default_config_file_dict
else:
logging.critical(f'picasso_default_config_file: {picasso_default_config_file} can not be found. Aborting')
return None
[docs]
def readPollyNetConfigLinkTable(polly_config_table_file, timestamp, device):
"""
"""
polly_config_table_file_path = Path(polly_config_table_file)
if polly_config_table_file_path.is_file():
logging.info(f'pollynet_config_link_file: {polly_config_table_file}')
excel_file_ds = pd.read_excel(f'{polly_config_table_file}', engine='openpyxl')
## search for timerange for given timestamp
timestamp = str(timestamp)
after_start_date = excel_file_ds['Starttime of config'] <= timestamp
before_end_date = excel_file_ds['Stoptime of config'] >= timestamp
between_two_dates = after_start_date & before_end_date
filtered_result = excel_file_ds.loc[between_two_dates]
# print(filtered_result)
## get config-file for timeperiod and instrument
config_array = filtered_result.loc[(filtered_result['Instrument'] == device)]
if len(config_array) > 0:
#polly_config_file = str(config_array['Config file'].to_string(index=False)).strip() ## get rid of whtiespaces
#return polly_config_file
return config_array
else:
logging.warning(f'no polly-config file could be found for {device}@{timestamp}.')
return pd.DataFrame()
else:
logging.warning(f'polly_config_table_file: {polly_config_table_file} can not be found.')
return pd.DataFrame()
[docs]
def fix_indexing(config_dict, keys=['first_range_gate_indx',]):
"""
"""
if not 'indexing_convention' in config_dict:
logging.warning(f'indexing_convention not given, assuming 1based')
config_dict['indexing_convention'] = '1based'
for k in keys:
if k in config_dict.keys():
if isinstance(config_dict[k], list):
config_dict[k] = (np.array(config_dict[k])-1).tolist()
else:
config_dict[k] = config_dict[k] - 1
return config_dict
[docs]
def loadPollyConfig(polly_config_file, polly_default_config_file):
"""
"""
polly_default_config_file_path = Path(polly_default_config_file)
polly_config_file_path = Path(polly_config_file)
polly_config_dict = {}
if polly_default_config_file_path.is_file():
logging.info(f'polly_default_config_file: {polly_default_config_file}')
try:
polly_default_config_file_json = open(polly_default_config_file, "r")
polly_default_config_file_dict = json.load(polly_default_config_file_json)
except Exception:
logging.critical(f'polly_default_config_file: {polly_default_config_file} can not be read.', exc_info=True)
if polly_config_file_path.is_file():
logging.info(f'polly_config_file: {polly_config_file}')
try:
polly_config_file_json = open(polly_config_file, "r")
polly_config_file_dict = json.load(polly_config_file_json)
except Exception:
logging.warning(f'polly_config_file: {polly_config_file} can not be read.', exc_info=True)
logging.info('keys default/template file, but not in specific file {}'.format(
set(polly_default_config_file_dict.keys()) - set(polly_config_file_dict.keys())))
logging.info('keys specific file, but not in default/template file {}'.format(
set(polly_config_file_dict.keys()) - set(polly_default_config_file_dict.keys())))
try:
## check if key exists in the polly_config_file, if yes, take that one instead of the one from the polly_default_config_file
for key in polly_default_config_file_dict.keys():
if key in polly_config_file_dict.keys():
polly_config_dict[key] = polly_config_file_dict[key]
continue
else:
if key == 'prodSaveList':
polly_config_dict[key] = polly_default_config_file_dict[key]
continue
# check if isFR is there -> config file
# if not -> default file
if 'isFR' in polly_config_file_dict:
channels = len(polly_config_file_dict['isFR']) ## isFR is a key, which has to be in the local-polly-config
if isinstance(polly_default_config_file_dict[key], list) and len(polly_default_config_file_dict[key]) == channels:
polly_config_dict[key] = polly_default_config_file_dict[key]
elif isinstance(polly_default_config_file_dict[key], list) and len(polly_default_config_file_dict[key]) > 4 and len(polly_default_config_file_dict[key]) != channels:
## number of channels from the default-polly-config has to be adapted to the correct number of channels, taken from the local-polly-config
if len(polly_default_config_file_dict[key]) > channels:
polly_config_dict[key] = polly_default_config_file_dict[key][:channels]
elif len(polly_default_config_file_dict[key]) < channels:
polly_config_dict[key] = polly_default_config_file_dict[key]
last_element = polly_default_config_file_dict[key][-1]
extension_length = channels - len(polly_default_config_file_dict[key])
polly_config_dict[key].extend([last_element] * extension_length)
else:
polly_config_dict[key] = polly_default_config_file_dict[key]
else:
polly_config_dict[key] = polly_default_config_file_dict[key]
## check if a key in the polly_config_file exists, but not in polly_default_config_file
for key in polly_config_file_dict.keys():
if key not in polly_default_config_file_dict.keys():
polly_config_dict[key] = polly_config_file_dict[key]
if 'first_range_gate_indx' in polly_default_config_file_dict.keys():
fix_indexing_keys = ['first_range_gate_indx']
elif 'LC' in polly_default_config_file_dict.keys():
fix_indexing_keys = ['LC']
return fix_indexing(polly_config_dict, keys=fix_indexing_keys)
except Exception:
logging.warning(f'polly_config_file: {polly_config_file} can not be processed.', exc_info=True)
else:
logging.warning(f'polly_config_file: {polly_config_file} does not exist')
logging.warning(f'polly_default_config_file: {polly_default_config_file} will be used')
if 'first_range_gate_indx' in polly_default_config_file_dict.keys():
fix_indexing_keys = ['first_range_gate_indx']
elif 'LC' in polly_default_config_file_dict.keys():
fix_indexing_keys = ['LC']
return fix_indexing(polly_default_config_file_dict)
else:
logging.critical(f'polly_default_config_file: {polly_default_config_file} can not be found. Aborting')
return None
[docs]
def checkPollyConfigDict(polly_config_dict:dict) -> dict:
"""
Check and potentially modify polly config dict
Parameters:
- polly_config_dict (dict): polly config dict to be checked
Output:
- new_polly_config_dict (dict): checked (and modified) polly config dict
"""
logging.info(".. checking polly config dict")
new_polly_config_dict = polly_config_dict.copy()
# Checking Background correction values:
variables = ['bgCorRangeIndxLow', 'bgCorRangeIndxHigh']
channels = len(polly_config_dict['isFR'])
for i, var in enumerate(variables):
if var in polly_config_dict.keys():
if isinstance(polly_config_dict[var], list):
# check length, shorten if to long, raise an error if to short
if len(polly_config_dict[var]) > channels:
logging.warning(f'length of {var} exceeds the number of channels ({len(polly_config_dict[var])} vs {channels}). Only the first {channels} values will be considered.', exc_info=True)
new_polly_config_dict[var] = polly_config_dict[var][:channels]
elif len(polly_config_dict[var]) < channels:
logging.critical(f'length of {var} is less than the number of channels ({len(polly_config_dict[var])} vs {channels}).')
raise IndexError(f'length of {var} is less than the number of channels ({len(polly_config_dict[var])} vs {channels}). Check polly config.')
else:
# Raise an error
logging.critical(f'only support {var} of type list not {type(polly_config_dict[var])}.')
raise TypeError(f"only support {var} of type list not {type(polly_config_dict[var])}. Check polly config")
else:
if 'bgCorRangeIndx' in polly_config_dict.keys():
# make an list out of the i'th value of 'bgCorRangeIndx'
logging.warning(f"no {var} was given. Uses the {'second' if i else 'first'} value of 'bgCorRangeIndx' for all channels.", exc_info=True)
new_polly_config_dict[var] = [int(polly_config_dict['bgCorRangeIndx'][i])]*channels
else:
# Raise an error
logging.critical(f"no {var} or 'bgCorRangeIndx' was given.")
raise KeyError(f"no {var} or 'bgCorRangeIndx' was given. Check polly config.")
# Remove bgCorRangeIndx
if 'bgCorRangeIndx' in new_polly_config_dict:
new_polly_config_dict.pop('bgCorRangeIndx')
# Can add additional checks to this function.
return new_polly_config_dict