diff --git a/SAS/TMSS/client/lib/tmssbuslistener.py b/SAS/TMSS/client/lib/tmssbuslistener.py index 75d63297e8d5dfff5403d560c6cbc3843ffcd71e..65a645f5a7557df7d7b2bb39ef389449775cb35d 100644 --- a/SAS/TMSS/client/lib/tmssbuslistener.py +++ b/SAS/TMSS/client/lib/tmssbuslistener.py @@ -113,6 +113,8 @@ class TMSSEventMessageHandler(AbstractMessageHandler): self.onSchedulingUnitBlueprintStatusChanged(**msg.content) elif stripped_subject == 'Setting.Object.Updated': self.onSettingUpdated(**msg.content) + elif stripped_subject == 'SchedulingUnitBlueprint.Object.CannotProceed': + self.onSchedulingUnitBlueprintCannotProceed(**msg.content) else: raise MessageHandlerUnknownSubjectError("TMSSBusListener.handleMessage: unknown subject: %s" % msg.subject) @@ -239,7 +241,12 @@ class TMSSEventMessageHandler(AbstractMessageHandler): :param name: the name of the Setting ''' pass - + + def onSchedulingUnitBlueprintCannotProceed(self, id: int): + '''onSchedulingUnitBlueprintCannotProceed is called upon receiving a SchedulingUnitBlueprint.Object.onSchedulingUnitBlueprintCannotProceed message, which is sent when a SchedulingUnitBlueprints cannot Proceed. + :param id: the TMSS id of the SchedulingUnitBlueprint + ''' + pass class TMSSBusListener(BusListener): diff --git a/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py b/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py index 51532b9390cc3e2b54a2f637f4bc26faf992b4e7..07ca7801b97003d1756307f90fa10c40a61ad67b 100644 --- a/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py +++ b/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py @@ -28,6 +28,8 @@ from lofar.messaging.messagebus import ToBus from lofar.sas.tmss.client.tmssbuslistener import * from lofar.common import dbcredentials from lofar.common.util import single_line_with_single_spaces +from lofar.sas.tmss.tmss.tmssapp.models import Subtask, SchedulingUnitBlueprint + class TMSSPGListener(PostgresListener): '''This class subscribes to the Subtask, TaskDraft/Blueprint & SchedulingUnitDraft/Blueprint tables in the TMSS database @@ -146,7 +148,6 @@ class TMSSPGListener(PostgresListener): def onSubTaskStateUpdated(self, payload = None): payload_dict = json.loads(payload) # send notification for this subtask... - from lofar.sas.tmss.tmss.tmssapp.models.scheduling import Subtask subtask = Subtask.objects.get(id=payload_dict['id']) self._sendNotification(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.'+subtask.state.value.capitalize(), {'id': subtask.id, 'status': subtask.state.value}) @@ -161,12 +162,13 @@ class TMSSPGListener(PostgresListener): self.onSchedulingUnitBlueprintUpdated( {'id': subtask.task_blueprint.scheduling_unit_blueprint.id}) self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.'+subtask.task_blueprint.scheduling_unit_blueprint.status.capitalize(), {'id': subtask.task_blueprint.scheduling_unit_blueprint.id, 'status': subtask.task_blueprint.scheduling_unit_blueprint.status}) + def onTaskBlueprintInserted(self, payload = None): self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', payload) def onTaskBlueprintUpdated(self, payload = None): - self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', payload) + self._sendNotification (TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', payload) def onTaskBlueprintDeleted(self, payload = None): self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Deleted', payload) @@ -185,7 +187,13 @@ class TMSSPGListener(PostgresListener): def onSchedulingUnitBlueprintUpdated(self, payload = None): self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', payload) + + scheduling_unit_blueprint = SchedulingUnitBlueprint.objects.get(id=payload['id']) + if not scheduling_unit_blueprint.can_proceed: + self.onSchedulingUnitBlueprintCannotProceed( {'id': scheduling_unit_blueprint.id}) + + def onSchedulingUnitBlueprintDeleted(self, payload = None): self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Deleted', payload) @@ -209,6 +217,9 @@ class TMSSPGListener(PostgresListener): payload['value'] = payload['value'] in ('true', 'True', 't') self._sendNotification(TMSS_SETTING_OBJECT_EVENT_PREFIX+'.Updated', payload) + def onSchedulingUnitBlueprintCannotProceed(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.CannotProceed', payload) + def create_service(dbcreds, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): '''create a TMSSPGListener instance''' diff --git a/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py index b0b847668bf48905701d8e48adbc0273bb416162..fb4295ce7c99a293e51d05621922f990f6685cc2 100755 --- a/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py +++ b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py @@ -28,8 +28,8 @@ from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator from lofar.messaging.messagebus import TemporaryExchange -from lofar.sas.tmss.services.tmss_postgres_listener import * from lofar.common.test_utils import integration_test +from lofar.common.util import single_line_with_single_spaces from threading import Lock import requests import json @@ -65,6 +65,10 @@ class TestSubtaskSchedulingService(unittest.TestCase): This test starts a TMSSPGListener service and TMSS, creates/updates/deletes subtasks/tasks/schedulingunits, and checks if the correct events are sent. ''' logger.info(' -- test_01_for_expected_behaviour -- ') + from lofar.sas.tmss.services.tmss_postgres_listener import TMSSPGListener, TMSS_SUBTASK_OBJECT_EVENT_PREFIX, \ + TMSS_SUBTASK_STATUS_EVENT_PREFIX, TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX, TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX, \ + TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX, TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX, \ + TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX, TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX class TestTMSSPGListener(TMSSPGListener): '''Helper TMSSPGListener for this test, storing intermediate results, and providing synchronization threading.Events''' diff --git a/SAS/TMSS/services/workflow_service/lib/workflow_service.py b/SAS/TMSS/services/workflow_service/lib/workflow_service.py index c38bde688e87903f9b66a4c9f2d6234814a4c808..731e102810837deb7aae504b109d8a32e9e69e8d 100644 --- a/SAS/TMSS/services/workflow_service/lib/workflow_service.py +++ b/SAS/TMSS/services/workflow_service/lib/workflow_service.py @@ -32,18 +32,30 @@ class SchedulingUnitEventMessageHandler(TMSSEventMessageHandler): def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): try: # import here and not at top of module because we need the django.setup() to be run first, either from this module's main, or from the TMSSTestEnvironment - from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_signal + from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_status_changed_signal from lofar.sas.tmss.tmss.tmssapp.models import SchedulingUnitBlueprint logger.info("SchedulingUnitBlueprint id=%s status changed to '%s', signalling workflow...", id, status) scheduling_unit_blueprint = SchedulingUnitBlueprint.objects.get(pk=id) - scheduling_unit_blueprint_signal.send(sender=self.__class__, instance=scheduling_unit_blueprint, status=status) + scheduling_unit_blueprint_status_changed_signal.send(sender=self.__class__, instance=scheduling_unit_blueprint, status=status) except Exception as e: logger.error(e) + def onSchedulingUnitBlueprintCannotProceed(self, id: int): + try: + # import here and not at top of module because we need the django.setup() to be run first, either from this module's main, or from the TMSSTestEnvironment + from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_cannot_proceed_signal + from lofar.sas.tmss.tmss.tmssapp.models import SchedulingUnitBlueprint + + logger.info("SchedulingUnitBlueprint id=%s cannot proceeed, signalling workflow...", id) + scheduling_unit_blueprint = SchedulingUnitBlueprint.objects.get(pk=id) + scheduling_unit_blueprint_cannot_proceed_signal.send(sender=self.__class__, instance=scheduling_unit_blueprint) + except Exception as e: + logger.error(e) -def create_workflow_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): - return TMSSBusListener(handler_type=SchedulingUnitEventMessageHandler, + +def create_workflow_service(handler_type = SchedulingUnitEventMessageHandler, exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): + return TMSSBusListener(handler_type=handler_type, handler_kwargs={}, exchange=exchange, broker=broker) diff --git a/SAS/TMSS/src/tmss/tmssapp/models/specification.py b/SAS/TMSS/src/tmss/tmssapp/models/specification.py index 6c9ffeaa5c55d4d3a748b40df1b5c8322c770c41..942279a907ac3ac74feeee71f0b2848764c47cd5 100644 --- a/SAS/TMSS/src/tmss/tmssapp/models/specification.py +++ b/SAS/TMSS/src/tmss/tmssapp/models/specification.py @@ -711,6 +711,11 @@ class SchedulingUnitBlueprint(NamedCommon): return SchedulingUnitBlueprint.Status.SCHEDULED.value return SchedulingUnitBlueprint.Status.SCHEDULABLE.value + @property + def can_proceed(self) -> bool: + '''Can this scheduling unit proceed with running its tasks?''' + return self.status not in [SchedulingUnitBlueprint.Status.ERROR.value, SchedulingUnitBlueprint.Status.FINISHED.value, SchedulingUnitBlueprint.Status.CANCELLED.value] + def _task_graph_instantiated(self): return self._get_total_nbr_tasks() > 0 diff --git a/SAS/TMSS/src/tmss/workflowapp/CMakeLists.txt b/SAS/TMSS/src/tmss/workflowapp/CMakeLists.txt index 495fd6fd253557a1af5b9ae7c8231db36c5d1083..75a1df5532e2fd948f5ca9b759182b0b0688ae28 100644 --- a/SAS/TMSS/src/tmss/workflowapp/CMakeLists.txt +++ b/SAS/TMSS/src/tmss/workflowapp/CMakeLists.txt @@ -5,7 +5,6 @@ set(_py_files __init__.py admin.py apps.py - signals.py ) python_install(${_py_files} @@ -18,4 +17,5 @@ add_subdirectory(viewsets) add_subdirectory(forms) add_subdirectory(templates) add_subdirectory(tests) +add_subdirectory(signals) add_subdirectory(serializers) diff --git a/SAS/TMSS/src/tmss/workflowapp/flows/schedulingunitflow.py b/SAS/TMSS/src/tmss/workflowapp/flows/schedulingunitflow.py index 0bf572d1e31c9c7817a845a12d8d6abdbbb23f8f..eb8877831d406ad74bdd8e3ed257ffddf59df0c3 100644 --- a/SAS/TMSS/src/tmss/workflowapp/flows/schedulingunitflow.py +++ b/SAS/TMSS/src/tmss/workflowapp/flows/schedulingunitflow.py @@ -14,7 +14,7 @@ from .. import viewsets from lofar.sas.tmss.tmss.tmssapp.models import Subtask from django.dispatch import receiver -from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_signal +from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_status_changed_signal, scheduling_unit_blueprint_cannot_proceed_signal from viewflow import frontend, ThisObject from viewflow.activation import STATUS @@ -83,7 +83,7 @@ class SchedulingUnitFlow(Flow): start = ( flow.StartSignal( - scheduling_unit_blueprint_signal, + scheduling_unit_blueprint_status_changed_signal, this.on_save_can_start, ).Next(this.wait_scheduled) ) @@ -91,21 +91,22 @@ class SchedulingUnitFlow(Flow): wait_scheduled = ( Condition( this.check_condition_scheduled, - scheduling_unit_blueprint_signal, + scheduling_unit_blueprint_status_changed_signal, task_loader=this.get_scheduling_unit_task ) .Next(this.wait_processed) ) - + wait_processed = ( - Condition( + flow.Signal( + scheduling_unit_blueprint_cannot_proceed_signal, this.check_condition_processed, - scheduling_unit_blueprint_signal, task_loader=this.get_scheduling_unit_task ) .Next(this.qa_reporting_to) ) - + + #QA Reporting (TO) qa_reporting_to = ( flow.View( @@ -178,21 +179,18 @@ class SchedulingUnitFlow(Flow): @method_decorator(flow.flow_start_signal) def on_save_can_start(self, activation, sender, instance, status, **signal_kwargs): - if status == "schedulable": try: process = models.SchedulingUnitProcess.objects.get(su=instance) - except Process.DoesNotExist: activation.prepare() activation.process.su = instance activation.done() - logger.info("workflow started") + logger.info("workflow started for scheduling unit id=%s name='%s'", instance.id, instance.name) except Process.MultipleObjectsReturned: logger.info("QA Workflow for process %s already exists",process) - else: - logger.info("no workflow started") + return activation @@ -213,12 +211,11 @@ class SchedulingUnitFlow(Flow): condition = instance.status == "scheduled" return condition - def check_condition_processed(self, activation, instance): - if instance is None: - instance = activation.process.su + @method_decorator(flow.flow_signal) + def check_condition_processed(self, activation, **signal_kwargs): + activation.prepare() + activation.done() - condition = instance.status == "processed" - return condition def get_scheduling_unit_task(self, flow_task, sender, instance, **kwargs): process = models.SchedulingUnitProcess.objects.get(su=instance) diff --git a/SAS/TMSS/src/tmss/workflowapp/migrations/0001_initial.py b/SAS/TMSS/src/tmss/workflowapp/migrations/0001_initial.py index 8119f3254e3b5d89bd1593f16b715b9d6f2d0d7a..34cf8652d9baf350166681742771f206f4cb05d7 100644 --- a/SAS/TMSS/src/tmss/workflowapp/migrations/0001_initial.py +++ b/SAS/TMSS/src/tmss/workflowapp/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 3.0.9 on 2020-12-02 20:16 +# Generated by Django 3.0.9 on 2020-12-15 08:23 from django.db import migrations, models import django.db.models.deletion @@ -34,7 +34,7 @@ class Migration(migrations.Migration): fields=[ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ('sos_report', models.CharField(max_length=150)), - ('quality_within_policy', models.CharField(max_length=150)), + ('quality_within_policy', models.BooleanField(default=False)), ('sos_accept_show_pi', models.BooleanField(default=False)), ], ), diff --git a/SAS/TMSS/src/tmss/workflowapp/models/schedulingunitflow.py b/SAS/TMSS/src/tmss/workflowapp/models/schedulingunitflow.py index d33f462ed653833794709d701e3d0e0be47f05a4..3c1ed87a6036fe77867ad2cce289a7a3776a65b1 100644 --- a/SAS/TMSS/src/tmss/workflowapp/models/schedulingunitflow.py +++ b/SAS/TMSS/src/tmss/workflowapp/models/schedulingunitflow.py @@ -15,7 +15,7 @@ class QAReportingTO(Model): class QAReportingSOS(Model): sos_report = CharField(max_length=150) - quality_within_policy = CharField(max_length=150) + quality_within_policy = BooleanField(default=False) sos_accept_show_pi = BooleanField(default=False) diff --git a/SAS/TMSS/src/tmss/workflowapp/signals.py b/SAS/TMSS/src/tmss/workflowapp/signals.py deleted file mode 100644 index 6087fb1615c6b7a8a5c33f897a4e1cbcce36c6f2..0000000000000000000000000000000000000000 --- a/SAS/TMSS/src/tmss/workflowapp/signals.py +++ /dev/null @@ -1,3 +0,0 @@ -import django.dispatch - -scheduling_unit_blueprint_signal = django.dispatch.Signal() \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/workflowapp/signals/CMakeLists.txt b/SAS/TMSS/src/tmss/workflowapp/signals/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..a28e632130d90b13240f87413a08c3656aa9846e --- /dev/null +++ b/SAS/TMSS/src/tmss/workflowapp/signals/CMakeLists.txt @@ -0,0 +1,11 @@ + +include(PythonInstall) + +set(_py_files + __init__.py + subcannotproceed.py + substatuschanged.py + ) + +python_install(${_py_files} + DESTINATION lofar/sas/tmss/tmss/workflowapp/signals) diff --git a/SAS/TMSS/src/tmss/workflowapp/signals/__init__.py b/SAS/TMSS/src/tmss/workflowapp/signals/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..64cc77936b56fd2055119cb66ec8a210b5c5ad6c --- /dev/null +++ b/SAS/TMSS/src/tmss/workflowapp/signals/__init__.py @@ -0,0 +1,2 @@ +from .subcannotproceed import * +from .substatuschanged import * \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/workflowapp/signals/subcannotproceed.py b/SAS/TMSS/src/tmss/workflowapp/signals/subcannotproceed.py new file mode 100644 index 0000000000000000000000000000000000000000..60c50ad80009d186b204ffb653655a2f73052603 --- /dev/null +++ b/SAS/TMSS/src/tmss/workflowapp/signals/subcannotproceed.py @@ -0,0 +1,3 @@ +import django.dispatch + +scheduling_unit_blueprint_status_changed_signal = django.dispatch.Signal() \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/workflowapp/signals/substatuschanged.py b/SAS/TMSS/src/tmss/workflowapp/signals/substatuschanged.py new file mode 100644 index 0000000000000000000000000000000000000000..04f2913b8e1136c68b303b545bab1f189f34688d --- /dev/null +++ b/SAS/TMSS/src/tmss/workflowapp/signals/substatuschanged.py @@ -0,0 +1,3 @@ +import django.dispatch + +scheduling_unit_blueprint_cannot_proceed_signal = django.dispatch.Signal() \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py b/SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py index 78b15269f3860b62b89a3575700723cb670c2d20..6762e5047c80a58dc62daa7dc76efe7a8da29aef 100755 --- a/SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py +++ b/SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py @@ -6,19 +6,23 @@ import logging logger = logging.getLogger(__name__) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) -logger.warning("TODO: Fix test, look into synchronization issues. Skipped test for now with exit(3)") -exit(3) - from lofar.common.test_utils import skip_integration_tests if skip_integration_tests(): exit(3) -from lofar.messaging.messagebus import TemporaryExchange +from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor +from lofar.sas.tmss.services.workflow_service import create_workflow_service, SchedulingUnitEventMessageHandler + +from time import sleep +from datetime import datetime, timedelta import uuid - +from threading import Thread, Event +from lofar.sas.tmss.client.tmssbuslistener import * -class SchedulingUnitFlowTest(unittest.TestCase): + +class SchedulingUnitFlowTest(unittest.TestCase): + @classmethod def setUpClass(cls) -> None: cls.TEST_UUID = uuid.uuid1() @@ -39,8 +43,8 @@ class SchedulingUnitFlowTest(unittest.TestCase): cls.ra_test_env.start() cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_schemas=True, populate_test_data=False, - start_subtask_scheduler=False, start_postgres_listener=True, start_ra_test_environment=True, - start_dynamic_scheduler=False, enable_viewflow=True, start_workflow_service=True) + start_subtask_scheduler=False, start_postgres_listener=True, start_ra_test_environment=False, + start_dynamic_scheduler=False, enable_viewflow=True, start_workflow_service=False) cls.tmss_test_env.start() @@ -50,7 +54,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): cls.ra_test_env.stop() cls.tmp_exchange.close() - def test_qa_workflow(self): from lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow import SchedulingUnitFlow @@ -60,52 +63,110 @@ class SchedulingUnitFlowTest(unittest.TestCase): from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess from viewflow.models import Task + import time + + sync_event_bp_scheduled = Event() + sync_event_bp_cannot_proceed = Event() - #check if one QA Workflow is created after scheduling unit blueprint creation - self.assertEqual(0, len(SchedulingUnitProcess.objects.all())) - strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") - - scheduling_unit_draft = models.SchedulingUnitDraft.objects.create( - name="Test Scheduling Unit UC1", - requirements_doc=strategy_template.template, - requirements_template=strategy_template.scheduling_unit_template, - observation_strategy_template=strategy_template, - copy_reason=models.CopyReason.objects.get(value='template'), - generator_instance_doc="para", - copies=None, - scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) - - create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) - - scheduling_unit_draft.refresh_from_db() - task_drafts = scheduling_unit_draft.task_drafts.all() - scheduling_unit_blueprints = scheduling_unit_draft.scheduling_unit_blueprints.all() - scheduling_unit_blueprint = scheduling_unit_blueprints[0] - task_blueprints = scheduling_unit_blueprint.task_blueprints.all() - qa_workflow = SchedulingUnitProcess.objects.all() - self.assertEqual(1, len(qa_workflow)) - - #test that QA workflow have two tasks - self.assertEqual(2, len(Task.objects.all())) - self.assertEqual(Task.objects.get(id=1).flow_task.name, 'start') - self.assertEqual(Task.objects.get(id=1).status, 'DONE') - self.assertEqual(Task.objects.get(id=2).flow_task.name, 'wait_scheduled') - self.assertEqual(Task.objects.get(id=2).status, 'NEW') - - #Change subtask status to scheduled - for task_blueprint in task_blueprints: + + class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): + def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): + super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) + if status == "scheduled": + sync_event_bp_scheduled.set() + + def onSchedulingUnitBlueprintCannotProceed(self, id: int): + super().onSchedulingUnitBlueprintCannotProceed(id=id) + sync_event_bp_cannot_proceed.set() + + service = create_workflow_service(handler_type=TestSchedulingUnitEventMessageHandler, + exchange=self.tmp_exchange.address) + with BusListenerJanitor(service): + strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") + + scheduling_unit_draft = models.SchedulingUnitDraft.objects.create( + name="Test Scheduling Unit UC1", + requirements_doc=strategy_template.template, + requirements_template=strategy_template.scheduling_unit_template, + observation_strategy_template=strategy_template, + copy_reason=models.CopyReason.objects.get(value='template'), + generator_instance_doc="para", + copies=None, + scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) + + scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + scheduling_unit_draft.refresh_from_db() + + # there is no signal that SchedulingUnitProcess instance was created, + # so we have to wait an poll before we can proceed with the test + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.filter(su=scheduling_unit_blueprint).count() > 0: + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("SchedulingUnitProcess not created within expected time") + + # Yes! the SchedulingUnitProcess was created, let's get it. + scheduling_unit_process_id = SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).id + + #test that QA workflow have two tasks + self.assertEqual(2, Task.objects.filter(process=scheduling_unit_process_id).count()) + #test that there is jut one active task + self.assertEqual(1, SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks().count()) + #check the active task name + self.assertEqual("wait_scheduled", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[0].flow_task.name, 'start') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[0].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'NEW') + + #Change subtask status to scheduled + for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): for subtask in task_blueprint.subtasks.all(): subtask.state = models.SubtaskState.objects.get(value='scheduled') subtask.save() - #Check the QA Workflow is now with 3 Task - self.assertEqual(3, len(Task.objects.all())) - self.assertEqual(Task.objects.get(id=2).flow_task.name, 'wait_scheduled') - self.assertEqual(Task.objects.get(id=2).status, 'DONE') - self.assertEqual(Task.objects.get(id=3).flow_task.name, 'wait_processed') - self.assertEqual(Task.objects.get(id=3).status, 'NEW') - - + # wait until scheduling unit is scheduled + if not sync_event_bp_scheduled.wait(timeout=10): + raise TimeoutError() + sync_event_bp_scheduled.clear() + + #Check the QA Workflow shoulb have 3 Tasks now + self.assertEqual(3, Task.objects.filter(process=scheduling_unit_process_id).count()) + #test that there is jut one active task + self.assertEqual(1, SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks().count()) + #check the active task name + self.assertEqual("wait_processed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') + + #Change subtask status to cancelled + for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): + for subtask in task_blueprint.subtasks.all(): + subtask.state = models.SubtaskState.objects.get(value='cancelled') + subtask.save() + + if not sync_event_bp_cannot_proceed.wait(timeout=10): + raise TimeoutError() + sync_event_bp_cannot_proceed.clear() + + #Check the QA Workflow is now with 4 Task + self.assertEqual(4, Task.objects.filter(process=scheduling_unit_process_id).count()) + #test that there is jut one active task + self.assertEqual(1, SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks().count()) + #check the active task name + self.assertEqual("qa_reporting_to", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'NEW') + if __name__ == '__main__': #run the unit tests