Skip to content
Snippets Groups Projects
Commit 024d89e0 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-419: proper setup of workflow test. The test still fails, and can be improved a lot.

parent 6f1485ee
No related branches found
No related tags found
1 merge request!268Resolve TMSS-419
...@@ -10,80 +10,95 @@ from lofar.common.test_utils import skip_integration_tests ...@@ -10,80 +10,95 @@ from lofar.common.test_utils import skip_integration_tests
if skip_integration_tests(): if skip_integration_tests():
exit(3) exit(3)
# Do Mandatory setup step: from lofar.messaging.messagebus import TemporaryExchange
# use setup/teardown magic for tmss test database, ldap server and django server import uuid
# (ignore pycharm unused import statement, python unittests does use at RunTime the tmss_test_environment_unittest_setup module)
from lofar.sas.tmss.test.tmss_test_environment_unittest_setup import *
tmss_test_env.populate_schemas()
from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment
from lofar.sas.tmss.test.tmss_test_data_django_models import *
# import and setup rest test data creator
from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
rest_data_creator = TMSSRESTTestDataCreator(BASE_URL, AUTH)
from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset
from lofar.sas.tmss.tmss.tmssapp.adapters.sip import generate_sip_for_dataproduct
from lofar.sas.tmss.tmss.tmssapp.adapters.feedback import generate_dataproduct_feedback_from_subtask_feedback_and_set_finished
from lofar.common.json_utils import get_default_json_object_for_schema
from lofar.lta.sip import constants
from datetime import datetime, timedelta
from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.subtasks import *
from lofar.sas.tmss.tmss.tmssapp.tasks import *
from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import *
from viewflow.models import Task
class SchedulingUnitFlowTest(unittest.TestCase): class SchedulingUnitFlowTest(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.TEST_UUID = uuid.uuid1()
cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
cls.tmp_exchange.open()
# override DEFAULT_BUSNAME
import lofar
lofar.messaging.config.DEFAULT_BUSNAME = cls.tmp_exchange.address
# import here, and not at top of module, because DEFAULT_BUSNAME needs to be set before importing
from lofar.sas.resourceassignment.resourceassigner.test.ra_test_environment import RATestEnvironment
from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment
from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
cls.ra_test_env = RATestEnvironment(exchange=cls.tmp_exchange.address)
cls.ra_test_env.start()
cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_schemas=True, populate_test_data=False,
start_subtask_scheduler=False, start_postgres_listener=True, start_ra_test_environment=True,
start_dynamic_scheduler=False, enable_viewflow=True, start_workflow_service=True)
cls.tmss_test_env.start()
@classmethod
def tearDownClass(cls) -> None:
cls.tmss_test_env.stop()
cls.ra_test_env.stop()
cls.tmp_exchange.close()
def test_qa_workflow(self): def test_qa_workflow(self):
#check if one QA Workflow is created after scheduling unit blueprint creation from lofar.sas.tmss.tmss.tmssapp import models
self.assertEqual(0, len(SchedulingUnitProcess.objects.all())) from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft
strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data
scheduling_unit_draft = models.SchedulingUnitDraft.objects.create( from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess
name="Test Scheduling Unit UC1", from viewflow.models import Task
requirements_doc=strategy_template.template,
requirements_template=strategy_template.scheduling_unit_template, #check if one QA Workflow is created after scheduling unit blueprint creation
observation_strategy_template=strategy_template, self.assertEqual(0, len(SchedulingUnitProcess.objects.all()))
copy_reason=models.CopyReason.objects.get(value='template'), strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines")
generator_instance_doc="para",
copies=None, scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(
scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) name="Test Scheduling Unit UC1",
requirements_doc=strategy_template.template,
create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) requirements_template=strategy_template.scheduling_unit_template,
observation_strategy_template=strategy_template,
scheduling_unit_draft.refresh_from_db() copy_reason=models.CopyReason.objects.get(value='template'),
self.task_drafts = scheduling_unit_draft.task_drafts.all() generator_instance_doc="para",
self.scheduling_unit_blueprints = scheduling_unit_draft.scheduling_unit_blueprints.all() copies=None,
self.scheduling_unit_blueprint = self.scheduling_unit_blueprints[0] scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data()))
self.task_blueprints = self.scheduling_unit_blueprint.task_blueprints.all()
self.qa_workflow = SchedulingUnitProcess.objects.all() create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft)
self.assertEqual(1, len(self.qa_workflow)) scheduling_unit_draft.refresh_from_db()
self.task_drafts = scheduling_unit_draft.task_drafts.all()
#test that QA workflow have two tasks self.scheduling_unit_blueprints = scheduling_unit_draft.scheduling_unit_blueprints.all()
self.assertEqual(2, len(Task.objects.all())) self.scheduling_unit_blueprint = self.scheduling_unit_blueprints[0]
self.assertEqual(Task.objects.get(id=1).flow_task.name, 'start') self.task_blueprints = self.scheduling_unit_blueprint.task_blueprints.all()
self.assertEqual(Task.objects.get(id=1).status, 'DONE') self.qa_workflow = SchedulingUnitProcess.objects.all()
self.assertEqual(Task.objects.get(id=2).flow_task.name, 'wait_scheduled')
self.assertEqual(Task.objects.get(id=2).status, 'NEW') self.assertEqual(1, len(self.qa_workflow))
#Change subtask status to scheduled #test that QA workflow have two tasks
for task_blueprint in self.task_blueprints: self.assertEqual(2, len(Task.objects.all()))
for subtask in task_blueprint.subtasks.all(): self.assertEqual(Task.objects.get(id=1).flow_task.name, 'start')
subtask.state = models.SubtaskState.objects.get(value='scheduled') self.assertEqual(Task.objects.get(id=1).status, 'DONE')
subtask.save() self.assertEqual(Task.objects.get(id=2).flow_task.name, 'wait_scheduled')
self.assertEqual(Task.objects.get(id=2).status, 'NEW')
#Check the QA Workflow is now with 3 Task
self.assertEqual(3, len(Task.objects.all())) #Change subtask status to scheduled
self.assertEqual(Task.objects.get(id=2).flow_task.name, 'wait_scheduled') for task_blueprint in self.task_blueprints:
self.assertEqual(Task.objects.get(id=2).status, 'DONE') for subtask in task_blueprint.subtasks.all():
self.assertEqual(Task.objects.get(id=3).flow_task.name, 'wait_processed') subtask.state = models.SubtaskState.objects.get(value='scheduled')
self.assertEqual(Task.objects.get(id=3).status, 'NEW') subtask.save()
#Check the QA Workflow is now with 3 Task
self.assertEqual(3, len(Task.objects.all()))
self.assertEqual(Task.objects.get(id=2).flow_task.name, 'wait_scheduled')
self.assertEqual(Task.objects.get(id=2).status, 'DONE')
self.assertEqual(Task.objects.get(id=3).flow_task.name, 'wait_processed')
self.assertEqual(Task.objects.get(id=3).status, 'NEW')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment