diff --git a/SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py b/SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py index 61f1ec46246061129079bc906c5fe6cd9fc92541..4adf6ebd761b11aa4f5effc5867386456da918ad 100755 --- a/SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py +++ b/SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py @@ -10,80 +10,95 @@ from lofar.common.test_utils import skip_integration_tests if skip_integration_tests(): exit(3) -# Do Mandatory setup step: -# use setup/teardown magic for tmss test database, ldap server and django server -# (ignore pycharm unused import statement, python unittests does use at RunTime the tmss_test_environment_unittest_setup module) -from lofar.sas.tmss.test.tmss_test_environment_unittest_setup import * -tmss_test_env.populate_schemas() - -from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment -from lofar.sas.tmss.test.tmss_test_data_django_models import * - -# import and setup rest test data creator -from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator -rest_data_creator = TMSSRESTTestDataCreator(BASE_URL, AUTH) - -from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset -from lofar.sas.tmss.tmss.tmssapp.adapters.sip import generate_sip_for_dataproduct -from lofar.sas.tmss.tmss.tmssapp.adapters.feedback import generate_dataproduct_feedback_from_subtask_feedback_and_set_finished -from lofar.common.json_utils import get_default_json_object_for_schema -from lofar.lta.sip import constants -from datetime import datetime, timedelta -from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC -from lofar.sas.tmss.tmss.tmssapp import models -from lofar.sas.tmss.tmss.tmssapp.subtasks import * -from lofar.sas.tmss.tmss.tmssapp.tasks import * - -from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import * -from viewflow.models import Task +from lofar.messaging.messagebus import TemporaryExchange +import uuid class SchedulingUnitFlowTest(unittest.TestCase): - + + @classmethod + def setUpClass(cls) -> None: + cls.TEST_UUID = uuid.uuid1() + + cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID)) + cls.tmp_exchange.open() + + # override DEFAULT_BUSNAME + import lofar + lofar.messaging.config.DEFAULT_BUSNAME = cls.tmp_exchange.address + + # import here, and not at top of module, because DEFAULT_BUSNAME needs to be set before importing + from lofar.sas.resourceassignment.resourceassigner.test.ra_test_environment import RATestEnvironment + from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment + from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator + + cls.ra_test_env = RATestEnvironment(exchange=cls.tmp_exchange.address) + cls.ra_test_env.start() + + cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_schemas=True, populate_test_data=False, + start_subtask_scheduler=False, start_postgres_listener=True, start_ra_test_environment=True, + start_dynamic_scheduler=False, enable_viewflow=True, start_workflow_service=True) + cls.tmss_test_env.start() + + + @classmethod + def tearDownClass(cls) -> None: + cls.tmss_test_env.stop() + cls.ra_test_env.stop() + cls.tmp_exchange.close() + + def test_qa_workflow(self): - #check if one QA Workflow is created after scheduling unit blueprint creation - self.assertEqual(0, len(SchedulingUnitProcess.objects.all())) - strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") - - scheduling_unit_draft = models.SchedulingUnitDraft.objects.create( - name="Test Scheduling Unit UC1", - requirements_doc=strategy_template.template, - requirements_template=strategy_template.scheduling_unit_template, - observation_strategy_template=strategy_template, - copy_reason=models.CopyReason.objects.get(value='template'), - generator_instance_doc="para", - copies=None, - scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) - - create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) - - scheduling_unit_draft.refresh_from_db() - self.task_drafts = scheduling_unit_draft.task_drafts.all() - self.scheduling_unit_blueprints = scheduling_unit_draft.scheduling_unit_blueprints.all() - self.scheduling_unit_blueprint = self.scheduling_unit_blueprints[0] - self.task_blueprints = self.scheduling_unit_blueprint.task_blueprints.all() - self.qa_workflow = SchedulingUnitProcess.objects.all() - - self.assertEqual(1, len(self.qa_workflow)) - - #test that QA workflow have two tasks - self.assertEqual(2, len(Task.objects.all())) - self.assertEqual(Task.objects.get(id=1).flow_task.name, 'start') - self.assertEqual(Task.objects.get(id=1).status, 'DONE') - self.assertEqual(Task.objects.get(id=2).flow_task.name, 'wait_scheduled') - self.assertEqual(Task.objects.get(id=2).status, 'NEW') - - #Change subtask status to scheduled - for task_blueprint in self.task_blueprints: - for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value='scheduled') - subtask.save() - - #Check the QA Workflow is now with 3 Task - self.assertEqual(3, len(Task.objects.all())) - self.assertEqual(Task.objects.get(id=2).flow_task.name, 'wait_scheduled') - self.assertEqual(Task.objects.get(id=2).status, 'DONE') - self.assertEqual(Task.objects.get(id=3).flow_task.name, 'wait_processed') - self.assertEqual(Task.objects.get(id=3).status, 'NEW') + from lofar.sas.tmss.tmss.tmssapp import models + from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft + from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data + + from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess + from viewflow.models import Task + + #check if one QA Workflow is created after scheduling unit blueprint creation + self.assertEqual(0, len(SchedulingUnitProcess.objects.all())) + strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") + + scheduling_unit_draft = models.SchedulingUnitDraft.objects.create( + name="Test Scheduling Unit UC1", + requirements_doc=strategy_template.template, + requirements_template=strategy_template.scheduling_unit_template, + observation_strategy_template=strategy_template, + copy_reason=models.CopyReason.objects.get(value='template'), + generator_instance_doc="para", + copies=None, + scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) + + create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + + scheduling_unit_draft.refresh_from_db() + self.task_drafts = scheduling_unit_draft.task_drafts.all() + self.scheduling_unit_blueprints = scheduling_unit_draft.scheduling_unit_blueprints.all() + self.scheduling_unit_blueprint = self.scheduling_unit_blueprints[0] + self.task_blueprints = self.scheduling_unit_blueprint.task_blueprints.all() + self.qa_workflow = SchedulingUnitProcess.objects.all() + + self.assertEqual(1, len(self.qa_workflow)) + + #test that QA workflow have two tasks + self.assertEqual(2, len(Task.objects.all())) + self.assertEqual(Task.objects.get(id=1).flow_task.name, 'start') + self.assertEqual(Task.objects.get(id=1).status, 'DONE') + self.assertEqual(Task.objects.get(id=2).flow_task.name, 'wait_scheduled') + self.assertEqual(Task.objects.get(id=2).status, 'NEW') + + #Change subtask status to scheduled + for task_blueprint in self.task_blueprints: + for subtask in task_blueprint.subtasks.all(): + subtask.state = models.SubtaskState.objects.get(value='scheduled') + subtask.save() + + #Check the QA Workflow is now with 3 Task + self.assertEqual(3, len(Task.objects.all())) + self.assertEqual(Task.objects.get(id=2).flow_task.name, 'wait_scheduled') + self.assertEqual(Task.objects.get(id=2).status, 'DONE') + self.assertEqual(Task.objects.get(id=3).flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.get(id=3).status, 'NEW')