Skip to content
Snippets Groups Projects
Select Git revision
  • 1e2a0e3bd3732a309be997e66e3eb3d79a5c7804
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

mains.py

Blame
  • Jörn Künsemöller's avatar
    TMSS-745: typo
    Jörn Künsemöller authored
    1e2a0e3b
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    mains.py 11.66 KiB
    import json
    import argparse
    from pprint import pprint
    from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
    from lofar.common.datetimeutils import parseDatetime
    import dateutil.parser
    import datetime
    
    def main_get_subtask_parset():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", help="The ID of the TMSS subtask to get the parset from")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                print(session.get_subtask_parset(args.subtask_id))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_get_subtask_predecessors():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to get the predecessors for")
        parser.add_argument('-s', '--state', help="only get predecessors with this state")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.get_subtask_predecessors(args.subtask_id, state=args.state))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_get_subtask_successors():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to get the successors for")
        parser.add_argument('-s', '--state', help="only get successors with this state")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.get_subtask_successors(args.subtask_id, state=args.state))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_get_subtask():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to get")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.get_subtask(args.subtask_id))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_get_subtasks():
        parser = argparse.ArgumentParser()
        parser.add_argument('-s', '--state', help="only get subtasks with this state")
        parser.add_argument('-c', '--cluster', help="only get subtasks for this cluster")
        parser.add_argument('--start_time_less_then', help="only get subtasks with a start time less then this timestamp")
        parser.add_argument('--start_time_greater_then', help="only get subtasks with a start time greater then this timestamp")
        parser.add_argument('--stop_time_less_then', help="only get subtasks with a stop time less then this timestamp")
        parser.add_argument('--stop_time_greater_then', help="only get subtasks with a stop time greater then this timestamp")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                result = session.get_subtasks(state=args.state,
                                              cluster=args.cluster,
                                              start_time_less_then=parseDatetime(args.start_time_less_then) if args.start_time_less_then else None,
                                              start_time_greater_then=parseDatetime(args.start_time_greater_then) if args.start_time_greater_then else None,
                                              stop_time_less_then=parseDatetime(args.stop_time_less_then) if args.stop_time_less_then else None,
                                              stop_time_greater_then=parseDatetime(args.stop_time_greater_then) if args.stop_time_greater_then else None)
                pprint(result)
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_set_subtask_state():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to set the status on")
        parser.add_argument("state", help="The state to set")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                changed_subtask = session.set_subtask_status(args.subtask_id, args.state)
                print("%s now has state %s, see: %s" % (changed_subtask['id'], changed_subtask['state_value'], changed_subtask['url']))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_specify_observation_task():
        """
        Ask user for parameter 'taskid' and execute API-call to specify observation
        """
        parser = argparse.ArgumentParser()
        parser.add_argument("task_id", help="The ID of the TMSS task to specify for observation")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                result = session.specify_observation_task(args.task_id)
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_schedule_subtask():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to be scheduled")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.schedule_subtask(args.subtask_id, retry_count=3))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_unschedule_subtask():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to be unscheduled")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.unschedule_subtask(args.subtask_id, retry_count=3))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_cancel_subtask():
        parser = argparse.ArgumentParser()
        parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to be cancelled")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.cancel_subtask(args.subtask_id, retry_count=3))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_get_setting():
        parser = argparse.ArgumentParser()
        parser.add_argument("setting_name", type=str, help="The name of the TMSS setting to get")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.get_setting(args.setting_name))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_set_setting():
        parser = argparse.ArgumentParser()
        parser.add_argument("setting_name", type=str, help="The name of the TMSS setting to set")
        parser.add_argument("setting_value", type=lambda s: s.lower() in ['true', 'True', '1'], # argparse is noot very good at speaking bool...
                            help="The value to set for the TMSS setting")
        args = parser.parse_args()
    
        try:
            with TMSSsession.create_from_dbcreds_for_ldap() as session:
                pprint(session.set_setting(args.setting_name, args.setting_value))
        except Exception as e:
            print(e)
            exit(1)
    
    
    def main_trigger_scheduling_unit(scheduling_unit_observing_strategy_template_id: int):
        parser = argparse.ArgumentParser()
        parser.add_argument("pointing_angle1", type=float, help="First angle of the pointing")
        parser.add_argument("pointing_angle2", type=float, help="Second angle of the pointing")
        parser.add_argument("--pointing_direction_type", type=str, default="J2000", help="The direction type of the pointing. Defaults to J2000.")
        parser.add_argument("--start_at", type=str, default=None, help="The start time of the observation as ISO string. If specified, a constraint will be added so that the observation must start exactly at this time. Defaults to None.")
        parser.add_argument("--end_before", type=str, default=None, help="The latest end time of the observation as ISO string. If specified, a constraint will be added so that the observation will run at any time before this time. Defaults to None.")
        parser.add_argument("--target_name", type=str, default='target1', help="The name of the observation target. Defaults to 'target1'.")
        parser.add_argument("duration", type=int, help="The duration of the observation in seconds.")
        parser.add_argument("scheduling_set_id", type=int, help="The ID of the scheduling set the trigger should be placed in. This should've been provided to you.")
        parser.add_argument("test", type=bool, default=True, help="Whether this is a test submission or should be actually performed. Set False to submit a real trigger. Defaults to True.")
        args = parser.parse_args()
    
        try:
            import uuid
            with TMSSsession('test', 'test', 'localhost', 8000, TMSSsession.BASICAUTH) as session:  # alternatively: TMSSsession.create_from_dbcreds_for_ldap()
                strategy_template = session.get_scheduling_unit_observing_strategy_template(scheduling_unit_observing_strategy_template_id)
                requirements = strategy_template['template']
                def dict_search_and_replace(d, key, value):
                    if key in d:
                        d[key] = value
                    for k in d:
                        if isinstance(d[k], dict):
                            dict_search_and_replace(d[k], key, value)
                        elif isinstance(d[k], list):
                            for i in d[k]:
                                if isinstance(i, dict):
                                    dict_search_and_replace(i, key, value)
    
                dict_search_and_replace(requirements['tasks'], 'angle1', args.pointing_angle1)
                dict_search_and_replace(requirements['tasks'], 'angle2', args.pointing_angle2)
                dict_search_and_replace(requirements['tasks'], 'name', args.target_name)
                dict_search_and_replace(requirements['tasks'], 'target', args.target_name)
                dict_search_and_replace(requirements['tasks'], 'duration', args.duration)
    
                constraints = {"sky": {
                               "min_distance": {
                                   "sun": 0.0,
                                   "moon": 0.0,
                                   "jupiter": 0.0
                               },
                               "transit_offset": {
                                   "to": 7200,
                                   "from": -7200
                               },
                               "min_target_elevation": 0.0
                               }}
    
                if args.end_before:
                    constraints.setdefault('time', {})['before'] = args.end_before
    
                if args.start_at:
                    constraints.setdefault('time', {})['at'] = args.start_at
    
                scheduling_unit_data = {"name": "CLI_trigger_%s" % (uuid.uuid4()),
                                        "description": "Trigger submitted via CLI tools: %s" % strategy_template['name'],
                                        "tags": [],
                                        "requirements_doc": requirements,
                                        "copy_reason": None,
                                        "generator_instance_doc": None,
                                        "copies": None,
                                        "scheduling_set": session.api_url + '/scheduling_set/%s/' % args.scheduling_set_id,
                                        "requirements_template": session.api_url + '/scheduling_unit_template/1/',
                                        "observation_strategy_template": strategy_template['url'],
                                        "scheduling_constraints_template_id": 1,
                                        "scheduling_constraints_doc":  constraints,
                                        "interrupts_telescope": True,
                                        "scheduling_unit_blueprints": [],
                                        "task_drafts": []}
                pprint(session.trigger_scheduling_unit(json_data=scheduling_unit_data, test=args.test))
        except Exception as e:
            print(e)
            exit(1)