Fix read fy awx file problem.

This commit is contained in:
NMC-DAVE 2022-04-01 00:31:08 +08:00
parent bded9aa8d8
commit 51879d4744
5 changed files with 601 additions and 269 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

@ -4,4 +4,4 @@ from various data sources.
"""
__author__ = "The R & D Center for Weather Forecasting Technology in NMC, CMA"
__version__ = '0.1.9.1'
__version__ = '0.1.9.2'

@ -7,49 +7,45 @@
Read FY satellite awx format file.
"""
from datetime import datetime, timedelta
import numpy as np
import xarray as xr
def read_awx_cloud(fname):
def resolve_awx_bytearray(btarray, units=''):
"""
Read satellite awx format file.
解析awx文件的字节内容, 返回数据信息
:param fname: file pathname.
:return: data list
:Example:
>>> headinfo, data = read_awx_cloud("./data/ANI_IR1_R04_20191026_2100_FY2G.AWX")
Args:
btarray: byte array.
"""
# the first class file head 一级文件头记录采用定长方式, 共40字节
head1_dtype = [
('SAT96', 'S12'), # SAT96 filename
('byteSequence', 'i2'), # 整型数的字节顺序, 0 低字节在前, 高字节在后; !=0 高字节在前, 低字节在后.
('firstClassHeadLength', 'i2'), # 第一节文件头长度
('secondClassHeadLength', 'i2'), # 第二节文件头长度
('padDataLength', 'i2'), # 填充段数据长度
('recordLength', 'i2'), # 记录长度(字节), 图像产品: 记录长度=图形宽度, 格点场产品: 记录长度=横向格点数x格点数据字长
('headRecordNumber', 'i2'), # 文件头占用记录数, 一级文件头、二填充段扩展以及的所占用总记录个数
('dataRecordNumber', 'i2'), # 产品数据占用记录数
('productCategory', 'i2'), # 产品类别, 1静止, 2极轨, 3格点定量, 4离散, 5: 图形和分析
('compressMethod', 'i2'), # 压缩方式, 0: 未压缩; 1 行程编码压缩; 2 LZW方式压缩; 3 特点方式压缩
('formatString', 'S8'), # 格式说明字符串, 'SAT2004', 'SAT98'为早期版本(不支持)
('qualityFlag', 'i2')] # 产品数据质量标记, 1 完全可靠; 2 基本可靠; 3 有缺值, 可用; 4 不可用
head1_info = np.frombuffer(btarray[0:40], dtype=head1_dtype)
ind = 40
# read part of binary
# refer to
# https://stackoverflow.com/questions/14245094/how-to-read-part-of-binary-file-with-numpy
# open file
with open(fname, 'rb') as fh:
# read file content
ba = bytearray(fh.read())
# define head structure
head_dtype = [
('SAT96', 'S12'), # SAT96 filename
('byteSequence', 'i2'), # integer number byte sequence
('firstClassHeadLength', 'i2'),
('secondClassHeadLength', 'i2'),
('padDataLength', 'i2'),
('recordLength', 'i2'),
('headRecordNumber', 'i2'),
('dataRecordNumber', 'i2'),
('productCategory', 'i2'),
('compressMethod', 'i2'),
('formatString', 'S8'),
('qualityFlag', 'i2'),
('satelliteName', 'S8'),
if head1_info['productCategory'] == 1:
# the second class file head 二级文件头采用不定长方式,内容依据产品的不同而不同.
head2_dtype = [
('satelliteName', 'S8'), # 卫星名
('year', 'i2'), ('month', 'i2'),
('day', 'i2'), ('hour', 'i2'),
('minute', 'i2'),
('channel', 'i2'),
('flagOfProjection', 'i2'),
('channel', 'i2'), # 通道号, 1红外, 2水汽, 3红外分裂, 4可见光, 5中红外, 6备用
('flagOfProjection', 'i2'), # 投影, 0为投影, 1兰勃托, 2麦卡托, 3极射, 4等经纬度, 5等面积
('widthOfImage', 'i2'),
('heightOfImage', 'i2'),
('scanLineNumberOfImageTopLeft', 'i2'),
@ -71,29 +67,152 @@ def read_awx_cloud(fname):
('dataLengthOfCalibration', 'i2'),
('dataLengthOfGeolocation', 'i2'),
('reserved', 'i2')]
head2_info = np.frombuffer(btarray[ind:(ind+64)], dtype=head2_dtype)
ind += 64
head_info = np.frombuffer(ba[0:104], dtype=head_dtype)
ind = 104
# color table
if head2_info['dataLengthOfColorTable'] != 0:
table_R = np.frombuffer(btarray[ind:(ind + 256)], dtype='u1')
ind += 256
table_G = np.frombuffer(btarray[ind:(ind + 256)], dtype='u1')
ind += 256
table_B = np.frombuffer(btarray[ind:(ind + 256)], dtype='u1')
ind += 256
# calibration table 定标数据块, 描述的是图像灰度值与探测物理量之间的关系
# refer to
# http://bbs.06climate.com/forum.php?mod=viewthread&tid=89296
# https://github.com/myyd/China_AWX/blob/master/awx/FY4A_AWX.py
# 2022/3/31日更改, 采用无符号整型'u2'读取, 并且区分了可见光和红外, 水汽的定标表
calibration_table = None
if head2_info['dataLengthOfCalibration'] != 0:
calibration_table = np.frombuffer(btarray[ind:(ind + 2048)], dtype='u2')
if head2_info['channel'] == 4:
# 对于可见光通道图像, 定标数据在0-63的范围内查找
calibration_table = calibration_table * 0.01
calibration_table = calibration_table[1:64]
calibration_table = calibration_table.repeat(4)
else:
# 对于红外,水汽图像, 在全局范围内查找, 但要隔4进行跳点
calibration_table = calibration_table * 0.01
calibration_table = calibration_table[0::4]
ind += 2048
# head rest information
head_rest_len = (head_info['recordLength'][0].astype(np.int) *
head_info['headRecordNumber'][0] - ind)
head_rest = np.frombuffer(
ba[ind:(ind + head_rest_len)],
dtype='u1', count=head_rest_len)
ind += head_rest_len
# geolocation table
if head2_info['dataLengthOfGeolocation'] != 0:
geolocation_dtype = [
('coordinate', 'i2'),
('source', 'i2'),
('delta', 'i2'),
('left_top_lat', 'i2'),
('left_top_lon', 'i2'),
('horizontalNumber', 'i2'),
('verticalNumber', 'i2'),
('reserved', 'i2')]
geolocation_info = np.frombuffer(btarray[ind:(ind+16)], dtype=geolocation_dtype)
ind += 16
geolocation_length = geolocation_info['horizontal_number'][0] * geolocation_info['vertical_number'][0] * 2
geolocation_table = np.frombuffer(btarray[ind:(ind+geolocation_length)], dtype='i2')
ind += geolocation_length
# retrieve data records
data_len = (head_info['dataRecordNumber'][0].astype(np.int) *
head_info['recordLength'][0])
data = np.frombuffer(
ba[ind:(ind + data_len)], dtype='u1',
count=data_len)
data.shape = (head_info['dataRecordNumber'][0],
head_info['recordLength'][0])
# pad field
pad_field = np.frombuffer(btarray[ind:(ind+head1_info['padDataLength'][0])], dtype='u1')
ind += head1_info['padDataLength'][0]
# retrieve data records
data_len = (head1_info['dataRecordNumber'][0].astype(int) *
head1_info['recordLength'][0])
data = np.frombuffer(btarray[ind:(ind + data_len)], dtype='u1', count=data_len)
if calibration_table is not None:
data = calibration_table[data]
data.shape = (head1_info['dataRecordNumber'][0], head1_info['recordLength'][0])
# 由于数据是按照左上角开始放置, 为此需要对纬度顺序进行反转
data = np.flip(data, axis=0)
# construct longitude and latitude coordinates
# if use the verticalResolution and horizontalResolution, lon and lat will not be correct.
#lat = (
# head2_info['latitudeOfNorth'][0]/100. -
# np.arange(head2_info['heightOfImage'][0])*head2_info['verticalResolution'][0]/100.)
#lon = (
# head2_info['longitudeOfWest'][0]/100. +
# np.arange(head2_info['widthOfImage'][0])*head2_info['horizontalResolution'][0]/100.)
lat = np.linspace(
head2_info['latitudeOfSouth'][0]/100., head2_info['latitudeOfNorth'][0]/100.,
num=head2_info['heightOfImage'][0])
lon = np.linspace(
head2_info['longitudeOfWest'][0]/100., head2_info['longitudeOfEast'][0]/100.,
num=head2_info['widthOfImage'][0])
# construct time
time = datetime(
head2_info['year'][0], head2_info['month'][0],
head2_info['day'][0], head2_info['hour'][0], head2_info['minute'][0])
time = np.array([time], dtype='datetime64[ms]')
# define coordinates
time_coord = ('time', time)
lon_coord = ('lon', lon, {
'long_name':'longitude', 'units':'degrees_east',
'_CoordinateAxisType':'Lon', 'axis': "X"})
lat_coord = ('lat', lat, {
'long_name':'latitude', 'units':'degrees_north',
'_CoordinateAxisType':'Lat', 'axis': "Y"})
channel_coord = ('channel', [head2_info['channel'][0]],
{'long_name':'channel', 'units':''})
# create xarray
data = data[np.newaxis, np.newaxis, ...]
varattrs = {
'productCategory': head1_info['productCategory'][0], # 产品类型, 1:静止, 2:极轨, 3:格点, 4:离散, 5:图形和分析
'formatString': head1_info['formatString'][0], # 产品格式名称
'qualityFlag': head1_info['qualityFlag'][0], # 产品质量标识
'satelliteName': head2_info['satelliteName'][0], # 卫星名称
'flagOfProjection': head2_info['flagOfProjection'][0], # 投影方式, 0:未投影, 1:兰勃托, 2:麦卡托, 3:极射, 4:等经纬, 5:等面积
'units': units}
data = xr.Dataset({
'image':(['time', 'channel', 'lat', 'lon'], data, varattrs)},
coords={ 'time':time_coord, 'channel':channel_coord,
'lat':lat_coord, 'lon':lon_coord})
# add attributes
data.attrs['Conventions'] = "CF-1.6"
data.attrs['Origin'] = 'MICAPS Cassandra Server'
# return
return head_info, data
return data
elif head1_info['productCategory'] == 3:
# TODO参考https://github.com/wqshen/AwxReader 读取格点定量数据
print("The productCategory is not supported.")
return None
else:
print("The productCategory is not supported.")
return None
def read_awx_cloud(fname, units=''):
"""
Read satellite awx format file.
:param fname: file pathname.
:return: data list
:Example:
>>> headinfo, data = read_awx_cloud("./data/ANI_IR1_R04_20191026_2100_FY2G.AWX")
"""
# read part of binary
# refer to
# https://stackoverflow.com/questions/14245094/how-to-read-part-of-binary-file-with-numpy
# open file
with open(fname, 'rb') as fh:
# read file content
ba = bytearray(fh.read())
return resolve_awx_bytearray(ba, units=units)
def read_himawari(fname, resolution):

@ -27,6 +27,7 @@ from tqdm import tqdm
from nmc_met_io import DataBlock_pb2
import nmc_met_io.config as CONFIG
from nmc_met_io.read_radar import StandardData
from nmc_met_io.read_satellite import resolve_awx_bytearray
def get_http_result(host, port, url):
@ -868,177 +869,21 @@ def get_fy_awx(directory, filename=None, suffix="*.AWX", units='', cache=True, c
print('There is no data ' + filename + ' in ' + directory)
return None
# the first class file head 一级文件头记录采用定长方式, 共40字节
head1_dtype = [
('SAT96', 'S12'), # SAT96 filename
('byteSequence', 'i2'), # 整型数的字节顺序, 0 低字节在前, 高字节在后; !=0 高字节在前, 低字节在后.
('firstClassHeadLength', 'i2'), # 第一节文件头长度
('secondClassHeadLength', 'i2'), # 第二节文件头长度
('padDataLength', 'i2'), # 填充段数据长度
('recordLength', 'i2'), # 记录长度(字节), 图像产品: 记录长度=图形宽度, 格点场产品: 记录长度=横向格点数x格点数据字长
('headRecordNumber', 'i2'), # 文件头占用记录数, 一级文件头、二填充段扩展以及的所占用总记录个数
('dataRecordNumber', 'i2'), # 产品数据占用记录数
('productCategory', 'i2'), # 产品类别, 1静止, 2极轨, 3格点定量, 4离散, 5: 图形和分析
('compressMethod', 'i2'), # 压缩方式, 0: 未压缩; 1 行程编码压缩; 2 LZW方式压缩; 3 特点方式压缩
('formatString', 'S8'), # 格式说明字符串, 'SAT2004', 'SAT98'为早期版本(不支持)
('qualityFlag', 'i2')] # 产品数据质量标记, 1 完全可靠; 2 基本可靠; 3 有缺值, 可用; 4 不可用
head1_info = np.frombuffer(byteArray[0:40], dtype=head1_dtype)
ind = 40
if head1_info['productCategory'] == 1:
# the second class file head 二级文件头采用不定长方式,内容依据产品的不同而不同.
head2_dtype = [
('satelliteName', 'S8'), # 卫星名
('year', 'i2'), ('month', 'i2'),
('day', 'i2'), ('hour', 'i2'),
('minute', 'i2'),
('channel', 'i2'), # 通道号, 1红外, 2水汽, 3红外分裂, 4可见光, 5中红外, 6备用
('flagOfProjection', 'i2'), # 投影, 0为投影, 1兰勃托, 2麦卡托, 3极射, 4等经纬度, 5等面积
('widthOfImage', 'i2'),
('heightOfImage', 'i2'),
('scanLineNumberOfImageTopLeft', 'i2'),
('pixelNumberOfImageTopLeft', 'i2'),
('sampleRatio', 'i2'),
('latitudeOfNorth', 'i2'),
('latitudeOfSouth', 'i2'),
('longitudeOfWest', 'i2'),
('longitudeOfEast', 'i2'),
('centerLatitudeOfProjection', 'i2'),
('centerLongitudeOfProjection', 'i2'),
('standardLatitude1', 'i2'),
('standardLatitude2', 'i2'),
('horizontalResolution', 'i2'),
('verticalResolution', 'i2'),
('overlapFlagGeoGrid', 'i2'),
('overlapValueGeoGrid', 'i2'),
('dataLengthOfColorTable', 'i2'),
('dataLengthOfCalibration', 'i2'),
('dataLengthOfGeolocation', 'i2'),
('reserved', 'i2')]
head2_info = np.frombuffer(byteArray[ind:(ind+64)], dtype=head2_dtype)
ind += 64
# color table
if head2_info['dataLengthOfColorTable'] != 0:
table_R = np.frombuffer(byteArray[ind:(ind + 256)], dtype='u1')
ind += 256
table_G = np.frombuffer(byteArray[ind:(ind + 256)], dtype='u1')
ind += 256
table_B = np.frombuffer(byteArray[ind:(ind + 256)], dtype='u1')
ind += 256
# calibration table 定标数据块
calibration_table = None
if head2_info['dataLengthOfCalibration'] != 0:
calibration_table = np.frombuffer(byteArray[ind:(ind + 2048)], dtype='u2')
calibration_table = calibration_table * 0.01
#if (np.array_equal(calibration_table[0::4], calibration_table[1::4]) and
# np.array_equal(calibration_table[0::4], calibration_table[2::4]) and
# np.array_equal(calibration_table[0::4], calibration_table[3::4])):
# This is a trick, refer to http://bbs.06climate.com/forum.php?mod=viewthread&tid=89296
# calibration_table = calibration_table[0::4]
calibration_table = calibration_table[0::4]
ind += 2048
# geolocation table
if head2_info['dataLengthOfGeolocation'] != 0:
geolocation_dtype = [
('coordinate', 'i2'),
('source', 'i2'),
('delta', 'i2'),
('left_top_lat', 'i2'),
('left_top_lon', 'i2'),
('horizontalNumber', 'i2'),
('verticalNumber', 'i2'),
('reserved', 'i2')]
geolocation_info = np.frombuffer(byteArray[ind:(ind+16)], dtype=geolocation_dtype)
ind += 16
geolocation_length = geolocation_info['horizontal_number'][0] * geolocation_info['vertical_number'][0] * 2
geolocation_table = np.frombuffer(byteArray[ind:(ind+geolocation_length)], dtype='i2')
ind += geolocation_length
# pad field
pad_field = np.frombuffer(byteArray[ind:(ind+head1_info['padDataLength'][0])], dtype='u1')
ind += head1_info['padDataLength'][0]
# retrieve data records
data_len = (head1_info['dataRecordNumber'][0].astype(int) *
head1_info['recordLength'][0])
data = np.frombuffer(byteArray[ind:(ind + data_len)], dtype='u1', count=data_len)
if calibration_table is not None:
data = calibration_table[data]
data.shape = (head1_info['dataRecordNumber'][0], head1_info['recordLength'][0])
# 由于数据是按照左上角开始放置, 为此需要对纬度顺序进行反转
data = np.flip(data, axis=0)
# construct longitude and latitude coordinates
# if use the verticalResolution and horizontalResolution, lon and lat will not be correct.
#lat = (
# head2_info['latitudeOfNorth'][0]/100. -
# np.arange(head2_info['heightOfImage'][0])*head2_info['verticalResolution'][0]/100.)
#lon = (
# head2_info['longitudeOfWest'][0]/100. +
# np.arange(head2_info['widthOfImage'][0])*head2_info['horizontalResolution'][0]/100.)
lat = np.linspace(
head2_info['latitudeOfSouth'][0]/100., head2_info['latitudeOfNorth'][0]/100.,
num=head2_info['heightOfImage'][0])
lon = np.linspace(
head2_info['longitudeOfWest'][0]/100., head2_info['longitudeOfEast'][0]/100.,
num=head2_info['widthOfImage'][0])
# construct time
time = datetime(
head2_info['year'][0], head2_info['month'][0],
head2_info['day'][0], head2_info['hour'][0], head2_info['minute'][0])
time = np.array([time], dtype='datetime64[ms]')
# define coordinates
time_coord = ('time', time)
lon_coord = ('lon', lon, {
'long_name':'longitude', 'units':'degrees_east',
'_CoordinateAxisType':'Lon', 'axis': "X"})
lat_coord = ('lat', lat, {
'long_name':'latitude', 'units':'degrees_north',
'_CoordinateAxisType':'Lat', 'axis': "Y"})
channel_coord = ('channel', [head2_info['channel'][0]],
{'long_name':'channel', 'units':''})
# create xarray
data = data[np.newaxis, np.newaxis, ...]
varattrs = {
'productCategory': head1_info['productCategory'][0], # 产品类型, 1:静止, 2:极轨, 3:格点, 4:离散, 5:图形和分析
'formatString': head1_info['formatString'][0], # 产品格式名称
'qualityFlag': head1_info['qualityFlag'][0], # 产品质量标识
'satelliteName': head2_info['satelliteName'][0], # 卫星名称
'flagOfProjection': head2_info['flagOfProjection'][0], # 投影方式, 0:未投影, 1:兰勃托, 2:麦卡托, 3:极射, 4:等经纬, 5:等面积
'units': units}
data = xr.Dataset({
'image':(['time', 'channel', 'lat', 'lon'], data, varattrs)},
coords={ 'time':time_coord, 'channel':channel_coord,
'lat':lat_coord, 'lon':lon_coord})
# add attributes
data.attrs['Conventions'] = "CF-1.6"
data.attrs['Origin'] = 'MICAPS Cassandra Server'
# cache data
if cache:
with open(cache_file, 'wb') as f:
pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
# return
return data
else:
print("The productCategory is not supported.")
# 解析字节数据
data = resolve_awx_bytearray(byteArray, units=units)
if data is None:
return None
# cache data
if cache:
with open(cache_file, 'wb') as f:
pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
return data
else:
return None
else:
return None
directory = "SATELLITE/FY2G/L1/IR1/EQUAL/"
data = get_fy_awx(directory, cache=False)
def get_fy_awxs(directory, filenames, allExists=True, pbar=False, **kargs):
"""