fix get_fy_awx errors in retrieve_cassandraDB.py

use resolve_awx_bytearray from nmc_met_io.read_satellite
This commit is contained in:
xufanglu 2022-06-08 18:46:37 +08:00 committed by GitHub
parent f4500f167f
commit 0fe92f7609
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -29,6 +29,7 @@ import pandas as pd
from tqdm import tqdm from tqdm import tqdm
import nmc_met_io.config as CONFIG import nmc_met_io.config as CONFIG
from nmc_met_io.read_radar import StandardData from nmc_met_io.read_radar import StandardData
from nmc_met_io.read_satellite import resolve_awx_bytearray
try: try:
from cassandra.cluster import Cluster from cassandra.cluster import Cluster
@ -867,178 +868,22 @@ def get_fy_awx(directory, filename=None, suffix="*.AWX", units='', cache=True, c
except ValueError: except ValueError:
print('Can not retrieve data' + filename + ' from ' + directory) print('Can not retrieve data' + filename + ' from ' + directory)
return None return None
if status == 200: if status == 200 and response is not None:
if response is not None: byteArray = gzip.decompress(response)
byteArray = gzip.decompress(response) if byteArray == '':
if byteArray == '': print('There is no data ' + filename + ' in ' + directory)
print('There is no data ' + filename + ' in ' + directory)
return None
# the first class file head 一级文件头记录采用定长方式, 共40字节
head1_dtype = [
('SAT96', 'S12'), # SAT96 filename
('byteSequence', 'i2'), # 整型数的字节顺序, 0 低字节在前, 高字节在后; !=0 高字节在前, 低字节在后.
('firstClassHeadLength', 'i2'), # 第一节文件头长度
('secondClassHeadLength', 'i2'), # 第二节文件头长度
('padDataLength', 'i2'), # 填充段数据长度
('recordLength', 'i2'), # 记录长度(字节), 图像产品: 记录长度=图形宽度, 格点场产品: 记录长度=横向格点数x格点数据字长
('headRecordNumber', 'i2'), # 文件头占用记录数, 一级文件头、二填充段扩展以及的所占用总记录个数
('dataRecordNumber', 'i2'), # 产品数据占用记录数
('productCategory', 'i2'), # 产品类别, 1静止, 2极轨, 3格点定量, 4离散, 5: 图形和分析
('compressMethod', 'i2'), # 压缩方式, 0: 未压缩; 1 行程编码压缩; 2 LZW方式压缩; 3 特点方式压缩
('formatString', 'S8'), # 格式说明字符串, 'SAT2004'
('qualityFlag', 'i2')] # 产品数据质量标记, 1 完全可靠; 2 基本可靠; 3 有缺值, 可用; 4 不可用
head1_info = np.frombuffer(byteArray[0:40], dtype=head1_dtype)
ind = 40
if head1_info['productCategory']:
# the second class file head 二级文件头采用不定长方式,内容依据产品的不同而不同.
head2_dtype = [
('satelliteName', 'S8'), # 卫星名
('year', 'i2'), ('month', 'i2'),
('day', 'i2'), ('hour', 'i2'),
('minute', 'i2'),
('channel', 'i2'), # 通道号, 1红外, 2水汽, 3红外分裂, 4可见光, 5中红外, 6备用
('flagOfProjection', 'i2'), # 投影, 0为投影, 1兰勃托, 2麦卡托, 3极射, 4等经纬度, 5等面积
('widthOfImage', 'i2'),
('heightOfImage', 'i2'),
('scanLineNumberOfImageTopLeft', 'i2'),
('pixelNumberOfImageTopLeft', 'i2'),
('sampleRatio', 'i2'),
('latitudeOfNorth', 'i2'),
('latitudeOfSouth', 'i2'),
('longitudeOfWest', 'i2'),
('longitudeOfEast', 'i2'),
('centerLatitudeOfProjection', 'i2'),
('centerLongitudeOfProjection', 'i2'),
('standardLatitude1', 'i2'),
('standardLatitude2', 'i2'),
('horizontalResolution', 'i2'),
('verticalResolution', 'i2'),
('overlapFlagGeoGrid', 'i2'),
('overlapValueGeoGrid', 'i2'),
('dataLengthOfColorTable', 'i2'),
('dataLengthOfCalibration', 'i2'),
('dataLengthOfGeolocation', 'i2'),
('reserved', 'i2')]
head2_info = np.frombuffer(byteArray[ind:(ind+64)], dtype=head2_dtype)
ind += 64
# color table
if head2_info['dataLengthOfColorTable'] != 0:
table_R = np.frombuffer(byteArray[ind:(ind + 256)], dtype='u1')
ind += 256
table_G = np.frombuffer(byteArray[ind:(ind + 256)], dtype='u1')
ind += 256
table_B = np.frombuffer(byteArray[ind:(ind + 256)], dtype='u1')
ind += 256
# calibration table
calibration_table = None
if head2_info['dataLengthOfCalibration'] != 0:
calibration_table = np.frombuffer(byteArray[ind:(ind + 2048)], dtype='i2')
calibration_table = calibration_table * 0.01
if (np.array_equal(calibration_table[0::4], calibration_table[1::4]) and
np.array_equal(calibration_table[0::4], calibration_table[2::4]) and
np.array_equal(calibration_table[0::4], calibration_table[3::4])):
# This is a trick, refer to http://bbs.06climate.com/forum.php?mod=viewthread&tid=89296
calibration_table = calibration_table[0::4]
ind += 2048
# geolocation table
if head2_info['dataLengthOfGeolocation'] != 0:
geolocation_dtype = [
('coordinate', 'i2'),
('source', 'i2'),
('delta', 'i2'),
('left_top_lat', 'i2'),
('left_top_lon', 'i2'),
('horizontalNumber', 'i2'),
('verticalNumber', 'i2'),
('reserved', 'i2')]
geolocation_info = np.frombuffer(byteArray[ind:(ind+16)], dtype=geolocation_dtype)
ind += 16
geolocation_length = geolocation_info['horizontal_number'][0] * geolocation_info['vertical_number'][0] * 2
geolocation_table = np.frombuffer(byteArray[ind:(ind+geolocation_length)], dtype='i2')
ind += geolocation_length
# pad field
pad_field = np.frombuffer(byteArray[ind:(ind+head1_info['padDataLength'][0])], dtype='u1')
ind += head1_info['padDataLength'][0]
# retrieve data records
data_len = (head1_info['dataRecordNumber'][0].astype(int) *
head1_info['recordLength'][0])
data = np.frombuffer(byteArray[ind:(ind + data_len)], dtype='u1', count=data_len)
if calibration_table is not None:
data = calibration_table[data]
data.shape = (head1_info['dataRecordNumber'][0], head1_info['recordLength'][0])
# 由于数据是按照左上角开始放置, 为此需要对纬度顺序进行反转
data = np.flip(data, axis=0)
# construct longitude and latitude coordinates
# if use the verticalResolution and horizontalResolution, lon and lat will not be correct.
#lat = (
# head2_info['latitudeOfNorth'][0]/100. -
# np.arange(head2_info['heightOfImage'][0])*head2_info['verticalResolution'][0]/100.)
#lon = (
# head2_info['longitudeOfWest'][0]/100. +
# np.arange(head2_info['widthOfImage'][0])*head2_info['horizontalResolution'][0]/100.)
lat = np.linspace(
head2_info['latitudeOfSouth'][0]/100., head2_info['latitudeOfNorth'][0]/100.,
num=head2_info['heightOfImage'][0])
lon = np.linspace(
head2_info['longitudeOfWest'][0]/100., head2_info['longitudeOfEast'][0]/100.,
num=head2_info['widthOfImage'][0])
# construct time
time = datetime(
head2_info['year'][0], head2_info['month'][0],
head2_info['day'][0], head2_info['hour'][0], head2_info['minute'][0])
time = np.array([time], dtype='datetime64[ms]')
# define coordinates
time_coord = ('time', time)
lon_coord = ('lon', lon, {
'long_name':'longitude', 'units':'degrees_east',
'_CoordinateAxisType':'Lon', 'axis': "X"})
lat_coord = ('lat', lat, {
'long_name':'latitude', 'units':'degrees_north',
'_CoordinateAxisType':'Lat', 'axis': "Y"})
channel_coord = ('channel', [head2_info['channel'][0]],
{'long_name':'channel', 'units':''})
# create xarray
data = data[np.newaxis, np.newaxis, ...]
varattrs = {
'productCategory': head1_info['productCategory'][0], # 产品类型, 1:静止, 2:极轨, 3:格点, 4:离散, 5:图形和分析
'formatString': head1_info['formatString'][0], # 产品格式名称
'qualityFlag': head1_info['qualityFlag'][0], # 产品质量标识
'satelliteName': head2_info['satelliteName'][0], # 卫星名称
'flagOfProjection': head2_info['flagOfProjection'][0], # 投影方式, 0:未投影, 1:兰勃托, 2:麦卡托, 3:极射, 4:等经纬, 5:等面积
'units': units}
data = xr.Dataset({
'image':(['time', 'channel', 'lat', 'lon'], data, varattrs)},
coords={ 'time':time_coord, 'channel':channel_coord,
'lat':lat_coord, 'lon':lon_coord})
# add attributes
data.attrs['Conventions'] = "CF-1.6"
data.attrs['Origin'] = 'MICAPS Cassandra DB'
# cache data
if cache:
with open(cache_file, 'wb') as f:
pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
# return
return data
else:
print("The productCategory is not supported.")
return None
else:
return None return None
data = resolve_awx_bytearray(byteArray, units)
# cache data
if data is not None and cache:
with open(cache_file, 'wb') as f:
pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
# return
return data
else: else:
return None return None