Skip to content
Snippets Groups Projects
Select Git revision
  • 203db57ef1f297dec12fe69e425fe8d5476e64f1
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

tmss_http_rest_client.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    app.py 9.00 KiB
    import json
    import os
    from configparser import ConfigParser
    from datetime import datetime
    from typing import Union
    
    from fastapi import FastAPI, exceptions, Request
    
    from pymongo import MongoClient, DESCENDING
    from pymongo.collection import Collection
    from bson.objectid import ObjectId
    
    import urllib.parse as urlparse
    from .rebuild_live_db import update_single, TMSSClient, DB as live_db
    import orjson
    from fastapi.responses import JSONResponse
    import typing
    
    class ORJSONResponse(JSONResponse):
        media_type = "application/json"
    
        def render(self, content: typing.Any) -> bytes:
            return orjson.dumps(content)
    
    config_path = os.environ.get('SW_CONFIG')
    from glob import glob
    
    
    def fromisoformat(date_time_str):
        ISOFORMAT = '%Y-%m-%d %H:%M:%S.%f'
        return datetime.strptime(date_time_str, ISOFORMAT)
    
    
    def fromisoformat_date(date_time_str):
        DATE_ISOFORMAT = '%Y-%m-%d'
        return datetime.strptime(date_time_str, DATE_ISOFORMAT)
    
    
    def finds_file_from_hard_drive(path, pattern):
        file_list = glob(os.path.join(path, pattern), recursive=True)
        return file_list
    
    
    def f_name_to_surl(f_name):
        baseurl = 'https://spaceweather.astron.nl/'
        return f_name.replace('/spaceweather', baseurl)
    
    
    def augment_item(item):
        if "_id" in item:
            item["_id"] = str(item["_id"])
    
        if "filename" in item and "storage_path" in item:
            f_name = item['filename']
            storage_path = item['storage_path'].rstrip('/')
            pattern = "*/" + f_name.split('/')[-1].split('.')[0] + '.*'
            f_list = finds_file_from_hard_drive(storage_path, pattern)
            item['additional_files_path'] = f_list
            item['additional_files_url'] = [f_name_to_surl(f) for f in f_list]
        return item
    
    
    class DataProductDB:
        def __init__(self):
            self.db: Union[Collection, None] = None
            self.cache = {}
            self.storage_db = ''
            self.live_db = ''
            self.configure()
            self.connect_db()
            self.base_url = ''
    
        def configure(self):
            parser = ConfigParser()
            parser.read(config_path)
            self.storage_db = parser['ARCHIVER']['storage_db']
            self.live_db = parser['LIVE']['storage_db']
            self.local_dir = parser['LIVE']['local_dir']
    
        def connect_db(self):
            self.db: Collection = MongoClient('mongodb://localhost')[self.storage_db]
            self.live_db: Collection = MongoClient('mongodb://localhost')[self.live_db]
    
        def get_live(self, query, skip=0, limit=100, url=None):
            n_results = self.live_db['previews'].count_documents(query)
            results = [item for item in
                       self.live_db['previews'].find(query, {}).skip(skip).limit(limit).sort('timestamp', DESCENDING)]
            quoted_query = urlparse.quote(json.dumps(query))
            if url:
                next_url = url + f'?filter={quoted_query}&limit={limit}&skip={skip + limit}'
            else:
                next_url = self.base_url + f'?filter={quoted_query}&limit={limit}&skip={skip + limit}'
            return {'count': n_results, 'results': results, 'next': next_url}
    
        def update_live(self, obs):
            database = live_db()
            tmss = TMSSClient.load_from_config()
            
            update_single(obs, tmss, self.local_dir, database)
                
    
        def get_collections(self):
            return self.db.list_collection_names()
    
        def does_collection_exist(self, collection_name):
            if collection_name in self.get_collections():
                return True
            else:
                return False
    
        def get_field(self, collection_name, field_name):
            return sorted(self.db[collection_name].distinct(field_name))
    
        def get_item(self, collection_name, task_id):
            item = self.db[collection_name].find_one({"_id": ObjectId(task_id)})
            if item:
                item = augment_item(item)
            return item
    
        def get_distinct_field(self, collection_name, query, distinct=None):
            results = self.db[collection_name].find(query,distinct).distinct(distinct)
            return {'results': results}
    
    
        def get_items(self, collection_name, query, skip=0, limit=100, url=None, select=None, sort=None):
    
            n_results = self.db[collection_name].count_documents(query)
            selected_fields = {}
            
            if "additional_files_path" in select or "additional_files_url" in select:
                selected_fields["filename"] = 1
                selected_fields["storage_path"] = 1
                
            for item in select:
                selected_fields[item] = 1
    
            # if a &sort=<field> url parameter is given then use sort, otherwise omit it (backward compatibility)
            if sort:
                if sort.startswith('-'):
                    direction = -1
                    sort = sort.lstrip('-')
                else:
                    direction = 1
    
                results = [augment_item(item) for item in
                           self.db[collection_name].find(query, selected_fields).skip(skip).limit(limit).sort(sort,direction)]
            else:
                results = [augment_item(item) for item in
                           self.db[collection_name].find(query, selected_fields).skip(skip).limit(limit)]
    
            quoted_query = urlparse.quote(json.dumps(query))
            quoted_select = urlparse.quote(json.dumps(select))
            if url:
                next_url = url + f'?filter={quoted_query}&limit={limit}&skip={skip + limit}&select={quoted_select}'
            else:
                next_url = self.base_url + f'?filter={quoted_query}&limit={limit}&skip={skip + limit}&select={quoted_select}'
            return {'count': n_results, 'results': results, 'next': next_url}
    
    
    DB = DataProductDB()
    app = FastAPI(default_response_class=ORJSONResponse)
    
    def validate_date(date_str):
        date_items = date_str.split('-')
        if len(date_items) != 3:
            raise ValueError('')
    
    
    def from_urlobj_to_url(url):
        return f'https://{url.hostname}{url.path}'
    
    @app.get('/live/')
    def get_live_monitor(request: Request, skip: int = 0, limit: int = 100, filter: Union[str, None] = None):
        DB.base_url = from_urlobj_to_url(request.base_url)
        try:
            if filter:
                query = json.loads(filter)
            else:
                query = {}
        except:
            raise exceptions.HTTPException(status_code=422, detail='filter syntax invalid')
        return DB.get_live(query, skip, limit)
    
    
    @app.get('/collections/')
    def get_collections(request: Request):
        DB.base_url = from_urlobj_to_url(request.base_url)
        return DB.get_collections()
    
    
    @app.get('/{collection_name}/tasks/{field_name}')
    def get_tasks(request: Request, collection_name: str, field_name: str):
        if DB.does_collection_exist(collection_name):
            return DB.get_field(collection_name, field_name)
        else:
            raise exceptions.HTTPException(status_code=404, detail=f'collection {collection_name} does not exist')
    
    @app.get('/live/update/{obs}')
    def get_tasks(obs: str, request: Request):
        DB.base_url = from_urlobj_to_url(request.url)
        DB.update_live(obs)
        return {'updating_db': obs}
    
    
    @app.get('/{collection_name}/task/{task_id}')
    def get_tasks(collection_name: str, task_id: str, request: Request):
        DB.base_url = from_urlobj_to_url(request.url)
        if DB.does_collection_exist(collection_name):
            return DB.get_item(collection_name, task_id)
        else:
            raise exceptions.HTTPException(status_code=404, detail=f'Collection {collection_name} does not exist')
    
    @app.get('/{collection_name}/distinct/')
    def get_tasks_distinct_field(collection_name: str, request: Request, filter: Union[str, None] = None, distinct: str = None):
    
        DB.base_url = from_urlobj_to_url(request.url)
        try:
            if filter:
                query = json.loads(filter)
            else:
                query = {}
    
        except:
            raise exceptions.HTTPException(status_code=422, detail='filter syntax invalid')
        if DB.does_collection_exist(collection_name):
            return DB.get_distinct_field(collection_name, query=query, distinct=distinct)
        else:
            raise exceptions.HTTPException(status_code=404, detail=f'collection {collection_name} does not exist')
    
    @app.get('/{collection_name}/tasks/')
    def get_tasks(collection_name: str, request: Request, skip: int = 0, limit: int = 100, filter: Union[str, None] = None, select: Union[str, None] = None, sort: str = None):
        DB.base_url = from_urlobj_to_url(request.url)
        try:
            if filter:
                query = json.loads(filter)
            else:
                query = {}
            if select:
                select = json.loads(select)
            else:
                select = []
        except:
            raise exceptions.HTTPException(status_code=422, detail='filter syntax invalid')
        if DB.does_collection_exist(collection_name):
            return DB.get_items(collection_name, query=query, limit=limit, skip=skip, url=from_urlobj_to_url(request.url), select=select, sort=sort)
        else:
            raise exceptions.HTTPException(status_code=404, detail=f'collection {collection_name} does not exist')
    
    
    
    @app.get('/{collection_name}/task_ids_stored')
    def get_tasks(request: Request, collection_name: str):
        DB.base_url = from_urlobj_to_url(request.base_url)
        list_stored_sasids = os.path.join('/spaceweather/SolarKSP/data/atdb_process/', collection_name.split('/')[0])
        return os.listdir(list_stored_sasids)