Skip to content
Snippets Groups Projects
Commit 580da8f5 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

Add utility script to define tasks on atdb

parent 45a544b3
No related branches found
No related tags found
No related merge requests found
Pipeline #33327 passed
#!/h5_to_fits/env python
from argparse import ArgumentParser
import pandas
import re
import requests
from sqlalchemy import create_engine
from typing import Dict, Union, List
import sys
import logging
import os.path
from configparser import ConfigParser
from urllib.parse import urlparse
import urllib.parse
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.DEBUG)
CONNECTION_STRING = 'oracle+cx_oracle://{user}:{password}@db.lofar.target.rug.nl/?service_name=db.lofar.target.rug.nl'
DEFAULT_CONFIG_PATHS = ['~/.config/solar_processing/config.ini', '~/.awe/Environment.cfg']
def parse_args():
parser = ArgumentParser(description='Query the LTA to obtain the list of observation')
parser.add_argument('--config', help='path to configuration file', type=str)
parser.add_argument('workflow', help='workflow uri', type=str)
parser.add_argument('filter', help='filter string', type=str)
parser.add_argument('sas_ids', help='list of SAS IDS', nargs='+')
return parser.parse_args()
logger = logging.getLogger('Solar query LTA')
QUERY_STR = '''
SELECT FO.OBJECT_ID AS OBJECT_ID,
FO.FILESIZE AS FILESIZE,
ST.OBS_ID as OBS_ID,
FO."+PROJECT" AS PROJECT,
STR.PRIMARY_URL AS PRIMARY_URL FROM AWOPER.FILEOBJECT FO JOIN
AWOPER.STORAGETICKETRESOURCE STR ON FO.STORAGE_TICKET_RESOURCE=STR.OBJECT_ID JOIN
AWOPER.STORAGETICKET ST ON ST.OBJECT_ID=STR.TICKET
WHERE OBS_ID = '{}' AND STR.PRIMARY_URL LIKE '%/LOFAR_DYNSPEC%'
'''
def read_user_credentials(file_paths: Union[str, List[str]]) -> Dict:
file_paths = list(map(os.path.expanduser, map(os.path.expandvars, file_paths)))
parser = ConfigParser()
parser.read(file_paths)
credentials = {'user': parser['global']['database_user'], 'password': parser['global']['database_password']}
return credentials
def read_atdb_parameters(file_paths: Union[str, List[str]]) -> Dict:
file_paths = list(map(os.path.expanduser, map(os.path.expandvars, file_paths)))
parser = ConfigParser()
parser.read(file_paths)
atdb_parameters = {'url': parser['atdb']['url'], 'token': parser['atdb']['token']}
return atdb_parameters
def create_db_engine(user: str, password: str):
connection_string: str = CONNECTION_STRING.format(user=urllib.parse.quote_plus(user),
password=urllib.parse.quote_plus(password))
return create_engine(connection_string)
def find_surl_and_size_per_sasid(engine, sasid):
result_df = pandas.read_sql_query(QUERY_STR.format(sasid), engine)
return result_df
def parse_surl_for_info(surl):
parsed = urlparse(surl)
site = parsed.hostname
path = parsed.path
pattern = r'^.*/projects\/(?P<project>.*_\d*)\/(?P<sas_id>\w\d*)\/'
groups = re.match(pattern, path).groupdict()
result = dict()
for key, value in groups.items():
result[key] = value
return site, result
def create_payload_from_entry(entry, filter_str, workflow, purge_policy='no', predecessor=None):
size = entry['filesize']
site, payload = parse_surl_for_info(entry['primary_url'])
inputs_payload = [
{'size': size,
'surl': entry['primary_url'],
'type': 'File',
'location': site}
]
## default values
payload['task_type'] = 'regular'
payload['filter'] = filter_str
payload['purge_policy'] = purge_policy
payload['status'] = 'defined'
payload['new_workflow_uri'] = workflow
## from entry
payload['size_to_process'] = size
payload['inputs'] = inputs_payload
if predecessor:
payload['predecessor'] = predecessor
return payload
class AInterface:
def __init__(self, url, token):
self._url = url.rstrip('/')
self._token = token
self._session = None
def open_session(self):
self._session = requests.Session()
self._session.headers['Authorization'] = f'Token {self._token}'
def session(self) -> requests.Session:
if self._session is None:
self.open_session()
return self._session
def submit_task(self, payload):
response = self.session().post(f'{self._url}/tasks/', json=payload)
if not response.ok:
logging.error('Error posting payload - %s - %s : %s', response.status_code,
response.reason, response.content)
logging.error('Payload is %s', payload)
raise SystemError()
def main():
args = parse_args()
sas_ids = args.sas_ids
credentials = read_user_credentials(DEFAULT_CONFIG_PATHS)
atdb_parameters = read_atdb_parameters(DEFAULT_CONFIG_PATHS)
atdb_interface = AInterface(atdb_parameters['url'], atdb_parameters['token'])
engine = create_db_engine(**credentials)
for sas_id in sas_ids:
sas_id = int(sas_id)
logger.info('fetching surl for sas_id %s', sas_id)
table = find_surl_and_size_per_sasid(engine, sas_id)
for index, line in table.iterrows():
logging.info('creating task for sas_id %s - dataset %s', sas_id, index)
payload = create_payload_from_entry(line, args.filter, args.workflow)
atdb_interface.submit_task(payload)
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment