完善CassandraDB读取接口,增加了Cassandra数据读取示例。

This commit is contained in:
longtsing 2022-02-24 16:29:14 +08:00
parent 81739fee47
commit 06e0faa41c
5 changed files with 991 additions and 885 deletions

3
.gitignore vendored

@ -104,4 +104,5 @@ venv.bak/
.mypy_cache/
# vscode
.vscode/
.vscode/
JupyterLab.bat

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

@ -4,12 +4,15 @@
# Distributed under the terms of the GPL V3 License.
"""
Concat me with e-mail:songofsongs@vip.qq.com
This is the retrieve module which get data from cassandra DB
with Python Cassandra driver API.
Same as the retrieve_micaps_cassandra
The API is same as the retrieve_micaps_cassandra
"""
import warnings
from glob import glob
import re
import pickle
import bz2
@ -26,20 +29,29 @@ from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import gzip
_db_client={
"ClusterIPAddresses":CONFIG.CONFIG['Cassandra']['ClusterIPAddresses'].split(","),
"gdsPort":CONFIG.CONFIG['Cassandra']['ClusterPort'],
"cluster":Cluster(CONFIG.CONFIG['Cassandra']['ClusterIPAddresses'].split(",")),
}
_db_session=None
def _open_DB():
global _db_session
_db_session=_db_client["cluster"].connect()
_db_session.set_keyspace(CONFIG.CONFIG['Cassandra']['KeySpace'])
class CassandraDBServer:
class CassandraDB:
def __init__(self):
# set MICAPS GDS服务器地址
# set MICAPS Cassandra集群地址
# 创建类自动建立与数据库连接,也需要手动关闭连接
self.ClusterIPAddresses = CONFIG.CONFIG['Cassandra']['ClusterIPAddresses'].split(",")
self.gdsPort = CONFIG.CONFIG['Cassandra']['ClusterPort']
self.cluster = Cluster(CONFIG.CONFIG['Cassandra']['ClusterIPAddresses'].split(","))
self.session = self.cluster.connect()
self.session.set_keyspace("micapsdataserver")
if(_db_session==None):
_open_DB()
self.session=_db_session
def getLatestDataName(self, directory, filter):
if(directory[-1]=="/"):
directory=directory[:-1]
response=None
status=200
try:
@ -52,7 +64,8 @@ class CassandraDBServer:
return status,response
def getData(self, directory, fileName):
if(directory[-1]=="/"):
directory=directory[:-1]
table=directory.split("/")[0]
key=directory.replace(table+"/","")
response=None
@ -68,6 +81,8 @@ class CassandraDBServer:
def getFileList(self,directory):
if(directory[-1]=="/"):
directory=directory[:-1]
response=None
status=None
try:
@ -80,12 +95,6 @@ class CassandraDBServer:
return status,response
def close(self):
self.session.shutdown()
def shutdown(self):
self.session.shutdown()
def get_file_list(path, latest=None):
"""return file list of cassandra data servere path
@ -99,7 +108,7 @@ def get_file_list(path, latest=None):
"""
# connect to data service
service = CassandraDBServer()
service = CassandraDB()
# 获得指定目录下的所有文件
status, response = service.getFileList(path)
@ -129,7 +138,7 @@ def get_latest_initTime(directory, suffix="*.006"):
"""
# connect to data service
service = CassandraDBServer()
service = CassandraDB()
# get lastest data filename
try:
@ -179,7 +188,7 @@ def get_model_grid(directory, filename=None, suffix="*.024",
if filename is None:
try:
# connect to data service
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getLatestDataName(directory, suffix)
except ValueError:
print('Can not retrieve data from ' + directory)
@ -205,7 +214,7 @@ def get_model_grid(directory, filename=None, suffix="*.024",
file_list = get_file_list(directory)
if filename not in file_list:
return None
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getData(directory, filename)
except ValueError:
print('Can not retrieve data' + filename + ' from ' + directory)
@ -409,7 +418,7 @@ def get_model_grid(directory, filename=None, suffix="*.024",
# add attributes
data.attrs['Conventions'] = "CF-1.6"
data.attrs['Origin'] = 'MICAPS Cassandra Server'
data.attrs['Origin'] = 'MICAPS Cassandra DB'
# sort latitude coordinates
data = data.loc[{'lat':sorted(data.coords['lat'].values)}]
@ -616,7 +625,7 @@ def get_station_data(directory, filename=None, suffix="*.000",
if filename is None:
try:
# connect to data service
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getLatestDataName(directory, suffix)
except ValueError:
print('Can not retrieve data from ' + directory)
@ -640,7 +649,7 @@ def get_station_data(directory, filename=None, suffix="*.000",
# get data contents
try:
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getData(directory, filename)
except ValueError:
print('Can not retrieve data' + filename + ' from ' + directory)
@ -821,7 +830,7 @@ def get_fy_awx(directory, filename=None, suffix="*.AWX", units='', cache=True, c
if filename is None:
try:
# connect to data service
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getLatestDataName(directory, suffix)
except ValueError:
print('Can not retrieve data from ' + directory)
@ -844,7 +853,7 @@ def get_fy_awx(directory, filename=None, suffix="*.AWX", units='', cache=True, c
# get data contents
try:
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getData(directory, filename)
except ValueError:
print('Can not retrieve data' + filename + ' from ' + directory)
@ -1007,7 +1016,7 @@ def get_fy_awx(directory, filename=None, suffix="*.AWX", units='', cache=True, c
# add attributes
data.attrs['Conventions'] = "CF-1.6"
data.attrs['Origin'] = 'MICAPS Cassandra Server'
data.attrs['Origin'] = 'MICAPS Cassandra DB'
# cache data
if cache:
@ -1104,7 +1113,7 @@ def get_radar_mosaic(directory, filename=None, suffix="*.BIN", cache=True, cache
if filename is None:
try:
# connect to data service
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getLatestDataName(directory, suffix)
except ValueError:
print('Can not retrieve data from ' + directory)
@ -1127,7 +1136,7 @@ def get_radar_mosaic(directory, filename=None, suffix="*.BIN", cache=True, cache
# get data contents
try:
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getData(directory, filename)
except ValueError:
print('Can not retrieve data' + filename + ' from ' + directory)
@ -1346,7 +1355,7 @@ def get_radar_mosaic(directory, filename=None, suffix="*.BIN", cache=True, cache
# add attributes
data.attrs['Conventions'] = "CF-1.6"
data.attrs['Origin'] = 'MICAPS Cassandra Server'
data.attrs['Origin'] = 'MICAPS Cassandra DB'
# cache data
if cache:
@ -1412,7 +1421,7 @@ def get_tlogp(directory, filename=None, suffix="*.000",
if filename is None:
try:
# connect to data service
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getLatestDataName(directory, suffix)
except ValueError:
print('Can not retrieve data from ' + directory)
@ -1435,7 +1444,7 @@ def get_tlogp(directory, filename=None, suffix="*.000",
# get data contents
try:
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getData(directory, filename)
except ValueError:
print('Can not retrieve data' + filename + ' from ' + directory)
@ -1574,7 +1583,7 @@ def get_swan_radar(directory, filename=None, suffix="*.000", scale=[0.1, 0],
if filename is None:
try:
# connect to data service
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getLatestDataName(directory, suffix)
except ValueError:
print('Can not retrieve data from ' + directory)
@ -1597,7 +1606,7 @@ def get_swan_radar(directory, filename=None, suffix="*.000", scale=[0.1, 0],
# get data contents
try:
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getData(directory, filename)
except ValueError:
print('Can not retrieve data' + filename + ' from ' + directory)
@ -1708,7 +1717,7 @@ def get_swan_radar(directory, filename=None, suffix="*.000", scale=[0.1, 0],
# add attributes
data.attrs['Conventions'] = "CF-1.6"
data.attrs['Origin'] = 'MICAPS Cassandra Server'
data.attrs['Origin'] = 'MICAPS Cassandra DB'
# cache data
if cache:
@ -1778,7 +1787,7 @@ def get_radar_standard(directory, filename=None, suffix="*.BZ2", cache=True, cac
if filename is None:
try:
# connect to data service
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getLatestDataName(directory, suffix)
except ValueError:
print('Can not retrieve data from ' + directory)
@ -1802,7 +1811,7 @@ def get_radar_standard(directory, filename=None, suffix="*.BZ2", cache=True, cac
if byteArray is None:
# get data contents
try:
service = CassandraDBServer()
service = CassandraDB()
status, response = service.getData(directory, filename)
except ValueError:
print('Can not retrieve data' + filename + ' from ' + directory)