diff --git a/SAS/TMSS/backend/services/workflow_service/test/t_workflow_qaworkflow.py b/SAS/TMSS/backend/services/workflow_service/test/t_workflow_qaworkflow.py index ae57faf9585853693348c96bec3c71b41fc37ec4..b1fb8448cd36d99acf5c0c8c9032cc4fafc05187 100755 --- a/SAS/TMSS/backend/services/workflow_service/test/t_workflow_qaworkflow.py +++ b/SAS/TMSS/backend/services/workflow_service/test/t_workflow_qaworkflow.py @@ -6,21 +6,13 @@ import json import logging logger = logging.getLogger('lofar.'+__name__) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - -from lofar.common.test_utils import skip_integration_tests -if skip_integration_tests(): - exit(3) - -#TODO: TMSS-656 re-enable this test -exit(3) - from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor from lofar.sas.tmss.services.workflow_service import create_workflow_service, SchedulingUnitEventMessageHandler from time import sleep -from datetime import datetime, timedelta +from datetime import timedelta import uuid -from threading import Thread, Event +from threading import Event from lofar.sas.tmss.client.tmssbuslistener import * @@ -31,7 +23,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): - from lofar.sas.tmss.tmss.tmssapp.models import SchedulingUnitBlueprint try: super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) except Exception as e: @@ -79,10 +70,8 @@ class SchedulingUnitFlowTest(unittest.TestCase): cls.tmp_exchange.close() def test_qa_workflow_complete(self): - from lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow import SchedulingUnitFlow - from lofar.sas.tmss.tmss.tmssapp import models - from lofar.sas.tmss.tmss.tmssapp.models import TaskType, SchedulingUnitBlueprint + from lofar.sas.tmss.tmss.tmssapp.models import TaskType from lofar.sas.tmss.tmss.tmssapp.tasks import create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft, update_task_graph_from_specifications_doc from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data @@ -381,8 +370,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): def test_qa_workflow_qa_reporting_to_no(self): - from lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow import SchedulingUnitFlow - from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp.models import TaskType @@ -575,8 +562,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): def test_qa_workflow_qa_reporting_sos_no(self): - from lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow import SchedulingUnitFlow - from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp.models import TaskType @@ -600,8 +585,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): update_task_graph_from_specifications_doc(scheduling_unit_draft, strategy_template.template) scheduling_unit_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) - #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, - # specifications_template__type__value=TaskType.Choices.INGEST.value) scheduling_unit_draft.refresh_from_db() # there is no signal that SchedulingUnitProcess instance was created, @@ -796,8 +779,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[11].status, 'DONE') def test_qa_workflow_qa_quality_acceptable_no(self): - from lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow import SchedulingUnitFlow - from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp.models import TaskType @@ -820,8 +801,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): update_task_graph_from_specifications_doc(scheduling_unit_draft, strategy_template.template) scheduling_unit_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) - #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, - # specifications_template__type__value=TaskType.Choices.INGEST.value) scheduling_unit_draft.refresh_from_db() # there is no signal that SchedulingUnitProcess instance was created, @@ -1075,8 +1054,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].status, 'DONE') def test_qa_workflow_qa_is_data_pinned_no(self): - from lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow import SchedulingUnitFlow - from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp.models import TaskType @@ -1100,8 +1077,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): update_task_graph_from_specifications_doc(scheduling_unit_draft, strategy_template.template) scheduling_unit_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) - #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, - # specifications_template__type__value=TaskType.Choices.INGEST.value) scheduling_unit_draft.refresh_from_db() # there is no signal that SchedulingUnitProcess instance was created, @@ -1373,8 +1348,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): scheduling_unit_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft( scheduling_unit_draft) - # ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, - # specifications_template__type__value=TaskType.Choices.INGEST.value) scheduling_unit_draft.refresh_from_db() # there is no signal that SchedulingUnitProcess instance was created, @@ -1512,7 +1485,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): # API: Assign to a user by email, as operator, which should work data = "" user_email = 'someone@astron.nl' - from requests.auth import HTTPDigestAuth response = requests.post( self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/?user_email={}'.format( task_id, user_email), json=data, auth=('operator', 'operator')) diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py index 967d57990d1bad2862ed9b78d3485347b6b11ba7..05c043910b2ade74081c74524a13242941e09464 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py @@ -10,6 +10,7 @@ from .. import viewsets from lofar.common.datetimeutils import round_to_second_precision from lofar.sas.tmss.tmss.workflowapp.signals import * +from lofar.sas.tmss.tmss.tmssapp.models.specification import SchedulingUnitStatus from viewflow import frontend, ThisObject from viewflow.activation import STATUS @@ -117,7 +118,7 @@ class SchedulingUnitFlow(Flow): # 2. Wait for SUB to get scheduled wait_scheduled = ( - flow.Conditional( + Conditional( this.is_scheduled_state, scheduling_unit_blueprint_scheduled_signal, this.on_scheduling_unit_blueprint_scheduled, @@ -128,7 +129,7 @@ class SchedulingUnitFlow(Flow): # 3. Wait for SUB to get observed wait_observed = ( - flow.Conditional( + Conditional( this.is_observed_state, scheduling_unit_blueprint_observed_signal, this.on_scheduling_unit_blueprint_observed, @@ -234,7 +235,7 @@ class SchedulingUnitFlow(Flow): check_data_pinned = ( flow.If(lambda activation: activation.process.su.output_pinned) .Then(this.decide_unpin_data) - .Else(this.delete_data) + .Else(this.wait_finished) # .Else(this.delete_data) -> todo: progress to check_cleanup_task_present when implemented ) # 15. Human action in Unpin Data view. @@ -244,7 +245,7 @@ class SchedulingUnitFlow(Flow): task_description='Unpin Data' ).Permission( auto_create=True - ).Next(this.delete_data) # -> progress to check_cleanup_task_present when implemented + ).Next(this.wait_finished) # .Else(this.delete_data) -> todo: progress to check_cleanup_task_present when implemented ) # todo: decide how to handle if there is no cleanup task in the SUB. @@ -341,22 +342,21 @@ class SchedulingUnitFlow(Flow): logger.info("[check scheduling unit status] checking on %s, status=%s, expected_status=%s", instance, instance.status, expected_status) - from lofar.sas.tmss.tmss.tmssapp.models.specification import SchedulingUnitStatus instance.refresh_from_db() condition = instance.status.value == expected_status return condition def is_ingested_state(self, activation, instance): - return checkSchedulingUnitBlueprint(activation, instance, SchedulingUnitStatus.Choices.INGESTED.value) + return self.check_SchedulingUnitBlueprint_status(activation, instance, SchedulingUnitStatus.Choices.INGESTED.value) def is_scheduled_state(self, activation, instance): - return checkSchedulingUnitBlueprint(activation, instance, SchedulingUnitStatus.Choices.SCHEDULED.value) + return self.check_SchedulingUnitBlueprint_status(activation, instance, SchedulingUnitStatus.Choices.SCHEDULED.value) def is_observed_state(self, activation, instance): # todo: deduplicate code - return checkSchedulingUnitBlueprint(activation, instance, SchedulingUnitStatus.Choices.OBSERVED.value) + return self.check_SchedulingUnitBlueprint_status(activation, instance, SchedulingUnitStatus.Choices.OBSERVED.value) def is_finished_state(self, activation, instance): # todo: deduplicate code - return checkSchedulingUnitBlueprint(activation, instance, SchedulingUnitStatus.Choices.FINISHED.value) + return self.check_SchedulingUnitBlueprint_status(activation, instance, SchedulingUnitStatus.Choices.FINISHED.value) def get_scheduling_unit_task(self, flow_task, sender, scheduling_unit_blueprint, **kwargs): try: diff --git a/SAS/TMSS/backend/test/t_reports.py b/SAS/TMSS/backend/test/t_reports.py index 62c6960932b74e43f70d64d44cadd36fc856f7e2..7addae67f792fb73aa1029cc1fd090955f2c8451 100755 --- a/SAS/TMSS/backend/test/t_reports.py +++ b/SAS/TMSS/backend/test/t_reports.py @@ -34,7 +34,7 @@ exit_with_skipped_code_if_skip_integration_tests() # (ignore pycharm unused import statement, python unittests does use at RunTime the tmss_test_environment_unittest_setup module) from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment -tmss_test_env = TMSSTestEnvironment(start_workflow_service=True, enable_viewflow=True) +tmss_test_env = TMSSTestEnvironment() try: tmss_test_env.start() tmss_test_env.populate_schemas_and_connectors() @@ -298,8 +298,7 @@ class ReportTest(unittest.TestCase): su_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(su_draft) if i in (0, 1): # Set results_accepted flag so we have a successful and a failed SUBs - results_accepted = True if i == 0 else False - su_blueprint.results_accepted = results_accepted + su_blueprint.results_accepted = i == 0 su_blueprint.save() # Set subtasks statuses, Workflow flags, etc.