Update get_radar_mosaic for supporting the new V3 format.

This commit is contained in:
NMC-DAVE 2021-05-12 14:09:39 +08:00
parent e108a0c3fa
commit 746c99fb3c
3 changed files with 310 additions and 141 deletions

@ -1,8 +1,6 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 从MICAPS Cassandra Server数据库读取数值模式数据\n",
"\n",
@ -29,7 +27,9 @@
"* https://github.com/nmcdev/nmc_met_io\n",
"* http://www.micaps.cn/MifunForum/topic/list?fId=7\n",
"---"
]
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "markdown",
@ -63,14 +63,14 @@
},
"outputs": [
{
"output_type": "execute_result",
"data": {
"text/plain": [
"<xarray.core.options.set_options at 0x7fc808956990>"
"<xarray.core.options.set_options at 0x7fef3958ab10>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
"execution_count": 2
}
],
"source": [
@ -125,33 +125,20 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"name": "stdout",
"text": [
"<xarray.Dataset>\n",
"Dimensions: (lat: 301, level: 1, lon: 651, time: 1)\n",
"Coordinates:\n",
" * time (time) datetime64[ns] 2021-04-25T08:00:00\n",
" * level (level) float32 850.0\n",
" * lat (lat) float64 6.735e-06 0.2 0.4 ... 59.6 59.8 60.0\n",
" * lon (lon) float64 50.0 50.2 50.4 ... 179.6 179.8 180.0\n",
" forecast_reference_time datetime64[ns] 2021-04-24T08:00:00\n",
" forecast_period (time) float64 24.0\n",
"Data variables:\n",
" data (time, level, lat, lon) float32 18.17 ... -4.896\n",
"Attributes:\n",
" Conventions: CF-1.6\n",
" Origin: MICAPS Cassandra Server\n"
"<xarray.Dataset>\nDimensions: (lat: 301, level: 1, lon: 651, time: 1)\nCoordinates:\n * time (time) datetime64[ns] 2021-05-09T08:00:00\n * level (level) float32 850.0\n * lat (lat) float64 6.735e-06 0.2 0.4 ... 59.6 59.8 60.0\n * lon (lon) float64 50.0 50.2 50.4 ... 179.6 179.8 180.0\n forecast_reference_time datetime64[ns] 2021-05-08T08:00:00\n forecast_period (time) float64 24.0\nData variables:\n data (time, level, lat, lon) float32 19.18 ... -2.792\nAttributes:\n Conventions: CF-1.6\n Origin: MICAPS Cassandra Server\n"
]
}
],
"source": [
"directory = 'ECMWF_HR/TMP/850'\n",
"filename = '21042408.024'\n",
"filename = '21050808.024'\n",
"data = get_model_grid(directory, filename=filename, cache=False)\n",
"if data is not None:\n",
" print(data)\n",
@ -539,9 +526,8 @@
"celltoolbar": "Initialization Cell",
"hide_input": false,
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
"name": "python3710jvsc74a57bd0ce674488fc7389ca027fd99dc2eb5f5e35c93a4190347346ae1bacb3bb4e56a8",
"display_name": "Python 3.7.10 64-bit ('MET': conda)"
},
"language_info": {
"codemirror_mode": {
@ -580,4 +566,4 @@
},
"nbformat": 4,
"nbformat_minor": 4
}
}

File diff suppressed because one or more lines are too long

@ -18,6 +18,7 @@ import http.client
import urllib.parse
import pickle
import bz2
import zlib
from io import BytesIO
from datetime import datetime, timedelta
import numpy as np
@ -1059,7 +1060,34 @@ def get_fy_awxs(directory, filenames, allExists=True, pbar=False, **kargs):
return xr.concat(dataset, dim='time')
def get_radar_mosaic(directory, filename=None, suffix="*.LATLON", cache=True, cache_clear=True):
def _lzw_decompress(compressed):
"""Decompress a list of output ks to a string.
refer to https://stackoverflow.com/questions/6834388/basic-lzw-compression-help-in-python.
"""
# Build the dictionary.
dict_size = 256
dictionary = {chr(i): chr(i) for i in range(dict_size)}
w = result = compressed.pop(0)
for k in compressed:
if k in dictionary:
entry = dictionary[k]
elif k == dict_size:
entry = w + w[0]
else:
raise ValueError('Bad compressed k: %s' % k)
result += entry
# Add w+entry[0] to the dictionary.
dictionary[dict_size] = w + entry[0]
dict_size += 1
w = entry
return result
def get_radar_mosaic(directory, filename=None, suffix="*.BIN", cache=True, cache_clear=True):
"""
该程序主要用于读取和处理中国气象局CRaMS系统的雷达回波全国拼图数据.
@ -1073,7 +1101,7 @@ def get_radar_mosaic(directory, filename=None, suffix="*.LATLON", cache=True, ca
:Example:
>>> data = get_radar_mosaic("RADARMOSAIC/CREF/")
>>> dir_dir = "RADARMOSAIC/CREF/"
>>> filename = "ACHN.CREF000.20201021.000000.LATLON"
>>> filename = "ACHN_CREF_20210413_005000.BIN"
>>> CREF = get_radar_mosaic(dir_dir, filename=filename, cache=False)
>>> print(CREF['time'].values)
"""
@ -1121,94 +1149,196 @@ def get_radar_mosaic(directory, filename=None, suffix="*.LATLON", cache=True, ca
print('There is no data ' + filename + ' in ' + directory)
return None
# define head structure
head_dtype = [
('description', 'S128'),
# product name, QREF=基本反射率, CREF=组合反射率,
# VIL=液态水含量, OHP=一小时降水
('name', 'S32'),
('organization', 'S16'),
('grid_flag', 'u2'), # 经纬网格数据标识固定值19532
('data_byte', 'i2'), # 数据单元字节数固定值2
('slat', 'f4'), # 数据区的南纬(度)
('wlon', 'f4'), # 数据区的西经(度)
('nlat', 'f4'), # 数据区的北纬(度)
('elon', 'f4'), # 数据区的东经(度)
('clat', 'f4'), # 数据区中心纬度(度)
('clon', 'f4'), # 数据区中心经度(度)
('rows', 'i4'), # 数据区的行数
('cols', 'i4'), # 每行数据的列数
('dlat', 'f4'), # 纬向分辨率(度)
('dlon', 'f4'), # 经向分辨率(度)
('nodata', 'f4'), # 无数据区的编码值
('levelbybtes', 'i4'), # 单层数据字节数
('levelnum', 'i2'), # 数据层个数
('amp', 'i2'), # 数值放大系数
('compmode', 'i2'), # 数据压缩存储时为1否则为0
('dates', 'u2'), # 数据观测时间为1970年1月1日以来的天数
('seconds', 'i4'), # 数据观测时间的秒数
('min_value', 'i2'), # 放大后的数据最小取值
('max_value', 'i2'), # 放大后的数据最大取值
('reserved', 'i2', 6) # 保留字节
]
# check data format version
if byteArray[0:3] == b'MOC':
# the new V3 format
head_dtype = [
('label', 'S4'), # 文件固定标识MOC
('Version', 'S4'), # 文件格式版本代码,如: 1.01.1etc
('FileBytes', 'i4'), # 包含头信息在内的文件字节数不超过2M
('MosaicID', 'i2'), # 拼图产品编号
('coordinate', 'i2'), # 坐标类型: 2=笛卡儿坐标,3=等经纬网格坐标
('varname', 'S8'), # 产品代码,如: ET,VIL,CR,CAP,OHP,OHPC
('description', 'S64'), # 产品描述,如Composite Reflectivity mosaic
('BlockPos', 'i4'), # 产品数据起始位置(字节顺序)
('BlockLen', 'i4'), # 产品数据字节数
# read head information
head_info = np.frombuffer(byteArray[0:256], dtype=head_dtype)
ind = 256
('TimeZone', 'i4'), # 数据时钟,0=世界时,28800=北京时
('yr', 'i2'), # 观测时间中的年份
('mon', 'i2'), # 观测时间中的月份112
('day', 'i2'), # 观测时间中的日期131
('hr', 'i2'), # 观测时间中的小时0023
('min', 'i2'), # 观测时间中的分0059
('sec', 'i2'), # 观测时间中的秒0059
('ObsSeconds', 'i4'), # 观测时间的seconds
('ObsDates', 'u2'), # 观测时间中的Julian dates
('GenDates', 'u2'), # 产品处理时间的天数
('GenSeconds', 'i4'), # 产品处理时间的描述
# get data information
varname = head_info['name'][0].decode("utf-8", 'ignore').rsplit('\x00')[0]
longname = {'CREF': 'Composite Reflectivity', 'QREF': 'Basic Reflectivity',
'VIL': 'Vertically Integrated Liquid', 'OHP': 'One Hour Precipitation'}
units = head_info['organization'][0].decode("utf-8", 'ignore').rsplit('\x00')[0]
amp = head_info['amp'][0]
('edge_s', 'i4'), # 数据区的南边界单位1/1000度放大1千倍
('edge_w', 'i4'), # 数据区的西边界单位1/1000度放大1千倍
('edge_n', 'i4'), # 数据区的北边界单位1/1000度放大1千倍
('edge_e', 'i4'), # 数据区的东边界单位1/1000度放大1千倍
('cx', 'i4'), # 数据区中心坐标单位1/1000度放大1千倍
('cy', 'i4'), # 数据区中心坐标单位1/1000度放大1千倍
('nX', 'i4'), # 格点坐标为列数
('nY', 'i4'), # 格点坐标为行数
('dx', 'i4'), # 格点坐标为列分辨率单位1/10000度放大1万倍
('dy', 'i4'), # 格点坐标为行分辨率单位1/10000度放大1万倍
('height', 'i2'), # 雷达高度
('Compress', 'i2'), # 数据压缩标识, 0=无,1=bz2,2=zip,3=lzw
('num_of_radars', 'i4'), # 有多少个雷达进行了拼图
('UnZipBytes', 'i4'), # 数据段压缩前的字节数
('scale', 'i2'),
('unUsed', 'i2'),
('RgnID', 'S8'),
('units', 'S8'),
('reserved', 'S60')
]
# define data variable
rows = head_info['rows'][0]
cols = head_info['cols'][0]
dlat = head_info['dlat'][0]
dlon = head_info['dlon'][0]
data = np.full(rows*cols, -9999, dtype=np.int32)
# read head information
head_info = np.frombuffer(byteArray[0:256], dtype=head_dtype)
ind = 256
# put data into array
while ind < len(byteArray):
irow = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0]
ind += 2
icol = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0]
ind += 2
if irow == -1 or icol == -1:
break
nrec = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0]
ind += 2
recd = np.frombuffer(
byteArray[ind:(ind + 2*nrec)], dtype='i2', count=nrec)
ind += 2*nrec
position = (irow-1)*cols+icol-1
data[position:(position+nrec)] = recd
# get data information
varname = head_info['varname'][0]
longname = head_info['description'][0]
units = head_info['units'][0]
# reshape data
data.shape = (rows, cols)
# define data variable
rows = head_info['nY'][0]
cols = head_info['nX'][0]
dlat = head_info['dx'][0]/10000.
dlon = head_info['dy'][0]/10000.
# deal missing data and restore values
data = data.astype(np.float32)
data[data < 0] = np.nan
data /= amp
# decompress byte Array
# 目前主要支持bz2压缩格式, zip, lzw格式还没有测试过
if head_info['Compress'] == 0: # 无压缩
data = np.frombuffer(byteArray[ind:], 'i2')
elif head_info['Compress'] == 1: #
data = np.frombuffer(bz2.decompress(byteArray[ind:]), 'i2')
elif head_info['Compress'] == 2:
data = np.frombuffer(zlib.decompress(byteArray[ind:]), 'i2')
elif head_info['Compress'] == 3:
data = np.frombuffer(_lzw_decompress(byteArray[ind:]), 'i2')
else:
print('Can not decompress data.')
return None
# reshape data
data.shape = (rows, cols)
# set longitude and latitude coordinates
lat = head_info['nlat'][0] - np.arange(rows)*dlat - dlat/2.0
lon = head_info['wlon'][0] + np.arange(cols)*dlon - dlon/2.0
# deal missing data and restore values
data = data.astype(np.float32)
data[data < 0] = np.nan
data /= head_info['scale'][0]
# reverse latitude axis
data = np.flip(data, 0)
lat = lat[::-1]
# set longitude and latitude coordinates
lat = head_info['edge_n'][0]/1000. - np.arange(rows)*dlat - dlat/2.0
lon = head_info['edge_w'][0]/1000. + np.arange(cols)*dlon - dlon/2.0
# set time coordinates
# 直接使用时间有问题, 需要天数减去1, 秒数加上28800(8小时)
time = datetime(1970, 1, 1) + timedelta(
days=head_info['dates'][0].astype(np.float64)-1,
seconds=head_info['seconds'][0].astype(np.float64)+28800)
time = np.array([time], dtype='datetime64[m]')
data = np.expand_dims(data, axis=0)
# reverse latitude axis
data = np.flip(data, 0)
lat = lat[::-1]
# set time coordinates
# 直接使用时间有问题, 需要天数减去1, 秒数加上28800(8小时)
time = datetime(head_info['yr'][0], head_info['mon'][0], head_info['day'][0],
head_info['hr'][0], head_info['min'][0], head_info['sec'][0])
time = np.array([time], dtype='datetime64[m]')
data = np.expand_dims(data, axis=0)
else:
# the old LONLAT format.
# define head structure
head_dtype = [
('description', 'S128'),
# product name, QREF=基本反射率, CREF=组合反射率,
# VIL=液态水含量, OHP=一小时降水
('name', 'S32'),
('organization', 'S16'),
('grid_flag', 'u2'), # 经纬网格数据标识固定值19532
('data_byte', 'i2'), # 数据单元字节数固定值2
('slat', 'f4'), # 数据区的南纬(度)
('wlon', 'f4'), # 数据区的西经(度)
('nlat', 'f4'), # 数据区的北纬(度)
('elon', 'f4'), # 数据区的东经(度)
('clat', 'f4'), # 数据区中心纬度(度)
('clon', 'f4'), # 数据区中心经度(度)
('rows', 'i4'), # 数据区的行数
('cols', 'i4'), # 每行数据的列数
('dlat', 'f4'), # 纬向分辨率(度)
('dlon', 'f4'), # 经向分辨率(度)
('nodata', 'f4'), # 无数据区的编码值
('levelbybtes', 'i4'), # 单层数据字节数
('levelnum', 'i2'), # 数据层个数
('amp', 'i2'), # 数值放大系数
('compmode', 'i2'), # 数据压缩存储时为1否则为0
('dates', 'u2'), # 数据观测时间为1970年1月1日以来的天数
('seconds', 'i4'), # 数据观测时间的秒数
('min_value', 'i2'), # 放大后的数据最小取值
('max_value', 'i2'), # 放大后的数据最大取值
('reserved', 'i2', 6) # 保留字节
]
# read head information
head_info = np.frombuffer(byteArray[0:256], dtype=head_dtype)
ind = 256
# get data information
varname = head_info['name'][0].decode("utf-8", 'ignore').rsplit('\x00')[0]
longname = {'CREF': 'Composite Reflectivity', 'QREF': 'Basic Reflectivity',
'VIL': 'Vertically Integrated Liquid', 'OHP': 'One Hour Precipitation'}
longname = longname.get(varname, 'radar mosaic')
units = head_info['organization'][0].decode("utf-8", 'ignore').rsplit('\x00')[0]
amp = head_info['amp'][0]
# define data variable
rows = head_info['rows'][0]
cols = head_info['cols'][0]
dlat = head_info['dlat'][0]
dlon = head_info['dlon'][0]
data = np.full(rows*cols, -9999, dtype=np.int32)
# put data into array
while ind < len(byteArray):
irow = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0]
ind += 2
icol = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0]
ind += 2
if irow == -1 or icol == -1:
break
nrec = np.frombuffer(byteArray[ind:(ind + 2)], dtype='i2')[0]
ind += 2
recd = np.frombuffer(
byteArray[ind:(ind + 2*nrec)], dtype='i2', count=nrec)
ind += 2*nrec
position = (irow-1)*cols+icol-1
data[position:(position+nrec)] = recd
# reshape data
data.shape = (rows, cols)
# deal missing data and restore values
data = data.astype(np.float32)
data[data < 0] = np.nan
data /= amp
# set longitude and latitude coordinates
lat = head_info['nlat'][0] - np.arange(rows)*dlat - dlat/2.0
lon = head_info['wlon'][0] + np.arange(cols)*dlon - dlon/2.0
# reverse latitude axis
data = np.flip(data, 0)
lat = lat[::-1]
# set time coordinates
# 直接使用时间有问题, 需要天数减去1, 秒数加上28800(8小时)
time = datetime(1970, 1, 1) + timedelta(
days=head_info['dates'][0].astype(np.float64)-1,
seconds=head_info['seconds'][0].astype(np.float64)+28800)
time = np.array([time], dtype='datetime64[m]')
data = np.expand_dims(data, axis=0)
# define coordinates
time_coord = ('time', time)
@ -1220,8 +1350,7 @@ def get_radar_mosaic(directory, filename=None, suffix="*.LATLON", cache=True, ca
'_CoordinateAxisType':'Lat', "axis": "Y"})
# create xarray
varattrs = {'long_name': longname.get(varname, 'radar mosaic'),
'short_name': varname, 'units': units}
varattrs = {'long_name': longname, 'short_name': varname, 'units': units}
data = xr.Dataset({'data':(['time', 'lat', 'lon'], data, varattrs)},
coords={'time':time_coord, 'lat':lat_coord, 'lon':lon_coord})