diff --git a/SAS/TMSS/services/subtask_scheduling/CMakeLists.txt b/SAS/TMSS/services/subtask_scheduling/CMakeLists.txt index 7d664ccd6981e1078644aac1c1d1f1deef870aab..460e356bc2c99121eb41a48fc27fad7d20a51fac 100644 --- a/SAS/TMSS/services/subtask_scheduling/CMakeLists.txt +++ b/SAS/TMSS/services/subtask_scheduling/CMakeLists.txt @@ -4,3 +4,5 @@ lofar_find_package(PythonInterp 3.4 REQUIRED) add_subdirectory(lib) add_subdirectory(bin) +add_subdirectory(test) + diff --git a/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py b/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py index a0b02d2e22e4e444258092e2f338151c38ae2837..fcc9043703a1afb0f0883f0ccfa21b664867a8f7 100644 --- a/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py +++ b/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py @@ -79,6 +79,12 @@ class TMSSSubTaskSchedulingEventMessageHandler(TMSSSubTaskEventMessageHandler): except Exception as e: logger.error(e) +def create_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, tmss_client_credentials_id: str=None): + return TMSSSubTaskBusListener(handler_type=TMSSSubTaskSchedulingEventMessageHandler, + handler_kwargs={'tmss_client_credentials_id': tmss_client_credentials_id}, + exchange=exchange, + broker=broker) + def main(): # make sure we run in UTC timezone os.environ['TZ'] = 'UTC' @@ -95,10 +101,7 @@ def main(): help='the credentials id for the file in ~/.lofar/dbcredentials which holds the TMSS http REST api url and credentials, default: %default') (options, args) = parser.parse_args() - with TMSSSubTaskBusListener(handler_type=TMSSSubTaskSchedulingEventMessageHandler, - handler_kwargs={'tmss_client_credentials_id': options.tmss_client_credentials_id}, - exchange=options.exchange, - broker=options.broker): + with create_service(options.exchange, options.broker, options.tmss_client_credentials_id): waitForInterrupt() if __name__ == '__main__': diff --git a/SAS/TMSS/services/subtask_scheduling/test/CMakeLists.txt b/SAS/TMSS/services/subtask_scheduling/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..b9da06a5dc6b27fde81e26c6cc5ba027cae2d821 --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/test/CMakeLists.txt @@ -0,0 +1,7 @@ +# $Id: CMakeLists.txt 32679 2015-10-26 09:31:56Z schaap $ + +if(BUILD_TESTING) + include(LofarCTest) + + lofar_add_test(t_subtask_scheduling_service) +endif() diff --git a/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py new file mode 100755 index 0000000000000000000000000000000000000000..20b6db5797fcb677433a7a3e73775d7640ff5735 --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +import unittest +import uuid + +import logging +logger = logging.getLogger(__name__) + +from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment +from lofar.sas.tmss.test.tmss_test_environment_unittest_setup import * +from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator + +from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor +from lofar.sas.tmss.services.subtask_scheduling import create_service +from lofar.common.test_utils import integration_test +from time import sleep +from datetime import datetime, timedelta + +@integration_test +class TestSubtaskSchedulingService(unittest.TestCase): + ''' + Tests for the SubtaskSchedulingService + ''' + @classmethod + def setUpClass(cls) -> None: + cls.TEST_UUID = uuid.uuid1() + + cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID)) + cls.tmp_exchange.open() + + cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address) + cls.tmss_test_env.start() + + cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url, + (tmss_test_env.ldap_server.dbcreds.user, tmss_test_env.ldap_server.dbcreds.password)) + + @classmethod + def tearDownClass(cls) -> None: + cls.tmss_test_env.stop() + cls.tmp_exchange.close() + + def test_01_for_expected_behaviour(self): + ''' + This test starts a scheduling service and tmss, creates a chain of subtasks, finishes the first, and checks if the successors are then scheduled. + ''' + + logger.info(' -- test_01_for_expected_behaviour -- ') + + # create and start the service (the object under test) + service = create_service(exchange=self.tmp_exchange.address, tmss_client_credentials_id=self.tmss_test_env.client_credentials.dbcreds_id) + with BusListenerJanitor(service): + # ------------------------- + # long setup of objects.... + + # setup proper template + subtask_template_url = self.test_data_creator.post_data_and_get_url(self.test_data_creator.SubtaskTemplate(subtask_type_url=self.test_data_creator.django_api_url + '/subtask_type/qa_files/'), '/subtask_template/') + + # create two subtasks + subtask1_url = self.test_data_creator.post_data_and_get_url(self.test_data_creator.Subtask(state="defined", specifications_template_url=subtask_template_url), '/subtask/') + subtask2_url = self.test_data_creator.post_data_and_get_url(self.test_data_creator.Subtask(state="defined", specifications_template_url=subtask_template_url), '/subtask/') + + # ugly + subtask1_id = subtask1_url.split('/')[subtask1_url.split('/').index('subtask') + 1] + subtask2_id = subtask2_url.split('/')[subtask2_url.split('/').index('subtask') + 1] + + # connect them + output_url = self.test_data_creator.post_data_and_get_url(self.test_data_creator.SubtaskOutput(subtask1_url), '/subtask_output/') + input_url = self.test_data_creator.post_data_and_get_url(self.test_data_creator.SubtaskInput(subtask_url=subtask2_url, subtask_output_url=output_url), '/subtask_input/') + + # ... end of long setup of objects + # -------------------------------- + + # now for the real test: set subtask1_id status to finished, and check that subtask2 is then properly scheduled + with self.tmss_test_env.create_tmss_client() as tmss_client: + subtask1 = tmss_client.get_subtask(subtask1_id) + subtask2 = tmss_client.get_subtask(subtask2_id) + + subtask1_status = subtask1['state'].split('/')[subtask1['state'].split('/').index('subtask_state')+1] #ugly + subtask2_status = subtask2['state'].split('/')[subtask2['state'].split('/').index('subtask_state')+1] #ugly + self.assertEqual(subtask1_status, 'defined') + self.assertEqual(subtask2_status, 'defined') + + # the first subtask ran, and is now finished... set it's status. This should trigger the scheduling service to schedule the second subtask. + tmss_client.set_subtask_status(subtask1_id, 'finished') + + # allow some time for the scheduling service to do its thing... + start = datetime.utcnow() + while subtask2_status != 'scheduled': + subtask2 = tmss_client.get_subtask(subtask2_id) + subtask2_status = subtask2['state'].split('/')[subtask2['state'].split('/').index('subtask_state')+1] #ugly + sleep(0.05) + if datetime.utcnow() - start > timedelta(seconds=10): + raise TimeoutError() + + # subtask2 should now be scheduled + self.assertEqual(subtask2_status, 'scheduled') + +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + +if __name__ == '__main__': + #run the unit tests + unittest.main() diff --git a/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.run b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.run new file mode 100755 index 0000000000000000000000000000000000000000..a38aefc96f84db6b0d634f11e0524ff4513191b5 --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.run @@ -0,0 +1,6 @@ +#!/bin/bash + +# Run the unit test +source python-coverage.sh +python_coverage_test "*tmss*" t_subtask_scheduling_service.py + diff --git a/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.sh b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.sh new file mode 100755 index 0000000000000000000000000000000000000000..60abec462c84d1a99cf2df03b1368271772dec55 --- /dev/null +++ b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh t_subtask_scheduling_service \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/src/tmss/tmssapp/subtasks.py index 85a0a87a210041588c9be288089a286cf68c568b..6aa7a3304f20bc277fd193667b5c28a14e5ae9c4 100644 --- a/SAS/TMSS/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/src/tmss/tmssapp/subtasks.py @@ -96,15 +96,16 @@ def schedule_qafile_subtask(qafile_subtask: Subtask): # step 4: create output dataproducts, and link these to the output # TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint? - qafile_subtask_dataproduct = Dataproduct.objects.create(filename="L%d_QA.h5" % (qafile_subtask.id,), - directory="/data/qa/qa_files", - dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_HDF5.value), - producer=qafile_subtask.outputs.first(), - specifications_doc="", - specifications_template=DataproductSpecificationsTemplate.objects.first(), # ????? - feedback_doc="", - feedback_template=DataproductFeedbackTemplate.objects.first() # ????? - ) + if qafile_subtask.outputs.first(): + qafile_subtask_dataproduct = Dataproduct.objects.create(filename="L%d_QA.h5" % (qafile_subtask.id,), + directory="/data/qa/qa_files", + dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_HDF5.value), + producer=qafile_subtask.outputs.first(), + specifications_doc="", + specifications_template=DataproductSpecificationsTemplate.objects.first(), # ????? + feedback_doc="", + feedback_template=DataproductFeedbackTemplate.objects.first() # ????? + ) # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it) qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value) diff --git a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py index f1748e6e8a7f85d01ad3c7841ce93c042d2af79b..48afa70f17b0622e3c2a740b4c97b201c3d0f554 100644 --- a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py @@ -167,9 +167,6 @@ class SubtaskViewSet(LOFARViewSet): filter_class = SubTaskFilter ordering = ('start_time',) - def get_queryset(self): - return super().get_queryset() - @swagger_auto_schema(auto_schema=TextPlainAutoSchema, responses={200: 'A LOFAR parset for this subtask (as plain text)', 403: 'forbidden', diff --git a/SAS/TMSS/test/tmss_test_data_rest.py b/SAS/TMSS/test/tmss_test_data_rest.py index 065a5614956f21b96a2dfccdd66a26a666cac063..e3bdab1480e8fa79c9e697d2481ed457f17f5226 100644 --- a/SAS/TMSS/test/tmss_test_data_rest.py +++ b/SAS/TMSS/test/tmss_test_data_rest.py @@ -359,7 +359,7 @@ class TMSSRESTTestDataCreator(): "location": "upstairs", "tags": ['tmss', 'testing']} - def Subtask(self, cluster_url=None, task_blueprint_url=None, specifications_template_url=None, specifications_doc=None): + def Subtask(self, cluster_url=None, task_blueprint_url=None, specifications_template_url=None, specifications_doc=None, state:str="defining"): if cluster_url is None: cluster_url = self.post_data_and_get_url(self.Cluster(), '/cluster/') @@ -374,7 +374,7 @@ class TMSSRESTTestDataCreator(): return {"start_time": datetime.utcnow().isoformat(), "stop_time": datetime.utcnow().isoformat(), - "state": self.django_api_url + '/subtask_state/defining/', + "state": self.django_api_url + '/subtask_state/%s/' % (state,), "specifications_doc": specifications_doc, "task_blueprint": task_blueprint_url, "specifications_template": specifications_template_url,