diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py index 9f7daaa75e5b321f9b79f5b4c89e682ab4e0c4af..e65a83fd9c9a80966ac90560593446b0f28b2086 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py @@ -23,10 +23,10 @@ import logging logger = logging.getLogger(__name__) -class ConditionActivation(FuncActivation): +class ConditionalActivation(FuncActivation): @classmethod def activate(cls, flow_task, prev_activation, token): - activation = super(ConditionActivation, cls).activate(flow_task, prev_activation, token) + activation = super(ConditionalActivation, cls).activate(flow_task, prev_activation, token) if flow_task.condition_check(activation, None): # condition holds on activation @@ -35,9 +35,28 @@ class ConditionActivation(FuncActivation): return activation -class Condition(Signal): +class Conditional(Signal): + """ + This is an augmented "If" and "Signal" node, which arguably implements an BPMN Conditional (https://camunda.com/bpmn/reference/#events-conditional). + Initially this node performs a condition check and only if not met it waits for a signal. + The purpose of this node is to avoid race conditions that can occur when a signal gets send before the Signal + node is reached, but after a prior If node evaluated the corresponding TMSS property based on which it still expects + a signal. + Also see https://support.astron.nl/jira/browse/TMSS-1562 + # todo: I'm not 100% sure about this one... + # I get the race condition part, so I see why we cannot simply to an If node followed by a Signal node. + # What I don't get is why we need to augment the two, since to me they follow very different concepts. + # In BPMN, a Conditional seems to simply block on a condition check until the condition is met. This + # should not be related in any way to Signals. So while I do see that this implementation of a conditionally + # active Signal node works, I think it is A) probably not a correctly named entity ("IfElseSignal"?) and B) + # I wonder if a polling solution is more straight-forward. I would expect a (probably polling) implementation of + # a Conditional Node in Viewflow, but at least I don't see it. But I think I would find an If-Node that loops back + # on itself as long as the condition is not met quite a bit clearer than this solution. + # I can see it's nicer than polling, but somehow this mix of condition check via ORM and triggering signals on + # messages on a property change in the same node is brushing me the wrong way. + """ task_type = "HUMAN" # makes it show up in the unassigned task lists - activation_class = ConditionActivation + activation_class = ConditionalActivation def __init__(self, condition_check, signal, sender=None, task_loader=None, **kwargs): """ @@ -54,7 +73,7 @@ class Condition(Signal): sent with Task instance. """ self.condition_check = condition_check - super(Condition, self).__init__(signal, self.signal_handler, sender, task_loader, **kwargs) + super(Conditional, self).__init__(signal, self.signal_handler, sender, task_loader, **kwargs) @method_decorator(flow.flow_signal) def signal_handler(self, activation, sender, instance, **signal_kwargs): @@ -74,16 +93,21 @@ class Condition(Signal): if isinstance(self.condition_check, ThisObject): self.condition_check = getattr(self.flow_class.instance, self.condition_check.name) - super(Condition, self).ready() + super(Conditional, self).ready() @frontend.register class SchedulingUnitFlow(Flow): + """ + This Flow describes the workflow for scheduling units. Each attribute of a Flow represents a flow task. + Some tasks are done event-driven by the system. The signals are dispatched by the workflow service in response + to SU state changes on the message bus. + """ process_class = models.SchedulingUnitProcess task_class = models.SchedulingUnitTask - #STEP 1 + # 1. Start workflow on SUB becoming schedulable start = ( flow.StartSignal( scheduling_unit_blueprint_schedulable_signal, @@ -91,18 +115,21 @@ class SchedulingUnitFlow(Flow): ).Next(this.wait_scheduled) ) - #STEP 2 + # 2. Wait for SUB to get scheduled wait_scheduled = ( - flow.Signal( + flow.Conditional( + this.is_scheduled_state, scheduling_unit_blueprint_scheduled_signal, this.on_scheduling_unit_blueprint_scheduled, task_loader=this.get_scheduling_unit_task ) .Next(this.wait_observed) ) - + + # 3. Wait for SUB to get observed wait_observed = ( - flow.Signal( + flow.Conditional( + this.is_observed_state, scheduling_unit_blueprint_observed_signal, this.on_scheduling_unit_blueprint_observed, task_loader=this.get_scheduling_unit_task @@ -110,8 +137,7 @@ class SchedulingUnitFlow(Flow): .Next(this.qa_reporting_to) ) - #STEP 1 - #QA Reporting (TO) + # 4. Human action in QA Reporting (TO) view qa_reporting_to = ( flow.View( viewsets.QAReportingTOView, @@ -121,14 +147,14 @@ class SchedulingUnitFlow(Flow): ).Next(this.check_operator_accept) ) - #Quality Acceptable + # 5. if operator accepted -> 6., else -> 13F. check_operator_accept = ( flow.If(lambda activation: activation.process.qa_reporting_to.operator_accept) .Then(this.qa_reporting_sos) - .Else(this.mark_sub) + .Else(this.do_mark_sub_not_accepted) ) - #QA Reporting (SOS) + # 6. Human action in QA Reporting (SOS) view qa_reporting_sos = ( flow.View( viewsets.QAReportingSOSView, @@ -138,14 +164,14 @@ class SchedulingUnitFlow(Flow): ).Next(this.check_sos_accept_show_pi) ) - #Quality Acceptable + # 7. if SOS accepted -> 8., else -> 13F. check_sos_accept_show_pi = ( flow.If(lambda activation: activation.process.qa_reporting_sos.sos_accept_show_pi) .Then(this.pi_verification) - .Else(this.mark_sub) + .Else(this.do_mark_sub_not_accepted) ) - #PI Verification + # 8. Human action in PI Verification view pi_verification = ( flow.View( viewsets.PIVerificationView, @@ -155,7 +181,7 @@ class SchedulingUnitFlow(Flow): ).Next(this.decide_acceptance) ) - #Decide Acceptance + # 9. Human action in Decide Acceptance view decide_acceptance = ( flow.View( viewsets.DecideAcceptanceView, @@ -165,14 +191,14 @@ class SchedulingUnitFlow(Flow): ).Next(this.check_sos_accept_after_pi) ) - #STEP 3 - #Quality Acceptable + # 10. If SOS accepted after PI -> 11., else -> 13F. check_sos_accept_after_pi = ( flow.If(lambda activation: activation.process.decide_acceptance.sos_accept_after_pi) .Then(this.allow_ingest) - .Else(this.mark_sub) + .Else(this.do_mark_sub_not_accepted) ) + # 11. set ingest permission of SUB allow_ingest = ( flow.Handler( this.signal_SUB_allow_ingest, @@ -180,28 +206,38 @@ class SchedulingUnitFlow(Flow): .Next(this.ingest_done) ) - ingest_done = ( - Condition( - this.check_ingest_done, + # 12. wait for ingest to be done (if needed) + wait_ingest_done = ( + Conditional( + this.is_ingest_done, scheduling_unit_blueprint_ingested_signal, task_loader=this.get_scheduling_unit_task ) - .Next(this.mark_sub) + .Next(this.mark_sub_accepted) ) - #Mark SUB Successful/failed - mark_sub = ( + # 13. Mark SUB as accepted + mark_sub_accepted = ( flow.Handler( - this.do_mark_sub + this.do_mark_sub_accepted ).Next(this.check_data_pinned) ) + # 13-F. Mark SUB as not accepted + mark_sub_not_accepted = ( + flow.Handler( + this.do_mark_sub_not_accepted + ).Next(this.check_data_pinned) # Is this useful or can unaccepted data always simply be removed? + ) + + # 14. if output pinned -> 15., else -> 16. check_data_pinned = ( - flow.If(lambda activation: activation.process.su.output_pinned) + flow.If(lambda activation: activation.process.su.output_pinned) # todo: this is currently always true, because it gets set in do_mark_sub .Then(this.unpin_data) .Else(this.delete_data) ) + # 15. Human action in Unpin Data view. unpin_data = ( flow.View( viewsets.UnpinDataView, @@ -211,12 +247,14 @@ class SchedulingUnitFlow(Flow): ).Next(this.delete_data) ) + # 16. Delete the data # TODO: should there not be a 'check_unpin_decision' prior to this? delete_data = ( flow.Handler( - this.do_delete_data + this.do_delete_data # todo: this currently does not do anything ).Next(this.end) ) - + + # 17. End end = flow.End() @method_decorator(flow.flow_start_signal) @@ -230,7 +268,7 @@ class SchedulingUnitFlow(Flow): activation.done() logger.info("workflow process with id=%s started for scheduling unit id=%s name='%s'", activation.process.id, scheduling_unit_blueprint.id, scheduling_unit_blueprint.name) except Process.MultipleObjectsReturned: - logger.info("QA Workflow for process %s already exists",process) + logger.info("Could not start QA workflow process for SUB id=%s because there are already multiple SchedulingUnitProcess objects for it.", scheduling_unit_blueprint.id) return activation @@ -248,18 +286,21 @@ class SchedulingUnitFlow(Flow): logger.info('QA workflow on_scheduling_unit_blueprint_observed scheduling_unit id=%s flow_process=(\'%s\', %s) flow_task=(\'%s\', %s)', scheduling_unit_blueprint.id, str(activation.process), activation.process.status, str(activation.flow_task), activation.status) - def do_mark_sub(self, activation): + def do_mark_sub(self, activation, accepted: bool): + activation.process.su.output_pinned = True # todo: why is this set here? + activation.process.results_accepted = accepted - activation.process.su.output_pinned = True - activation.process.results_accepted = ((activation.process.qa_reporting_to is not None and activation.process.qa_reporting_to.operator_accept) - and (activation.process.qa_reporting_sos is not None and activation.process.qa_reporting_sos.sos_accept_show_pi) - and (activation.process.decide_acceptance is not None and activation.process.decide_acceptance.sos_accept_after_pi)) - activation.process.su.save() activation.process.save() return activation - def do_delete_data(self, activation): + def do_mark_sub_accepted(self, activation): + return self.do_mark_sub(activation, True) + + def do_mark_sub_not_accepted(self, activation): + return self.do_mark_sub(activation, False) + + def do_delete_data(self, activation): # todo: this currently does not actually do anything... activation.process.su.save() activation.process.save() return activation @@ -273,19 +314,26 @@ class SchedulingUnitFlow(Flow): activation.process.su.save() activation.process.save() - - - def check_ingest_done(self, activation, instance): + + def check_SchedulingUnitBlueprint_status(self, activation, instance, expected_status): # todo: deduplicate code if instance is None: instance = activation.process.su - - logger.info("[check ingest done] checking on %s, status %s", instance, instance.status) + + logger.info("[check scheduling unit status] checking on %s, status=%s, expected_status=%s", instance, instance.status, expected_status) from lofar.sas.tmss.tmss.tmssapp.models.specification import SchedulingUnitStatus instance.refresh_from_db() - condition = instance.status.value == SchedulingUnitStatus.Choices.FINISHED.value + condition = instance.status.value == expected_status return condition + def is_ingest_done(self, activation, instance): + return checkSchedulingUnitBlueprint(activation, instance, SchedulingUnitStatus.Choices.FINISHED.value) + + def is_scheduled_state(self, activation, instance): + return checkSchedulingUnitBlueprint(activation, instance, SchedulingUnitStatus.Choices.SCHEDULED.value) + + def is_observed_state(self, activation, instance): # todo: deduplicate code + return checkSchedulingUnitBlueprint(activation, instance, SchedulingUnitStatus.Choices.OBSERVED.value) def get_scheduling_unit_task(self, flow_task, sender, scheduling_unit_blueprint, **kwargs): try: diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/models/schedulingunitflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/models/schedulingunitflow.py index 680fc20c830636baacad541b65354bfad6a92c1b..8d8b688df9fa49747ccbad35d8fa93c6d56837a0 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/models/schedulingunitflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/models/schedulingunitflow.py @@ -36,6 +36,25 @@ class SchedulingUnitTask(Task, ProjectPropertyMixin): path_to_project ='flow_process__su__draft__scheduling_set__project' class SchedulingUnitProcess(Process, ProjectPropertyMixin): + """ + The viewflow process object contains attributes that capture the state of a running workflow. + # TODO: Review this when we implement another workflow. + # Is this setup with separate data models for the individual steps reasonable? + # This could simply be a collection of the fields in the individual models. + # It seems as if this is explicitly modeled so that we can have separate views outside of what viewflow offers and + # if I recall correctly than this was mainly driven by the wish that e.g. a report can be changed later. + # This implementation is insofar understandable as the viewflow REST stuff is a non-free feature and we kind of + # created our own viewsets now. But if this is purely to allow alterations at a later point in time, then I would + # give this another thought. It probably makes more sense to come up with a workflow that matches the order of how + # things are actually done then to find ways to allow meddling with things at a point in time where decisions have + # already been made based on the original value. + # This whole setup with tons of workflow-specific models/serializers/views feels very clunky and non-generic for + # something that should be relatively lightweight to implement. + # Also, at least I find this quite confusing because instead of a simple state container, we now have models that + # represent individual steps of a workflow, i.e. their objects represent individual tasks and we have a REST + # interface to use them. However, in viewflow logic there is a separate model for (all kinds of) tasks that is + # usually the interface for the user to interact with the system. + """ su = ForeignKey(SchedulingUnitBlueprint, blank=True, null=True, on_delete=CASCADE) qa_reporting_to = ForeignKey(QAReportingTO, blank=True, null=True, related_name='flow_process', on_delete=CASCADE) qa_reporting_sos = ForeignKey(QAReportingSOS, blank=True, null=True, related_name='flow_process', on_delete=CASCADE)