diff --git a/CMake/LofarPackageList.cmake b/CMake/LofarPackageList.cmake index 10ed0938ba2b2cf57be7480a23b4a2158643d183..740c52181b38c49d3b3a57866e5423582e3d096d 100644 --- a/CMake/LofarPackageList.cmake +++ b/CMake/LofarPackageList.cmake @@ -1,7 +1,7 @@ # - Create for each LOFAR package a variable containing the absolute path to # its source directory. # -# Generated by gen_LofarPackageList_cmake.sh at ma 4 mei 2020 12:07:08 CEST +# Generated by gen_LofarPackageList_cmake.sh at do 28 mei 2020 11:22:44 CEST # # ---- DO NOT EDIT ---- # @@ -207,6 +207,7 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) set(TaskPrescheduler_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/ResourceAssignment/TaskPrescheduler) set(RACommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/ResourceAssignment/Common) set(TMSSClient_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/client) + set(TMSSSubtaskSchedulingService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/subtask_scheduling) set(TriggerEmailServiceCommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Common) set(TriggerEmailServiceServer_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Server) set(CCU_MAC_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/CCU_MAC) diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index c5273956ec6e68066192054c9a9261b00b74188e..65d72b100967e758a65a5c734063a05e7069c64c 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -1417,10 +1417,11 @@ class BusListener: sanitized_routing_key = self.routing_key.replace(".#","").replace(".*","").replace("#","").replace("*","") if not sanitized_routing_key: sanitized_routing_key = "all" - return "%s.queue.for.%s.%s.on.%s" % (self.exchange, - program_name(include_extension=False), - self.__class__.__name__, - sanitized_routing_key) + return "%s.queue.for.%s.%s.%s.on.%s" % (self.exchange, + program_name(include_extension=False), + self.__class__.__name__, + self._handler_type.__name__, + sanitized_routing_key) def is_running(self) -> bool: """Is this listener running its background listen/handle loops?""" diff --git a/SAS/TMSS/CMakeLists.txt b/SAS/TMSS/CMakeLists.txt index afc70e4bdb9d18cdc4bedface086d071fb769db6..8c0d7575c2b5ab1194e3115d8faca0debdc0586a 100644 --- a/SAS/TMSS/CMakeLists.txt +++ b/SAS/TMSS/CMakeLists.txt @@ -1,12 +1,14 @@ lofar_package(TMSS 0.1 DEPENDS PyCommon pyparameterset PyMessaging) +lofar_add_package(TMSSClient client) add_subdirectory(src) add_subdirectory(bin) add_subdirectory(test) add_subdirectory(frontend) +add_subdirectory(services) lofar_add_docker_files(docker-compose-tmss.yml) -lofar_add_package(TMSSClient client) + diff --git a/SAS/TMSS/client/bin/CMakeLists.txt b/SAS/TMSS/client/bin/CMakeLists.txt index 501bc8f8b66f37c17a4b74a29fd3f29478bd9287..a7142728b7503dd6acabf1ac85e4a611c7e8b7c7 100644 --- a/SAS/TMSS/client/bin/CMakeLists.txt +++ b/SAS/TMSS/client/bin/CMakeLists.txt @@ -2,3 +2,6 @@ lofar_add_bin_scripts(tmss_set_subtask_state) lofar_add_bin_scripts(tmss_get_subtask_parset) lofar_add_bin_scripts(tmss_get_subtask) lofar_add_bin_scripts(tmss_get_subtasks) +lofar_add_bin_scripts(tmss_get_subtask_predecessors) +lofar_add_bin_scripts(tmss_get_subtask_successors) +lofar_add_bin_scripts(tmss_schedule_subtask) diff --git a/SAS/TMSS/client/bin/tmss_get_subtask_predecessors b/SAS/TMSS/client/bin/tmss_get_subtask_predecessors new file mode 100755 index 0000000000000000000000000000000000000000..7944ab528d3929dd4587f072ab7ef839b20fc033 --- /dev/null +++ b/SAS/TMSS/client/bin/tmss_get_subtask_predecessors @@ -0,0 +1,23 @@ +#!/usr/bin/python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +from lofar.sas.tmss.client.mains import main_get_subtask_predecessors + +if __name__ == "__main__": + main_get_subtask_predecessors() diff --git a/SAS/TMSS/client/bin/tmss_get_subtask_successors b/SAS/TMSS/client/bin/tmss_get_subtask_successors new file mode 100755 index 0000000000000000000000000000000000000000..ed50a3edf182e71dd716233a460b23189e23a39d --- /dev/null +++ b/SAS/TMSS/client/bin/tmss_get_subtask_successors @@ -0,0 +1,23 @@ +#!/usr/bin/python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +from lofar.sas.tmss.client.mains import main_get_subtask_successors + +if __name__ == "__main__": + main_get_subtask_successors() diff --git a/SAS/TMSS/client/bin/tmss_schedule_subtask b/SAS/TMSS/client/bin/tmss_schedule_subtask new file mode 100755 index 0000000000000000000000000000000000000000..ec0a2df943bbf8e7181913fc7267da0bce316f2b --- /dev/null +++ b/SAS/TMSS/client/bin/tmss_schedule_subtask @@ -0,0 +1,23 @@ +#!/usr/bin/python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +from lofar.sas.tmss.client.mains import main_schedule_subtask + +if __name__ == "__main__": + main_schedule_subtask() diff --git a/SAS/TMSS/client/lib/mains.py b/SAS/TMSS/client/lib/mains.py index 57cb619cb412ab18d840c7f91a622b9fe66564bf..f645b9643a195d213ee4411a7cd3e964419afcbe 100644 --- a/SAS/TMSS/client/lib/mains.py +++ b/SAS/TMSS/client/lib/mains.py @@ -10,8 +10,40 @@ def main_get_subtask_parset(): parser.add_argument("subtask_id", help="The ID of the TMSS subtask to get the parset from") args = parser.parse_args() - with TMSSsession.create_from_dbcreds_for_ldap() as session: - print(session.get_subtask_parset(args.subtask_id)) + try: + with TMSSsession.create_from_dbcreds_for_ldap() as session: + print(session.get_subtask_parset(args.subtask_id)) + except Exception as e: + print(e) + exit(1) + + +def main_get_subtask_predecessors(): + parser = argparse.ArgumentParser() + parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to get the predecessors for") + parser.add_argument('-s', '--state', help="only get predecessors with this state") + args = parser.parse_args() + + try: + with TMSSsession.create_from_dbcreds_for_ldap() as session: + pprint(session.get_subtask_predecessors(args.subtask_id, state=args.state)) + except Exception as e: + print(e) + exit(1) + + +def main_get_subtask_successors(): + parser = argparse.ArgumentParser() + parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to get the successors for") + parser.add_argument('-s', '--state', help="only get successors with this state") + args = parser.parse_args() + + try: + with TMSSsession.create_from_dbcreds_for_ldap() as session: + pprint(session.get_subtask_successors(args.subtask_id, state=args.state)) + except Exception as e: + print(e) + exit(1) def main_get_subtask(): @@ -19,8 +51,12 @@ def main_get_subtask(): parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to get") args = parser.parse_args() - with TMSSsession.create_from_dbcreds_for_ldap() as session: - pprint(session.get_subtask(args.subtask_id)) + try: + with TMSSsession.create_from_dbcreds_for_ldap() as session: + pprint(session.get_subtask(args.subtask_id)) + except Exception as e: + print(e) + exit(1) def main_get_subtasks(): @@ -33,14 +69,18 @@ def main_get_subtasks(): parser.add_argument('--stop_time_greater_then', help="only get subtasks with a stop time greater then this timestamp") args = parser.parse_args() - with TMSSsession.create_from_dbcreds_for_ldap() as session: - result = session.get_subtasks(state=args.state, - cluster=args.cluster, - start_time_less_then=parseDatetime(args.start_time_less_then) if args.start_time_less_then else None, - start_time_greater_then=parseDatetime(args.start_time_greater_then) if args.start_time_greater_then else None, - stop_time_less_then=parseDatetime(args.stop_time_less_then) if args.stop_time_less_then else None, - stop_time_greater_then=parseDatetime(args.stop_time_greater_then) if args.stop_time_greater_then else None) - pprint(result['results']) + try: + with TMSSsession.create_from_dbcreds_for_ldap() as session: + result = session.get_subtasks(state=args.state, + cluster=args.cluster, + start_time_less_then=parseDatetime(args.start_time_less_then) if args.start_time_less_then else None, + start_time_greater_then=parseDatetime(args.start_time_greater_then) if args.start_time_greater_then else None, + stop_time_less_then=parseDatetime(args.stop_time_less_then) if args.stop_time_less_then else None, + stop_time_greater_then=parseDatetime(args.stop_time_greater_then) if args.stop_time_greater_then else None) + pprint(result) + except Exception as e: + print(e) + exit(1) def main_set_subtask_state(): @@ -49,10 +89,13 @@ def main_set_subtask_state(): parser.add_argument("state", help="The state to set") args = parser.parse_args() - with TMSSsession.create_from_dbcreds_for_ldap() as session: - result = session.set_subtask_status(args.subtask_id, args.state) - result_obj = json.loads(result.content.decode('utf-8')) - print("%s now has state %s" % (result_obj['url'], result_obj['state'])) + try: + with TMSSsession.create_from_dbcreds_for_ldap() as session: + changed_subtask = session.set_subtask_status(args.subtask_id, args.state) + print("%s now has state %s" % (changed_subtask['url'], changed_subtask['state'])) + except Exception as e: + print(e) + exit(1) def main_specify_observation_task(): @@ -62,5 +105,23 @@ def main_specify_observation_task(): parser = argparse.ArgumentParser() parser.add_argument("task_id", help="The ID of the TMSS task to specify for observation") args = parser.parse_args() - with TMSSsession.create_from_dbcreds_for_ldap() as session: - result = session.specify_observation_task(args.task_id) + + try: + with TMSSsession.create_from_dbcreds_for_ldap() as session: + result = session.specify_observation_task(args.task_id) + except Exception as e: + print(e) + exit(1) + + +def main_schedule_subtask(): + parser = argparse.ArgumentParser() + parser.add_argument("subtask_id", type=int, help="The ID of the TMSS subtask to be scheduled") + args = parser.parse_args() + + try: + with TMSSsession.create_from_dbcreds_for_ldap() as session: + pprint(session.schedule_subtask(args.subtask_id)) + except Exception as e: + print(e) + exit(1) diff --git a/SAS/TMSS/client/lib/tmss_http_rest_client.py b/SAS/TMSS/client/lib/tmss_http_rest_client.py index a4092d4bddb6a5a93674682d7bc8f9bf8e27197f..867a29c05bc2c371bed2c51dfeb88d690f230bd4 100644 --- a/SAS/TMSS/client/lib/tmss_http_rest_client.py +++ b/SAS/TMSS/client/lib/tmss_http_rest_client.py @@ -2,6 +2,7 @@ import logging logger = logging.getLogger(__file__) import requests +from http.client import responses import os import json from datetime import datetime @@ -90,11 +91,17 @@ class TMSSsession(object): except: pass - def set_subtask_status(self, subtask_id: int, status: str) -> requests.Response: - '''set the status for the given subtask''' - result = self.session.patch(url='%s/subtask/%s/' % (self.base_url, subtask_id), - json={'state': "%s/subtask_state/%s/" % (self.base_url, status)}) - return result + def set_subtask_status(self, subtask_id: int, status: str) -> {}: + '''set the status for the given subtask, and return the subtask with its new state, or raise on error''' + response = self.session.patch(url='%s/subtask/%s/' % (self.base_url, subtask_id), + json={'state': "%s/subtask_state/%s/" % (self.base_url, status)}, + params={'format':'json'}) + + if response.status_code >= 200 and response.status_code < 300: + return json.loads(response.content.decode('utf-8')) + + content = response.content.decode('utf-8') + raise Exception("Could not set status with url %s - %s %s - %s" % (response.request.url, response.status_code, responses.get(response.status_code), content)) def get_subtask_parset(self, subtask_id) -> str: '''get the lofar parameterset (as text) for the given subtask''' @@ -103,6 +110,25 @@ class TMSSsession(object): return result.content.decode('utf-8') raise Exception("Could not get parameterset for subtask %s.\nResponse: %s" % (subtask_id, result)) + def get_subtask_predecessors(self, subtask_id: int, state: str=None) -> list: + '''get the subtask's predecessors as list of dict for the given subtask''' + + clauses = {} + if state is not None: + clauses["state__value"] = state + + path = 'subtask/%s/predecessors' % (subtask_id,) + return self.get_path_as_json_object(path, clauses) + + def get_subtask_successors(self, subtask_id: int, state: str=None) -> list: + '''get the subtask's successors as list of dict for the given subtask''' + clauses = {} + if state is not None: + clauses["state__value"] = state + + path = 'subtask/%s/successors' % (subtask_id,) + return self.get_path_as_json_object(path, clauses) + def get_subtask(self, subtask_id: int) -> dict: '''get the subtask as dict for the given subtask''' path = 'subtask/%s' % (subtask_id,) @@ -129,20 +155,38 @@ class TMSSsession(object): return self.get_path_as_json_object("subtask", clauses) - def get_path_as_json_object(self, path: str, params={}) -> dict: - '''get resource at the given path, interpret it as json, and return it as as native object''' + def get_path_as_json_object(self, path: str, params={}) -> object: + '''get resource at the given path, interpret it as json, and return it as as native object (usually a dict or a list of dicts)''' full_url = '%s/%s/' % (self.base_url, path) return self.get_url_as_json_object(full_url, params=params) - def get_url_as_json_object(self, full_url: str, params={}) -> dict: - '''get resource at the given full url (including http://<base_url>, interpret it as json, and return it as as native object''' + def get_url_as_json_object(self, full_url: str, params={}) -> object: + '''get resource at the given full url (including http://<base_url>, interpret it as json, and return it as as native object (usually a dict or a list of dicts)''' if "format=json" not in full_url or params.get("format") != "json": params['format'] ='json' - result = self.session.get(url=full_url, params=params) - if result.status_code >= 200 and result.status_code < 300: - return json.loads(result.content.decode('utf-8')) - raise Exception("Could not get %s.\nResponse: %s" % (full_url, result)) + response = self.session.get(url=full_url, params=params) + logger.info("[%s] %s %s on %s", response.request.method.upper(), response.status_code, responses.get(response.status_code), response.request.url) + + if response.status_code >= 200 and response.status_code < 300: + result = json.loads(response.content.decode('utf-8')) + if isinstance(result, dict): + result_object = result.get('results', result) # return the 'results' list if any, or lese just the object itself + + if result.get('next'): + # recurse, get the 'next' url, and return a concatenation of the results + return result_object + self.get_url_as_json_object(result['next']) + return result_object + return result + + # ugly error message parsing + content = response.content.decode('utf-8') + try: + error_msg = content.split('\n')[1] # magic! error message is at 2nd line of response... + except: + error_msg= content + + raise Exception("Could not get %s - %s %s - %s" % (full_url, response.status_code, responses.get(response.status_code), error_msg)) def get_subtask_template(self, name: str, version: str=None) -> dict: '''get the subtask_template as dict for the given name (and version)''' @@ -152,11 +196,13 @@ class TMSSsession(object): if version is not None: clauses["version"] = version result = self.get_path_as_json_object('subtask_template', clauses) - if result['count'] > 1: - raise ValueError("Found more then one SubtaskTemplate for clauses: %s" % (clauses,)) - elif result['count'] == 1: - return result['results'][0] - return None + if isinstance(result, list): + if len(result) > 1: + raise ValueError("Found more then one SubtaskTemplate for clauses: %s" % (clauses,)) + elif len(result) == 1: + return result[0] + return None + return result def specify_observation_task(self, task_id: int) -> requests.Response: """specify observation for the given draft task by just doing a REST API call """ @@ -164,3 +210,9 @@ class TMSSsession(object): if result.status_code >= 200 and result.status_code < 300: return result.content.decode('utf-8') raise Exception("Could not specify observation for task %s.\nResponse: %s" % (task_id, result)) + + def schedule_subtask(self, subtask_id: int) -> {}: + """schedule the subtask for the given subtask_id. + returns the scheduled subtask upon success, or raises.""" + return self.get_path_as_json_object('subtask/%s/schedule' % subtask_id) + diff --git a/SAS/TMSS/services/CMakeLists.txt b/SAS/TMSS/services/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..4520a7b4e826a8ad61f4feec3b7d1d263f10495a --- /dev/null +++ b/SAS/TMSS/services/CMakeLists.txt @@ -0,0 +1,2 @@ +lofar_add_package(TMSSSubtaskSchedulingService subtask_scheduling) + diff --git a/SAS/TMSS/services/subtask_scheduling/CMakeLists.txt b/SAS/TMSS/services/subtask_scheduling/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..460e356bc2c99121eb41a48fc27fad7d20a51fac --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/CMakeLists.txt @@ -0,0 +1,8 @@ +lofar_package(TMSSSubtaskSchedulingService 0.1 DEPENDS TMSSClient PyCommon pyparameterset PyMessaging) + +lofar_find_package(PythonInterp 3.4 REQUIRED) + +add_subdirectory(lib) +add_subdirectory(bin) +add_subdirectory(test) + diff --git a/SAS/TMSS/services/subtask_scheduling/bin/CMakeLists.txt b/SAS/TMSS/services/subtask_scheduling/bin/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..07e30a532f710dd1242ba026ad12e9ce014f1125 --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/bin/CMakeLists.txt @@ -0,0 +1,4 @@ +lofar_add_bin_scripts(tmss_subtask_scheduling_service) + +# supervisord config files +lofar_add_sysconf_files(tmss_subtask_scheduling_service.ini DESTINATION supervisord.d) diff --git a/SAS/TMSS/services/subtask_scheduling/bin/tmss_subtask_scheduling_service b/SAS/TMSS/services/subtask_scheduling/bin/tmss_subtask_scheduling_service new file mode 100755 index 0000000000000000000000000000000000000000..2ecd686a25fd88e45094bf4cda143e41de1fb61d --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/bin/tmss_subtask_scheduling_service @@ -0,0 +1,24 @@ +#!/usr/bin/python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + + +from lofar.sas.tmss.services.subtask_scheduling import main + +if __name__ == "__main__": + main() diff --git a/SAS/TMSS/services/subtask_scheduling/bin/tmss_subtask_scheduling_service.ini b/SAS/TMSS/services/subtask_scheduling/bin/tmss_subtask_scheduling_service.ini new file mode 100644 index 0000000000000000000000000000000000000000..e43c0d3e66f4534b32c6d6129397a0309a2b95e7 --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/bin/tmss_subtask_scheduling_service.ini @@ -0,0 +1,9 @@ +[program:tmss_subtask_scheduling_service] +command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec tmss_subtask_scheduling_service' +user=lofarsys +stopsignal=INT ; KeyboardInterrupt +stopasgroup=true ; bash does not propagate signals +stdout_logfile=%(program_name)s.log +redirect_stderr=true +stderr_logfile=NONE +stdout_logfile_maxbytes=0 diff --git a/SAS/TMSS/services/subtask_scheduling/lib/CMakeLists.txt b/SAS/TMSS/services/subtask_scheduling/lib/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..7cf0b591612ccb75bc2a73c1a6f9d1d8a2c2d9da --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/lib/CMakeLists.txt @@ -0,0 +1,10 @@ +lofar_find_package(PythonInterp 3.4 REQUIRED) +include(PythonInstall) + +set(_py_files + subtask_scheduling.py + ) + +python_install(${_py_files} + DESTINATION lofar/sas/tmss/services) + diff --git a/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py b/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py new file mode 100644 index 0000000000000000000000000000000000000000..40912203e7bc7bfbfe20ead7c4cf1c5c6ce4fc17 --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 + +# subtask_scheduling.py +# +# Copyright (C) 2015 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id: subtask_scheduling.py 1580 2015-09-30 14:18:57Z loose $ + +""" +The subtask_scheduling service schedules TMSS subtasks. +It listens on the lofar notification message bus for state changes of TMSS subtasks; when a task finished, +it schedules (rest action) all successors that are in state 'defined'. +""" + +import os +from optparse import OptionParser +import logging +logger = logging.getLogger(__name__) + +from lofar.sas.tmss.client.tmssbuslistener import * +from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession + +class TMSSSubTaskSchedulingEventMessageHandler(TMSSSubTaskEventMessageHandler): + ''' + ''' + def __init__(self, tmss_client_credentials_id: str=None): + super().__init__() + self.tmss_client = TMSSsession.create_from_dbcreds_for_ldap(tmss_client_credentials_id) + + def start_handling(self): + self.tmss_client.open() + super().start_handling() + + def stop_handling(self): + super().stop_handling() + self.tmss_client.close() + + def onSubTaskFinished(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskFinished is called upon received a SubTaskFinished message, which is sent when a SubTasks changes state to "Finished". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + logger.info("subtask %s finished. Trying to schedule defined successor subtasks...", subtask_id) + + successors = self.tmss_client.get_subtask_successors(subtask_id, state="defined") + + if not successors: + logger.info("subtask %s finished. No (defined) successor subtasks to schedule...", subtask_id) + + for successor in successors: + try: + suc_subtask_id = successor['url'].split('/')[successor['url'].split('/').index('subtask')+1] #ugly + suc_subtask_state = successor['state'].split('/')[successor['state'].split('/').index('subtask_state')+1] #ugly + + if suc_subtask_state == "defined": + logger.info("trying to schedule successor subtask %s for finished subtask %s", suc_subtask_id, subtask_id) + scheduled_successor = self.tmss_client.schedule_subtask(suc_subtask_id) + suc_subtask_state = scheduled_successor['state'].split('/')[scheduled_successor['state'].split('/').index('subtask_state')+1] #ugly + logger.info("successor subtask %s for finished subtask %s is now has state '%s'", suc_subtask_id, subtask_id, suc_subtask_state) + else: + logger.warning("skipping scheduling of successor subtask %s for finished subtask %s because its state is '%s'", suc_subtask_id, subtask_id, suc_subtask_state) + + except Exception as e: + logger.error(e) + +def create_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, tmss_client_credentials_id: str=None): + return TMSSSubTaskBusListener(handler_type=TMSSSubTaskSchedulingEventMessageHandler, + handler_kwargs={'tmss_client_credentials_id': tmss_client_credentials_id}, + exchange=exchange, + broker=broker) + +def main(): + # make sure we run in UTC timezone + os.environ['TZ'] = 'UTC' + + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + # Check the invocation arguments + parser = OptionParser('%prog [options]', + description='run the tmss_subtask_scheduling_service which automatically schedules the defined successor tasks for finished subtasks') + parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the messaging broker, default: %default') + parser.add_option('--exchange', dest='exchange', type='string', default=DEFAULT_BUSNAME, help='Name of the exchange on the messaging broker, default: %default') + parser.add_option('-t', '--tmss_client_credentials_id', dest='tmss_client_credentials_id', type='string', + default=os.environ.get("TMSS_CLIENT_DBCREDENTIALS", "TMSSClient"), + help='the credentials id for the file in ~/.lofar/dbcredentials which holds the TMSS http REST api url and credentials, default: %default') + (options, args) = parser.parse_args() + + with create_service(options.exchange, options.broker, options.tmss_client_credentials_id): + waitForInterrupt() + +if __name__ == '__main__': + main() diff --git a/SAS/TMSS/services/subtask_scheduling/test/CMakeLists.txt b/SAS/TMSS/services/subtask_scheduling/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..b9da06a5dc6b27fde81e26c6cc5ba027cae2d821 --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/test/CMakeLists.txt @@ -0,0 +1,7 @@ +# $Id: CMakeLists.txt 32679 2015-10-26 09:31:56Z schaap $ + +if(BUILD_TESTING) + include(LofarCTest) + + lofar_add_test(t_subtask_scheduling_service) +endif() diff --git a/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py new file mode 100755 index 0000000000000000000000000000000000000000..b171a012430ea1e10f976dc793bc0ead1fa47c04 --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +import unittest +import uuid + +import logging +logger = logging.getLogger(__name__) + +from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment +from lofar.sas.tmss.test.tmss_test_environment_unittest_setup import * +from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator + +from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor +from lofar.sas.tmss.services.subtask_scheduling import create_service +from lofar.common.test_utils import integration_test +from time import sleep +from datetime import datetime, timedelta + +@integration_test +class TestSubtaskSchedulingService(unittest.TestCase): + ''' + Tests for the SubtaskSchedulingService + ''' + @classmethod + def setUpClass(cls) -> None: + cls.TEST_UUID = uuid.uuid1() + + cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID)) + cls.tmp_exchange.open() + + cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address) + cls.tmss_test_env.start() + + cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url, + (tmss_test_env.ldap_server.dbcreds.user, tmss_test_env.ldap_server.dbcreds.password)) + + @classmethod + def tearDownClass(cls) -> None: + cls.tmss_test_env.stop() + cls.tmp_exchange.close() + + def test_01_for_expected_behaviour(self): + ''' + This test starts a scheduling service and tmss, creates a chain of subtasks, finishes the first, and checks if the successors are then scheduled. + ''' + + logger.info(' -- test_01_for_expected_behaviour -- ') + + # create and start the service (the object under test) + service = create_service(exchange=self.tmp_exchange.address, tmss_client_credentials_id=self.tmss_test_env.client_credentials.dbcreds_id) + with BusListenerJanitor(service): + # ------------------------- + # long setup of objects.... + + # setup proper template + subtask_template_url = self.test_data_creator.post_data_and_get_url(self.test_data_creator.SubtaskTemplate(subtask_type_url=self.test_data_creator.django_api_url + '/subtask_type/qa_files/'), '/subtask_template/') + + # create two subtasks + subtask1_url = self.test_data_creator.post_data_and_get_url(self.test_data_creator.Subtask(state="defined", specifications_template_url=subtask_template_url), '/subtask/') + subtask2_url = self.test_data_creator.post_data_and_get_url(self.test_data_creator.Subtask(state="defined", specifications_template_url=subtask_template_url), '/subtask/') + + # ugly + subtask1_id = subtask1_url.split('/')[subtask1_url.split('/').index('subtask') + 1] + subtask2_id = subtask2_url.split('/')[subtask2_url.split('/').index('subtask') + 1] + + # connect them + output_url = self.test_data_creator.post_data_and_get_url(self.test_data_creator.SubtaskOutput(subtask1_url), '/subtask_output/') + input_url = self.test_data_creator.post_data_and_get_url(self.test_data_creator.SubtaskInput(subtask_url=subtask2_url, subtask_output_url=output_url), '/subtask_input/') + + # ... end of long setup of objects + # -------------------------------- + + # now for the real test: set subtask1_id status to finished, and check that subtask2 is then properly scheduled + with self.tmss_test_env.create_tmss_client() as tmss_client: + subtask1 = tmss_client.get_subtask(subtask1_id) + subtask2 = tmss_client.get_subtask(subtask2_id) + + subtask1_status = subtask1['state'].split('/')[subtask1['state'].split('/').index('subtask_state')+1] #ugly + subtask2_status = subtask2['state'].split('/')[subtask2['state'].split('/').index('subtask_state')+1] #ugly + self.assertEqual(subtask1_status, 'defined') + self.assertEqual(subtask2_status, 'defined') + + # the first subtask ran, and is now finished... set it's status. This should trigger the scheduling service to schedule the second subtask. + tmss_client.set_subtask_status(subtask1_id, 'finished') + + # allow some time for the scheduling service to do its thing... + start = datetime.utcnow() + while subtask2_status != 'scheduled': + subtask2 = tmss_client.get_subtask(subtask2_id) + subtask2_status = subtask2['state'].split('/')[subtask2['state'].split('/').index('subtask_state')+1] #ugly + sleep(0.5) + if datetime.utcnow() - start > timedelta(seconds=2): + raise TimeoutError() + + # subtask2 should now be scheduled + self.assertEqual(subtask2_status, 'scheduled') + +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + +if __name__ == '__main__': + #run the unit tests + unittest.main() diff --git a/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.run b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.run new file mode 100755 index 0000000000000000000000000000000000000000..a38aefc96f84db6b0d634f11e0524ff4513191b5 --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.run @@ -0,0 +1,6 @@ +#!/bin/bash + +# Run the unit test +source python-coverage.sh +python_coverage_test "*tmss*" t_subtask_scheduling_service.py + diff --git a/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.sh b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.sh new file mode 100755 index 0000000000000000000000000000000000000000..60abec462c84d1a99cf2df03b1368271772dec55 --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh t_subtask_scheduling_service \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/exceptions.py b/SAS/TMSS/src/tmss/exceptions.py index 0ea1ea394479bb5b63e10ce3a689f739c135ab7c..c77b7226f0474e10745600e2b71f4284b141f8a8 100644 --- a/SAS/TMSS/src/tmss/exceptions.py +++ b/SAS/TMSS/src/tmss/exceptions.py @@ -7,3 +7,12 @@ class SchemaValidationException(TMSSException): class ConversionException(TMSSException): pass + +class SchedulingException(TMSSException): + pass + +class SubtaskSchedulingException(SchedulingException): + pass + +class TaskSchedulingException(SchedulingException): + pass diff --git a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py index b067845f2a6c8c6219f35fa8bfdcc622779d918b..48d9ca29ff7bd86466326ddeb51c47ec36c9f4e3 100644 --- a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py @@ -7,13 +7,14 @@ import logging logger = logging.getLogger(__name__) from django.db.models import ForeignKey, CharField, DateTimeField, BooleanField, IntegerField, BigIntegerField, \ - ManyToManyField, CASCADE, SET_NULL, PROTECT, UniqueConstraint + ManyToManyField, CASCADE, SET_NULL, PROTECT, UniqueConstraint, QuerySet from django.contrib.postgres.fields import ArrayField, JSONField from django.contrib.auth.models import User from .specification import AbstractChoice, BasicCommon, Template, NamedCommon # , <TaskBlueprint from enum import Enum from rest_framework.serializers import HyperlinkedRelatedField from django.dispatch import receiver +from django.db.models.expressions import RawSQL from lofar.sas.tmss.tmss.tmssapp.validation import validate_json_against_schema from lofar.messaging.messagebus import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME @@ -172,6 +173,28 @@ class Subtask(BasicCommon): content={'subtask_id': subtask_id, 'old_state': old_state, 'new_state': new_state}) tobus.send(msg) + @property + def successors(self) -> QuerySet: + '''return the connect successor subtask(s) as queryset (over which you can perform extended queries, or return via the serializers/viewsets) + If you want the result, add .all() like so: my_subtask.successors.all() + ''' + # JS, 20200528: I couldn't make django do a "self-reference" query from the subtask table to the subtask table (via input, output), so I used plain SQL. + return Subtask.objects.filter(id__in=RawSQL("SELECT successor_st.id FROM tmssapp_subtask as successor_st\n" + "INNER JOIN tmssapp_subtaskinput as st_input on st_input.subtask_id = successor_st.id\n" + "INNER JOIN tmssapp_subtaskoutput as st_output on st_output.id = st_input.producer_id\n" + "WHERE st_output.subtask_id = %s", params=[self.id])) + + @property + def predecessors(self) -> QuerySet: + '''return the connect predecessor subtask(s) as queryset (over which you can perform extended queries, or return via the serializers/viewsets) + If you want the result, add .all() like so: my_subtask.predecessors.all() + ''' + # JS, 20200528: I couldn't make django do a "self-reference" query from the subtask table to the subtask table (via input, output), so I used plain SQL. + return Subtask.objects.filter(id__in=RawSQL("SELECT predecessor_st.id FROM tmssapp_subtask as predecessor_st\n" + "INNER JOIN tmssapp_subtaskoutput as st_output on st_output.subtask_id = predecessor_st.id\n" + "INNER JOIN tmssapp_subtaskinput as st_input on st_input.producer_id = st_output.id\n" + "WHERE st_input.subtask_id = %s", params=[self.id])) + def save(self, force_insert=False, force_update=False, using=None, update_fields=None): creating = self._state.adding # True on create, False on update diff --git a/SAS/TMSS/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/src/tmss/tmssapp/subtasks.py index 66c2063359a00ecc0be9d5b7ea03c7eebe9e9e9c..249f2d5a3cbb6c0f642f43633fbf5233bdeda69a 100644 --- a/SAS/TMSS/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/src/tmss/tmssapp/subtasks.py @@ -3,6 +3,8 @@ logger = logging.getLogger(__name__) from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema +from lofar.sas.tmss.tmss.exceptions import SubtaskSchedulingException + from lofar.sas.tmss.tmss.tmssapp.models.specification import * from lofar.sas.tmss.tmss.tmssapp.models.scheduling import * @@ -25,7 +27,7 @@ def create_observation_to_qafile_subtask(observation_subtask: Subtask): observation_subtask.specifications_template.type, SubtaskType.Choices.OBSERVATION.value)) if observation_subtask.state.value == SubtaskState.Choices.DEFINING.value: - raise ValueError("Cannot create %s subtask for subtask id=%d because it is not DEFINED yet" % ( + raise ValueError("Cannot create %s subtask for subtask id=%d because it is not DEFINED" % ( SubtaskType.Choices.QA_FILES.value, observation_subtask.pk)) obs_task_spec = observation_subtask.task_blueprint.specifications_doc @@ -75,13 +77,12 @@ def schedule_qafile_subtask(qafile_subtask: Subtask): ''' # step 0: check pre-requisites - if qafile_subtask.state.value != SubtaskState.Choices.DEFINED.value: - raise ValueError("Cannot schedule subtask id=%d because it is not DEFINED yet. state=%s" % (qafile_subtask.pk, - qafile_subtask.state.value)) + check_prerequities_for_scheduling(qafile_subtask) if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value: - raise ValueError("Cannot schedule subtask id=%d type=%s but type should be %s" % (qafile_subtask.pk, - qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value)) + raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qafile_subtask.pk, + qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value)) + # step 1: set state to SCHEDULING qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) @@ -96,15 +97,16 @@ def schedule_qafile_subtask(qafile_subtask: Subtask): # step 4: create output dataproducts, and link these to the output # TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint? - qafile_subtask_dataproduct = Dataproduct.objects.create(filename="L%d_QA.h5" % (qafile_subtask.id,), - directory="/data/qa/qa_files", - dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_HDF5.value), - producer=qafile_subtask.outputs.first(), - specifications_doc="", - specifications_template=DataproductSpecificationsTemplate.objects.first(), # ????? - feedback_doc="", - feedback_template=DataproductFeedbackTemplate.objects.first() # ????? - ) + if qafile_subtask.outputs.first(): + qafile_subtask_dataproduct = Dataproduct.objects.create(filename="L%d_QA.h5" % (qafile_subtask.id,), + directory="/data/qa/qa_files", + dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_HDF5.value), + producer=qafile_subtask.outputs.first(), + specifications_doc="", + specifications_template=DataproductSpecificationsTemplate.objects.first(), # ????? + feedback_doc="", + feedback_template=DataproductFeedbackTemplate.objects.first() # ????? + ) # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) @@ -124,8 +126,8 @@ def create_qafile_to_qaplots_subtask(qafile_subtask: Subtask): qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value)) if qafile_subtask.state.value == SubtaskState.Choices.DEFINING.value: - raise ValueError("Cannot create %s subtask for subtask id=%d because it is not DEFINED yet" % ( - SubtaskType.Choices.QA_PLOTS.value, qafile_subtask.pk)) + raise ValueError("Cannot create %s subtask for subtask id=%d because it is not DEFINED. Current state=%s" % ( + SubtaskType.Choices.QA_PLOTS.value, qafile_subtask.pk, qafile_subtask.state.value)) obs_task_spec = qafile_subtask.task_blueprint.specifications_doc obs_task_qaplots_spec = obs_task_spec.get("QA", {}).get("plots", {}) @@ -174,14 +176,12 @@ def schedule_qaplots_subtask(qaplots_subtask: Subtask): ''' # step 0: check pre-requisites - if qaplots_subtask.state.value != SubtaskState.Choices.DEFINED.value: - raise ValueError("Cannot schedule subtask id=%d because it is not DEFINED yet. state=%s" % (qaplots_subtask.pk, - qaplots_subtask.state.value)) + check_prerequities_for_scheduling(qaplots_subtask) if qaplots_subtask.specifications_template.type.value != SubtaskType.Choices.QA_PLOTS.value: - raise ValueError("Cannot schedule subtask id=%d type=%s but type should be %s" % (qaplots_subtask.pk, - qaplots_subtask.specifications_template.type, - SubtaskType.Choices.QA_PLOTS.value)) + raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qaplots_subtask.pk, + qaplots_subtask.specifications_template.type, + SubtaskType.Choices.QA_PLOTS.value)) # step 1: set state to SCHEDULING qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value) @@ -374,3 +374,24 @@ def create_subtask(subtask_template: SubtaskTemplate, subtask_specifications): return Subtask.objects.create(**subtask_data) +def schedule_subtask(subtask: Subtask) -> Subtask: + '''Generic scheduling method for subtasks. Calls the appropiate scheduling method based on the subtask's type.''' + check_prerequities_for_scheduling(subtask) + + if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value: + return schedule_qafile_subtask(subtask) + + if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value: + return schedule_qaplots_subtask(subtask) + + raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." % (subtask.pk, subtask.specifications_template.type.value)) + +def check_prerequities_for_scheduling(subtask: Subtask) -> bool: + if subtask.state.value != SubtaskState.Choices.DEFINED.value: + raise SubtaskSchedulingException("Cannot schedule subtask id=%d because it is not DEFINED. Current state=%s" % (subtask.pk, subtask.state.value)) + + for predecessor in subtask.predecessors.all(): + if predecessor.state.value != SubtaskState.Choices.FINISHED.value: + raise SubtaskSchedulingException("Cannot schedule subtask id=%d because its predecessor id=%s in not FINISHED but state=%s" % (subtask.pk, predecessor.pk, predecessor.state.value)) + + return True \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py index 747bca0f9bf690b175d8df576f41c66fab00753c..48afa70f17b0622e3c2a740b4c97b201c3d0f554 100644 --- a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py @@ -17,6 +17,7 @@ from drf_yasg.inspectors import SwaggerAutoSchema from rest_framework.decorators import action from django.http import HttpResponse, JsonResponse +from rest_framework.response import Response as RestResponse from lofar.sas.tmss.tmss.tmssapp.viewsets.lofar_viewset import LOFARViewSet from lofar.sas.tmss.tmss.tmssapp import models @@ -182,6 +183,40 @@ class SubtaskViewSet(LOFARViewSet): return HttpResponse(parset_str, content_type='text/plain') + @swagger_auto_schema(responses={200: 'The predecessor subtasks of this subtask', + 403: 'forbidden'}, + operation_description="Get the predecessor subtasks of this subtask.") + @action(methods=['get'], detail=True, url_name="predecessors") + def predecessors(self, request, pk=None): + subtask = get_object_or_404(models.Subtask, pk=pk) + predecessors = self.filter_queryset(subtask.predecessors) + serializer = self.get_serializer(predecessors, many=True) + return RestResponse(serializer.data) + + + @swagger_auto_schema(responses={200: 'The successor subtasks of this subtask', + 403: 'forbidden'}, + operation_description="Get the successor subtasks of this subtask.") + @action(methods=['get'], detail=True, url_name="successors") + def successors(self, request, pk=None): + subtask = get_object_or_404(models.Subtask, pk=pk) + successors = self.filter_queryset(subtask.successors) + serializer = self.get_serializer(successors, many=True) + return RestResponse(serializer.data) + + + @swagger_auto_schema(responses={200: 'The a scheduled version of this subtask', + 403: 'forbidden', + 500: 'The subtask could not be scheduled'}, + operation_description="Try to schedule this subtask.") + @action(methods=['get'], detail=True, url_name="schedule") + def schedule(self, request, pk=None): + subtask = get_object_or_404(models.Subtask, pk=pk) + from lofar.sas.tmss.tmss.tmssapp.subtasks import schedule_subtask + scheduled_subtask = schedule_subtask(subtask) + serializer = self.get_serializer(scheduled_subtask) + return RestResponse(serializer.data) + class SubtaskNestedViewSet(LOFARNestedViewSet): queryset = models.Subtask.objects.all() serializer_class = serializers.SubtaskSerializer diff --git a/SAS/TMSS/test/t_tmssapp_scheduling_django_API.py b/SAS/TMSS/test/t_tmssapp_scheduling_django_API.py index 78fd1a0d54856d4b8d7a7b814972301c95518305..6260ec3fbbb7aee89671a9990c2a48db2cfa8581 100755 --- a/SAS/TMSS/test/t_tmssapp_scheduling_django_API.py +++ b/SAS/TMSS/test/t_tmssapp_scheduling_django_API.py @@ -211,6 +211,60 @@ class SubtaskTest(unittest.TestCase): with self.assertRaises(IntegrityError): models.Subtask.objects.create(**test_data) + def test_Subtask_predecessors_and_successors_none(self): + subtask1:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + + self.assertEqual(set(), set(subtask1.predecessors.all())) + self.assertEqual(set(), set(subtask2.predecessors.all())) + self.assertEqual(set(), set(subtask1.successors.all())) + self.assertEqual(set(), set(subtask2.successors.all())) + + def test_Subtask_predecessors_and_successors_simple(self): + subtask1:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + + output1 = models.SubtaskOutput.objects.create(subtask=subtask1) + models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask2, producer=output1)) + + self.assertEqual(subtask1, subtask2.predecessors.all()[0]) + self.assertEqual(subtask2, subtask1.successors.all()[0]) + + def test_Subtask_predecessors_and_successors_complex(self): + subtask1:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask3:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask4:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask5:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + subtask6:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + + # ST1 ---> ST3 ---> ST4 + # | | + # ST2 - -> ST5 ---> ST6 + + output1 = models.SubtaskOutput.objects.create(subtask=subtask1) + output2 = models.SubtaskOutput.objects.create(subtask=subtask2) + output3 = models.SubtaskOutput.objects.create(subtask=subtask3) + output4 = models.SubtaskOutput.objects.create(subtask=subtask4) + output5 = models.SubtaskOutput.objects.create(subtask=subtask5) + output6 = models.SubtaskOutput.objects.create(subtask=subtask6) + + models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask3, producer=output1)) + models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask3, producer=output2)) + models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask4, producer=output3)) + models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask5, producer=output3)) + models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask6, producer=output5)) + + self.assertEqual(set((subtask1, subtask2)), set(subtask3.predecessors.all())) + self.assertEqual(set((subtask4, subtask5)), set(subtask3.successors.all())) + self.assertEqual(set((subtask3,)), set(subtask4.predecessors.all())) + self.assertEqual(set((subtask3,)), set(subtask5.predecessors.all())) + self.assertEqual(set((subtask3,)), set(subtask1.successors.all())) + self.assertEqual(set((subtask3,)), set(subtask2.successors.all())) + self.assertEqual(set(), set(subtask1.predecessors.all())) + self.assertEqual(set(), set(subtask2.predecessors.all())) + self.assertEqual(set(), set(subtask4.successors.all())) + self.assertEqual(set((subtask6,)), set(subtask5.successors.all())) class DataproductTest(unittest.TestCase): def test_Dataproduct_gets_created_with_correct_creation_timestamp(self): diff --git a/SAS/TMSS/test/tmss_test_data_django_models.py b/SAS/TMSS/test/tmss_test_data_django_models.py index 155c1b780f8635cbf69a403f3794736aad27c570..2c7609339029bfe643e2bbc9938d52d74d6b3403 100644 --- a/SAS/TMSS/test/tmss_test_data_django_models.py +++ b/SAS/TMSS/test/tmss_test_data_django_models.py @@ -252,11 +252,16 @@ def SubtaskOutput_test_data(subtask: models.Subtask=None) -> dict: return {"subtask": subtask, "tags":[]} -def SubtaskInput_test_data() -> dict: - return {"subtask": models.Subtask.objects.create(**Subtask_test_data()), +def SubtaskInput_test_data(subtask: models.Subtask=None, producer: models.SubtaskOutput=None) -> dict: + if subtask is None: + subtask = models.Subtask.objects.create(**Subtask_test_data()) + + if producer is None: + producer = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data()) + + return {"subtask": subtask, "task_relation_blueprint": models.TaskRelationBlueprint.objects.create(**TaskRelationBlueprint_test_data()), - "producer": models.SubtaskOutput.objects.create(**SubtaskOutput_test_data()), - #"dataproducts": models.Dataproduct.objects.create(**dpt.get_test_data()), + "producer": producer, "selection_doc": {}, "selection_template": models.SubtaskInputSelectionTemplate.objects.create(**SubtaskInputSelectionTemplate_test_data()), "tags":[]} diff --git a/SAS/TMSS/test/tmss_test_data_rest.py b/SAS/TMSS/test/tmss_test_data_rest.py index 065a5614956f21b96a2dfccdd66a26a666cac063..e3bdab1480e8fa79c9e697d2481ed457f17f5226 100644 --- a/SAS/TMSS/test/tmss_test_data_rest.py +++ b/SAS/TMSS/test/tmss_test_data_rest.py @@ -359,7 +359,7 @@ class TMSSRESTTestDataCreator(): "location": "upstairs", "tags": ['tmss', 'testing']} - def Subtask(self, cluster_url=None, task_blueprint_url=None, specifications_template_url=None, specifications_doc=None): + def Subtask(self, cluster_url=None, task_blueprint_url=None, specifications_template_url=None, specifications_doc=None, state:str="defining"): if cluster_url is None: cluster_url = self.post_data_and_get_url(self.Cluster(), '/cluster/') @@ -374,7 +374,7 @@ class TMSSRESTTestDataCreator(): return {"start_time": datetime.utcnow().isoformat(), "stop_time": datetime.utcnow().isoformat(), - "state": self.django_api_url + '/subtask_state/defining/', + "state": self.django_api_url + '/subtask_state/%s/' % (state,), "specifications_doc": specifications_doc, "task_blueprint": task_blueprint_url, "specifications_template": specifications_template_url, diff --git a/SubSystems/RAServices/RAServices.ini b/SubSystems/RAServices/RAServices.ini index 0f013968492ce3d0a6dadfb22705bf26edf2219f..bc649306d58920185ea4e191d774735985bb638d 100644 --- a/SubSystems/RAServices/RAServices.ini +++ b/SubSystems/RAServices/RAServices.ini @@ -16,3 +16,6 @@ programs=ltastorageoverviewscraper,ltastorageoverviewwebservice [group:Messaging] programs=messagelogger + +[group:TMSS] +programs=tmss_subtask_scheduling_service