Select Git revision
test_cool_module.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
mains.py 11.66 KiB
import json
import argparse
from pprint import pprint
from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession
from lofar.common.datetimeutils import parseDatetime
import dateutil.parser
import datetime
def main_get_subtask_parset():
parser = argparse.ArgumentParser()
parser.add_argument("subtask_id", help="The ID of the TMSS subtask to get the parset from")
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap() as session:
print(session.get_subtask_parset(args.subtask_id))
except Exception as e:
print(e)
exit(1)
def main_get_subtask_predecessors():
parser = argparse.ArgumentParser()
parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to get the predecessors for")
parser.add_argument('-s', '--state', help="only get predecessors with this state")
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap() as session:
pprint(session.get_subtask_predecessors(args.subtask_id, state=args.state))
except Exception as e:
print(e)
exit(1)
def main_get_subtask_successors():
parser = argparse.ArgumentParser()
parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to get the successors for")
parser.add_argument('-s', '--state', help="only get successors with this state")
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap() as session:
pprint(session.get_subtask_successors(args.subtask_id, state=args.state))
except Exception as e:
print(e)
exit(1)
def main_get_subtask():
parser = argparse.ArgumentParser()
parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to get")
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap() as session:
pprint(session.get_subtask(args.subtask_id))
except Exception as e:
print(e)
exit(1)
def main_get_subtasks():
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--state', help="only get subtasks with this state")
parser.add_argument('-c', '--cluster', help="only get subtasks for this cluster")
parser.add_argument('--start_time_less_then', help="only get subtasks with a start time less then this timestamp")
parser.add_argument('--start_time_greater_then', help="only get subtasks with a start time greater then this timestamp")
parser.add_argument('--stop_time_less_then', help="only get subtasks with a stop time less then this timestamp")
parser.add_argument('--stop_time_greater_then', help="only get subtasks with a stop time greater then this timestamp")
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap() as session:
result = session.get_subtasks(state=args.state,
cluster=args.cluster,
start_time_less_then=parseDatetime(args.start_time_less_then) if args.start_time_less_then else None,
start_time_greater_then=parseDatetime(args.start_time_greater_then) if args.start_time_greater_then else None,
stop_time_less_then=parseDatetime(args.stop_time_less_then) if args.stop_time_less_then else None,
stop_time_greater_then=parseDatetime(args.stop_time_greater_then) if args.stop_time_greater_then else None)
pprint(result)
except Exception as e:
print(e)
exit(1)
def main_set_subtask_state():
parser = argparse.ArgumentParser()
parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to set the status on")
parser.add_argument("state", help="The state to set")
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap() as session:
changed_subtask = session.set_subtask_status(args.subtask_id, args.state)
print("%s now has state %s, see: %s" % (changed_subtask['id'], changed_subtask['state_value'], changed_subtask['url']))
except Exception as e:
print(e)
exit(1)
def main_specify_observation_task():
"""
Ask user for parameter 'taskid' and execute API-call to specify observation
"""
parser = argparse.ArgumentParser()
parser.add_argument("task_id", help="The ID of the TMSS task to specify for observation")
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap() as session:
result = session.specify_observation_task(args.task_id)
except Exception as e:
print(e)
exit(1)
def main_schedule_subtask():
parser = argparse.ArgumentParser()
parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to be scheduled")
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap() as session:
pprint(session.schedule_subtask(args.subtask_id, retry_count=3))
except Exception as e:
print(e)
exit(1)
def main_unschedule_subtask():
parser = argparse.ArgumentParser()
parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to be unscheduled")
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap() as session:
pprint(session.unschedule_subtask(args.subtask_id, retry_count=3))
except Exception as e:
print(e)
exit(1)
def main_cancel_subtask():
parser = argparse.ArgumentParser()
parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to be cancelled")
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap() as session:
pprint(session.cancel_subtask(args.subtask_id, retry_count=3))
except Exception as e:
print(e)
exit(1)
def main_get_setting():
parser = argparse.ArgumentParser()
parser.add_argument("setting_name", type=str, help="The name of the TMSS setting to get")
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap() as session:
pprint(session.get_setting(args.setting_name))
except Exception as e:
print(e)
exit(1)
def main_set_setting():
parser = argparse.ArgumentParser()
parser.add_argument("setting_name", type=str, help="The name of the TMSS setting to set")
parser.add_argument("setting_value", type=lambda s: s.lower() in ['true', 'True', '1'], # argparse is noot very good at speaking bool...
help="The value to set for the TMSS setting")
args = parser.parse_args()
try:
with TMSSsession.create_from_dbcreds_for_ldap() as session:
pprint(session.set_setting(args.setting_name, args.setting_value))
except Exception as e:
print(e)
exit(1)
def main_trigger_scheduling_unit(scheduling_unit_observing_strategy_template_id: int):
parser = argparse.ArgumentParser()
parser.add_argument("pointing_angle1", type=float, help="First angle of the pointing")
parser.add_argument("pointing_angle2", type=float, help="Second angle of the pointing")
parser.add_argument("--pointing_direction_type", type=str, default="J2000", help="The direction type of the pointing. Defaults to J2000.")
parser.add_argument("--start_at", type=str, default=None, help="The start time of the observation as ISO string. If specified, a constraint will be added so that the observation must start exactly at this time. Defaults to None.")
parser.add_argument("--end_before", type=str, default=None, help="The latest end time of the observation as ISO string. If specified, a constraint will be added so that the observation will run at any time before this time. Defaults to None.")
parser.add_argument("--target_name", type=str, default='target1', help="The name of the observation target. Defaults to 'target1'.")
parser.add_argument("duration", type=int, help="The duration of the observation in seconds.")
parser.add_argument("scheduling_set_id", type=int, help="The ID of the scheduling set the trigger should be placed in. This should've been provided to you.")
parser.add_argument("test", type=bool, default=True, help="Whether this is a test submission or should be actually performed. Set False to submit a real trigger. Defaults to True.")
args = parser.parse_args()
try:
import uuid
with TMSSsession('test', 'test', 'localhost', 8000, TMSSsession.BASICAUTH) as session: # alternatively: TMSSsession.create_from_dbcreds_for_ldap()
strategy_template = session.get_scheduling_unit_observing_strategy_template(scheduling_unit_observing_strategy_template_id)
requirements = strategy_template['template']
def dict_search_and_replace(d, key, value):
if key in d:
d[key] = value
for k in d:
if isinstance(d[k], dict):
dict_search_and_replace(d[k], key, value)
elif isinstance(d[k], list):
for i in d[k]:
if isinstance(i, dict):
dict_search_and_replace(i, key, value)
dict_search_and_replace(requirements['tasks'], 'angle1', args.pointing_angle1)
dict_search_and_replace(requirements['tasks'], 'angle2', args.pointing_angle2)
dict_search_and_replace(requirements['tasks'], 'name', args.target_name)
dict_search_and_replace(requirements['tasks'], 'target', args.target_name)
dict_search_and_replace(requirements['tasks'], 'duration', args.duration)
constraints = {"sky": {
"min_distance": {
"sun": 0.0,
"moon": 0.0,
"jupiter": 0.0
},
"transit_offset": {
"to": 7200,
"from": -7200
},
"min_target_elevation": 0.0
}}
if args.end_before:
constraints.setdefault('time', {})['before'] = args.end_before
if args.start_at:
constraints.setdefault('time', {})['at'] = args.start_at
scheduling_unit_data = {"name": "CLI_trigger_%s" % (uuid.uuid4()),
"description": "Trigger submitted via CLI tools: %s" % strategy_template['name'],
"tags": [],
"requirements_doc": requirements,
"copy_reason": None,
"generator_instance_doc": None,
"copies": None,
"scheduling_set": session.api_url + '/scheduling_set/%s/' % args.scheduling_set_id,
"requirements_template": session.api_url + '/scheduling_unit_template/1/',
"observation_strategy_template": strategy_template['url'],
"scheduling_constraints_template_id": 1,
"scheduling_constraints_doc": constraints,
"interrupts_telescope": True,
"scheduling_unit_blueprints": [],
"task_drafts": []}
pprint(session.trigger_scheduling_unit(json_data=scheduling_unit_data, test=args.test))
except Exception as e:
print(e)
exit(1)