Source code for ppcpy.preprocess.pollyPreprocess

import numpy as np
import logging
import datetime
import time
import itertools
from scipy.ndimage import label
from multiprocessing import Pool, cpu_count
#from ppcpy.preprocess.optimized_polyval import process_signal
#from ppcpy.preprocess.compute_pcr import compute_pcr
#import numpy as np
#from multiprocessing import Pool, cpu_count

from ppcpy.retrievals.collection import calc_snr

[docs] def compute_channel_pcr(args): """ Computes PCR for a single channel. Parameters: args: Tuple containing (rawSignal, mShots, scale_factor, channel_index). Returns: np.ndarray: Computed PCR for the given channel. """ rawSignal, mShots, scale_factor, ch = args return (rawSignal[:, :, ch] / mShots[:, np.newaxis, ch]) * scale_factor
[docs] def compute_pcr_parallel(rawSignal, mShots, scale_factor): """ Computes PCR using multiprocessing for channel-wise parallelism. Parameters: rawSignal: 3D input array (shape: [M, N, P]). mShots: 2D multiplicative factors array (shape: [M, P]). scale_factor: Scaling factor for the computation. Returns: np.ndarray: 3D output array (PCR) with the same shape as rawSignal. """ M, N, P = rawSignal.shape PCR = np.zeros((M, N, P), dtype=np.float64) # Prepare arguments for each channel args = [(rawSignal, mShots, scale_factor, ch) for ch in range(P)] # Use a pool of workers to compute each channel in parallel with Pool(processes=min(cpu_count(), P)) as pool: results = pool.map(compute_channel_pcr, args) # Collect the results for ch, result in enumerate(results): PCR[:, :, ch] = result return PCR
[docs] def faster_polyval(a,x): """faster version than np.polyval(), using numba would provide 10% increase, but with different function""" y = a[-1] for ai in a[-2::-1]: y *= x y += ai return y
#@jit(nopython=True) #def faster_polyval(p, x): # y = np.zeros(x.shape, dtype=float) # for i, v in enumerate(p): # y *= x # y += v # return y #@profile
[docs] def pollyDTCor(rawSignal,mShots,hRes, **varargin): ## Defining default values for param keys (key initialization), if not explictly defined when calling the function polly_device = varargin.get('device', False) flagDeadTimeCorrection = varargin.get('flagDeadTimeCorrection', False) DeadTimeCorrectionMode = varargin.get('DeadTimeCorrectionMode', 2) deadtimeParams = varargin.get('deadtimeParams', []) deadtime = varargin.get('deadtime', []) #print('mShots', np.all(mShots[:,0] == mShots[0,0]), mShots[0,0], np.min(mShots[:,0]), np.max(mShots[:,0])) if not np.all(mShots[:,0] == mShots[0,0]): logging.warning(f"... mShots not constant min {np.min(mShots)} max {np.max(mShots)}") mShots_norm = np.repeat(np.mean(mShots, axis=0)[np.newaxis,:], mShots.shape[0], axis=0) #print('mShots_norm', mShots_norm.shape, 'mShots_norm', mShots_norm[0,:]) logging.info(f'... Deadtime-correction (Mode: {DeadTimeCorrectionMode})') Nchannels = mShots.shape[1] scale_factor = 150.0 / hRes ## convert photon counts to Photon-Count-Rate PCR [MHz] # start_time_command1 = time.time() # compute_pcr(rawSignal.astype(np.float64), mShots.astype(np.float64), scale_factor, PCR) # Compute PCR in parallel # end_time_command1 = time.time() # elapsed_time_command1 = end_time_command1 - start_time_command1 # print(f"Time taken: {elapsed_time_command1:.4f} seconds") PCR = rawSignal * (150./hRes) / mShots[:,np.newaxis,:] #PCR_Cor = np.zeros_like(PCR) signalDTCor = np.zeros_like(PCR) ## Deadtime correction if flagDeadTimeCorrection: ## polynomial correction with parameters saved in the level0 netcdf-file under variable 'deadtime_polynomial' if DeadTimeCorrectionMode == 1: for iCh in range(Nchannels): # Extract polynomial coefficients for the channel and reverse their order coeffs = deadtime[:, iCh][::-1] #PCR_Cor[:,:,iCh] = np.polyval(deadtime[:,iCh][::-1], PCR[:,:,iCh]) #PCR_Cor[:,:,iCh] = faster_polyval(deadtime[:,iCh], PCR[:,:,iCh]) #signalDTCor[:,:,iCh] = PCR_Cor[:,:,iCh]*mShots_norm[:,np.newaxis,iCh]/(150./hRes) signalDTCor[:,:,iCh] = faster_polyval(deadtime[:,iCh], PCR[:,:,iCh])*mShots_norm[:,np.newaxis,iCh]/(150./hRes) ## nonparalyzable correction: PCR_cor = PCR / (1 - tau*PCR), with tau beeing the dead-time ## reading from polly-config file under key 'dT' (only the first value from each channel) elif DeadTimeCorrectionMode == 2: for iCh in range(Nchannels): #PCR_Cor[:,:,iCh]= PCR[:, :, iCh] / (1.0 - deadtimeParams[iCh][0] * 10**(-3) * PCR[:, :, iCh]) #signalDTCor[:, :, iCh] = PCR_Cor[:,:,iCh] * mShots_norm[:, np.newaxis, iCh] / scale_factor signalDTCor[:, :, iCh] = PCR[:, :, iCh] / (1.0 - deadtimeParams[iCh][0] * 10**(-3) * PCR[:, :, iCh]) * mShots_norm[:, np.newaxis, iCh] / scale_factor ## user defined deadtime, reading from polly-config file under key 'dT' (the whole matrix, polynome) elif DeadTimeCorrectionMode == 3: if np.array(deadtimeParams).size != 0: # deadtimeParams=np.array(deadtimeParams) # signal_out = np.zeros_like(PCR) # process_signal(PCR, mShots.astype(np.float64), deadtimeParams, Nchannels, scale_factor, signal_out) # PCR_Cor = PCR # Pre-extract polynomial coefficients for all channels and reverse their order coeffs_matrix = np.array([np.array(deadtimeParams[ch][::-1]) for ch in range(Nchannels)]) for iCh in range(Nchannels): # Evaluate the polynomial for the current channel #PCR_Cor[:,:,iCh] = np.polyval(coeffs_matrix[iCh], PCR[:, :, iCh]) #PCR_Cor[:,:,iCh] = faster_polyval(coeffs_matrix[iCh][::-1], PCR[:, :, iCh]) #signalDTCor[:, :, iCh] = PCR_Cor[:,:,iCh] * mShots_norm[:, np.newaxis, iCh] / scale_factor signalDTCor[:, :, iCh] = faster_polyval(coeffs_matrix[iCh][::-1], PCR[:, :, iCh]) * mShots_norm[:, np.newaxis, iCh] / scale_factor else: logging.warning(f'User defined deadtime parameters were not found in polly-config file.') logging.warning(f'In order to continue the current processing, deadtime correction will not be implemented.') ## No deadtime correction elif DeadTimeCorrectionMode == 4: logging.warning(f'Deadtime correction was turned off. Be careful to check the signal strength.') else: logging.error(f'Unknow deadtime correction setting! Please go back to check the configuration.') logging.error(f'For deadtimeCorrectionMode, only 1-4 is allowed.') #return PCR_Cor, signalDTCor return signalDTCor
[docs] def pollyRemoveBG(rawSignal:np.ndarray, bgCorrectionIndexLow:list, bgCorrectionIndexHigh:list, maxHeightBin:int=3000, firstBinIndex:list|None=None) -> tuple[np.ndarray, np.ndarray]: """ Background correction. Remove mean background noise from signal. Parameters: - rawSignal (np.ndarray): Lidar Signal to be processed - bgCorrectionIndexLow (list of int): lower index of background noise per channel - bgCorrectionIndexHigh (list of int): upper index of background noise per channel - maxHeightBin (int): maximum height bin index (default: 3000) - firstBinIndex (list of int): first height bin index per channel (default: 0 per chanel) Output: - signal_out (np.ndarray): Background corrected signal - bg (np.ndarray): Removed background noise """ logging.info(f'... removing background from signal') if firstBinIndex is None: logging.warning('No firstBinIndex value were given, default value 0 is used', exc_info=True) firstBinIndex = [0]*rawSignal.shape[2] # Calculate the mean across the channel specific column range for each row and page mean_matrix = np.empty((rawSignal.shape[0], 1, rawSignal.shape[2]), dtype=rawSignal.dtype) for iCh in range(rawSignal.shape[2]): mean_matrix[:, :, iCh] = np.mean(rawSignal[:, bgCorrectionIndexLow[iCh]:bgCorrectionIndexHigh[iCh], iCh], axis=1, keepdims=True) # Replicate the mean matrix along the second dimension bg = np.tile(mean_matrix, (1, maxHeightBin, 1)) signal_out = slicerange(rawSignal, maxHeightBin, firstBinIndex) - bg return signal_out, bg
[docs] def slicerange(array:np.ndarray, maxHeightBin:int, firstBinIndex:list) -> np.ndarray: """ Slice a given array across the height/range dimension from firstBinIndex to maxHeightBin + firstBinIndex. Parameters: - array (np.ndarray): array to be sliced - maxHeightBin (int): length of slice - firstBinIndex (list of int): start hight/range index of slice per channel Output: - out (np.ndarray): sliced array """ firstBinIndex = np.asarray(firstBinIndex) heightBins = np.arange(maxHeightBin)[:, None] + firstBinIndex[None, :] out = array[:, heightBins, np.arange(array.shape[2])] return out
[docs] def pollyPolCaliTime(depCalAng, mTime, init_depAng, maskDepCalAng): """ """ depCal_P_Ang_time_start = [] depCal_P_Ang_time_end = [] depCal_N_Ang_time_start = [] depCal_N_Ang_time_end = [] maskDepCal = np.zeros(len(mTime), dtype=bool) if len(depCalAng) == 0: ## if depCalAng is empty, which means the polly does not support auto depol calibration return depCal_P_Ang_time_start, depCal_P_Ang_time_end, depCal_N_Ang_time_start, depCal_N_Ang_time_end, maskDepCal if len(maskDepCalAng) == 0: maskDepCalAng = ['none', 'none', 'p', 'p', 'p', 'p', 'p', 'p', 'p', 'p', 'none', 'none', 'n', 'n', 'n', 'n', 'n', 'n', 'n', 'n', 'none'] ## the mask for postive and negative ## calibration angle. 'none' means ## invalid profiles with different ## depol_cal_angle flagPDepCal = np.zeros(len(maskDepCalAng),dtype=bool) flagNDepCal = np.zeros(len(maskDepCalAng),dtype=bool) for iProf in range(0,len(maskDepCalAng)): if maskDepCalAng[iProf] == 'p': flagPDepCal[iProf] = True elif maskDepCalAng[iProf] == 'n': flagNDepCal[iProf] = True flagDepCal = (np.abs(depCalAng - init_depAng) > 0.0) ## the profile will be treated as depol cali profile if it has different ## depol_cal_ang than the init_depAng #print(init_depAng) #print(depCalAng) #print(flagDepCal) maskDepCal = flagDepCal ## search the calibration periods valuesFlagDepCal = flagDepCal.astype(int) ## label connected components in the matrix; 0 will stay 0 ## connected 1s will be numbered consecutively depCalPeriods, nDepCalPeriods = label(valuesFlagDepCal) #print(depCalPeriods) if nDepCalPeriods >= 1: pass else: logging.info(f'No Depolarization Calibration phase found.') return depCal_P_Ang_time_start, depCal_P_Ang_time_end, depCal_N_Ang_time_start, depCal_N_Ang_time_end, maskDepCal for iDepCalPeriod in range(1,nDepCalPeriods+1): #flagIDepCal = (depCalPeriods == iDepCalPeriod) # flag for the ith calibration period. flagIDepCal = depCalPeriods[depCalPeriods == iDepCalPeriod] # flag for the ith calibration period. indices = np.where(depCalPeriods == flagIDepCal[0])[0] #print(flagIDepCal) if len(flagIDepCal) != len(maskDepCalAng): logging.warning(f"Depolarization Calibration from Timestamp " f"{mTime[indices[0]]} - {mTime[indices[-1]]} " f"does not match the maskDepCalAng pattern in the polly-config file.\n" f"This calibration phase will be skipped.") continue else: pass tIDepCal = mTime[indices[0]:indices[-1]+1] t_all_p_depCal = list(itertools.compress(tIDepCal, flagPDepCal)) t_all_n_depCal = list(itertools.compress(tIDepCal, flagNDepCal)) depCal_P_Ang_time_start.append(t_all_p_depCal[0]) depCal_P_Ang_time_end.append(t_all_p_depCal[-1]) depCal_N_Ang_time_start.append(t_all_n_depCal[0]) depCal_N_Ang_time_end.append(t_all_n_depCal[-1]) return depCal_P_Ang_time_start, depCal_P_Ang_time_end, depCal_N_Ang_time_start, depCal_N_Ang_time_end, maskDepCal
[docs] def calculate_rcs(datasignal, ranges): """ Function for calculating RCS. Args: datasignal: signal to range correct ranges: ranges that are squared Returns: np.ndarray: Computed RCS array. """ print(datasignal.shape) ranges_squared = ranges**2 ranges2d = np.repeat(ranges_squared[np.newaxis,:], datasignal.shape[0], axis=0) print('ranges2d', ranges2d.shape) # Perform the computation #RCS = ( # datasignal / mShots_broadcasted * 150 / float(hRes) * height_squared_broadcasted #) RCS = ( datasignal * ranges2d[:,:,np.newaxis] ) return RCS
[docs] def pollyPreprocess(rawdata_dict, collect_debug=False, **param): """ POLLYPREPROCESS Deadtime correction, background correction, first-bin shift, mask for low-SNR and mask for depolarization-calibration process. USAGE: [data] = pollyPreprocess(data) INPUTS: data: struct rawSignal: array signal. [Photon Count] mShots: array number of the laser shots for each profile. mTime: array datetime array for the measurement time of each profile. depCalAng: array angle of the polarizer in the receiving channel. (>0 means calibration process starts) zenithAng: array zenith angle of the laer beam. repRate: float laser pulse repetition rate. [s^-1] hRes: float spatial resolution [m] mSite: string measurement site. KEYWORDS: deltaT: numeric integration time (in seconds) for single profile. (default: 30) flagForceMeasTime: logical flag to control whether to align measurement time with file creation time, instead of taking the measurement time in the data file. (default: false) maxHeightBin: numeric number of range bins to read out from data file. (default: 3000) firstBinIndex: numeric index of first bin to read out. (default: 1) pollyType: char polly version. (default: 'arielle') flagDeadTimeCorrection: logical flag to control whether to apply deadtime correction. (default: false) deadtimeCorrectionMode: numeric deadtime correction mode. (default: 2) 1: polynomial correction with parameters saved in data file. 2: non-paralyzable correction 3: polynomail correction with user defined parameters 4: disable deadtime correction deadtimeParams: numeric deadtime parameters. (default: []) flagSigTempCor: logical flag to implement signal temperature correction. tempCorFunc: cell symbolic function for signal temperature correction. "1": no correction "exp(-0.001*T)": exponential correction function. (Unit: Kelvin) meteorDataSource: str meteorological data type. e.g., 'gdas1'(default), 'standard_atmosphere', 'websonde', 'radiosonde' gdas1Site: str the GDAS1 site for the current campaign. meteo_folder: str the main folder of the GDAS1 profiles. radiosondeSitenum: integer site number, which can be found in doc/radiosonde-station-list.txt. radiosondeFolder: str the folder of the sonding files. radiosondeType: integer file type of the radiosonde file. - 1: radiosonde file for MOSAiC (default) - 2: radiosonde file for MUA bgCorrectionIndexLow: 1-dim. array base indecis of bins for background estimation. (defults: [10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10]) bgCorrectionIndexHigh: 1-dim. array top index of bins for background estimation. (defults: [240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240, 240]) asl: numeric above sea level in meters. (default: 0) initialPolAngle: numeric initial polarization angle of the polarizer for polarization calibration. (default: 0) maskPolCalAngle: cell mask for positive and negative calibration angle of the polarizer, in which 'p' stands for positive angle, while 'n' for negative angle. (default: {}) minSNRThresh: numeric lower bound of signal-noise ratio. minPC_fog: numeric minimun number of photon count after strong attenuation by fog. flagFarRangeChannel: logical flags of far-range channel. flag532nmChannel: logical flags of channels with central wavelength (CW) at 532 nm. flagTotalChannel: logical flags of channels receiving total elastic signal. flag355nmChannel: logical flags of channels with CW at 355 nm. flag607nmChannel: logical flags of channels with CW at 607 nm. flag387nmChannel: logical flags of channels with CW at 387 nm. flag407nmChannel: logical flags of channels with CW at 407 nm. flag532nmRotRaman: logical flags of rotational Raman channels with CW at 532 nm. flag1064nmRotRaman: logical flags of rotational Raman channels with CW at 1064 nm. OUTPUTS: data: struct rawSignal: array signal. [Photon Count] mShots: array number of the laser shots for each profile. mTime: array datetime array for the measurement time of each profile. depCalAng: array angle of the polarizer in the receiving channel. (>0 means calibration process starts) zenithAng: array zenith angle of the laer beam. repRate: float laser pulse repetition rate. [s^-1] hRes: float spatial resolution [m] mSite: string measurement site. deadtime: matrix (channel x polynomial_orders) deadtime correction parameters. signal: array Background removed signal bg: array background height: array height. [m] lowSNRMask: logical If SNR less SNRmin, mask is set true. Otherwise, false depCalMask: logical If polly was doing polarization calibration, depCalMask is set true. Otherwise, false. fogMask: logical If it is foggy which means the signal will be very weak, fogMask will be set true. Otherwise, false mask607Off: logical mask of PMT on/off status at 607 nm channel. mask387Off: logical mask of PMT on/off status at 387 nm channel. mask407Off: logical mask of PMT on/off status at 407 nm channel. mask355RROff: logical mask of PMT on/off status at 355 nm rotational Raman channel. mask532RROff: logical mask of PMT on/off status at 532 nm rotational Raman channel. mask1064RROff: logical mask of PMT on/off status at 1064 nm rotational Raman channel. """ logging.info('starting data preprocessing...') rawSignal = rawdata_dict['raw_signal']['var_data'] mShots = rawdata_dict['measurement_shots']['var_data'] mTime = rawdata_dict['measurement_time']['var_data'] depCalAng = rawdata_dict['depol_cal_angle']['var_data'] zenithAng = rawdata_dict['zenithangle']['var_data'] repRate = rawdata_dict['laser_rep_rate']['var_data'] hRes = rawdata_dict['measurement_height_resolution']['var_data'] mSite = rawdata_dict['global_attributes']['location'] data_dict = {} ## print all of the large arrays to screen, not only starts and ends of an array np.set_printoptions(threshold=np.inf) ## converting raw-mTime format from [YYYYMMDD seconds-of-day] to unixtimestamp-format logging.info(f'... time conversion') date_string = str(mTime[0][0]) seconds_of_day = mTime[:,1] YYYY = int(date_string[:4]) MM = int(date_string[4:6]) DD = int(date_string[6:8]) datetime_obj = datetime.datetime(YYYY, MM, DD) mTime_obj = [ datetime_obj.replace(tzinfo=datetime.timezone.utc) + datetime.timedelta(seconds=int(s)) for s in seconds_of_day] mTime_str = [dt.strftime('%Y%m%d %H:%M:%S') for dt in mTime_obj] # Convert to Unix timestamp mTime_unixtimestamp = [int(datetime.datetime.timestamp(dt)) for dt in mTime_obj] ## Defining default values for param keys (key initialization), if not explictly defined when calling the function deltaT = param.get('deltaT', 30) flagForceMeasTime = param.get('flagForceMeasTime', False) maxHeightBin = param.get('maxHeightBin', 3000) firstBinIndex = param.get('firstBinIndex', False) firstBinHeight = param.get('firstBinHeight', False) pollyType = param.get('pollyType', False) flagDeadTimeCorrection = param.get('flagDeadTimeCorrection', False) deadtimeCorrectionMode = param.get('deadtimeCorrectionMode', 2) deadtimeParams = param.get('deadtimeParams', False) flagSigTempCor = param.get('flagSigTempCor', False) tempCorFunc = param.get('tempCorFunc', False) meteorDataSource = param.get('meteorDataSource', False) gdas1Site = param.get('gdas1Site', False) gdas1_folder = param.get('gdas1_folder', False) radiosondeSitenum = param.get('radiosondeSitenum', False) radiosondeFolder = param.get('radiosondeFolder', False) radiosondeType = param.get('radiosondeType', False) bgCorrectionIndexLow = param.get('bgCorrectionIndexLow', False) bgCorrectionIndexHigh = param.get('bgCorrectionIndexHigh', False) asl = param.get('asl', 10) initialPolAngle = param.get('initialPolAngle', False) maskPolCalAngle = param.get('maskPolCalAngle', False) minSNRThresh = param.get('minSNRThresh', False) minPC_fog = param.get('minPC_fog', False) flagFarRangeChannel = param.get('flagFarRangeChannel', False) flag532nmChannel = param.get('flag532nmChannel', False) flagTotalChannel = param.get('flagTotalChannel', False) flag355nmChannel = param.get('flag355nmChannel', False) flag607nmChannel = param.get('flag607nmChannel', False) flag387nmChannel = param.get('flag387nmChannel', False) flag407nmChannel = param.get('flag407nmChannel', False) flag355nmRotRaman = param.get('flag355nmRotRaman', False) flag532nmRotRaman = param.get('flag532nmRotRaman', False) flag1064nmRotRaman = param.get('flag1064nmRotRaman', False) isUseLatestGDAS = param.get('isUseLatestGDAS', False) # print(flagFarRangeChannel) # print(flag1064nmRotRaman) #%% Determine whether number of range bins is out of range #if (max(config.maxHeightBin + config.firstBinIndex - 1) > size(data.rawSignal, 2)) # warning('maxHeightBin or firstBinIndex is out of range.\nTotal number of range bin is %d.\nmaxHeightBin is # %d\nfirstBinIndex is %d\n', size(data.rawSignal, 2), config.maxHeightBin, config.firstBinIndex); # fprintf('Set maxHeightBin and firstBinIndex to default values.\n'); # config.maxHeightBin = ones(1, size(data.rawSignal, 1)); # config.firstBinIndex = 251; #end # logging.info(f'Total number of range bin is: {len(rawSignal[0])}\nmaxHeightBin is: {maxHeightBin}\nfirstBinIndex is {firstBinIndex}.') #if (maxHeightBin + np.max(firstBinIndex) -1) > len(rawSignal[0]): # logging.warning(f'maxHeightBin or firstBinIndex is out of range. Total number of range bin is: {len(rawSignal[0])}\nmaxHeightBin is: {maxHeightBin}\nfirstBinIndex is {firstBinIndex}.') # logging.info(f'Set maxHeightBin and firstBinIndex to default values.') # maxHeightBin = np.ones(rawSignal.shape[2]) # logging.info(f'maxHeightBin: {maxHeightBin}') # firstBinIndex = 251 mShotsPerPrf = deltaT * repRate # print(mShotsPerPrf) # print(mShots) # print(mTime) # print(deltaT) # print(np.nanmean(np.diff(np.array(mTime[:,1])))) # print(np.array(mTime[:,1])) # print(len(mTime)) # print(np.diff(mTime)) if len(mTime) > 1: #nInt = np.round(deltaT / (np.nanmean(np.diff(np.array(mTime[:,1]))) * 24 * 3600)) ## number of profiles to be integrated. Usually, 600 shots per 30 s nInt = np.round(deltaT / (np.nanmean(np.diff(np.array(mTime[:,1]))))) ## number of profiles to be integrated. Usually, 600 shots per 30 s else: nInt = np.round(mShotsPerPrf / np.nanmean(np.array(mShots[0, :]))) ## Deadtime correction #PCR_Cor, preproSignal = pollyDTCor(rawSignal = rawSignal, preproSignal = pollyDTCor( rawSignal = rawSignal, mShots = mShots, hRes = hRes, polly_device = pollyType, flagDeadTimeCorrection = flagDeadTimeCorrection, DeadTimeCorrectionMode = deadtimeCorrectionMode, deadtimeParams = deadtimeParams, deadtime = rawdata_dict['deadtime_polynomial']['var_data'] ) # most likely the preprocesssed deadtime corrected signal can be omitted if collect_debug: #data_dict['PCR_cor'] = PCR_Cor data_dict['preproSignal'] = preproSignal #data_dict['PCR_slice'] = slicerange(PCR_Cor, maxHeightBin, firstBinIndex) ## Background Substraction sigBGCor, bg = pollyRemoveBG( rawSignal = preproSignal, bgCorrectionIndexLow = bgCorrectionIndexLow, bgCorrectionIndexHigh = bgCorrectionIndexHigh, maxHeightBin = maxHeightBin, firstBinIndex = firstBinIndex ) data_dict['BG'] = bg[:, 1, :] ## reshaping the3-dim. BG-matrix to 2-dim matrix # Store the background corrected signal data_dict['sigBGCor'] = sigBGCor ## Height and first bin height correction logging.info('... height bin calculations') # TODO first bin hight might change for different telescopes... data_dict['range'] = np.arange(0, sigBGCor.shape[1]) * hRes + firstBinHeight[0] data_dict['height'] = data_dict['range'].copy() * np.cos(zenithAng*np.pi/180) correction_firstBinHight = (( (np.arange(0, sigBGCor.shape[1]) * hRes)[:,np.newaxis] + firstBinHeight)**2 / data_dict['range'][:,np.newaxis]**2) data_dict['sigBGCor'] = data_dict['sigBGCor'] * correction_firstBinHight[np.newaxis,:,:] data_dict['alt'] = data_dict['height'] + float(asl) ## geopotential height data_dict['time'] = mTime_unixtimestamp data_dict['time64'] = np.array([np.datetime64(t) for t in mTime_obj]) ## Mask for bins with low SNR logging.info('... mask bins with low SNR') SNR = calc_snr(sigBGCor, bg) data_dict['SNR'] = SNR #print(SNR) ## create mask and mask every entry, where SNR < minSNRThresh #data_dict['lowSNRMask'] = np.ma.array(np.zeros(sigBGCor.shape, dtype=bool), mask=np.ones(sigBGCor.shape, dtype=bool)) # a plain bool mask should be faster. Let's give it a try data_dict['lowSNRMask'] = np.zeros_like(sigBGCor).astype(bool) #print(data_dict['lowSNRMask']) for iCh in range(0, sigBGCor.shape[2]): #data_dict['lowSNRMask'][:,:,iCh].mask = SNR[:,:,iCh].data < minSNRThresh[iCh] #data_dict['lowSNRMask'][:,:,iCh] = np.ma.masked_where(SNR[:,:,iCh].data < minSNRThresh[iCh], SNR[:,:,iCh]) data_dict['lowSNRMask'][:,:,iCh][SNR[:,:,iCh] < minSNRThresh[iCh]] = True # TODO check the low SNR mask # TODO mask for laser shutter? flag532FR = (np.array(flag532nmChannel) & np.array(flagFarRangeChannel) & np.array(flagTotalChannel)).astype(bool) flag355FR = (np.array(flag355nmChannel) & np.array(flagFarRangeChannel) & np.array(flagTotalChannel)).astype(bool) print('flag 532 FR', flag532FR) print('flag 355 FR', flag355FR) if any(flag532FR): data_dict['shutterOnMask'] = any_signal(np.squeeze(data_dict['sigBGCor'][:,:,flag532FR])) elif any(flag355FR): data_dict['shutterOnMask'] = any_signal(np.squeeze(data_dict['sigBGCor'][:,:,flag355FR])) else: raise ValueError('No suitable channel to determine the shutter status') # TODO mask for fog? # the original matlab code raises questions. Why 40:120 and why hard coded? # When sum is used (as in matlab), minPC_fog is range resolution dependent fogsum = np.sum(np.squeeze(data_dict['sigBGCor'][:,39:120,flag532FR]), axis=1) data_dict['fogMask'] = fogsum < minPC_fog # TODO mask for single channels on 607, 387, 407, 355RR 532RR 1064RR flag607FR = (np.array(flag607nmChannel) & np.array(flagFarRangeChannel)).astype(bool) print('flag 607 FR', flag607FR) if any(flag607FR): data_dict['mask607Off'] = any_signal(np.squeeze(data_dict['sigBGCor'][:,:,flag607FR])) flag387FR = (np.array(flag387nmChannel) & np.array(flagFarRangeChannel)).astype(bool) if any(flag387FR): data_dict['mask387Off'] = any_signal(np.squeeze(data_dict['sigBGCor'][:,:,flag387FR])) flag407FR = (np.array(flag407nmChannel) & np.array(flagFarRangeChannel)).astype(bool) if any(flag407FR): data_dict['mask407Off'] = any_signal(np.squeeze(data_dict['sigBGCor'][:,:,flag407FR])) flag355RRFR = (np.array(flag355nmRotRaman) & np.array(flagFarRangeChannel)).astype(bool) if any(flag355RRFR): data_dict['mask355_RROff'] = any_signal(np.squeeze(data_dict['sigBGCor'][:,:,flag355RRFR])) flag532RRFR = (np.array(flag532nmRotRaman) & np.array(flagFarRangeChannel)).astype(bool) if any(flag532RRFR): data_dict['mask532_RROff'] = any_signal(np.squeeze(data_dict['sigBGCor'][:,:,flag532RRFR])) flag1064RRFR = (np.array(flag1064nmRotRaman) & np.array(flagFarRangeChannel)).astype(bool) if any(flag1064RRFR): data_dict['mask1064_RROff'] = any_signal(np.squeeze(data_dict['sigBGCor'][:,:,flag1064RRFR])) ## Mask for polarization calibration logging.info('... mask for polarization calibration') (data_dict['depol_cal_ang_p_time_start'], data_dict['depol_cal_ang_p_time_end'], data_dict['depol_cal_ang_n_time_start'], data_dict['depol_cal_ang_n_time_end'], data_dict['depCalMask']) = pollyPolCaliTime( depCalAng=depCalAng, mTime=mTime_unixtimestamp, init_depAng=initialPolAngle, maskDepCalAng=maskPolCalAngle) # print(data_dict['depol_cal_ang_p_time_start']) # print(data_dict['depol_cal_ang_p_time_end']) # print(data_dict['depol_cal_ang_n_time_start']) # print(data_dict['depol_cal_ang_n_time_end']) # print(data_dict['depCalMask']) #%% Mask for polarization calibration #[data.depol_cal_ang_p_time_start, data.depol_cal_ang_p_time_end, ... # data.depol_cal_ang_n_time_start, data.depol_cal_ang_n_time_end, ... # depCalMask] = pollyPolCaliTime(data.depCalAng, data.mTime, ... # config.initialPolAngle, config.maskPolCalAngle); #data.depCalMask = transpose(depCalMask); ## Range-corrected Signal calculation logging.info('... calculate range-corrected Signal') #mask = data_dict['lowSNRMask'].mask mask = data_dict['lowSNRMask'] # masked arry might be slow #RCS_masked = np.ma.masked_array(sigBGCor+bg,mask=mask) # data_dict['RCS'] = calculate_rcs(datasignal=preproSignal,data_dict=data_dict,mShots=mShots,hRes=hRes) mShots_norm = np.repeat(np.mean(mShots, axis=0)[np.newaxis,:], mShots.shape[0], axis=0) data_dict['PCR_slice'] = data_dict['sigBGCor']*(150/hRes)/mShots_norm[:, np.newaxis, :] data_dict['RCS'] = calculate_rcs(data_dict['PCR_slice'], data_dict['range']) logging.info('finished data preprocessing.') return data_dict
[docs] def any_signal(sig: np.ndarray) -> np.ndarray: """check if there is any signal POLLYISLASERSHUTTERON determine whether the laser shutter is on due to the flying object. INPUTS: sig: np.ndarray BGCor signal with shape [height, time]. OUTPUTS: flag: np.ndarray Boolean array of shape [time,] where True indicates the laser shutter is turned on. HISTORY: - 2021-04-21: first edition by Zhenping - 2025-05-14: translated and generalized pollyIsLaserShutterOn, """ # Mean and standard deviation over the height dimension (axis 0) mean_sig = np.mean(sig, axis=1) std_sig = np.std(sig, axis=1, ddof=0) # Detect when both mean and std dev are below threshold # for some reason had to set the thresholds higher than in matlab version flag = (mean_sig <= 0.02) & (std_sig <= 0.9) return flag
# #%% Temperature effect correction (for Raman signal) #if config.flagSigTempCor # temperature = loadMeteor(mean(data.mTime), data.alt, ... # 'meteorDataSource', config.meteorDataSource, ... # 'gdas1Site', config.gdas1Site, ... # 'meteo_folder', config.meteo_folder, ... # 'radiosondeSitenum', config.radiosondeSitenum, ... # 'radiosondeFolder', config.radiosondeFolder, ... # 'radiosondeType', config.radiosondeType, ... # 'method', 'linear', ... # 'isUseLatestGDAS', config.flagUseLatestGDAS); # absTemp = temperature + 273.17; # # for iCh = 1:size(data.signal, 1) # leadingChar = config.tempCorFunc{iCh}(1); # if (leadingChar == '@') # % valid matlab anonymous function # tempCorFunc = config.tempCorFunc{iCh}; # else # tempCorFunc = vectorize(['@(T) ', '(', config.tempCorFunc{iCh}, ') .* ones(size(T))']); # % fprintf('%s is not a valid matlab anonymous function. Redefine it as %s\n', config.tempCorFunc{iCh}, tempCorFunc); # end # # corFunc = str2func(tempCorFunc); # corFac = corFunc(absTemp); # data.signal(iCh, :, :) = data.signal(iCh, :, :) ./ repmat(reshape(corFac, 1, [], 1), 1, 1, size(data.signal, 3)); # end #end # # #%% Mask for polarization calibration #[data.depol_cal_ang_p_time_start, data.depol_cal_ang_p_time_end, ... # data.depol_cal_ang_n_time_start, data.depol_cal_ang_n_time_end, ... # depCalMask] = pollyPolCaliTime(data.depCalAng, data.mTime, ... # config.initialPolAngle, config.maskPolCalAngle); #data.depCalMask = transpose(depCalMask); # #%% Mask for laser shutter #flagChannel532FR = config.flagFarRangeChannel & config.flag532nmChannel & config.flagTotalChannel; #flagChannel355FR = config.flagFarRangeChannel & config.flag355nmChannel & config.flagTotalChannel; #if any(flagChannel532FR) # data.shutterOnMask = pollyIsLaserShutterOn(... # squeeze(data.signal(flagChannel532FR, :, :))); #elseif any(flagChannel355FR) # data.shutterOnMask = pollyIsLaserShutterOn(... # squeeze(data.signal(flagChannel355FR, :, :))); #else # warning('No suitable channel to determine the shutter status'); # data.shutterOnMask = false(size(data.mTime)); #end # #%% Mask for fog #data.fogMask = false(1, size(data.signal, 3)); #is_channel_532_FR_Tot = config.flagFarRangeChannel & config.flag532nmChannel & config.flagTotalChannel; #data.fogMask(transpose(squeeze(sum(data.signal(is_channel_532_FR_Tot, 40:120, :), 2)) <= config.minPC_fog) & (~ data.shutterOnMask)) = true; # #%% Mask for PMT on/off status of 607 nm channel #flagChannel607 = config.flagFarRangeChannel & config.flag607nmChannel; #data.mask607Off = pollyIs607Off(squeeze(data.signal(flagChannel607, :, :))); # #%% Mask for PMT of 387 nm channel #flagChannel387 = config.flagFarRangeChannel & config.flag387nmChannel; #data.mask387Off = pollyIs387Off(squeeze(data.signal(flagChannel387, :, :))); # #%% Mask for PMT of 407 nm channel #flagChannel407 = config.flagFarRangeChannel & config.flag407nmChannel; #data.mask407Off = pollyIs407Off(squeeze(data.signal(flagChannel407, :, :))); # #%% Mask for PMT of 355 nm rotation Raman channel #flagChannel355RR = config.flagFarRangeChannel & config.flag355nmRotRaman; #data.mask355RROff = pollyIs607Off(squeeze(data.signal(flagChannel355RR, :, :))); # #%% Mask for PMT of 532 nm rotation Raman channel #flagChannel532RR = config.flagFarRangeChannel & config.flag532nmRotRaman; #data.mask532RROff = pollyIs607Off(squeeze(data.signal(flagChannel532RR, :, :))); # #%% Mask for 1064 nm rotation Raman channel #flagChannel1064RR = config.flag1064nmRotRaman; #data.mask1064RROff = pollyIs607Off(squeeze(data.signal(flagChannel1064RR, :, :))); # #end