diff --git a/SAS/TMSS/services/workflow_service/lib/workflow_service.py b/SAS/TMSS/services/workflow_service/lib/workflow_service.py index 885d1905f98bba5e3fe465d1c89d309ee1d9586e..c300c7f70b1a04be57fb2edc3c9c49a5b4035253 100644 --- a/SAS/TMSS/services/workflow_service/lib/workflow_service.py +++ b/SAS/TMSS/services/workflow_service/lib/workflow_service.py @@ -52,8 +52,26 @@ class SchedulingUnitEventMessageHandler(TMSSEventMessageHandler): scheduling_unit_blueprint_cannot_proceed_signal.send(sender=self.__class__, instance=scheduling_unit_blueprint) except Exception as e: logger.error(e) + + def onTaskBlueprintStatusChanged(self, id: int, status:str): + try: + + from lofar.sas.tmss.tmss.tmssapp.models import TaskBlueprint, TaskType, SchedulingUnitBlueprint + from lofar.sas.tmss.tmss.workflowapp.signals import ingest_task_blueprint_status_changed_signal + + logger.info("TaskBlueprint id=%s status changed, signalling workflow...", id) + task_blueprint = TaskBlueprint.objects.get(pk=id) + + logger.info("TaskBlueprint type is %s , signalling workflow...", task_blueprint.specifications_template.type) + + if task_blueprint.specifications_template.type.value == TaskType.Choices.INGEST.value: + logger.info("[INGEST]TaskBlueprint type is %s , signalling workflow...", task_blueprint.specifications_template.type) + scheduling_unit_blueprint = SchedulingUnitBlueprint.objects.get(pk=task_blueprint.scheduling_unit_blueprint_id) + ingest_task_blueprint_status_changed_signal.send(sender=self.__class__, instance=scheduling_unit_blueprint, status=status) + + except Exception as e: + logger.error(e) - def create_workflow_service(handler_type = SchedulingUnitEventMessageHandler, exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): return TMSSBusListener(handler_type=handler_type, handler_kwargs={}, diff --git a/SAS/TMSS/src/tmss/workflowapp/flows/schedulingunitflow.py b/SAS/TMSS/src/tmss/workflowapp/flows/schedulingunitflow.py index ace997e82c60b5f66f5539e161e91f1182d9a812..63f42e77f0788b0dba1f8ab0f19c18350dedf89a 100644 --- a/SAS/TMSS/src/tmss/workflowapp/flows/schedulingunitflow.py +++ b/SAS/TMSS/src/tmss/workflowapp/flows/schedulingunitflow.py @@ -15,7 +15,7 @@ from .. import forms from lofar.sas.tmss.tmss.tmssapp.models import Subtask from django.dispatch import receiver -from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_status_changed_signal, scheduling_unit_blueprint_cannot_proceed_signal +from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_status_changed_signal, scheduling_unit_blueprint_cannot_proceed_signal, ingest_task_blueprint_status_changed_signal from viewflow import frontend, ThisObject from viewflow.activation import STATUS @@ -85,18 +85,7 @@ class Condition(Signal): @frontend.register class SchedulingUnitFlow(Flow): - """ - def __init__(self): - self.event_bus = ToBus(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) - - - def __exit__(self, exc_type, exc_val, exc_tb): - logger.info("[WORKFLOW] Closing event bus to communicate with TMSS on %s ", self.event_bus.exchange) - super().stop() - self.event_bus.close() - """ - - + process_class = models.SchedulingUnitProcess #STEP 1 @@ -104,8 +93,7 @@ class SchedulingUnitFlow(Flow): flow.StartSignal( scheduling_unit_blueprint_status_changed_signal, this.on_save_can_start, - ).Next(this.allow_ingest) - #Next(this.wait_scheduled) + ).Next(this.wait_scheduled) ) #STEP 2 @@ -194,18 +182,17 @@ class SchedulingUnitFlow(Flow): flow.Handler( this.signal_SUB_allow_ingest, ) - .Next(this.mark_sub) + .Next(this.ingest_done) ) - """ + ingest_done = ( Condition( this.check_ingest_done, - scheduling_unit_blueprint_signal, + ingest_task_blueprint_status_changed_signal, task_loader=this.get_scheduling_unit_task ) .Next(this.mark_sub) ) - """ #Mark SUB Successful/failed mark_sub = ( @@ -263,12 +250,12 @@ class SchedulingUnitFlow(Flow): self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.AllowIngest', payload) activation.process.save() - + def _sendNotification(self, subject, content): try: event_bus = ToBus(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER) - logger.info("[WORKFLOW] Opening event bus to communicate with TMSS on %s ", event_bus.exchange); + logger.info("[WORKFLOW] Opening event bus to communicate with TMSS on %s ", event_bus.exchange) event_bus.open() @@ -281,13 +268,14 @@ class SchedulingUnitFlow(Flow): event_bus.close() except Exception as e: logger.error(str(e)) - def check_ingest_done(self, activation, instance): if instance is None: instance = activation.process.su - condition = instance.status == "XXXXX" + logger.info("[check ingest done] checking on %s, status %s", instance, instance.status) + + condition = instance.status == "finished" return condition diff --git a/SAS/TMSS/src/tmss/workflowapp/signals/CMakeLists.txt b/SAS/TMSS/src/tmss/workflowapp/signals/CMakeLists.txt index a28e632130d90b13240f87413a08c3656aa9846e..f35ffe6bb27eb4cd1c440c4023816b8cf5d2c9ef 100644 --- a/SAS/TMSS/src/tmss/workflowapp/signals/CMakeLists.txt +++ b/SAS/TMSS/src/tmss/workflowapp/signals/CMakeLists.txt @@ -5,6 +5,7 @@ set(_py_files __init__.py subcannotproceed.py substatuschanged.py + ingesttaskstatuschanged.py ) python_install(${_py_files} diff --git a/SAS/TMSS/src/tmss/workflowapp/signals/__init__.py b/SAS/TMSS/src/tmss/workflowapp/signals/__init__.py index 64cc77936b56fd2055119cb66ec8a210b5c5ad6c..0b64dfb537bbe4ae29286689baa7e10c867d95f0 100644 --- a/SAS/TMSS/src/tmss/workflowapp/signals/__init__.py +++ b/SAS/TMSS/src/tmss/workflowapp/signals/__init__.py @@ -1,2 +1,3 @@ from .subcannotproceed import * -from .substatuschanged import * \ No newline at end of file +from .substatuschanged import * +from .ingesttaskstatuschanged import * \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/workflowapp/signals/ingesttaskstatuschanged.py b/SAS/TMSS/src/tmss/workflowapp/signals/ingesttaskstatuschanged.py new file mode 100644 index 0000000000000000000000000000000000000000..e2bfcb4e4c616ec8cec38c1a202e270b18735734 --- /dev/null +++ b/SAS/TMSS/src/tmss/workflowapp/signals/ingesttaskstatuschanged.py @@ -0,0 +1,3 @@ +import django.dispatch + +ingest_task_blueprint_status_changed_signal = django.dispatch.Signal() \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py b/SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py index d6c1006396f562c9b02871e4716a2efdd207aeb2..e5d09be1737400bbe73fce450dcbb9957865e0e8 100755 --- a/SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py +++ b/SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py @@ -192,18 +192,107 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'ASSIGNED') - + #API: Perform qa_reporting_to step headers = {'content-type': 'application/json'} data = '{"operator_report": "Test report", "operator_accept": true}' response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[4].flow_task.name, 'check_operator_accept') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[4].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].flow_task.name, 'qa_reporting_sos') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) content = response.content.decode('utf-8') r_dict = json.loads(content) self.assertEqual(200,response.status_code) - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'DONE') + #API: Perform qa_reporting_sos step + headers = {'content-type': 'application/json'} + data = '{"sos_report": "Test report", "quality_within_policy": true, "sos_accept_show_pi": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].flow_task.name, 'qa_reporting_sos') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[6].flow_task.name, 'check_sos_accept_show_pi') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[6].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].flow_task.name, 'pi_verification') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform pi_verification step + headers = {'content-type': 'application/json'} + data = '{"pi_report": "Test report", "pi_accept": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].flow_task.name, 'pi_verification') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].flow_task.name, 'decide_acceptance') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform decide_acceptance step + headers = {'content-type': 'application/json'} + data = '{"sos_accept_after_pi": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].flow_task.name, 'decide_acceptance') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[9].flow_task.name, 'check_sos_accept_after_pi') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[9].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[10].flow_task.name, 'allow_ingest') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[10].status, 'DONE') if __name__ == '__main__': diff --git a/SAS/TMSS/src/tmss/workflowapp/viewsets/schedulingunitflow.py b/SAS/TMSS/src/tmss/workflowapp/viewsets/schedulingunitflow.py index 739c4fb25e509a7f81543bc83f8a061b0a406e08..258b4ff6e8775ab8331f4c2e0f4020479ca985d7 100644 --- a/SAS/TMSS/src/tmss/workflowapp/viewsets/schedulingunitflow.py +++ b/SAS/TMSS/src/tmss/workflowapp/viewsets/schedulingunitflow.py @@ -252,11 +252,10 @@ class SchedulingUnitTaskExecuteViewSet(mixins.CreateModelMixin, if 'qa_scheduling_unit_process_id' in kwargs: try: - process= models.SchedulingUnitProcess.objects.filter(id=self.kwargs['qa_scheduling_unit_process_id'])[0] - task = models.SchedulingUnitProcess.objects.filter(id=self.kwargs['qa_scheduling_unit_process_id'])[0].active_tasks()[0] + process= models.SchedulingUnitProcess.objects.get(pk=self.kwargs['qa_scheduling_unit_process_id']) + task = models.SchedulingUnitProcess.objects.get(pk=self.kwargs['qa_scheduling_unit_process_id']).active_tasks()[0] view = task.flow_task._view_class.as_view() - act=task.activate() act.prepare() @@ -267,7 +266,7 @@ class SchedulingUnitTaskExecuteViewSet(mixins.CreateModelMixin, request._request.POST['_viewflow_activation-started'] = timezone.now() request._request.POST['_done'] = '' - response = view(request._request, flow_class=flows.SchedulingUnitFlow, flow_task=flows.SchedulingUnitFlow.qa_reporting_to, + response = view(request._request, flow_class=flows.SchedulingUnitFlow, flow_task=task.flow_task, process_pk=process.pk, task_pk=task.pk) content = {'Perform Task': 'Task Performed'}