diff --git a/bin/define_tasks_from_archive.py b/bin/define_tasks_from_archive.py new file mode 100644 index 0000000000000000000000000000000000000000..a70c25032fe7234dfdfce6448e7b77c0741df50d --- /dev/null +++ b/bin/define_tasks_from_archive.py @@ -0,0 +1,156 @@ +#!/h5_to_fits/env python +from argparse import ArgumentParser +import pandas +import re + +import requests +from sqlalchemy import create_engine +from typing import Dict, Union, List +import sys +import logging +import os.path +from configparser import ConfigParser +from urllib.parse import urlparse +import urllib.parse + +logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.DEBUG) + +CONNECTION_STRING = 'oracle+cx_oracle://{user}:{password}@db.lofar.target.rug.nl/?service_name=db.lofar.target.rug.nl' +DEFAULT_CONFIG_PATHS = ['~/.config/solar_processing/config.ini', '~/.awe/Environment.cfg'] + + +def parse_args(): + parser = ArgumentParser(description='Query the LTA to obtain the list of observation') + parser.add_argument('--config', help='path to configuration file', type=str) + parser.add_argument('workflow', help='workflow uri', type=str) + parser.add_argument('filter', help='filter string', type=str) + parser.add_argument('sas_ids', help='list of SAS IDS', nargs='+') + return parser.parse_args() + + +logger = logging.getLogger('Solar query LTA') + +QUERY_STR = ''' +SELECT FO.OBJECT_ID AS OBJECT_ID, + FO.FILESIZE AS FILESIZE, + ST.OBS_ID as OBS_ID, + FO."+PROJECT" AS PROJECT, + STR.PRIMARY_URL AS PRIMARY_URL FROM AWOPER.FILEOBJECT FO JOIN + AWOPER.STORAGETICKETRESOURCE STR ON FO.STORAGE_TICKET_RESOURCE=STR.OBJECT_ID JOIN + AWOPER.STORAGETICKET ST ON ST.OBJECT_ID=STR.TICKET + WHERE OBS_ID = '{}' AND STR.PRIMARY_URL LIKE '%/LOFAR_DYNSPEC%' +''' + + +def read_user_credentials(file_paths: Union[str, List[str]]) -> Dict: + file_paths = list(map(os.path.expanduser, map(os.path.expandvars, file_paths))) + + parser = ConfigParser() + parser.read(file_paths) + credentials = {'user': parser['global']['database_user'], 'password': parser['global']['database_password']} + return credentials + + +def read_atdb_parameters(file_paths: Union[str, List[str]]) -> Dict: + file_paths = list(map(os.path.expanduser, map(os.path.expandvars, file_paths))) + + parser = ConfigParser() + parser.read(file_paths) + atdb_parameters = {'url': parser['atdb']['url'], 'token': parser['atdb']['token']} + return atdb_parameters + + +def create_db_engine(user: str, password: str): + connection_string: str = CONNECTION_STRING.format(user=urllib.parse.quote_plus(user), + password=urllib.parse.quote_plus(password)) + + return create_engine(connection_string) + + +def find_surl_and_size_per_sasid(engine, sasid): + result_df = pandas.read_sql_query(QUERY_STR.format(sasid), engine) + return result_df + + +def parse_surl_for_info(surl): + parsed = urlparse(surl) + site = parsed.hostname + path = parsed.path + pattern = r'^.*/projects\/(?P<project>.*_\d*)\/(?P<sas_id>\w\d*)\/' + groups = re.match(pattern, path).groupdict() + + result = dict() + for key, value in groups.items(): + result[key] = value + return site, result + + +def create_payload_from_entry(entry, filter_str, workflow, purge_policy='no', predecessor=None): + size = entry['filesize'] + site, payload = parse_surl_for_info(entry['primary_url']) + inputs_payload = [ + {'size': size, + 'surl': entry['primary_url'], + 'type': 'File', + 'location': site} + ] + + ## default values + payload['task_type'] = 'regular' + payload['filter'] = filter_str + payload['purge_policy'] = purge_policy + payload['status'] = 'defined' + payload['new_workflow_uri'] = workflow + + ## from entry + payload['size_to_process'] = size + payload['inputs'] = inputs_payload + if predecessor: + payload['predecessor'] = predecessor + return payload + + +class AInterface: + def __init__(self, url, token): + self._url = url.rstrip('/') + self._token = token + self._session = None + + def open_session(self): + self._session = requests.Session() + self._session.headers['Authorization'] = f'Token {self._token}' + + def session(self) -> requests.Session: + if self._session is None: + self.open_session() + return self._session + + def submit_task(self, payload): + response = self.session().post(f'{self._url}/tasks/', json=payload) + if not response.ok: + logging.error('Error posting payload - %s - %s : %s', response.status_code, + response.reason, response.content) + logging.error('Payload is %s', payload) + raise SystemError() + + +def main(): + args = parse_args() + sas_ids = args.sas_ids + credentials = read_user_credentials(DEFAULT_CONFIG_PATHS) + atdb_parameters = read_atdb_parameters(DEFAULT_CONFIG_PATHS) + atdb_interface = AInterface(atdb_parameters['url'], atdb_parameters['token']) + engine = create_db_engine(**credentials) + for sas_id in sas_ids: + sas_id = int(sas_id) + logger.info('fetching surl for sas_id %s', sas_id) + + table = find_surl_and_size_per_sasid(engine, sas_id) + for index, line in table.iterrows(): + logging.info('creating task for sas_id %s - dataset %s', sas_id, index) + payload = create_payload_from_entry(line, args.filter, args.workflow) + atdb_interface.submit_task(payload) + + +if __name__ == '__main__': + main()