Skip to content
Snippets Groups Projects
Select Git revision
  • 689e2edbdb953eecaf06859d50ba7ad91388c7d5
  • main default protected
2 results

test_cool_module.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    mains.py 11.66 KiB
    import json
    import argparse
    from pprint import pprint
    from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
    from lofar.common.datetimeutils import parseDatetime
    import dateutil.parser
    import datetime
    
    def main_get_subtask_parset():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", help="The ID of the TMSS subtask to get the parset from")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                print(session.get_subtask_parset(args.subtask_id))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_get_subtask_predecessors():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to get the predecessors for")
        parser.add_argument('-s', '--state', help="only get predecessors with this state")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.get_subtask_predecessors(args.subtask_id, state=args.state))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_get_subtask_successors():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to get the successors for")
        parser.add_argument('-s', '--state', help="only get successors with this state")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.get_subtask_successors(args.subtask_id, state=args.state))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_get_subtask():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to get")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.get_subtask(args.subtask_id))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_get_subtasks():
        parser = argparse.ArgumentParser()
        parser.add_argument('-s', '--state', help="only get subtasks with this state")
        parser.add_argument('-c', '--cluster', help="only get subtasks for this cluster")
        parser.add_argument('--start_time_less_then', help="only get subtasks with a start time less then this timestamp")
        parser.add_argument('--start_time_greater_then', help="only get subtasks with a start time greater then this timestamp")
        parser.add_argument('--stop_time_less_then', help="only get subtasks with a stop time less then this timestamp")
        parser.add_argument('--stop_time_greater_then', help="only get subtasks with a stop time greater then this timestamp")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                result = session.get_subtasks(state=args.state,
                                              cluster=args.cluster,
                                              start_time_less_then=parseDatetime(args.start_time_less_then) if args.start_time_less_then else None,
                                              start_time_greater_then=parseDatetime(args.start_time_greater_then) if args.start_time_greater_then else None,
                                              stop_time_less_then=parseDatetime(args.stop_time_less_then) if args.stop_time_less_then else None,
                                              stop_time_greater_then=parseDatetime(args.stop_time_greater_then) if args.stop_time_greater_then else None)
                pprint(result)
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_set_subtask_state():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to set the status on")
        parser.add_argument("state", help="The state to set")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                changed_subtask = session.set_subtask_status(args.subtask_id, args.state)
                print("%s now has state %s, see: %s" % (changed_subtask['id'], changed_subtask['state_value'], changed_subtask['url']))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_specify_observation_task():
        """
        Ask user for parameter 'taskid' and execute API-call to specify observation
        """
        parser = argparse.ArgumentParser()
        parser.add_argument("task_id", help="The ID of the TMSS task to specify for observation")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                result = session.specify_observation_task(args.task_id)
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_schedule_subtask():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to be scheduled")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.schedule_subtask(args.subtask_id, retry_count=3))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_unschedule_subtask():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to be unscheduled")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.unschedule_subtask(args.subtask_id, retry_count=3))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_cancel_subtask():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to be cancelled")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.cancel_subtask(args.subtask_id, retry_count=3))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_get_setting():
        parser = argparse.ArgumentParser()
        parser.add_argument("setting_name", type=str, help="The name of the TMSS setting to get")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.get_setting(args.setting_name))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_set_setting():
        parser = argparse.ArgumentParser()
        parser.add_argument("setting_name", type=str, help="The name of the TMSS setting to set")
        parser.add_argument("setting_value", type=lambda s: s.lower() in ['true', 'True', '1'], # argparse is noot very good at speaking bool...
                            help="The value to set for the TMSS setting")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.set_setting(args.setting_name, args.setting_value))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_trigger_scheduling_unit(scheduling_unit_observing_strategy_template_id: int):
        parser = argparse.ArgumentParser()
        parser.add_argument("pointing_angle1", type=float, help="First angle of the pointing")
        parser.add_argument("pointing_angle2", type=float, help="Second angle of the pointing")
        parser.add_argument("--pointing_direction_type", type=str, default="J2000", help="The direction type of the pointing. Defaults to J2000.")
        parser.add_argument("--start_at", type=str, default=None, help="The start time of the observation as ISO string. If specified, a constraint will be added so that the observation must start exactly at this time. Defaults to None.")
        parser.add_argument("--end_before", type=str, default=None, help="The latest end time of the observation as ISO string. If specified, a constraint will be added so that the observation will run at any time before this time. Defaults to None.")
        parser.add_argument("--target_name", type=str, default='target1', help="The name of the observation target. Defaults to 'target1'.")
        parser.add_argument("duration", type=int, help="The duration of the observation in seconds.")
        parser.add_argument("scheduling_set_id", type=int, help="The ID of the scheduling set the trigger should be placed in. This should've been provided to you.")
        parser.add_argument("test", type=bool, default=True, help="Whether this is a test submission or should be actually performed. Set False to submit a real trigger. Defaults to True.")
        args = parser.parse_args()
    
        try:
            import uuid
            with TMSSsession('test', 'test', 'localhost', 8000, TMSSsession.BASICAUTH) as session:  # alternatively: TMSSsession.create_from_dbcreds_for_ldap()
                strategy_template = session.get_scheduling_unit_observing_strategy_template(scheduling_unit_observing_strategy_template_id)
                requirements = strategy_template['template']
                def dict_search_and_replace(d, key, value):
                    if key in d:
                        d[key] = value
                    for k in d:
                        if isinstance(d[k], dict):
                            dict_search_and_replace(d[k], key, value)
                        elif isinstance(d[k], list):
                            for i in d[k]:
                                if isinstance(i, dict):
                                    dict_search_and_replace(i, key, value)
    
                dict_search_and_replace(requirements['tasks'], 'angle1', args.pointing_angle1)
                dict_search_and_replace(requirements['tasks'], 'angle2', args.pointing_angle2)
                dict_search_and_replace(requirements['tasks'], 'name', args.target_name)
                dict_search_and_replace(requirements['tasks'], 'target', args.target_name)
                dict_search_and_replace(requirements['tasks'], 'duration', args.duration)
    
                constraints = {"sky": {
                               "min_distance": {
                                   "sun": 0.0,
                                   "moon": 0.0,
                                   "jupiter": 0.0
                               },
                               "transit_offset": {
                                   "to": 7200,
                                   "from": -7200
                               },
                               "min_target_elevation": 0.0
                               }}
    
                if args.end_before:
                    constraints.setdefault('time', {})['before'] = args.end_before
    
                if args.start_at:
                    constraints.setdefault('time', {})['at'] = args.start_at
    
                scheduling_unit_data = {"name": "CLI_trigger_%s" % (uuid.uuid4()),
                                        "description": "Trigger submitted via CLI tools: %s" % strategy_template['name'],
                                        "tags": [],
                                        "requirements_doc": requirements,
                                        "copy_reason": None,
                                        "generator_instance_doc": None,
                                        "copies": None,
                                        "scheduling_set": session.api_url + '/scheduling_set/%s/' % args.scheduling_set_id,
                                        "requirements_template": session.api_url + '/scheduling_unit_template/1/',
                                        "observation_strategy_template": strategy_template['url'],
                                        "scheduling_constraints_template_id": 1,
                                        "scheduling_constraints_doc":  constraints,
                                        "interrupts_telescope": True,
                                        "scheduling_unit_blueprints": [],
                                        "task_drafts": []}
                pprint(session.trigger_scheduling_unit(json_data=scheduling_unit_data, test=args.test))
        except Exception as e:
            print(e)
            exit(1)