Source code for ppcpy.misc.helper

import sys
import os
import re
import shutil
import logging
from netCDF4 import Dataset
import platform
import xarray
from zipfile import ZipFile, ZIP_DEFLATED
from pathlib import Path

import numpy as np
from scipy.ndimage import uniform_filter1d

[docs] def detect_path_type(fullpath): """ Detect the type of path (Windows or Linux) based on the input. """ def is_windows_path(fullpath): if '\\' in str(fullpath): return True else: return False def is_linux_path(fullpath): if '/' in str(fullpath): return True else: return False if is_windows_path(fullpath): # Parse the drive letter and path drive_letter, path_part = str(fullpath).split(':', 1) # Remove leading backslash from path_part path_part = path_part.lstrip('\\') path_object = Path(drive_letter + ':' + '\\' + path_part) return path_object elif is_linux_path(fullpath): path_object = Path(fullpath) return path_object else: # that happens if no folders are given return Path(fullpath)
[docs] def os_name(): return platform.system()
[docs] def get_input_path(timestamp,device,raw_folder): YYYY=timestamp[0:4] MM=timestamp[4:6] DD=timestamp[6:8] input_path = Path(raw_folder,device,"data_zip",f"{YYYY}{MM}") logging.info(f'checking data availability for {device} at {YYYY}-{MM}-{DD}') return input_path
[docs] def get_pollyxt_files(timestamp,device,raw_folder,output_path): ''' This function locates multiple pollyxt level0 nc-zip files from one day measurements, unzipps the files to output_path and returns a list of files to be merged and the title of the new merged nc-file ''' input_path = get_input_path(timestamp,device,raw_folder) path_exist = Path(input_path) if path_exist.exists() == True: ## set the searchpattern for the zipped-nc-files: YYYY=timestamp[0:4] MM=timestamp[4:6] DD=timestamp[6:8] zip_searchpattern = str(YYYY)+'_'+str(MM)+'_'+str(DD)+'*_*[0-9].nc.zip' polly_files = Path(r'{}'.format(input_path)).glob('{}'.format(zip_searchpattern)) polly_zip_files_list0 = [x for x in polly_files if x.is_file()] ## convert type path to type string polly_zip_files_list = [] for file in polly_zip_files_list0: polly_zip_files_list.append(str(file)) if len(polly_zip_files_list) < 1: print('no files found!') sys.exit() # Ensure the destination directory exists Path(output_path).mkdir(parents=True, exist_ok=True) polly_files_list = [] to_unzip_list = [] for zip_file in polly_zip_files_list: ## check for size of zip-files to ensure to exclude bad measurement files with wrong timestamp e.g. 19700101 f_size = os.path.getsize(zip_file) print(zip_file) if f_size > 500000: print(f_size) print("filesize passes") else: print(f_size) print("filesize too small, file will be skipped!") continue ## go to next file unzipped_nc = Path(zip_file).name unzipped_nc = Path(unzipped_nc).stem unzipped_nc = Path(output_path,unzipped_nc) polly_files_list.append(unzipped_nc) path = Path(unzipped_nc) ## check if unzipped files already exists in outputfolder if path.is_file() == False: to_unzip_list.append(zip_file) if path.is_file() == True: os.remove(unzipped_nc) to_unzip_list.append(zip_file) ## unzipping date_pattern = str(YYYY)+'_'+str(MM)+'_'+str(DD) if len(to_unzip_list) > 0: ## if working remotly on windows, copy zipped files first, than unzip if os_name().lower() == 'windows': print("\nCopy zipped files to local drive...") for zip_file in to_unzip_list: print(zip_file) shutil.copy2(Path(zip_file), Path(output_path) / Path(zip_file).name) print("\nUnzipping...") for zip_file in Path(output_path).iterdir(): if zip_file.is_file() and date_pattern in zip_file.stem and zip_file.suffix == '.zip': with ZipFile(zip_file, 'r') as zip_ref: print("unzipping "+str(zip_file)) zip_ref.extractall(output_path) print("Removing .zip file...") os.remove(zip_file) else: print("\nUnzipping...") for zip_file in to_unzip_list: with ZipFile(zip_file, 'r') as zip_ref: print("unzipping "+zip_file) zip_ref.extractall(output_path) ## sort lists polly_files_list.sort() print("\n"+str(len(polly_files_list))+" files found:\n") print(polly_files_list) print("\n") else: print("\nNo data was found in {}. Correct path?\n".format(input_path)) sys.exit() return polly_files_list
[docs] def get_pollyxt_logbook_files(timestamp,device,raw_folder,output_path): ''' This function locates multiple pollyxt logbook-zip files from one day measurements, unzipps the files to output_path and merge them to one file ''' input_path = get_input_path(timestamp,device,raw_folder) path_exist = Path(input_path) if path_exist.exists() == True: ## set the searchpattern for the zipped-nc-files: YYYY=timestamp[0:4] MM=timestamp[4:6] DD=timestamp[6:8] zip_searchpattern = str(YYYY)+'_'+str(MM)+'_'+str(DD)+'*_*laserlogbook*.zip' polly_laserlog_files = Path(r'{}'.format(input_path)).glob('{}'.format(zip_searchpattern)) polly_laserlog_zip_files_list0 = [x for x in polly_laserlog_files if x.is_file()] ## convert type path to type string polly_laserlog_zip_files_list = [] for file in polly_laserlog_zip_files_list0: polly_laserlog_zip_files_list.append(str(file)) if len(polly_laserlog_zip_files_list) < 1: print('no laserlogbook-files found!') sys.exit() polly_laserlog_files_list = [] to_unzip_list = [] for zip_file in polly_laserlog_zip_files_list: unzipped_logtxt = Path(zip_file).name unzipped_logtxt = Path(unzipped_logtxt).stem unzipped_logtxt = Path(output_path,unzipped_logtxt) polly_laserlog_files_list.append(unzipped_logtxt) path = Path(unzipped_logtxt) to_unzip_list.append(zip_file) ## unzipping if len(to_unzip_list) > 0: for zip_file in to_unzip_list: with ZipFile(zip_file, 'r') as zip_ref: print("unzipping "+zip_file) zip_ref.extractall(output_path) ## sort lists polly_laserlog_files_list.sort() print("\n"+str(len(polly_laserlog_files_list))+" laserlogfiles found:\n") print(polly_laserlog_files_list) print("\n") ## concat the txt files result_file = Path(output_path,"result.txt") with open(result_file, "wb") as outfile: for logf in polly_laserlog_files_list: with open(logf, "rb") as infile: outfile.write(infile.read()) ## delete every single logbook-file from unzipped-folder os.remove(logf) laserlog_filename = polly_laserlog_files_list[0] laserlog_filename = Path(laserlog_filename).name laserlog_filename_left = re.split(r'_[0-9][0-9]_[0-9][0-9]_[0-9][0-9]\.nc',laserlog_filename)[0] laserlog_filename = f'{laserlog_filename_left}_00_00_01.nc.laserlogbook.txt' destination_file = Path(output_path,laserlog_filename) # Open the source file in binary mode and read its content with open(result_file, 'rb') as source: # Open the destination file in binary mode and write the content with open(destination_file, 'wb') as destination: destination.write(source.read()) os.remove(result_file) else: print("\nNo laserlogbook was found in {}. Correct path?\n".format(input_path)) return ()
[docs] def add_to_list(element, from_list, to_list): if from_list[element] in to_list: pass else: to_list.append(from_list[element])
[docs] def checking_vars(timestamp,device,raw_folder,output_path): ## select only those nc-files where the values of some specific variables haven't changed vars_of_interest = [ 'measurement_height_resolution', 'laser_rep_rate', 'laser_power', # 'laser_flashlamp', 'location_height', 'neutral_density_filter', # 'location_coordinates', 'pm_voltage', 'pinhole', 'polstate', 'telescope', 'deadtime_polynomial', 'discr_level', 'if_center', 'if_fwhm', 'zenithangle' ] polly_files_list = get_pollyxt_files(timestamp,device,raw_folder,output_path) if len(polly_files_list) == 1: return polly_files_list polly_file_ds_ls = [] for files in polly_files_list: polly_file_ds = Dataset(files,"r") polly_file_ds_ls.append(polly_file_ds) selected_var_nc_ls=[] diff_var=0 print('\n') print('checking differences in selected variables ...') for ds in range(0,len(polly_file_ds_ls)-1): # print('\n') # print(polly_files_list[ds] + ' vs. ' + polly_files_list[ds+1]) for var in vars_of_interest: if var in polly_file_ds_ls[ds].variables.keys(): ## check if var is available within the polly-datastructure (depending on polly-system) var_value_1=str(polly_file_ds_ls[ds].variables[var][:]) var_value_2=str(polly_file_ds_ls[ds+1].variables[var][:]) # print(var + ": " + var_value_1) # print(var + ": " + var_value_2) if var_value_1 == var_value_2 and diff_var==0: # print('no difference found ...') add_to_list(ds,polly_files_list,selected_var_nc_ls) elif var_value_1 != var_value_2 and diff_var==0: print('difference found in var:') print(var) #print(var + ": " + var_value_1) #print(var + ": " + var_value_2) diff_var=1 add_to_list(ds,polly_files_list,selected_var_nc_ls) if force == True else None elif var_value_1 == var_value_2 and diff_var!=0: add_to_list(ds,polly_files_list,selected_var_nc_ls) if force == True else None elif var_value_1 != var_value_2 and diff_var!=0: diff_var=diff_var+1 print('difference found!') add_to_list(ds,polly_files_list,selected_var_nc_ls) if force == True else None polly_file_ds_ls[ds].close() if diff_var==0: add_to_list(-1,polly_files_list,selected_var_nc_ls) print('\nno differences found in selected variables!\n') elif diff_var!=0: # add_to_list(-1,polly_files_list,selected_var_nc_ls) if force == True else None ## if force==true, merge, but if force==false: the whole day will not be in list anymore if force == True: add_to_list(-1,polly_files_list,selected_var_nc_ls) print('\ndifferences found in selected variables! But will be force-merged.\n') else: print('\ndifferences found in selected variables! Selected Date will be skipped.\n') for el in polly_files_list: os.remove(el) sys.exit() return selected_var_nc_ls
[docs] def checking_attr(timestamp,device,raw_folder,output_path): ## select only those nc-files where the global attributes and the var-attributes haven't changed selected_var_nc_ls = checking_vars(timestamp,device,raw_folder,output_path) if len(selected_var_nc_ls) == 1: return selected_var_nc_ls polly_file_ds_ls = [] for files in selected_var_nc_ls: print(files) polly_file_ds = Dataset(files,"r") polly_file_ds_ls.append(polly_file_ds) selected_att_nc_ls = [] diff_att=0 diff_var_att=0 print('\n') print('checking differences in attributes ...') for ds in range(0,len(polly_file_ds_ls)-1): ## get global attributes as a list of strings # print(selected_var_nc_ls[ds] + ' vs. ' + selected_var_nc_ls[ds+1]) # print('\nglobal attributes:') for nc_attr in polly_file_ds_ls[0].ncattrs(): # att_value=repr(input_nc_file.getncattr(nc_attr)) att_value_1=polly_file_ds_ls[ds].getncattr(nc_attr) att_value_2=polly_file_ds_ls[ds+1].getncattr(nc_attr) # print(nc_attr) # print(" " + att_value_1) # print(" " + att_value_2) if att_value_1 == att_value_2 and diff_att==0: add_to_list(ds,selected_var_nc_ls,selected_att_nc_ls) elif att_value_1 != att_value_2 and diff_att==0: print('difference found!') print(nc_attr) print(" " + att_value_1) print(" " + att_value_2) diff_att=1 add_to_list(ds,selected_var_nc_ls,selected_att_nc_ls) if force == True else None elif att_value_1 == att_value_2 and diff_att!=0: add_to_list(ds,selected_var_nc_ls,selected_att_nc_ls) if force == True else None elif att_value_1 != att_value_2 and diff_att!=0: print('difference found!') add_to_list(ds,selected_var_nc_ls,selected_att_nc_ls) if force == True else None # print("\nvariable attributes:") for var in polly_file_ds_ls[0].variables.keys(): # print(var) for var_att in polly_file_ds_ls[0].variables[var].ncattrs(): var_att_value_1 = polly_file_ds_ls[ds].variables[var].getncattr(var_att) var_att_value_2 = polly_file_ds_ls[ds+1].variables[var].getncattr(var_att) # print(" " + var_att) # print(" " + var_att_value_1) # print(" " + var_att_value_2) if var_att_value_1 == var_att_value_2 and diff_var_att==0: pass elif var_att_value_1 != var_att_value_2 and diff_var_att==0: print('difference found!') print(" " + var_att) print(" " + var_att_value_1) print(" " + var_att_value_2) diff_var_att=1 elif var_att_value_1 == var_att_value_2 and diff_var_att!=0: pass elif var_att_value_1 != var_att_value_2 and diff_var_att!=0: print('difference found!') for ds in range(0,len(polly_file_ds_ls)-1): polly_file_ds_ls[ds].close() if diff_att==0: add_to_list(-1,selected_var_nc_ls,selected_att_nc_ls) print('\nno differences found in global attributes!\n') elif diff_att!=0: # add_to_list(-1,selected_var_nc_ls,selected_att_nc_ls) if force == True else None ## if force==true, merge, but if force==false: the whole day will not be in list anymore if force == True: add_to_list(-1,selected_var_nc_ls,selected_att_nc_ls) print('\ndifferences found in global attributes! But will be force-merged.\n') else: print('\ndifferences found in global attributes! Selected Date will be skipped.\n') for el in polly_files_list: os.remove(el) sys.exit() if diff_var_att==0: print('\nno differences found in variable attributes!\n') # elif diff_var_att != 0: # ## if force==true, merge, but if force==false: the whole day will not be in list anymore # if force == True: # add_to_list(-1,selected_var_nc_ls,selected_att_nc_ls) # print('\ndifferences found in variable attributes! But will be force-merged.\n') # else: # print('\ndifferences found in variable attributes! Selected Date will be skipped.\n') # sys.exit() print(selected_att_nc_ls) return selected_att_nc_ls
[docs] def checking_timestamp(timestamp,device,raw_folder,output_path): selected_timestamp_nc_ls = checking_attr(timestamp,device,raw_folder,output_path) if len(selected_timestamp_nc_ls) == 1: return selected_timestamp_nc_ls selected_cor_timestamp_nc_ls = [] polly_file_ds_ls = [] print('checking for correct timestamps...') for files in selected_timestamp_nc_ls: polly_file_ds = Dataset(files,"r") polly_file_ds_ls.append(polly_file_ds) for elementNR,ds in enumerate(polly_file_ds_ls): # print(selected_timestamp_nc_ls[elementNR]) timestamp_ds = ds.variables['measurement_time'][:] if 19700101 in timestamp_ds.T[0]: print(f'The file: {selected_timestamp_nc_ls[elementNR]} contains incorrect timestamps!') print('Trying to correct timestamps...') ## get correct timestamp_ds from filename timestamp_filename = selected_timestamp_nc_ls[elementNR] timestamp_filename = timestamp_filename.stem timestamp_filename = re.split(r'_',str(timestamp_filename))[-3:] ## del. nc-file #os.remove(selected_timestamp_nc_ls[elementNR]) ### remove unzipped nc-file with incorrect timestamps ## calc. the deltaT between measurementdatapoints laser_rep_rate = float(ds.variables['laser_rep_rate'][0]) measurement_shots = ds.variables['measurement_shots'][:] measurement_shots_nonzero = [elem for row in measurement_shots for elem in row if elem > 0] exit if len(measurement_shots_nonzero) == 0: print('length of measurement_shots_nonzero equals 0. file will be removed from merging list.') ds.close() continue else: measurement_shots_average = sum(measurement_shots_nonzero) / len(measurement_shots_nonzero) deltaT = measurement_shots_average / laser_rep_rate deltaT = int(round(deltaT,0)) ## unit in seconds ## calc. the correct seconds of day for this dataset start_seconds = int(timestamp_filename[0])*3600 + int(timestamp_filename[1])*60 + int(timestamp_filename[2]) ## length of measurement_list len_measurement_list = len(timestamp_ds) ## create new measurement_time list seconds_ls = [] t = start_seconds for i in range(1, len_measurement_list+1): seconds_ls.append(t) t = t + deltaT ## check if seconds_ls does not contain seonds of day larger than 86400 ## TODO ## if so, remove file from list and del. file ## TODO t_check = any(value > 86400 for value in seconds_ls) #t_check = False ## do not skip files which are longer than 24h, seconds_ls > 86400 if t_check == True: print('seconds of day exceeds 86400. file will be removed from merging list.') ds.close() continue else: seconds_iter = iter(seconds_ls) new_measurement_time_list = [[int(timestamp), next(seconds_iter)] for i in range(1, len_measurement_list+1)] ## create a new netCDF4 file to write the dataset new_dataset = Dataset(f'{selected_timestamp_nc_ls[elementNR]}_dummy', mode='w') print(f'{selected_timestamp_nc_ls[elementNR]}_dummy') ## copy the entire dataset to the new file new_dataset.setncatts(ds.__dict__) for name, dim in ds.dimensions.items(): new_dataset.createDimension(name, len(dim) if not dim.isunlimited() else None) for name, var in ds.variables.items(): new_var = new_dataset.createVariable(name, var.dtype, var.dimensions) if name == 'measurement_time': new_var[:] = new_measurement_time_list else: new_var[:] = var[:] ds.close() new_dataset.close() os.remove(selected_timestamp_nc_ls[elementNR]) ### remove unzipped nc-file with incorrect timestamps os.rename(f'{selected_timestamp_nc_ls[elementNR]}_dummy',selected_timestamp_nc_ls[elementNR]) print('timestamps corrected.') selected_cor_timestamp_nc_ls.append(selected_timestamp_nc_ls[elementNR]) else: print(f'The file: {selected_timestamp_nc_ls[elementNR]} passes timestamp check.') selected_cor_timestamp_nc_ls.append(selected_timestamp_nc_ls[elementNR]) if ds.isopen(): ds.close() print('\nthe following '+str(len(selected_cor_timestamp_nc_ls))+' files can be merged:') print(selected_cor_timestamp_nc_ls) return selected_cor_timestamp_nc_ls
[docs] def concat_files(timestamp,device,raw_folder,output_path): ## merge selected files # concat='concat' sel_polly_files_list = checking_timestamp(timestamp,device,raw_folder,output_path) if len(sel_polly_files_list) == 0: print('no files found for this day. no merging.') return () polly_files_no_path = Path(sel_polly_files_list[0]).name filestring_left = str(re.split(r'_[0-9][0-9]_[0-9][0-9]_[0-9][0-9]', polly_files_no_path)[0]) filestring_dummy = f"{filestring_left}_00_00_01_dummy.nc" filestring = f"{filestring_left}_00_00_01.nc" if len(sel_polly_files_list) == 1: print("\nOnly one file found. Nothing to merge!\n") os.rename(sel_polly_files_list[0],Path(output_path,filestring)) return () else: # sel_polly_files_list = [ str(el) for el in sel_polly_files_list] ## parameters for controlling the merging process compat='override' ## Values of variable "laser_flashlamp" often changes, but those files will be merged anyway. This option picks the value from first dataset. coords='minimal' ds = xarray.open_mfdataset(sel_polly_files_list,combine = 'nested', data_vars="minimal", concat_dim="time", compat=compat, coords=coords) ## save to a single nc-file print(f"\nmerged nc-file '{filestring}' will be stored to '{output_path}'") print("\nwriting merged file ...") ## adding compression-level encoding def write_netcdf(ds: xarray.Dataset, out_file: Path) -> None: enc = {} for k in ds.data_vars: if ds[k].ndim < 2: continue enc[k] = { "zlib": True, "complevel": 1, # "fletcher32": True, # "chunksizes": tuple(map(lambda x: x//2, ds[k].shape)) } ds.to_netcdf(out_file, format="NETCDF4", engine="netcdf4", encoding=enc) write_netcdf(ds=ds,out_file=Path(output_path,filestring_dummy)) ds.close() print("\ndeleting individual .nc files ...") for el in sel_polly_files_list: print(el) os.remove(el) destination_file = Path(output_path,filestring) if os.path.exists(destination_file): os.remove(destination_file) # Remove the existing destination file os.rename(Path(output_path,filestring_dummy),destination_file) print('done!') return destination_file
[docs] def remove_whitespaces_and_replace_dash_with_underscore(string): new_string = string.replace(" ", "").replace("-", "_") return new_string
[docs] def find_matching_dimension(array, reference_list): """ Finds the dimension of a 3D array that matches the length of the reference list. Args: array (np.ndarray): The 3D NumPy array to check. reference_list (list): The list to compare the dimension lengths with. This can also be a dict. Returns: int: The index of the matching dimension, or -1 if no match is found. """ list_length = len(reference_list) for dim_index, dim_size in enumerate(array.shape): if dim_size == list_length: return dim_index return -1 # No matching dimension found
[docs] def channel_2_variable_mapping(data_retrievals, var, channeltags_dict): ## check length of channeltags_dict vs. length of data_retrievals[var].shape channel_dim = find_matching_dimension(array=data_retrievals[var], reference_list=channeltags_dict) for ch in channeltags_dict.keys(): ch_mod =remove_whitespaces_and_replace_dash_with_underscore(string=channeltags_dict[ch]) if len(data_retrievals[var].shape) == 3: if channel_dim == 0: data_retrievals[f'{var}_{ch_mod}'] = data_retrievals[var][ch,:,:] elif channel_dim == 1: data_retrievals[f'{var}_{ch_mod}'] = data_retrievals[var][:,ch,:] elif channel_dim == 2: data_retrievals[f'{var}_{ch_mod}'] = data_retrievals[var][:,:,ch] elif len(data_retrievals[var].shape) == 2: if channel_dim == 0: data_retrievals[f'{var}_{ch_mod}'] = data_retrievals[var][ch,:] elif channel_dim == 1: data_retrievals[f'{var}_{ch_mod}'] = data_retrievals[var][:,ch]
[docs] def mean_stable(x, win, minBin=None, maxBin=None, minRelStd=None): """Calculate the mean value of x based on the least fluctuated segment of x. The searching is based on the std inside each window of x. Parameters ---------- x : ndarray Signal array. win : int Window width for calculating the relative standard deviation. minBin : int, optional The start index for the mean calculation (default: 1). maxBin : int, optional The end index for the mean calculation (default: length of x). minRelStd : float, optional Minimum relative standard deviation threshold. Returns ------- xStable : float Stable mean value. xIndx : ndarray Index of the elements used to calculate the mean value. xRelStd : float Relative uncertainty of the sequences used to calculate the mean values. History ------- - 2021-05-30: First edition by Zhenping .. Authors: - zhenping@tropos.de """ # Default values for minBin and maxBin if minBin is None: minBin = 0 if maxBin is None: maxBin = len(x) # Handle NaN values and smooth the array flagNaN = np.isnan(x) x_smoothed = uniform_filter1d(x, size=win, mode='nearest') x_smoothed[flagNaN] = np.nan # If the range between maxBin and minBin is less than or equal to the window size if (maxBin - minBin + 1) <= win: xIndx = np.arange(minBin, maxBin + 1) xStable = np.nanmean(x_smoothed[minBin:maxBin + 1]) xRelStd = np.nanstd(x_smoothed[minBin:maxBin + 1]) / xStable return xStable, xIndx, xRelStd # Calculate relative standard deviation for each window relStd = [] for iX in range(minBin, maxBin - win + 1): window = x_smoothed[iX:iX + win] thisStd = np.nanstd(window) thisMean = np.nanmean(window) if np.sum(np.isnan(window)) / (win + 1) <= 0.2: relStd.append(thisStd / abs(thisMean)) else: relStd.append(np.nan) relStd = np.array(relStd) if np.all(np.isnan(relStd)): return None, None, None # Determine index with minimum relative standard deviation if minRelStd is None: indxTmp = np.nanargmin(relStd) indx = indxTmp + minBin else: thisRelStd = np.nanmin(relStd) indxTmp = np.nanargmin(relStd) if thisRelStd > minRelStd: return None, None, None indx = indxTmp + minBin # Calculate the stable mean value and its relative standard deviation xStable = np.nanmean(x_smoothed[indx:indx + win]) xIndx = np.arange(indx, indx + win) xRelStd = relStd[indxTmp] return xStable, xIndx, xRelStd
from scipy.sparse import diags
[docs] def smooth2a(matrix_in, Nr, Nc=None): """ Smooths 2D array data while ignoring NaNs. This function smooths the data in `matrix_in` using a mean filter over a rectangle of size (2*Nr+1)-by-(2*Nc+1). Each element is replaced by the mean of the surrounding rectangle, ignoring NaN elements. If an element is NaN, it remains NaN in the output. At the edges, as much of the rectangle as fits is used. Parameters: matrix_in (ndarray): Original matrix to be smoothed. Nr (int): Number of points used to smooth rows. Nc (int, optional): Number of points used to smooth columns. If not specified, Nc = Nr. Returns: ndarray: Smoothed version of the input matrix. References: - Written by Greg Reeves, March 2009, Division of Biology, Caltech. - Inspired by "smooth2" by Kelly Hilands, October 2004, Applied Research Laboratory, Penn State University. - Developed from code by Olof Liungman, 1997, Dept. of Oceanography, Earth Sciences Centre, Göteborg University. """ if Nc is None: Nc = Nr if not isinstance(Nr, (int, np.int64)) or not isinstance(Nc, (int, np.int64)): raise ValueError("Nr and Nc must be scalars!") # Matrix dimensions rows, cols = matrix_in.shape # Create smoothing matrices eL = diags(np.ones((2 * Nr + 1, rows)), np.arange(-Nr, Nr + 1), shape=(rows, rows)).toarray() eR = diags(np.ones((2 * Nc + 1, cols)), np.arange(-Nc, Nc + 1), shape=(cols, cols)).toarray() # Replace NaNs with 0 for summation mask_nan = np.isnan(matrix_in) matrix_in_filled = np.where(mask_nan, 0, matrix_in) # Compute normalizing matrix to count non-NaN elements normalize = eL @ (~mask_nan) @ eR normalize[mask_nan] = np.nan # Retain NaNs in the final output # Compute smoothed matrix matrix_out = (eL @ matrix_in_filled @ eR) / normalize return matrix_out
[docs] def get_wv_pol_telescope_from_dictkeyname(keyname): """ translate `{wavelength}_{total|cross|parallel|rr}_{NR|FR|DFOV}` to wavelength,polarisation,telescope separataly. Parameters: - keyname (str): e.g.: 532_total_FR. Output: - wavelength,polarisation,telescope """ wv = re.split(r'_',keyname)[0] pol = re.split(r'_',keyname)[1] tel = re.split(r'_',keyname)[2] return wv,pol,tel