cmadaas_model_grid support ensemble member forecast retrieve. And update cache support setting cache days.

This commit is contained in:
NMC-DAVE 2022-09-25 12:36:11 +08:00
parent 3d64556c35
commit 56221f11e6
4 changed files with 195 additions and 84 deletions

@ -52,6 +52,14 @@ Using the fellowing command to install packages:
# 若用不到某个服务器, 则不设置或删除改段落即可.
# 注意设置IP地址时, 不要加http等前缀信息.
# 当读取CMADaas, MICAPS Cassandra等数据时, 采取了本地文件缓存机制, 以便加快
# 文件的读取速度和减少数据库访问次数(设置函数参数cache=False可以不用缓存机制).
# CACHE_DIR, 缓存目录, 若没有设置, 默认为 /user_home/.nmcdev/cache
# CACHE_DAYS, 缓存天数, 若没有设置, 默认为7, 即7天之前的缓存数据将被删除
[CACHE]
# CACHE_DIR = ~
CACHE_DAYS = 7
# CMADaaS大数据云平台配置:
# DNS为IP地址, PORT为端口
# USER_ID和PASSWORD分别为用户名和密码
@ -69,9 +77,6 @@ serviceNodeId = NMIC_MUSIC_CMADAAS
[MICAPS]
GDS_IP = xx.xx.xx.xx
GDS_PORT = 8080
# Cached file directory, if not set,
# /user_home/.nmcdev/cache will be used.
# CACHE_DIR = ~
# CIMISS网址及用户ID和PASSWORD, 2021年年底CIMISS停止提供服务
# DNS为IP地址, PORT为端口

@ -7,7 +7,7 @@
Read configure file.
"""
import os
import glob
import datetime
import shutil
import configparser
@ -86,12 +86,21 @@ def get_cache_file(sub_dir, filename, name=None, cache_clear=True):
# 如果设置了清除缓存, 则会将缓存文件逐周存放, 并删除过去的周文件夹.
if cache_clear:
# Use the week number of year as subdir
cache_subdir1 = cache_dir / datetime.date.today().strftime("%Y%U")
cache_subdir1 = cache_dir / datetime.date.today().strftime("cached_%Y%m%d")
cache_subdir2 = cache_subdir1 / sub_dir
cache_subdir2.mkdir(parents=True, exist_ok=True)
for f in cache_dir.iterdir():
if f != cache_subdir1:
if CONFIG.has_option('CACHE', 'CACHE_DAYS'):
cache_days = int(CONFIG['CACHE']['CACHE_DAYS'])
else:
cache_days = 7
deadline_time = datetime.datetime.now() - datetime.timedelta(days=cache_days)
# 删除cache_days天之前的缓存数据目录
dirs = cache_dir.glob("cached_????????")
for f in dirs:
ftime = datetime.datetime.fromtimestamp(f.stat().st_mtime)
if ftime < deadline_time:
shutil.rmtree(f)
else:
cache_subdir2 = cache_dir / sub_dir

@ -991,7 +991,7 @@ def cimiss_obs_grid_by_time(time_str, limit=None, data_code="SURF_CMPA_FRT_5KM",
directory = os.path.join(data_code, fcst_ele)
filename = time_str
if limit is not None:
filename = filename + '.' + str(limit)
filename = filename + '.' + str(limit).replace(" ","")
cache_file = CONFIG.get_cache_file(directory, filename, name="CIMISS", cache_clear=cache_clear)
if cache_file.is_file():
with open(cache_file, 'rb') as f:
@ -1169,7 +1169,7 @@ def cimiss_analysis_by_time(time_str, limit=None, data_code='NAFP_CLDAS2.0_RT_GR
directory = os.path.join(data_code, fcst_ele, str(fcst_level))
filename = time_str
if limit is not None:
filename = filename + '.' + str(limit)
filename = filename + '.' + str(limit).replace(" ","")
cache_file = CONFIG.get_cache_file(directory, filename, name="CIMISS", cache_clear=cache_clear)
if cache_file.is_file():
with open(cache_file, 'rb') as f:
@ -1332,7 +1332,7 @@ def cimiss_model_grid(data_code, init_time_str, valid_time, fcst_ele, fcst_level
directory = os.path.join(data_code, fcst_ele, str(fcst_level))
filename = init_time_str + '.' + str(valid_time).zfill(3)
if limit is not None:
filename = init_time_str + '_' +str(limit) +'.' + str(valid_time).zfill(3)
filename = init_time_str + '_' +str(limit).replace(" ","") +'.' + str(valid_time).zfill(3)
cache_file = CONFIG.get_cache_file(directory, filename, name="CIMISS", cache_clear=cache_clear)
if cache_file.is_file():
with open(cache_file, 'rb') as f:
@ -1648,7 +1648,7 @@ def cimiss_model_by_time(init_time_str, valid_time=0, limit=None,
directory = os.path.join(data_code, fcst_ele, str(fcst_level))
filename = init_time_str + '.' + str(valid_time).zfill(3)
if limit is not None:
filename = init_time_str + '_' +str(limit) +'.' + str(valid_time).zfill(3)
filename = init_time_str + '_' +str(limit).replace(" ","") +'.' + str(valid_time).zfill(3)
cache_file = CONFIG.get_cache_file(directory, filename, name="CIMISS", cache_clear=cache_clear)
if cache_file.is_file():
with open(cache_file, 'rb') as f:

@ -1096,7 +1096,7 @@ def cmadaas_obs_grid_by_time(time_str, limit=None, data_code="SURF_CMPA_FAST_5KM
directory = os.path.join(data_code, fcst_ele)
filename = time_str
if limit is not None:
filename = filename + '.' + str(limit)
filename = filename + '.' + str(limit).replace(" ","")
cache_file = CONFIG.get_cache_file(directory, filename, name="CMADaaS", cache_clear=cache_clear)
if cache_file.is_file():
with open(cache_file, 'rb') as f:
@ -1911,7 +1911,7 @@ def cmadaas_analysis_by_time(time_str, limit=None, data_code='NAFP_CLDAS2.0_NRT_
directory = os.path.join(data_code, fcst_ele, str(fcst_level))
filename = time_str
if limit is not None:
filename = filename + '.' + str(limit)
filename = filename + '.' + str(limit).replace(" ","")
cache_file = CONFIG.get_cache_file(directory, filename, name="CMADaaS", cache_clear=cache_clear)
if cache_file.is_file():
with open(cache_file, 'rb') as f:
@ -2073,8 +2073,9 @@ def cmadaas_get_model_latest_time(data_code="NAFP_FOR_FTM_HIGH_EC_ANEA", latestT
return time[0]
def cmadaas_model_grid(data_code, init_time, valid_time, fcst_ele, fcst_level, level_type, limit=None,
varname='data', units=None, scale_off=None, cache=True, cache_clear=True,
def cmadaas_model_grid(data_code, init_time, valid_time, fcst_ele, fcst_level, level_type,
fcst_member=None, limit=None, varname='data', units=None,
scale_off=None, cache=True, cache_clear=True,
levattrs={'long_name':'height_above_ground', 'units':'m', '_CoordinateAxisType':'Height'}):
"""
Retrieve model grid data from CMADaaS service.
@ -2089,6 +2090,7 @@ def cmadaas_model_grid(data_code, init_time, valid_time, fcst_ele, fcst_level, l
:param fcst_ele: forecast element, like 2m temperature "TEM"
:param fcst_level: vertical level, like 0
:param level_type: forecast level type, 表示Grib数据中的层次类型, 可在云平台上查询.
:param fcst_member: ensemble forecast member, 集合预报成员标识. 如果数据是集合预报, 该变量必须设置.
:param limit: [min_lat, min_lon, max_lat, max_lon]
:param varname: set variable name, default is 'data'
:param units: forecast element's units, defaults to retrieved units.
@ -2102,8 +2104,13 @@ def cmadaas_model_grid(data_code, init_time, valid_time, fcst_ele, fcst_level, l
:return: xarray dataset.
Examples:
>>> data = cmadaas_model_grid("NAFP_FOR_FTM_HIGH_EC_ANEA", "2021010512", 24, 'TEM', 850, 100, units="C", scale_off=[1.0, -273.15], limit=[10.,100,30,120.],
levattrs={'long_name':'pressure_level', 'units':'hPa', '_CoordinateAxisType':'Pressure'}, cache=True)
>>> data = cmadaas_model_grid("NAFP_FOR_FTM_HIGH_EC_ANEA", "2021010512", 24, 'TEM', 850, 100,
units="C", scale_off=[1.0, -273.15], limit=[10.,100,30,120.],
levattrs={'long_name':'pressure_level', 'units':'hPa', '_CoordinateAxisType':'Pressure'},
cache=True)
>>> data = cmadaas_model_grid("NAFP_GRAPESREPS_FOR_FTM_DIS_CHN", "2022092400", 24, 'TEM', 2, 103,
fcst_member=1, units="C", scale_off=[1.0, -273.15],
limit=[10., 100, 30, 120.], cache=True)
"""
# check initial time
@ -2115,9 +2122,12 @@ def cmadaas_model_grid(data_code, init_time, valid_time, fcst_ele, fcst_level, l
# retrieve data from cached file
if cache:
directory = os.path.join(data_code, fcst_ele, str(fcst_level))
filename = init_time_str + '.' + str(valid_time).zfill(3)
filename = init_time_str
if limit is not None:
filename = init_time_str + '_' +str(limit) +'.' + str(valid_time).zfill(3)
filename = filename + '_' + str(limit).replace(" ","")
if fcst_member is not None:
filename = filename + '_' + str(fcst_member).replace(" ","")
filename = filename + '.' + str(valid_time).zfill(3)
cache_file = CONFIG.get_cache_file(directory, filename, name="CMADaaS", cache_clear=cache_clear)
if cache_file.is_file():
with open(cache_file, 'rb') as f:
@ -2125,26 +2135,50 @@ def cmadaas_model_grid(data_code, init_time, valid_time, fcst_ele, fcst_level, l
return data
# set retrieve parameters
if limit is None:
params = {'dataCode': data_code,
'time': init_time_str + '0000',
'fcstLevel': '{:d}'.format(fcst_level),
'levelType': level_type if type(level_type) == str else '{:d}'.format(level_type),
'validTime': '{:d}'.format(valid_time),
'fcstEle': fcst_ele}
interface_id = 'getNafpEleGridByTimeAndLevelAndValidtime'
if fcst_member is None:
if limit is None:
params = {'dataCode': data_code,
'time': init_time_str + '0000',
'fcstLevel': '{:d}'.format(fcst_level),
'levelType': level_type if type(level_type) == str else '{:d}'.format(level_type),
'validTime': '{:d}'.format(valid_time),
'fcstEle': fcst_ele}
interface_id = 'getNafpEleGridByTimeAndLevelAndValidtime'
else:
params = {'dataCode': data_code,
'time': init_time_str + '0000',
'minLat': '{:.10f}'.format(limit[0]),
"minLon": '{:.10f}'.format(limit[1]),
"maxLat": '{:.10f}'.format(limit[2]),
"maxLon": '{:.10f}'.format(limit[3]),
'fcstLevel': '{:d}'.format(fcst_level),
'levelType': level_type if type(level_type) == str else '{:d}'.format(level_type),
'validTime': '{:d}'.format(valid_time),
'fcstEle': fcst_ele}
interface_id = 'getNafpEleGridInRectByTimeAndLevelAndValidtime'
else:
params = {'dataCode': data_code,
'time': init_time_str + '0000',
'minLat': '{:.10f}'.format(limit[0]),
"minLon": '{:.10f}'.format(limit[1]),
"maxLat": '{:.10f}'.format(limit[2]),
"maxLon": '{:.10f}'.format(limit[3]),
'fcstLevel': '{:d}'.format(fcst_level),
'levelType': level_type if type(level_type) == str else '{:d}'.format(level_type),
'validTime': '{:d}'.format(valid_time),
'fcstEle': fcst_ele}
interface_id = 'getNafpEleGridInRectByTimeAndLevelAndValidtime'
if limit is None:
params = {'dataCode': data_code,
'time': init_time_str + '0000',
'fcstLevel': '{:d}'.format(fcst_level),
'levelType': level_type if type(level_type) == str else '{:d}'.format(level_type),
'validTime': '{:d}'.format(valid_time),
'fcstEle': fcst_ele,
'fcstMember': '{:d}'.format(fcst_member)}
interface_id = 'getNafpEleGridByTimeAndLevelAndValidtimeAndFcstMember'
else:
params = {'dataCode': data_code,
'time': init_time_str + '0000',
'minLat': '{:.10f}'.format(limit[0]),
"minLon": '{:.10f}'.format(limit[1]),
"maxLat": '{:.10f}'.format(limit[2]),
"maxLon": '{:.10f}'.format(limit[3]),
'fcstLevel': '{:d}'.format(fcst_level),
'levelType': level_type if type(level_type) == str else '{:d}'.format(level_type),
'validTime': '{:d}'.format(valid_time),
'fcstEle': fcst_ele,
'fcstMember': '{:d}'.format(fcst_member)}
interface_id = 'getNafpEleGridInRectByTimeAndLevelAndValidtimeAndFcstMember'
# retrieve data contents
contents = get_rest_result(interface_id, params)
@ -2180,22 +2214,39 @@ def cmadaas_model_grid(data_code, init_time, valid_time, fcst_ele, fcst_level, l
'long_name':'latitude', 'units':'degrees_north', '_CoordinateAxisType':'Lat'})
if fcst_level != 0:
level_coord = ('level', np.array([fcst_level]), levattrs)
if fcst_member is not None:
member_coord = ('realization', np.array([fcst_member]), {
'long_name':'realization', units:'1'})
varattrs = {'short_name': fcst_ele, 'long_name': name, 'units': units}
# construct xarray
data = np.array(contents['DS'], dtype=np.float32)
if scale_off is not None:
data = data * scale_off[0] + scale_off[1]
if fcst_level == 0:
data = data[np.newaxis, ...]
data = xr.Dataset({
varname:(['time', 'lat', 'lon'], data, varattrs)},
coords={'time':time_coord, 'lat':lat_coord, 'lon':lon_coord})
if fcst_member is None:
if fcst_level == 0:
data = data[np.newaxis, ...]
data = xr.Dataset({
varname:(['time', 'lat', 'lon'], data, varattrs)},
coords={'time':time_coord, 'lat':lat_coord, 'lon':lon_coord})
else:
data = data[np.newaxis, np.newaxis, ...]
data = xr.Dataset({
varname:(['time', 'level', 'lat', 'lon'], data, varattrs)},
coords={'time':time_coord, 'level':level_coord, 'lat':lat_coord, 'lon':lon_coord})
else:
data = data[np.newaxis, np.newaxis, ...]
data = xr.Dataset({
varname:(['time', 'level', 'lat', 'lon'], data, varattrs)},
coords={'time':time_coord, 'level':level_coord, 'lat':lat_coord, 'lon':lon_coord})
if fcst_level == 0:
data = data[np.newaxis, np.newaxis, ...]
data = xr.Dataset({
varname:(['realization', 'time', 'lat', 'lon'], data, varattrs)},
coords={'realization':member_coord, 'time':time_coord,
'lat':lat_coord, 'lon':lon_coord})
else:
data = data[np.newaxis, np.newaxis, np.newaxis, ...]
data = xr.Dataset({
varname:(['realization', 'time', 'level', 'lat', 'lon'], data, varattrs)},
coords={'realization':member_coord, 'time':time_coord, 'level':level_coord,
'lat':lat_coord, 'lon':lon_coord})
# add time coordinates
data.coords['forecast_reference_time'] = init_time[0]
@ -2411,17 +2462,19 @@ def cmadaas_model_profiles(data_code, init_time, valid_times, fcst_ele, fcst_lev
return None
def cmadaas_model_by_time(init_time, valid_time=0, limit=None,
def cmadaas_model_by_time(init_time, valid_time=0, limit=None, fcst_member=None,
data_code='NAFP_FOR_FTM_HIGH_EC_GLB', fcst_level=0, level_type=1,
levattrs={'long_name':'pressure_level', 'units':'hPa', '_CoordinateAxisType':'Pressure'},
fcst_ele="TEM", varname='data', units=None, scale_off=None, cache=True, cache_clear=True):
"""
Retrieve grid data from CMADaaS service.
与cmadass_model_grid功能相似, 只是接口方式不同.
与cmadass_model_grid功能相似, 只是接口方式不同(多了一些默认参数的设置),
建议使用cmadass_model_grid为主.
:param init_time: model run time, like "2016081712", or datetime object.
:param valid_time: forecast hour, default is 0
:param limit: [min_lat, min_lon, max_lat, max_lon]
:param fcst_member: ensemble forecast member, 集合预报成员标识. 如果数据是集合预报, 该变量必须设置.
:param varname: set variable name, default is 'data'
:param data_code: MUSIC data code, default is "NAFP_FOR_FTM_HIGH_EC_GLB", 即EC高分辨率全球地面预报数据.
:param fcst_level: vertical level, default is 0, 表示地面数据, 可在云平台上查询.
@ -2447,9 +2500,12 @@ def cmadaas_model_by_time(init_time, valid_time=0, limit=None,
# retrieve data from cached file
if cache:
directory = os.path.join(data_code, fcst_ele, str(fcst_level))
filename = init_time_str + '.' + str(valid_time).zfill(3)
filename = init_time_str
if limit is not None:
filename = init_time_str + '_' +str(limit) +'.' + str(valid_time).zfill(3)
filename = filename + '_' + str(limit).replace(" ","")
if fcst_member is not None:
filename = filename + '_' + str(fcst_member).replace(" ","")
filename = filename + '.' + str(valid_time).zfill(3)
cache_file = CONFIG.get_cache_file(directory, filename, name="CMADaaS", cache_clear=cache_clear)
if cache_file.is_file():
with open(cache_file, 'rb') as f:
@ -2457,27 +2513,52 @@ def cmadaas_model_by_time(init_time, valid_time=0, limit=None,
return data
# set retrieve parameters
if limit is None:
params = {'dataCode': data_code,
'time': init_time_str + '0000',
'fcstLevel': '{:d}'.format(fcst_level),
'levelType': level_type if type(level_type) == str else '{:d}'.format(level_type),
'validTime': '{:d}'.format(valid_time),
'fcstEle': fcst_ele}
interface_id = 'getNafpEleGridByTimeAndLevelAndValidtime'
if fcst_member is None:
if limit is None:
params = {'dataCode': data_code,
'time': init_time_str + '0000',
'fcstLevel': '{:d}'.format(fcst_level),
'levelType': level_type if type(level_type) == str else '{:d}'.format(level_type),
'validTime': '{:d}'.format(valid_time),
'fcstEle': fcst_ele}
interface_id = 'getNafpEleGridByTimeAndLevelAndValidtime'
else:
params = {'dataCode': data_code,
'time': init_time_str + '0000',
'minLat': '{:.10f}'.format(limit[0]),
"minLon": '{:.10f}'.format(limit[1]),
"maxLat": '{:.10f}'.format(limit[2]),
"maxLon": '{:.10f}'.format(limit[3]),
'fcstLevel': '{:d}'.format(fcst_level),
'levelType': level_type if type(level_type) == str else '{:d}'.format(level_type),
'validTime': '{:d}'.format(valid_time),
'fcstEle': fcst_ele}
interface_id = 'getNafpEleGridInRectByTimeAndLevelAndValidtime'
else:
params = {'dataCode': data_code,
'time': init_time_str + '0000',
'minLat': '{:.10f}'.format(limit[0]),
"minLon": '{:.10f}'.format(limit[1]),
"maxLat": '{:.10f}'.format(limit[2]),
"maxLon": '{:.10f}'.format(limit[3]),
'fcstLevel': '{:d}'.format(fcst_level),
'levelType': level_type if type(level_type) == str else '{:d}'.format(level_type),
'validTime': '{:d}'.format(valid_time),
'fcstEle': fcst_ele}
interface_id = 'getNafpEleGridInRectByTimeAndLevelAndValidtime'
if limit is None:
params = {'dataCode': data_code,
'time': init_time_str + '0000',
'fcstLevel': '{:d}'.format(fcst_level),
'levelType': level_type if type(level_type) == str else '{:d}'.format(level_type),
'validTime': '{:d}'.format(valid_time),
'fcstEle': fcst_ele,
'fcstMember': '{:d}'.format(fcst_member)}
interface_id = 'getNafpEleGridByTimeAndLevelAndValidtimeAndFcstMember'
else:
params = {'dataCode': data_code,
'time': init_time_str + '0000',
'minLat': '{:.10f}'.format(limit[0]),
"minLon": '{:.10f}'.format(limit[1]),
"maxLat": '{:.10f}'.format(limit[2]),
"maxLon": '{:.10f}'.format(limit[3]),
'fcstLevel': '{:d}'.format(fcst_level),
'levelType': level_type if type(level_type) == str else '{:d}'.format(level_type),
'validTime': '{:d}'.format(valid_time),
'fcstEle': fcst_ele,
'fcstMember': '{:d}'.format(fcst_member)}
interface_id = 'getNafpEleGridInRectByTimeAndLevelAndValidtimeAndFcstMember'
# retrieve data contents
contents = get_rest_result(interface_id, params)
contents = _load_contents(contents)
@ -2513,22 +2594,39 @@ def cmadaas_model_by_time(init_time, valid_time=0, limit=None,
'long_name':'latitude', 'units':'degrees_north', '_CoordinateAxisType':'Lat'})
if fcst_level != 0:
level_coord = ('level', np.array([fcst_level]), levattrs)
if fcst_member is not None:
member_coord = ('realization', np.array([fcst_member]), {
'long_name':'realization', units:'1'})
varattrs = {'long_name': name, 'units': units}
# construct xarray
data = np.array(contents['DS'], dtype=np.float32)
if scale_off is not None:
data = data * scale_off[0] + scale_off[1]
if fcst_level == 0:
data = data[np.newaxis, ...]
data = xr.Dataset({
varname:(['time', 'lat', 'lon'], data, varattrs)},
coords={'time':time_coord, 'lat':lat_coord, 'lon':lon_coord})
if fcst_member is None:
if fcst_level == 0:
data = data[np.newaxis, ...]
data = xr.Dataset({
varname:(['time', 'lat', 'lon'], data, varattrs)},
coords={'time':time_coord, 'lat':lat_coord, 'lon':lon_coord})
else:
data = data[np.newaxis, np.newaxis, ...]
data = xr.Dataset({
varname:(['time', 'level', 'lat', 'lon'], data, varattrs)},
coords={'time':time_coord, 'level':level_coord, 'lat':lat_coord, 'lon':lon_coord})
else:
data = data[np.newaxis, np.newaxis, ...]
data = xr.Dataset({
varname:(['time', 'level', 'lat', 'lon'], data, varattrs)},
coords={'time':time_coord, 'level':level_coord, 'lat':lat_coord, 'lon':lon_coord})
if fcst_level == 0:
data = data[np.newaxis, np.newaxis, ...]
data = xr.Dataset({
varname:(['realization', 'time', 'lat', 'lon'], data, varattrs)},
coords={'realization':member_coord, 'time':time_coord,
'lat':lat_coord, 'lon':lon_coord})
else:
data = data[np.newaxis, np.newaxis, np.newaxis, ...]
data = xr.Dataset({
varname:(['realization', 'time', 'level', 'lat', 'lon'], data, varattrs)},
coords={'realization':member_coord, 'time':time_coord, 'level':level_coord,
'lat':lat_coord, 'lon':lon_coord})
# add time coordinates
data.coords['forecast_reference_time'] = init_time[0]
@ -2777,7 +2875,9 @@ def cmadaas_get_model_file_with_filter(
the list of download files path.
Examples:
>>> out_files = cmadaas_get_model_file('20210113000000', just_url=True)
>>> out_files = cmadaas_get_model_file_with_filter(
'20220920000000', data_code="NAFP_GRAPESREPS_FOR_FTM_DIS_CHN",
filter="*_TEM_103_*_4_4.*", just_url=True)
"""
# check initial time
@ -2832,6 +2932,3 @@ def cmadaas_get_model_file_with_filter(
return out_files
out_files = cmadaas_get_model_file_with_filter(
'20220920000000', data_code="NAFP_GRAPESREPS_FOR_FTM_DIS_CHN",
filter="*_TEM_103_*_4_4.*", just_url=True)