Skip to content
Snippets Groups Projects
Select Git revision
  • db6ae1f8e72d940521358b1d2d32f1d5649dab93
  • master default protected
  • MAM-56-prepare-update-for-sip-version-3
  • TMSS-1777
  • SDC-545_update_SIP
  • lofar_repo
  • 2.7.1
  • 2.8.0
8 results

station_coordinates.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    app.py 9.00 KiB
    import json
    import os
    from configparser import ConfigParser
    from datetime import datetime
    from typing import Union
    
    from fastapi import FastAPI, exceptions, Request
    
    from pymongo import MongoClient, DESCENDING
    from pymongo.collection import Collection
    from bson.objectid import ObjectId
    
    import urllib.parse as urlparse
    from .rebuild_live_db import update_single, TMSSClient, DB as live_db
    import orjson
    from fastapi.responses import JSONResponse
    import typing
    
    class ORJSONResponse(JSONResponse):
        media_type = "application/json"
    
        def render(self, content: typing.Any) -> bytes:
            return orjson.dumps(content)
    
    config_path = os.environ.get('SW_CONFIG')
    from glob import glob
    
    
    def fromisoformat(date_time_str):
        ISOFORMAT = '%Y-%m-%d %H:%M:%S.%f'
        return datetime.strptime(date_time_str, ISOFORMAT)
    
    
    def fromisoformat_date(date_time_str):
        DATE_ISOFORMAT = '%Y-%m-%d'
        return datetime.strptime(date_time_str, DATE_ISOFORMAT)
    
    
    def finds_file_from_hard_drive(path, pattern):
        file_list = glob(os.path.join(path, pattern), recursive=True)
        return file_list
    
    
    def f_name_to_surl(f_name):
        baseurl = 'https://spaceweather.astron.nl/'
        return f_name.replace('/spaceweather', baseurl)
    
    
    def augment_item(item):
        if "_id" in item:
            item["_id"] = str(item["_id"])
    
        if "filename" in item and "storage_path" in item:
            f_name = item['filename']
            storage_path = item['storage_path'].rstrip('/')
            pattern = "*/" + f_name.split('/')[-1].split('.')[0] + '.*'
            f_list = finds_file_from_hard_drive(storage_path, pattern)
            item['additional_files_path'] = f_list
            item['additional_files_url'] = [f_name_to_surl(f) for f in f_list]
        return item
    
    
    class DataProductDB:
        def __init__(self):
            self.db: Union[Collection, None] = None
            self.cache = {}
            self.storage_db = ''
            self.live_db = ''
            self.configure()
            self.connect_db()
            self.base_url = ''
    
        def configure(self):
            parser = ConfigParser()
            parser.read(config_path)
            self.storage_db = parser['ARCHIVER']['storage_db']
            self.live_db = parser['LIVE']['storage_db']
            self.local_dir = parser['LIVE']['local_dir']
    
        def connect_db(self):
            self.db: Collection = MongoClient('mongodb://localhost')[self.storage_db]
            self.live_db: Collection = MongoClient('mongodb://localhost')[self.live_db]
    
        def get_live(self, query, skip=0, limit=100, url=None):
            n_results = self.live_db['previews'].count_documents(query)
            results = [item for item in
                       self.live_db['previews'].find(query, {}).skip(skip).limit(limit).sort('timestamp', DESCENDING)]
            quoted_query = urlparse.quote(json.dumps(query))
            if url:
                next_url = url + f'?filter={quoted_query}&limit={limit}&skip={skip + limit}'
            else:
                next_url = self.base_url + f'?filter={quoted_query}&limit={limit}&skip={skip + limit}'
            return {'count': n_results, 'results': results, 'next': next_url}
    
        def update_live(self, obs):
            database = live_db()
            tmss = TMSSClient.load_from_config()
            
            update_single(obs, tmss, self.local_dir, database)
                
    
        def get_collections(self):
            return self.db.list_collection_names()
    
        def does_collection_exist(self, collection_name):
            if collection_name in self.get_collections():
                return True
            else:
                return False
    
        def get_field(self, collection_name, field_name):
            return sorted(self.db[collection_name].distinct(field_name))
    
        def get_item(self, collection_name, task_id):
            item = self.db[collection_name].find_one({"_id": ObjectId(task_id)})
            if item:
                item = augment_item(item)
            return item
    
        def get_distinct_field(self, collection_name, query, distinct=None):
            results = self.db[collection_name].find(query,distinct).distinct(distinct)
            return {'results': results}
    
    
        def get_items(self, collection_name, query, skip=0, limit=100, url=None, select=None, sort=None):
    
            n_results = self.db[collection_name].count_documents(query)
            selected_fields = {}
            
            if "additional_files_path" in select or "additional_files_url" in select:
                selected_fields["filename"] = 1
                selected_fields["storage_path"] = 1
                
            for item in select:
                selected_fields[item] = 1
    
            # if a &sort=<field> url parameter is given then use sort, otherwise omit it (backward compatibility)
            if sort:
                if sort.startswith('-'):
                    direction = -1
                    sort = sort.lstrip('-')
                else:
                    direction = 1
    
                results = [augment_item(item) for item in
                           self.db[collection_name].find(query, selected_fields).skip(skip).limit(limit).sort(sort,direction)]
            else:
                results = [augment_item(item) for item in
                           self.db[collection_name].find(query, selected_fields).skip(skip).limit(limit)]
    
            quoted_query = urlparse.quote(json.dumps(query))
            quoted_select = urlparse.quote(json.dumps(select))
            if url:
                next_url = url + f'?filter={quoted_query}&limit={limit}&skip={skip + limit}&select={quoted_select}'
            else:
                next_url = self.base_url + f'?filter={quoted_query}&limit={limit}&skip={skip + limit}&select={quoted_select}'
            return {'count': n_results, 'results': results, 'next': next_url}
    
    
    DB = DataProductDB()
    app = FastAPI(default_response_class=ORJSONResponse)
    
    def validate_date(date_str):
        date_items = date_str.split('-')
        if len(date_items) != 3:
            raise ValueError('')
    
    
    def from_urlobj_to_url(url):
        return f'https://{url.hostname}{url.path}'
    
    @app.get('/live/')
    def get_live_monitor(request: Request, skip: int = 0, limit: int = 100, filter: Union[str, None] = None):
        DB.base_url = from_urlobj_to_url(request.base_url)
        try:
            if filter:
                query = json.loads(filter)
            else:
                query = {}
        except:
            raise exceptions.HTTPException(status_code=422, detail='filter syntax invalid')
        return DB.get_live(query, skip, limit)
    
    
    @app.get('/collections/')
    def get_collections(request: Request):
        DB.base_url = from_urlobj_to_url(request.base_url)
        return DB.get_collections()
    
    
    @app.get('/{collection_name}/tasks/{field_name}')
    def get_tasks(request: Request, collection_name: str, field_name: str):
        if DB.does_collection_exist(collection_name):
            return DB.get_field(collection_name, field_name)
        else:
            raise exceptions.HTTPException(status_code=404, detail=f'collection {collection_name} does not exist')
    
    @app.get('/live/update/{obs}')
    def get_tasks(obs: str, request: Request):
        DB.base_url = from_urlobj_to_url(request.url)
        DB.update_live(obs)
        return {'updating_db': obs}
    
    
    @app.get('/{collection_name}/task/{task_id}')
    def get_tasks(collection_name: str, task_id: str, request: Request):
        DB.base_url = from_urlobj_to_url(request.url)
        if DB.does_collection_exist(collection_name):
            return DB.get_item(collection_name, task_id)
        else:
            raise exceptions.HTTPException(status_code=404, detail=f'Collection {collection_name} does not exist')
    
    @app.get('/{collection_name}/distinct/')
    def get_tasks_distinct_field(collection_name: str, request: Request, filter: Union[str, None] = None, distinct: str = None):
    
        DB.base_url = from_urlobj_to_url(request.url)
        try:
            if filter:
                query = json.loads(filter)
            else:
                query = {}
    
        except:
            raise exceptions.HTTPException(status_code=422, detail='filter syntax invalid')
        if DB.does_collection_exist(collection_name):
            return DB.get_distinct_field(collection_name, query=query, distinct=distinct)
        else:
            raise exceptions.HTTPException(status_code=404, detail=f'collection {collection_name} does not exist')
    
    @app.get('/{collection_name}/tasks/')
    def get_tasks(collection_name: str, request: Request, skip: int = 0, limit: int = 100, filter: Union[str, None] = None, select: Union[str, None] = None, sort: str = None):
        DB.base_url = from_urlobj_to_url(request.url)
        try:
            if filter:
                query = json.loads(filter)
            else:
                query = {}
            if select:
                select = json.loads(select)
            else:
                select = []
        except:
            raise exceptions.HTTPException(status_code=422, detail='filter syntax invalid')
        if DB.does_collection_exist(collection_name):
            return DB.get_items(collection_name, query=query, limit=limit, skip=skip, url=from_urlobj_to_url(request.url), select=select, sort=sort)
        else:
            raise exceptions.HTTPException(status_code=404, detail=f'collection {collection_name} does not exist')
    
    
    
    @app.get('/{collection_name}/task_ids_stored')
    def get_tasks(request: Request, collection_name: str):
        DB.base_url = from_urlobj_to_url(request.base_url)
        list_stored_sasids = os.path.join('/spaceweather/SolarKSP/data/atdb_process/', collection_name.split('/')[0])
        return os.listdir(list_stored_sasids)