import numpy as np
from scipy.ndimage import uniform_filter1d
# Helper functions
[docs]
def smooth_signal(signal, window_len):
""" """
return uniform_filter1d(signal, size=window_len, mode='nearest')
[docs]
def cloudscreen(data_cube):
""" """
config_dict = data_cube.polly_config_dict
print('Starting cloud screen')
print('cloud screen mode', config_dict['cloudScreenMode'])
print('slope_thres', config_dict['maxSigSlope4FilterCloud'])
height = data_cube.retrievals_highres['range']
wv = 532
RCS = np.squeeze(data_cube.retrievals_highres['RCS'][:, :, data_cube.gf(wv, 'total', 'FR')])
bg = np.squeeze(data_cube.retrievals_highres['BG'][:, data_cube.gf(wv, 'total', 'FR')])
hFullOL = np.array(config_dict['heightFullOverlap'])[data_cube.gf(wv, 'total', 'FR')][0]
if config_dict['cloudScreenMode'] == 1:
screenfunc = cloudScreen_MSG
elif config_dict['cloudScreenMode'] == 2:
screenfunc = cloudScreen_Zhao
else:
raise ValueError(f'cloudScreenMode not properly defined')
flagCloudFree, layerStatus = screenfunc(
height, RCS, config_dict['maxSigSlope4FilterCloud'], [hFullOL, 7000])
# and for near range if it exists
if np.any(data_cube.gf(wv, 'total', 'NR')):
RCS = np.squeeze(data_cube.retrievals_highres['RCS'][:, :, data_cube.gf(wv, 'total', 'NR')])
bg = np.squeeze(data_cube.retrievals_highres['BG'][:, data_cube.gf(wv, 'total', 'NR')])
hFullOL = np.array(config_dict['heightFullOverlap'])[data_cube.gf(wv, 'total', 'NR')][0]
if config_dict['cloudScreenMode'] == 1:
flagCloudFree_NR, layerStatus_NR = screenfunc(
height, RCS, config_dict['maxSigSlope4FilterCloud'], [hFullOL, 2000])
flagCloudFree = flagCloudFree & flagCloudFree_NR
return flagCloudFree
[docs]
def cloudScreen_MSG(height, RCS, slope_thres, search_region):
"""
CLOUDSCREEN_MSG cloud screen with maximum signal gradient.
INPUTS:
height: array
Height in meters.
signal: array (time, height) !! this is transposed compared to the original implementation
Photon count rate in MHz.
slope_thres: float
Threshold of the slope to determine whether there is strong backscatter signal. [MHz*m]
search_region: list or array (2 elements)
[baseHeight, topHeight] in meters.
OUTPUTS:
flagCloudFree: boolean array
Indicates whether the profile is cloud free.
layerStatus: matrix (height x time)
Layer status for each bin (0: unknown, 1: cloud, 2: aerosol).
HISTORY:
- 2021-05-18: First edition by Zhenping
- 2025-03-20: Translated into python
"""
if len(search_region) != 2 or search_region[1] <= height[0]:
raise ValueError("Not a valid search_region.")
if search_region[0] < height[0]:
print(f"Warning: Base of search_region is lower than {height[0]}, setting it to {height[0]}")
search_region[0] = height[0]
flagCloudFree = np.zeros(RCS.shape[0], dtype=bool)
layerStatus = np.zeros_like(RCS, dtype=int)
# Find indices corresponding to search_region
search_indx = np.array(((np.array(search_region) - height[0]) / (height[1] - height[0])) + 1, dtype=int)
for indx in range(RCS.shape[0]):
if np.isnan(RCS[indx, 0]):
continue
slope = np.concatenate(([0], np.diff(smooth_signal(RCS[indx, :], 10)))) / (height[1] - height[0])
if not np.any(slope[search_indx[0]:search_indx[1]] >= slope_thres):
flagCloudFree[indx] = True
return flagCloudFree, layerStatus
[docs]
def cloudScreen_Zhao(height, RCS, slope_thres, search_region):
""" """
raise ValueError('not yet implemented')