Select Git revision
tmss_http_rest_client.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
app.py 9.00 KiB
import json
import os
from configparser import ConfigParser
from datetime import datetime
from typing import Union
from fastapi import FastAPI, exceptions, Request
from pymongo import MongoClient, DESCENDING
from pymongo.collection import Collection
from bson.objectid import ObjectId
import urllib.parse as urlparse
from .rebuild_live_db import update_single, TMSSClient, DB as live_db
import orjson
from fastapi.responses import JSONResponse
import typing
class ORJSONResponse(JSONResponse):
media_type = "application/json"
def render(self, content: typing.Any) -> bytes:
return orjson.dumps(content)
config_path = os.environ.get('SW_CONFIG')
from glob import glob
def fromisoformat(date_time_str):
ISOFORMAT = '%Y-%m-%d %H:%M:%S.%f'
return datetime.strptime(date_time_str, ISOFORMAT)
def fromisoformat_date(date_time_str):
DATE_ISOFORMAT = '%Y-%m-%d'
return datetime.strptime(date_time_str, DATE_ISOFORMAT)
def finds_file_from_hard_drive(path, pattern):
file_list = glob(os.path.join(path, pattern), recursive=True)
return file_list
def f_name_to_surl(f_name):
baseurl = 'https://spaceweather.astron.nl/'
return f_name.replace('/spaceweather', baseurl)
def augment_item(item):
if "_id" in item:
item["_id"] = str(item["_id"])
if "filename" in item and "storage_path" in item:
f_name = item['filename']
storage_path = item['storage_path'].rstrip('/')
pattern = "*/" + f_name.split('/')[-1].split('.')[0] + '.*'
f_list = finds_file_from_hard_drive(storage_path, pattern)
item['additional_files_path'] = f_list
item['additional_files_url'] = [f_name_to_surl(f) for f in f_list]
return item
class DataProductDB:
def __init__(self):
self.db: Union[Collection, None] = None
self.cache = {}
self.storage_db = ''
self.live_db = ''
self.configure()
self.connect_db()
self.base_url = ''
def configure(self):
parser = ConfigParser()
parser.read(config_path)
self.storage_db = parser['ARCHIVER']['storage_db']
self.live_db = parser['LIVE']['storage_db']
self.local_dir = parser['LIVE']['local_dir']
def connect_db(self):
self.db: Collection = MongoClient('mongodb://localhost')[self.storage_db]
self.live_db: Collection = MongoClient('mongodb://localhost')[self.live_db]
def get_live(self, query, skip=0, limit=100, url=None):
n_results = self.live_db['previews'].count_documents(query)
results = [item for item in
self.live_db['previews'].find(query, {}).skip(skip).limit(limit).sort('timestamp', DESCENDING)]
quoted_query = urlparse.quote(json.dumps(query))
if url:
next_url = url + f'?filter={quoted_query}&limit={limit}&skip={skip + limit}'
else:
next_url = self.base_url + f'?filter={quoted_query}&limit={limit}&skip={skip + limit}'
return {'count': n_results, 'results': results, 'next': next_url}
def update_live(self, obs):
database = live_db()
tmss = TMSSClient.load_from_config()
update_single(obs, tmss, self.local_dir, database)
def get_collections(self):
return self.db.list_collection_names()
def does_collection_exist(self, collection_name):
if collection_name in self.get_collections():
return True
else:
return False
def get_field(self, collection_name, field_name):
return sorted(self.db[collection_name].distinct(field_name))
def get_item(self, collection_name, task_id):
item = self.db[collection_name].find_one({"_id": ObjectId(task_id)})
if item:
item = augment_item(item)
return item
def get_distinct_field(self, collection_name, query, distinct=None):
results = self.db[collection_name].find(query,distinct).distinct(distinct)
return {'results': results}
def get_items(self, collection_name, query, skip=0, limit=100, url=None, select=None, sort=None):
n_results = self.db[collection_name].count_documents(query)
selected_fields = {}
if "additional_files_path" in select or "additional_files_url" in select:
selected_fields["filename"] = 1
selected_fields["storage_path"] = 1
for item in select:
selected_fields[item] = 1
# if a &sort=<field> url parameter is given then use sort, otherwise omit it (backward compatibility)
if sort:
if sort.startswith('-'):
direction = -1
sort = sort.lstrip('-')
else:
direction = 1
results = [augment_item(item) for item in
self.db[collection_name].find(query, selected_fields).skip(skip).limit(limit).sort(sort,direction)]
else:
results = [augment_item(item) for item in
self.db[collection_name].find(query, selected_fields).skip(skip).limit(limit)]
quoted_query = urlparse.quote(json.dumps(query))
quoted_select = urlparse.quote(json.dumps(select))
if url:
next_url = url + f'?filter={quoted_query}&limit={limit}&skip={skip + limit}&select={quoted_select}'
else:
next_url = self.base_url + f'?filter={quoted_query}&limit={limit}&skip={skip + limit}&select={quoted_select}'
return {'count': n_results, 'results': results, 'next': next_url}
DB = DataProductDB()
app = FastAPI(default_response_class=ORJSONResponse)
def validate_date(date_str):
date_items = date_str.split('-')
if len(date_items) != 3:
raise ValueError('')
def from_urlobj_to_url(url):
return f'https://{url.hostname}{url.path}'
@app.get('/live/')
def get_live_monitor(request: Request, skip: int = 0, limit: int = 100, filter: Union[str, None] = None):
DB.base_url = from_urlobj_to_url(request.base_url)
try:
if filter:
query = json.loads(filter)
else:
query = {}
except:
raise exceptions.HTTPException(status_code=422, detail='filter syntax invalid')
return DB.get_live(query, skip, limit)
@app.get('/collections/')
def get_collections(request: Request):
DB.base_url = from_urlobj_to_url(request.base_url)
return DB.get_collections()
@app.get('/{collection_name}/tasks/{field_name}')
def get_tasks(request: Request, collection_name: str, field_name: str):
if DB.does_collection_exist(collection_name):
return DB.get_field(collection_name, field_name)
else:
raise exceptions.HTTPException(status_code=404, detail=f'collection {collection_name} does not exist')
@app.get('/live/update/{obs}')
def get_tasks(obs: str, request: Request):
DB.base_url = from_urlobj_to_url(request.url)
DB.update_live(obs)
return {'updating_db': obs}
@app.get('/{collection_name}/task/{task_id}')
def get_tasks(collection_name: str, task_id: str, request: Request):
DB.base_url = from_urlobj_to_url(request.url)
if DB.does_collection_exist(collection_name):
return DB.get_item(collection_name, task_id)
else:
raise exceptions.HTTPException(status_code=404, detail=f'Collection {collection_name} does not exist')
@app.get('/{collection_name}/distinct/')
def get_tasks_distinct_field(collection_name: str, request: Request, filter: Union[str, None] = None, distinct: str = None):
DB.base_url = from_urlobj_to_url(request.url)
try:
if filter:
query = json.loads(filter)
else:
query = {}
except:
raise exceptions.HTTPException(status_code=422, detail='filter syntax invalid')
if DB.does_collection_exist(collection_name):
return DB.get_distinct_field(collection_name, query=query, distinct=distinct)
else:
raise exceptions.HTTPException(status_code=404, detail=f'collection {collection_name} does not exist')
@app.get('/{collection_name}/tasks/')
def get_tasks(collection_name: str, request: Request, skip: int = 0, limit: int = 100, filter: Union[str, None] = None, select: Union[str, None] = None, sort: str = None):
DB.base_url = from_urlobj_to_url(request.url)
try:
if filter:
query = json.loads(filter)
else:
query = {}
if select:
select = json.loads(select)
else:
select = []
except:
raise exceptions.HTTPException(status_code=422, detail='filter syntax invalid')
if DB.does_collection_exist(collection_name):
return DB.get_items(collection_name, query=query, limit=limit, skip=skip, url=from_urlobj_to_url(request.url), select=select, sort=sort)
else:
raise exceptions.HTTPException(status_code=404, detail=f'collection {collection_name} does not exist')
@app.get('/{collection_name}/task_ids_stored')
def get_tasks(request: Request, collection_name: str):
DB.base_url = from_urlobj_to_url(request.base_url)
list_stored_sasids = os.path.join('/spaceweather/SolarKSP/data/atdb_process/', collection_name.split('/')[0])
return os.listdir(list_stored_sasids)