Source code for ppcpy.cloudmask.cloudscreen

import numpy as np
from scipy.ndimage import label, uniform_filter1d
from ppcpy.misc.helper import uniform_filter, savgol_filter
import matplotlib.pyplot as plt


[docs] def smooth_signal(signal:np.ndarray, window_len:int) -> np.ndarray: """Uniformly smooth the input signal Parameters ---------- singal : ndarray Signal to be smooth window_len : int Width of the applied uniform filter Returns ------- ndarray Smoothed signal Notes ----- **History** - 2026-02-04: Changed from scipy.ndimage.uniform_filter1d to ppcpy.misc.helper.uniform_filter - 2026-03-17: Changed back to scipy.ndimage.uniform_filter1d due to NaN-related issues in Zhao's cloud screening algorithm. """ # return uniform_filter(signal, window_len) return uniform_filter1d(signal, window_len, mode='nearest')
[docs] def cloudscreen(data_cube, wv:int=532, collect_debug:bool=False) -> np.ndarray: """Preform cloud screening. Parameters ---------- data_cube : object Main PicassoProc object. wv : int, optional Wavelength of the channel to perfom cloud screening [nm]. Default is 532. collect_debug : bool, optional If true, collects debug information. Default is False. Returns ------- falgCloudFree : ndarray 1 dimensional temporal boolean array. 0 = cloudy, 1 = cloud free. Notes ----- - The cloud screening is done for both the far range and near range total channels of wavelength wv if the channels exist. **History** - xxxx-xx-xx: First edition by ... - 2026-03-18: Updated to include necessary configurations for cloudScreen_Zhao. - 2026-04-09: Added 'maxCloudSearchHeight' config parameters. """ print('Starting cloud screen') config_dict = data_cube.polly_config_dict height = data_cube.retrievals_highres['range'] bg = np.squeeze(data_cube.retrievals_highres['BG'][:, data_cube.gf(wv, 'total', 'FR')]) hFullOL = np.array(config_dict['heightFullOverlap'])[data_cube.gf(wv, 'total', 'FR')][0] # Cloud screen mode dependent configuration if config_dict['cloudScreenMode'] == 1: print(f"cloud screen mode {config_dict['cloudScreenMode']}: MSG method") screenfunc = cloudScreen_MSG sig = 'RCS' kwargs = { 'slope_thres': config_dict['maxSigSlope4FilterCloud'], } elif config_dict['cloudScreenMode'] == 2: print(f"cloud screen mode {config_dict['cloudScreenMode']}: Zhao's method") screenfunc = cloudScreen_Zhao sig = 'PCR_slice' kwargs = { 'bg': bg, 'minSNR': 2, 'heightFullOverlap': hFullOL, 'showDetails': collect_debug } else: raise ValueError(f"cloudScreenMode {config_dict['cloudScreenMode']} not properly defined.") signal = np.squeeze(data_cube.retrievals_highres[sig][:, :, data_cube.gf(wv, 'total', 'FR')]) flagCloudFree, layerStatus = screenfunc( height=height, signal=signal, search_region=[hFullOL, config_dict['maxCloudSearchHeight']], **kwargs ) # and for near range if it exists if np.any(data_cube.gf(wv, 'total', 'NR')): signal = np.squeeze(data_cube.retrievals_highres[sig][:, :, data_cube.gf(wv, 'total', 'NR')]) bg = np.squeeze(data_cube.retrievals_highres['BG'][:, data_cube.gf(wv, 'total', 'NR')]) hFullOL = np.array(config_dict['heightFullOverlap'])[data_cube.gf(wv, 'total', 'NR')][0] if 'hFullOl' in kwargs and 'bg' in kwargs: kwargs['heightFullOverlap'] = hFullOL kwargs['bg'] = bg flagCloudFree_NR, layerStatus_NR = screenfunc( height=height, signal=signal, search_region=[hFullOL, config_dict['maxCloudSearchHeight_NR']], **kwargs ) flagCloudFree = flagCloudFree & flagCloudFree_NR return flagCloudFree
[docs] def cloudScreen_MSG(height:np.ndarray, signal:np.ndarray, slope_thres:float, search_region:list, smooth_win:int=9) -> float: """Cloud screen with maximum signal gradient. Parameters ---------- height : ndarray Height in meters. signal : ndarray (time, height) Range corrected photon count rate signal [MHz]. slope_thres : float Threshold of the slope to determine whether there is strong backscatter signal. [MHz*m] search_region : list or array (2 elements) [baseHeight, topHeight] in meters. smooth_win : int, optional Width of the applied smoothing filter [bins]. Default is 9. Returns ------- flagCloudFree : ndarray A boolean array indicates whether the profile is cloud free. layerStatus: ndarray (time x height) Layer status for each bin (0: unknown, 1: cloud, 2: aerosol). Notes ----- - This function does not yield any layerStatus information. **History** - 2021-05-18: First edition by Zhenping. - 2025-03-20: Translated into python. """ if len(search_region) != 2 or search_region[1] <= height[0]: raise ValueError("Not a valid search_region.") if search_region[0] < height[0]: print(f"Warning: Base of search_region is lower than {height[0]}, setting it to {height[0]}") search_region[0] = height[0] flagCloudFree = np.zeros(signal.shape[0], dtype=bool) layerStatus = None # Find indices corresponding to search_region search_index = np.searchsorted(height, np.array(search_region)) for indx in range(signal.shape[0]): if np.isnan(signal[indx]).all(): print(f'Skipping timestamp {indx}') continue slope = np.concatenate(([0], np.diff(smooth_signal(signal[indx, :], smooth_win)))) / (height[1] - height[0]) if not np.any(slope[search_index[0]:search_index[1] + 1] >= slope_thres): flagCloudFree[indx] = True return flagCloudFree, layerStatus
[docs] def cloudScreen_Zhao(height:np.ndarray, signal:np.ndarray, bg:np.ndarray, search_region:list=[0, 10000], minDepth:float=100, heightFullOverlap:float=600, smoothWin:int=7, minSNR:float=1, showDetails:bool=False) -> tuple: """Cloud layer detection based on Zhao's algorithm. Parameters ---------- height : ndarray Height above ground [m]. signal : ndarray (time, height) Background corrected photon count rate signal [MHz]. bg : ndarray Background of the photon count rate signal [MHz]. search_region : list Bottom and top height for cloud detection [m]. minDepth : float Minimum layer depth [m]. Default is 100. heightFullOverlap : float Minimum height with full overlap [m]. Default is 600. smoothWin : int Smoothing window width [bins]. Default is 7. minSNR : float Minimum layer mean signal-noise-ratio. Default is 1. Returns ------- flagCloudFree : ndarray A boolean array indicates whether the profile is cloud free. layerStatus: ndarray (time x height) Layer status for each bin (0: unknown, 1: cloud, 2: aerosol). References ---------- Zhao, C., Y. Wang, Q. Wang, Z. Li, Z. Wang, and D. Liu (2014), A new cloud and aerosol layer detection method based on micropulse lidar measurements, Journal of Geophysical Research: Atmospheres, 119(11), 6788-6802. **History** - 2021-05-18: First edition by Zhengping. - 2026-03-11: Translated into python. """ if (signal.shape[1] != height.shape[0] or signal.shape[0] != signal.shape[0] or signal.shape[0] != bg.shape[0]): raise ValueError('Dimensions do not match!') flagCloudFree = np.ones(signal.shape[0], dtype=bool) layerStatus = np.zeros(signal.shape, dtype=int) flagDetectBins = (height >= search_region[0]) & (height <= search_region[1]) for iTime in range(signal.shape[0]): if showDetails: print(f"--------- iTime: {iTime} ---------") if np.isnan(signal[iTime, flagDetectBins]).all(): print(f'Skipping timestamp {iTime}') continue ## layer detection layerInfo, _, _ = VDE_cld( signal=signal[iTime, flagDetectBins], height=height[flagDetectBins] / 1e3, BG=bg[iTime], minLayerDepth=minDepth / 1e3, minHeight=heightFullOverlap / 1e3, smoothWin=smoothWin, minSNR=minSNR, showDetails=showDetails ) for iLayer in range(len(layerInfo)): ## gridding the layers layerIndex = (height >= layerInfo[iLayer]['baseHeight'] * 1e3) & \ (height <= layerInfo[iLayer]['topHeight'] * 1e3) if layerInfo[iLayer]['flagCloud']: ## cloud layer flagCloudFree[iTime] = False layerStatus[iTime, layerIndex] = 1 else: ## aerosol layer layerStatus[iTime, layerIndex] = 2 return flagCloudFree, layerStatus
[docs] def VDE_cld(signal:np.ndarray, height:np.ndarray, BG:float, minLayerDepth:float=0.2, minHeight:float=0.4, smoothWin:int=3, savgolSmoothWin:int=9, minSNR:int=5, showDetails:bool=False) -> tuple: """Cloud layer detection with VDE method. This method only required elstic signal. Parameters ---------- signal : ndarray (height) raw signal without background [photon count]. height : ndarray height above the ground [km]. BG : float background signal [photon count]. minLayerDepth : float minimum layer geometrical depth [km]. Default is 0.2. minHeight : float minimum height to start the searching with [km]. Default is 0.4. smoothWin : int smoothing window bins. Default is 3. savgolSmoothWin : int smoothing window bins of the Savitzky-Golay filter. Default is 9. minSNR : int minimum layer SNR to filter the fake features. Default is 5. Returns ------- layerInfo : list of dicts id : int identity of the layer. baseHeight : float the layer base height [km]. topHeight : float the layer top height [km]. peakHeight : float the layer height with maximum backscatter signal [km]. layerDepth : int geometrical depth of the layer [km]. flagCloud: bool cloud flag. PD : ndarray SDP signal. PN : ndarray VDE signal References ---------- Zhao, C., Y. Wang, Q. Wang, Z. Li, Z. Wang, and D. Liu (2014), A new cloud and aerosol layer detection method based on micropulse lidar measurements, Journal of Geophysical Research: Atmospheres, 119(11), 6788-6802. Notes ----- - Some of the for-loops could be vectorised for enhanced performance. However, not in a pretty way. - Also could consider using numba and its @njit decorator. That would also enhance the performance but the code needs to be rewritten to fit numba's requirements. - This function does not work with NaN values in the signal. **History** - 2021-06-13: First edition by Zhenping. - 2026-03-11: Translated into pyhton. - 2026-04-09: Enhanced robustness against NaN values. """ if np.floor(minLayerDepth / (height[1] - height[0])) < 3: raise ValueError('minLayerDepth must be assured there are at least 3 bins in each layer') layerInfo = [] minIndex = np.where(height >= minHeight)[0][0] P = signal[minIndex:] height = height[minIndex:] # ---------------------------------------------------- # 1. Semi-Discretization Process (SDP) # ---------------------------------------------------- noise = np.sqrt(P + BG)*3 min_noise = np.ones_like(P)*np.sqrt(3)*3 noise_tresh = np.nanmax(np.vstack([noise, min_noise]), axis=0) Ps = smooth_signal(P, smoothWin) ## bottom to top semi-discretization PD1 = Ps.copy() for i in range(1, len(PD1)): if np.abs(PD1[i] - PD1[i - 1]) <= noise_tresh[i]: PD1[i] = PD1[i - 1] ### top to bottom semi-discretizion PD2 = Ps.copy() for i in range(len(PD2) - 2, -1, -1): if np.abs(PD2[i] - PD2[i + 1]) <= noise_tresh[i]: PD2[i] = PD2[i + 1] PD = np.nanmean(np.vstack([PD1, PD2]), axis=0) # ---------------------------------------------------- # 2. Value Distribution Equalization (VDE) Process # ---------------------------------------------------- # PD = PD[~np.isnan(PD)] IS = np.argsort(PD) RS = PD[IS] MA = np.nanmax(RS) MI = np.nanmin(RS) PE = np.arange(1, len(RS)+1) / len(RS) epsilon = 1e-6 for i in range(1, len(RS)): if np.abs(RS[i] - RS[i - 1]) <= epsilon: PE[i] = PE[i - 1] yi = PE * (MA - MI) + MI RIS = np.argsort(IS) PN = yi[RIS] # ---------------------------------------------------- # 3. Layer detection # ---------------------------------------------------- BZ = np.arange(len(PN), 0, -1)/len(PN) * (MA - MI) + MI layerN = 0 L, nLayer = label(PN > BZ) if showDetails: fig, ax = plt.subplots(figsize=(12, 6)) ax.plot(height, P, color="black", alpha=0.3, label="P(R)") ax.plot(height, Ps, color="black", linestyle="--", alpha=0.3, label="Ps(R)") # ax.plot(height, PD, color="tab:green", alpha=0.7, label='PD') ax.plot(height, PN, color="tab:blue", alpha=0.7, label='PN(R)') ax.plot(height, BZ, color="tab:orange", alpha=0.7, label='BZ(R)') ax.set_xlabel("Range [km]") ax.set_ylabel("Photon count") ax.legend() fig.tight_layout() plt.show() for iLayer in range(1, nLayer + 1): baseIndex = np.where(L == iLayer)[0][0] topIndex = np.where(L == iLayer)[0][-1] layerDepth = height[topIndex] - height[baseIndex] layerIndex = (L == iLayer) layerSNR = np.nanmean(P[layerIndex]) / np.sqrt(np.nanmean(P[layerIndex] + BG)) if showDetails: print(f"layer: {iLayer} at height [{height[baseIndex]:.3f}, {height[topIndex]:.3f}] km, Depth: {layerDepth:.3f} >= {minLayerDepth}, SNR: {layerSNR:.3f} >= {minSNR}") if (layerDepth >= minLayerDepth) and (layerSNR >= minSNR): layerN += 1 layerInfo.append({ 'id': layerN, 'baseHeight': height[baseIndex], 'topHeight': height[topIndex], 'layerDepth': layerDepth, 'flagCloud': False }) # ---------------------------------------------------- # 4. Layer classification # ---------------------------------------------------- Ps_1 = savgol_filter(P, savgolSmoothWin, 2) for iLayer in range(len(layerInfo)): mask = (height >= layerInfo[iLayer]['baseHeight']) & (height <= layerInfo[iLayer]['topHeight']) sig = Ps_1[mask] * height[mask]**2 sig[sig <= 0] = np.nan # may cause T and or D to be NaN, thus affecting the determined layer satus # sig[sig <= 0] = epsilon maxIndex = np.nanargmax(sig) # potential crash here if all of sig = NaN maxSig = sig[maxIndex] maxIndex = np.where(mask)[0][0] + maxIndex - 1 # layerIndex + max_Signal_Index T = maxSig / sig[0] D = (np.log(sig[-1] / maxSig)) / (layerInfo[iLayer]['topHeight'] - height[maxIndex]) layerHeight = np.nanmean([layerInfo[iLayer]['baseHeight'], layerInfo[iLayer]['topHeight']]) layerInfo[iLayer]['peekHeight'] = height[maxIndex] ## segmented threshold for the determination of layer status if layerHeight <= 1.5: if T > 6 or D < -15: layerInfo[iLayer]['flagCloud'] = True elif layerHeight > 1.5 and layerHeight <= 5: if T > 4 or D < -9: layerInfo[iLayer]['flagCloud'] = True elif layerHeight > 5: if T > 1.5 or D < -9: layerInfo[iLayer]['flagCloud'] = True else: raise ValueError('Invalid height!') if showDetails and layerInfo[iLayer]['flagCloud']: print(f"Cloud detected at height: {layerHeight:.3f} km T = {T:.1f}, D = {D:.1f}") return layerInfo, PD, PN