From 9f01b924884669141342ca8fd22576b6e51e0c1b Mon Sep 17 00:00:00 2001 From: unknown <n.santhanam@redkarma.eu> Date: Wed, 24 Feb 2021 19:56:26 +0530 Subject: [PATCH] TMSS-492 Updated based on Review Comments --- .../scheduling/lib/dynamic_scheduling.py | 2 + .../lib/tmss_postgres_listener.py | 19 +- .../websocket/lib/websocket_service.py | 2 +- .../workflow_service/lib/workflow_service.py | 19 +- SAS/TMSS/backend/src/remakemigrations.py | 10 - .../tmss/tmssapp/migrations/0001_initial.py | 5 +- .../tmss/tmssapp/migrations/0002_populate.py | 10 - .../src/tmss/tmssapp/models/specification.py | 8 + .../tmss/tmssapp/serializers/specification.py | 2 +- .../workflowapp/flows/schedulingunitflow.py | 38 +- .../workflowapp/migrations/0001_initial.py | 9 +- .../workflowapp/models/schedulingunitflow.py | 5 +- .../tmss/workflowapp/signals/CMakeLists.txt | 1 + .../src/tmss/workflowapp/signals/__init__.py | 3 +- .../signals/obstaskstatuschanged.py | 3 + .../tests/t_workflow_qaworkflow.py | 1171 ++++++++++++++++- .../viewsets/schedulingunitflow.py | 27 + SAS/TMSS/backend/test/t_adapter.py | 18 +- SAS/TMSS/backend/test/test_utils.py | 12 +- SAS/TMSS/client/lib/tmssbuslistener.py | 9 + .../tmss_webapp/.vscode/settings.json | 3 + SAS/TMSS/frontend/tmss_webapp/package.json | 85 ++ SAS/TMSS/frontend/tmss_webapp/src/App.css | 8 - .../src/components/JSONEditor/JEditor.js | 37 +- .../components/Timeline/CalendarTimeline.js | 5 +- .../src/layout/sass/_timeline.scss | 39 +- .../routes/Scheduling/ViewSchedulingUnit.js | 4 +- .../tmss_webapp/src/routes/Timeline/view.js | 49 +- .../signals/ingesttaskstatuschanged.py | 3 - 29 files changed, 1461 insertions(+), 145 deletions(-) create mode 100644 SAS/TMSS/backend/src/tmss/workflowapp/signals/obstaskstatuschanged.py create mode 100644 SAS/TMSS/frontend/tmss_webapp/.vscode/settings.json create mode 100644 SAS/TMSS/frontend/tmss_webapp/package.json delete mode 100644 SAS/TMSS/src/tmss/workflowapp/signals/ingesttaskstatuschanged.py diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index b9d63c82307..d921996e78f 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -40,6 +40,7 @@ from threading import Thread, Event from lofar.sas.tmss.services.scheduling.constraints import * + # LOFAR needs to have a gap in between observations to (re)initialize hardware. DEFAULT_INTER_OBSERVATION_GAP = timedelta(seconds=60) DEFAULT_NEXT_STARTTIME_GAP = timedelta(seconds=180) @@ -251,6 +252,7 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): logger.info("%s was set to %s: triggering update of dynamic schedule...", name, value) self._do_schedule_event.set() + def _scheduling_loop(self): while self._scheduling_thread_running: if self._do_schedule_event.wait(timeout=10): diff --git a/SAS/TMSS/backend/services/tmss_postgres_listener/lib/tmss_postgres_listener.py b/SAS/TMSS/backend/services/tmss_postgres_listener/lib/tmss_postgres_listener.py index 6e3ba46c7e3..6ed29be0f7a 100644 --- a/SAS/TMSS/backend/services/tmss_postgres_listener/lib/tmss_postgres_listener.py +++ b/SAS/TMSS/backend/services/tmss_postgres_listener/lib/tmss_postgres_listener.py @@ -28,6 +28,8 @@ from lofar.messaging.messagebus import ToBus from lofar.sas.tmss.client.tmssbuslistener import * from lofar.common import dbcredentials from lofar.common.util import single_line_with_single_spaces +from lofar.sas.tmss.tmss.tmssapp.models import Subtask, SchedulingUnitBlueprint, TaskBlueprint +from distutils.util import strtobool class TMSSPGListener(PostgresListener): @@ -73,6 +75,9 @@ class TMSSPGListener(PostgresListener): self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'delete')) self.subscribe('tmssapp_taskblueprint_delete', self.onTaskBlueprintDeleted) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'update', column_name='output_pinned', quote_column_value=False)) + self.subscribe('tmssapp_taskblueprint_update_column_output_pinned'[:63], self.onTaskBlueprintOutputPinningUpdated) + # TaskDraft self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskdraft', 'insert')) @@ -172,7 +177,6 @@ class TMSSPGListener(PostgresListener): def onSubTaskStateUpdated(self, payload = None): payload_dict = json.loads(payload) # send notification for this subtask... - from lofar.sas.tmss.tmss.tmssapp.models import Subtask subtask = Subtask.objects.get(id=payload_dict['id']) self._sendNotification(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.'+subtask.state.value.capitalize(), {'id': subtask.id, 'status': subtask.state.value}) @@ -198,6 +202,14 @@ class TMSSPGListener(PostgresListener): def onTaskBlueprintDeleted(self, payload = None): self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Deleted', payload) + def onTaskBlueprintOutputPinningUpdated(self, payload = None): + self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.OutputPinningUpdated', payload) + + if isinstance(payload, str): + payload = json.loads(payload) + task_blueprint = TaskBlueprint.objects.get(id=payload['id']) + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', {'id': task_blueprint.scheduling_unit_blueprint.id}) + def onTaskDraftInserted(self, payload = None): self._sendNotification(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Created', payload) @@ -215,15 +227,12 @@ class TMSSPGListener(PostgresListener): if isinstance(payload, str): payload = json.loads(payload) - - from lofar.sas.tmss.tmss.tmssapp.models import SchedulingUnitBlueprint scheduling_unit_blueprint = SchedulingUnitBlueprint.objects.get(id=payload['id']) if not scheduling_unit_blueprint.can_proceed: self.onSchedulingUnitBlueprintCannotProceed( {'id': scheduling_unit_blueprint.id}) def onSchedulingUnitBlueprintIngestPermissionGranted(self, payload=None): - logger.info("onSchedulingUnitBlueprintIngestPermissionGranted payload=%s", payload) self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX + '.IngestPermissionGranted', payload) def onSchedulingUnitBlueprintDeleted(self, payload = None): @@ -264,7 +273,7 @@ class TMSSPGListener(PostgresListener): payload = json.loads(payload) payload['name'] = payload['name_id'] del payload['name_id'] - payload['value'] = payload['value'] in ('true', 'True', 't') + payload['value'] = strtobool(payload['value']) self._sendNotification(TMSS_SETTING_OBJECT_EVENT_PREFIX+'.Updated', payload) def onSchedulingUnitBlueprintCannotProceed(self, payload = None): diff --git a/SAS/TMSS/backend/services/websocket/lib/websocket_service.py b/SAS/TMSS/backend/services/websocket/lib/websocket_service.py index 64aa14b82dc..698c4e70be8 100644 --- a/SAS/TMSS/backend/services/websocket/lib/websocket_service.py +++ b/SAS/TMSS/backend/services/websocket/lib/websocket_service.py @@ -79,7 +79,7 @@ class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler): self.t = Thread(target=start_ws_server) self.t.start() - if not socket_started_event.wait(3): + if not socket_started_event.wait(10): raise RuntimeError("Could not start websocket server on port %s"%self.websocket_port) super().start_handling() diff --git a/SAS/TMSS/backend/services/workflow_service/lib/workflow_service.py b/SAS/TMSS/backend/services/workflow_service/lib/workflow_service.py index 829dea74683..656b4a8b18d 100644 --- a/SAS/TMSS/backend/services/workflow_service/lib/workflow_service.py +++ b/SAS/TMSS/backend/services/workflow_service/lib/workflow_service.py @@ -57,20 +57,23 @@ class SchedulingUnitEventMessageHandler(TMSSEventMessageHandler): try: from lofar.sas.tmss.tmss.tmssapp.models import TaskBlueprint, TaskType, SchedulingUnitBlueprint - from lofar.sas.tmss.tmss.workflowapp.signals import ingest_task_blueprint_status_changed_signal - logger.info("TaskBlueprint id=%s status changed, signalling workflow...", id) task_blueprint = TaskBlueprint.objects.get(pk=id) - logger.info("TaskBlueprint type is %s , signalling workflow...", task_blueprint.specifications_template.type) - - if task_blueprint.specifications_template.type.value == TaskType.Choices.INGEST.value: - logger.info("[INGEST]TaskBlueprint type is %s , signalling workflow...", task_blueprint.specifications_template.type) + if task_blueprint.specifications_template.type.value in (TaskType.Choices.OBSERVATION.value, TaskType.Choices.INGEST.value): + logger.info("TaskBlueprint id=%s type=%s , signalling workflow...", task_blueprint.id, task_blueprint.specifications_template.type) scheduling_unit_blueprint = SchedulingUnitBlueprint.objects.get(pk=task_blueprint.scheduling_unit_blueprint_id) - ingest_task_blueprint_status_changed_signal.send(sender=self.__class__, instance=scheduling_unit_blueprint, status=status) + + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: + from lofar.sas.tmss.tmss.workflowapp.signals import obs_task_status_changed_signal + obs_task_status_changed_signal.send(sender=self.__class__, instance=scheduling_unit_blueprint, status=status) + + if task_blueprint.specifications_template.type.value == TaskType.Choices.INGEST.value: + from lofar.sas.tmss.tmss.workflowapp.signals import ingest_task_blueprint_status_changed_signal + ingest_task_blueprint_status_changed_signal.send(sender=self.__class__, instance=scheduling_unit_blueprint, status=status) except Exception as e: - logger.error(e) + logger.error(e) def create_workflow_service(handler_type = SchedulingUnitEventMessageHandler, exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): return TMSSBusListener(handler_type=handler_type, diff --git a/SAS/TMSS/backend/src/remakemigrations.py b/SAS/TMSS/backend/src/remakemigrations.py index a80266cbb4a..6a4ee430ffd 100755 --- a/SAS/TMSS/backend/src/remakemigrations.py +++ b/SAS/TMSS/backend/src/remakemigrations.py @@ -75,16 +75,6 @@ class Migration(migrations.Migration): # Start SubTask id with 2 000 000 to avoid overlap with 'old' (test/production) OTDB operations = [ migrations.RunSQL('ALTER SEQUENCE tmssapp_SubTask_id_seq RESTART WITH 2000000;'), - migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_taskblueprintsummary; " - "CREATE OR REPLACE VIEW tmssapp_taskblueprintsummary AS " - "SELECT tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_subtask.id AS subtask_id, tmssapp_subtask.state_id AS substate, tmssapp_subtasktemplate.type_id AS subtask_type" - " FROM tmssapp_subtask LEFT JOIN tmssapp_taskblueprint ON tmssapp_taskblueprint.id = tmssapp_subtask.task_blueprint_id" - " LEFT JOIN tmssapp_subtasktemplate ON tmssapp_subtasktemplate.id = tmssapp_subtask.specifications_template_id;"), - migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_schedulingunitblueprintsummary; " - "CREATE OR REPLACE VIEW tmssapp_schedulingunitblueprintsummary AS " - "SELECT row_number() OVER () AS id, tmssapp_schedulingunitblueprint.id AS sub_id, tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_tasktemplate.type_id AS task_type, 'unknown' AS derived_task_status" - " FROM tmssapp_taskblueprint LEFT JOIN tmssapp_schedulingunitblueprint ON tmssapp_schedulingunitblueprint.id = tmssapp_taskblueprint.scheduling_unit_blueprint_id" - " LEFT JOIN tmssapp_tasktemplate ON tmssapp_tasktemplate.id = tmssapp_taskblueprint.specifications_template_id;"), migrations.RunPython(populate_choices), migrations.RunPython(populate_settings), migrations.RunPython(populate_misc), diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py index 4b979319b6d..92a733c311d 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 3.0.9 on 2021-02-11 13:51 +# Generated by Django 3.0.9 on 2021-02-18 15:51 from django.conf import settings import django.contrib.postgres.fields @@ -580,6 +580,9 @@ class Migration(migrations.Migration): ('do_cancel', models.BooleanField()), ('ingest_permission_required', models.BooleanField(default=False, help_text='Explicit permission is needed before the task.')), ('ingest_permission_granted_since', models.DateTimeField(help_text='The moment when ingest permission was granted.', null=True)), + ('output_data_allowed_to_be_ingested', models.BooleanField(default=False, help_text='boolean (default FALSE), which blocks Ingest Tasks from starting if OFF. When toggled ON, backend must scan for startable Ingest Tasks.')), + ('output_data_allowed_to_be_deleted', models.BooleanField(default=False, help_text='boolean (default FALSE), which blocks deleting unpinned dataproducts. When toggled ON, backend must pick SUB up for deletion. It also must when dataproducts are unpinned.')), + ('results_accepted', models.BooleanField(default=False, help_text='boolean (default NULL), which records whether the results were accepted, allowing the higher-level accounting to be adjusted.')), ], options={ 'abstract': False, diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0002_populate.py b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0002_populate.py index 023594b67ad..92baffd4c15 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0002_populate.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0002_populate.py @@ -16,16 +16,6 @@ class Migration(migrations.Migration): # Start SubTask id with 2 000 000 to avoid overlap with 'old' (test/production) OTDB operations = [ migrations.RunSQL('ALTER SEQUENCE tmssapp_SubTask_id_seq RESTART WITH 2000000;'), - migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_taskblueprintsummary; " - "CREATE OR REPLACE VIEW tmssapp_taskblueprintsummary AS " - "SELECT tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_subtask.id AS subtask_id, tmssapp_subtask.state_id AS substate, tmssapp_subtasktemplate.type_id AS subtask_type" - " FROM tmssapp_subtask LEFT JOIN tmssapp_taskblueprint ON tmssapp_taskblueprint.id = tmssapp_subtask.task_blueprint_id" - " LEFT JOIN tmssapp_subtasktemplate ON tmssapp_subtasktemplate.id = tmssapp_subtask.specifications_template_id;"), - migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_schedulingunitblueprintsummary; " - "CREATE OR REPLACE VIEW tmssapp_schedulingunitblueprintsummary AS " - "SELECT row_number() OVER () AS id, tmssapp_schedulingunitblueprint.id AS sub_id, tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_tasktemplate.type_id AS task_type, 'unknown' AS derived_task_status" - " FROM tmssapp_taskblueprint LEFT JOIN tmssapp_schedulingunitblueprint ON tmssapp_schedulingunitblueprint.id = tmssapp_taskblueprint.scheduling_unit_blueprint_id" - " LEFT JOIN tmssapp_tasktemplate ON tmssapp_tasktemplate.id = tmssapp_taskblueprint.specifications_template_id;"), migrations.RunPython(populate_choices), migrations.RunPython(populate_settings), migrations.RunPython(populate_misc), diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py index 44ecd759f2e..7198dfcf39f 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py @@ -405,6 +405,9 @@ class SchedulingUnitBlueprint(NamedCommon): ingest_permission_granted_since = DateTimeField(auto_now_add=False, null=True, help_text='The moment when ingest permission was granted.') requirements_template = ForeignKey('SchedulingUnitTemplate', on_delete=CASCADE, help_text='Schema used for requirements_doc (IMMUTABLE).') draft = ForeignKey('SchedulingUnitDraft', related_name='scheduling_unit_blueprints', on_delete=CASCADE, help_text='Scheduling Unit Draft which this run instantiates.') + output_data_allowed_to_be_ingested = BooleanField(default=False, help_text='boolean (default FALSE), which blocks Ingest Tasks from starting if OFF. When toggled ON, backend must scan for startable Ingest Tasks.') + output_data_allowed_to_be_deleted = BooleanField(default=False, help_text='boolean (default FALSE), which blocks deleting unpinned dataproducts. When toggled ON, backend must pick SUB up for deletion. It also must when dataproducts are unpinned.') + results_accepted = BooleanField(default=False, help_text='boolean (default NULL), which records whether the results were accepted, allowing the higher-level accounting to be adjusted.') def save(self, force_insert=False, force_update=False, using=None, update_fields=None): annotate_validate_add_defaults_to_doc_using_template(self, 'requirements_doc', 'requirements_template') @@ -526,6 +529,11 @@ class SchedulingUnitBlueprint(NamedCommon): '''Can this scheduling unit proceed with running its tasks?''' return self.status not in [SchedulingUnitBlueprint.Status.ERROR.value, SchedulingUnitBlueprint.Status.FINISHED.value, SchedulingUnitBlueprint.Status.CANCELLED.value] + @property + def output_pinned(self) -> bool: + '''Is the output_pinned flag True for any of its task_blueprints?''' + return any(task.output_pinned for task in self.task_blueprints.all()) + def _task_graph_instantiated(self): return self._get_total_nbr_tasks() > 0 diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/serializers/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/serializers/specification.py index 43e4ac09f9a..650d1f68166 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/serializers/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/serializers/specification.py @@ -274,7 +274,7 @@ class SchedulingUnitBlueprintSerializer(DynamicRelationalHyperlinkedModelSeriali class Meta: model = models.SchedulingUnitBlueprint fields = '__all__' - extra_fields = ['task_blueprints', 'duration', 'start_time', 'stop_time', 'status', 'observed_end_time'] + extra_fields = ['task_blueprints', 'duration', 'start_time', 'stop_time', 'status', 'observed_end_time', 'output_pinned'] expandable_fields = { 'requirements_template': 'lofar.sas.tmss.tmss.tmssapp.serializers.SchedulingUnitTemplateSerializer', 'draft': 'lofar.sas.tmss.tmss.tmssapp.serializers.SchedulingUnitDraftSerializer', diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py index 14b27ffb25d..d98d0e652a7 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py @@ -16,7 +16,7 @@ from lofar.sas.tmss.tmss.tmssapp.models import Subtask, SchedulingUnitBlueprint from lofar.common.datetimeutils import round_to_second_precision from django.dispatch import receiver -from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_status_changed_signal, scheduling_unit_blueprint_cannot_proceed_signal, ingest_task_blueprint_status_changed_signal +from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_status_changed_signal, scheduling_unit_blueprint_cannot_proceed_signal, ingest_task_blueprint_status_changed_signal, obs_task_status_changed_signal from viewflow import frontend, ThisObject from viewflow.activation import STATUS @@ -100,7 +100,7 @@ class SchedulingUnitFlow(Flow): wait_scheduled = ( Condition( this.check_condition_scheduled, - scheduling_unit_blueprint_status_changed_signal, + obs_task_status_changed_signal, task_loader=this.get_scheduling_unit_task ) .Next(this.wait_processed) @@ -198,6 +198,27 @@ class SchedulingUnitFlow(Flow): mark_sub = ( flow.Handler( this.do_mark_sub + ).Next(this.check_data_pinned) + ) + + check_data_pinned = ( + flow.If(lambda activation: activation.process.su.output_pinned) + .Then(this.unpin_data) + .Else(this.delete_data) + ) + + unpin_data = ( + flow.View( + viewsets.UnpinDataView, + task_description='Unpin Data' + ).Permission( + auto_create=True + ).Next(this.delete_data) + ) + + delete_data = ( + flow.Handler( + this.do_delete_data ).Next(this.end) ) @@ -221,12 +242,18 @@ class SchedulingUnitFlow(Flow): def do_mark_sub(self, activation): - activation.process.can_delete = True - activation.process.results_accepted = ((activation.process.qa_reporting_to is not None and activation.process.qa_reporting_to.operator_accept) + activation.process.su.output_data_allowed_to_be_deleted = True + activation.process.su.results_accepted = ((activation.process.qa_reporting_to is not None and activation.process.qa_reporting_to.operator_accept) and (activation.process.qa_reporting_sos is not None and activation.process.qa_reporting_sos.sos_accept_show_pi) and (activation.process.decide_acceptance is not None and activation.process.decide_acceptance.sos_accept_after_pi)) + + activation.process.su.save() + activation.process.save() + return activation - logger.info("End of schedulingunit workflow: can_delete: %s, results_accepted: %s", activation.process.can_delete, activation.process.results_accepted) + def do_delete_data(self, activation): + activation.process.su.save() + activation.process.save() return activation def check_condition_scheduled(self, activation, instance): @@ -245,6 +272,7 @@ class SchedulingUnitFlow(Flow): logger.info("granting ingest permission for scheduling unit blueprint id=%s", activation.process.su.id) activation.process.su.ingest_permission_granted_since = round_to_second_precision(datetime.utcnow()) + activation.process.su.output_data_allowed_to_be_ingested = True activation.process.su.save() activation.process.save() diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/migrations/0001_initial.py b/SAS/TMSS/backend/src/tmss/workflowapp/migrations/0001_initial.py index 7b5662531ee..5a34111ac5f 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/migrations/0001_initial.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/migrations/0001_initial.py @@ -21,6 +21,13 @@ class Migration(migrations.Migration): ('sos_accept_after_pi', models.BooleanField(default=False)), ], ), + migrations.CreateModel( + name='UnpinData', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('unpin_data', models.BooleanField(default=False)), + ], + ), migrations.CreateModel( name='PIVerification', fields=[ @@ -50,8 +57,6 @@ class Migration(migrations.Migration): name='SchedulingUnitProcess', fields=[ ('process_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='viewflow.Process')), - ('can_delete', models.BooleanField(default=False)), - ('results_accepted', models.BooleanField(default=False)), ('decide_acceptance', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='workflowapp.DecideAcceptance')), ('pi_verification', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='workflowapp.PIVerification')), ('qa_reporting_sos', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='workflowapp.QAReportingSOS')), diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/models/schedulingunitflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/models/schedulingunitflow.py index 392a7168f10..3a3c4c6d637 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/models/schedulingunitflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/models/schedulingunitflow.py @@ -27,6 +27,8 @@ class PIVerification(Model): class DecideAcceptance(Model): sos_accept_after_pi = BooleanField(default=False) +class UnpinData(Model): + unpin_data = BooleanField(default=False) class SchedulingUnitProcess(Process): su = ForeignKey(SchedulingUnitBlueprint, blank=True, null=True, on_delete=CASCADE) @@ -34,5 +36,4 @@ class SchedulingUnitProcess(Process): qa_reporting_sos=ForeignKey(QAReportingSOS, blank=True, null=True, on_delete=CASCADE) pi_verification=ForeignKey(PIVerification, blank=True, null=True, on_delete=CASCADE) decide_acceptance=ForeignKey(DecideAcceptance, blank=True, null=True, on_delete=CASCADE) - can_delete = BooleanField(default=False) - results_accepted = BooleanField(default=False) + diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/signals/CMakeLists.txt b/SAS/TMSS/backend/src/tmss/workflowapp/signals/CMakeLists.txt index f35ffe6bb27..91e675015dc 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/signals/CMakeLists.txt +++ b/SAS/TMSS/backend/src/tmss/workflowapp/signals/CMakeLists.txt @@ -6,6 +6,7 @@ set(_py_files subcannotproceed.py substatuschanged.py ingesttaskstatuschanged.py + obstaskstatuschanged.py ) python_install(${_py_files} diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/signals/__init__.py b/SAS/TMSS/backend/src/tmss/workflowapp/signals/__init__.py index 0b64dfb537b..d40f956d2b2 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/signals/__init__.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/signals/__init__.py @@ -1,3 +1,4 @@ from .subcannotproceed import * from .substatuschanged import * -from .ingesttaskstatuschanged import * \ No newline at end of file +from .ingesttaskstatuschanged import * +from .obstaskstatuschanged import * \ No newline at end of file diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/signals/obstaskstatuschanged.py b/SAS/TMSS/backend/src/tmss/workflowapp/signals/obstaskstatuschanged.py new file mode 100644 index 00000000000..b812434d926 --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/workflowapp/signals/obstaskstatuschanged.py @@ -0,0 +1,3 @@ +import django.dispatch + +obs_task_status_changed_signal = django.dispatch.Signal() \ No newline at end of file diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py index 694e4403597..7185040d6f0 100755 --- a/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py @@ -7,10 +7,6 @@ import logging logger = logging.getLogger(__name__) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) -""" -logger.warning("TODO: Fix test, look into synchronization issues. Skipped test for now with exit(3)") -exit(3) -""" from lofar.common.test_utils import skip_integration_tests if skip_integration_tests(): exit(3) @@ -27,7 +23,8 @@ from lofar.sas.tmss.client.tmssbuslistener import * class SchedulingUnitFlowTest(unittest.TestCase): - + + @classmethod def setUpClass(cls) -> None: cls.TEST_UUID = uuid.uuid1() @@ -54,6 +51,23 @@ class SchedulingUnitFlowTest(unittest.TestCase): cls.BASE_URL_WF_API = cls.tmss_test_env.django_server.url.rstrip('/').replace('/api','/workflow_api') cls.AUTH = requests.auth.HTTPBasicAuth(cls.tmss_test_env.ldap_server.dbcreds.user, cls.tmss_test_env.ldap_server.dbcreds.password) + import time + + cls.sync_event_bp_scheduled = Event() + cls.sync_event_bp_cannot_proceed = Event() + + + class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): + def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): + super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) + if status == "scheduled": + cls.sync_event_bp_scheduled.set() + + def onSchedulingUnitBlueprintCannotProceed(self, id: int): + super().onSchedulingUnitBlueprintCannotProceed(id=id) + cls.sync_event_bp_cannot_proceed.set() + + @classmethod def tearDownClass(cls) -> None: @@ -61,8 +75,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): cls.ra_test_env.stop() cls.tmp_exchange.close() - #TODO: split the test_qa_workflow, to test different branches of the workflow - def test_qa_workflow(self): + def test_qa_workflow_complete(self): from lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow import SchedulingUnitFlow from lofar.sas.tmss.tmss.tmssapp import models @@ -73,7 +86,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess from viewflow.models import Task - import time sync_event_bp_scheduled = Event() sync_event_bp_cannot_proceed = Event() @@ -82,13 +94,17 @@ class SchedulingUnitFlowTest(unittest.TestCase): class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) + + if status == "scheduled": + logging.info("Status is %s, sending sync event",status) sync_event_bp_scheduled.set() def onSchedulingUnitBlueprintCannotProceed(self, id: int): super().onSchedulingUnitBlueprintCannotProceed(id=id) + logging.info("Scheduling Unit Blueprint with id %s cannot proceed, sending sync event",id) sync_event_bp_cannot_proceed.set() - + service = create_workflow_service(handler_type=TestSchedulingUnitEventMessageHandler, exchange=self.tmp_exchange.address) with BusListenerJanitor(service): @@ -105,8 +121,8 @@ class SchedulingUnitFlowTest(unittest.TestCase): scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) - ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, - specifications_template__type__value=TaskType.Choices.INGEST.value) + #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, + # specifications_template__type__value=TaskType.Choices.INGEST.value) scheduling_unit_draft.refresh_from_db() # there is no signal that SchedulingUnitProcess instance was created, @@ -125,10 +141,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): prev_ingest_permission_granted_since = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).ingest_permission_granted_since self.assertEqual(prev_ingest_permission_granted_since, None) - #test that QA workflow have two tasks - self.assertEqual(2, Task.objects.filter(process=scheduling_unit_process_id).count()) - #test that there is jut one active task - self.assertEqual(1, SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks().count()) #check the active task name self.assertEqual("wait_scheduled", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) @@ -137,6 +149,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'NEW') + #Change subtask status to scheduled for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: @@ -144,18 +157,23 @@ class SchedulingUnitFlowTest(unittest.TestCase): subtask.state = models.SubtaskState.objects.get(value='scheduled') subtask.save() - - - # wait until scheduling unit is scheduled if not sync_event_bp_scheduled.wait(timeout=10): + logging.info("sync_event_bp_scheduled event not received, raising TimeoutError") raise TimeoutError() + else: + logging.info("Received sync_event_bp_scheduled event") + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_processed": + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("Task not activated within expected time") + sync_event_bp_scheduled.clear() - #Check the QA Workflow shoulb have 3 Tasks now - self.assertEqual(3, Task.objects.filter(process=scheduling_unit_process_id).count()) - #test that there is jut one active task - self.assertEqual(1, SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks().count()) + #check the active task name self.assertEqual("wait_processed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) @@ -164,21 +182,30 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') - #Change subtask status to cancelled + #Change subtask status to finished for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: - for subtask in task_blueprint.subtasks.all(): - subtask.state = models.SubtaskState.objects.get(value='cancelled') - subtask.save() + task_blueprint.output_pinned=True + task_blueprint.save() + + for subtask in task_blueprint.subtasks.all(): + subtask.state = models.SubtaskState.objects.get(value='finished') + subtask.save() if not sync_event_bp_cannot_proceed.wait(timeout=10): + logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") raise TimeoutError() + else: + logging.info("Received sync_event_bp_cannot_proceed event") + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "qa_reporting_to": + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("Task not activated within expected time") + sync_event_bp_cannot_proceed.clear() - #Check the QA Workflow is now with 4 Task - self.assertEqual(4, Task.objects.filter(process=scheduling_unit_process_id).count()) - #test that there is jut one active task - self.assertEqual(1, SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks().count()) #check the active task name self.assertEqual("qa_reporting_to", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) @@ -308,6 +335,1088 @@ class SchedulingUnitFlowTest(unittest.TestCase): #verify that ingest_permission_granted_since is now a valid datetime ingest_permission_granted_since = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).ingest_permission_granted_since self.assertEqual(True,isinstance(ingest_permission_granted_since, datetime)) + + #verify that output_data_allowed_to_be_ingested is now True + output_data_allowed_to_be_ingested = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_data_allowed_to_be_ingested + self.assertEqual(True,output_data_allowed_to_be_ingested) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[11].flow_task.name, 'ingest_done') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[11].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[12].flow_task.name, 'mark_sub') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[12].status, 'DONE') + + output_data_allowed_to_be_deleted = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_data_allowed_to_be_deleted + self.assertEqual(True,output_data_allowed_to_be_deleted) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[13].flow_task.name, 'check_data_pinned') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[13].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].flow_task.name, 'unpin_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform decide_acceptance step + headers = {'content-type': 'application/json'} + data = '{"unpin_data": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].flow_task.name, 'unpin_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[15].flow_task.name, 'delete_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[15].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[16].flow_task.name, 'end') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[16].status, 'DONE') + + + def test_qa_workflow_qa_reporting_to_no(self): + from lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow import SchedulingUnitFlow + + from lofar.sas.tmss.tmss.tmssapp import models + from lofar.sas.tmss.tmss.tmssapp.models import TaskType + + from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft + from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data + + from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess + from viewflow.models import Task + + sync_event_bp_scheduled = Event() + sync_event_bp_cannot_proceed = Event() + + + class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): + def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): + super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) + + + if status == "scheduled": + logging.info("Status is %s, sending sync event",status) + sync_event_bp_scheduled.set() + + def onSchedulingUnitBlueprintCannotProceed(self, id: int): + super().onSchedulingUnitBlueprintCannotProceed(id=id) + logging.info("Scheduling Unit Blueprint with id %s cannot proceed, sending sync event",id) + sync_event_bp_cannot_proceed.set() + + service = create_workflow_service(handler_type=TestSchedulingUnitEventMessageHandler, + exchange=self.tmp_exchange.address) + with BusListenerJanitor(service): + + strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") + + scheduling_unit_draft = models.SchedulingUnitDraft.objects.create( + name="Test Scheduling Unit UC1", + requirements_doc=strategy_template.template, + requirements_template=strategy_template.scheduling_unit_template, + observation_strategy_template=strategy_template, + copy_reason=models.CopyReason.objects.get(value='template'), + generator_instance_doc="para", + copies=None, + scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) + + scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, + # specifications_template__type__value=TaskType.Choices.INGEST.value) + scheduling_unit_draft.refresh_from_db() + + # there is no signal that SchedulingUnitProcess instance was created, + # so we have to wait an poll before we can proceed with the test + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.filter(su=scheduling_unit_blueprint).count() > 0: + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("SchedulingUnitProcess not created within expected time") + + # Yes! the SchedulingUnitProcess was created, let's get it. + scheduling_unit_process_id = SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).id + + prev_ingest_permission_granted_since = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).ingest_permission_granted_since + self.assertEqual(prev_ingest_permission_granted_since, None) + + + self.assertEqual("wait_scheduled", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[0].flow_task.name, 'start') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[0].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'NEW') + + + #Change subtask status to scheduled + for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): + if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: + for subtask in task_blueprint.subtasks.all(): + subtask.state = models.SubtaskState.objects.get(value='scheduled') + subtask.save() + + # wait until scheduling unit is scheduled + if not sync_event_bp_scheduled.wait(timeout=10): + logging.info("sync_event_bp_scheduled event not received, raising TimeoutError") + raise TimeoutError() + else: + logging.info("Received sync_event_bp_scheduled event") + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_processed": + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("Task not activated within expected time") + + sync_event_bp_scheduled.clear() + + + #check the active task name + self.assertEqual("wait_processed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') + + #Change subtask status to finished + for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): + task_blueprint.output_pinned=True + task_blueprint.save() + + for subtask in task_blueprint.subtasks.all(): + subtask.state = models.SubtaskState.objects.get(value='finished') + subtask.save() + + if not sync_event_bp_cannot_proceed.wait(timeout=10): + logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") + raise TimeoutError() + else: + logging.info("Received sync_event_bp_cannot_proceed event") + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "qa_reporting_to": + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("Task not activated within expected time") + + sync_event_bp_cannot_proceed.clear() + + + #check the active task name + self.assertEqual("qa_reporting_to", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'ASSIGNED') + + #API: Perform qa_reporting_to step + headers = {'content-type': 'application/json'} + data = '{"operator_report": "Test report", "operator_accept": false}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[4].flow_task.name, 'check_operator_accept') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[4].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].flow_task.name, 'mark_sub') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[6].flow_task.name, 'check_data_pinned') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[6].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].flow_task.name, 'unpin_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].status, 'NEW') + + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform decide_acceptance step + headers = {'content-type': 'application/json'} + data = '{"unpin_data": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].flow_task.name, 'unpin_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].flow_task.name, 'delete_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[9].flow_task.name, 'end') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[9].status, 'DONE') + + + def test_qa_workflow_qa_reporting_sos_no(self): + from lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow import SchedulingUnitFlow + + from lofar.sas.tmss.tmss.tmssapp import models + from lofar.sas.tmss.tmss.tmssapp.models import TaskType + + from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft + from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data + + from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess + from viewflow.models import Task + + sync_event_bp_scheduled = Event() + sync_event_bp_cannot_proceed = Event() + + + class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): + def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): + super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) + + + if status == "scheduled": + logging.info("Status is %s, sending sync event",status) + sync_event_bp_scheduled.set() + + def onSchedulingUnitBlueprintCannotProceed(self, id: int): + super().onSchedulingUnitBlueprintCannotProceed(id=id) + logging.info("Scheduling Unit Blueprint with id %s cannot proceed, sending sync event",id) + sync_event_bp_cannot_proceed.set() + + service = create_workflow_service(handler_type=TestSchedulingUnitEventMessageHandler, + exchange=self.tmp_exchange.address) + with BusListenerJanitor(service): + strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") + + scheduling_unit_draft = models.SchedulingUnitDraft.objects.create( + name="Test Scheduling Unit UC1", + requirements_doc=strategy_template.template, + requirements_template=strategy_template.scheduling_unit_template, + observation_strategy_template=strategy_template, + copy_reason=models.CopyReason.objects.get(value='template'), + generator_instance_doc="para", + copies=None, + scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) + + scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, + # specifications_template__type__value=TaskType.Choices.INGEST.value) + scheduling_unit_draft.refresh_from_db() + + # there is no signal that SchedulingUnitProcess instance was created, + # so we have to wait an poll before we can proceed with the test + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.filter(su=scheduling_unit_blueprint).count() > 0: + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("SchedulingUnitProcess not created within expected time") + + # Yes! the SchedulingUnitProcess was created, let's get it. + scheduling_unit_process_id = SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).id + + prev_ingest_permission_granted_since = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).ingest_permission_granted_since + self.assertEqual(prev_ingest_permission_granted_since, None) + + #check the active task name + self.assertEqual("wait_scheduled", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[0].flow_task.name, 'start') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[0].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'NEW') + + + #Change subtask status to scheduled + for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): + if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: + for subtask in task_blueprint.subtasks.all(): + subtask.state = models.SubtaskState.objects.get(value='scheduled') + subtask.save() + + # wait until scheduling unit is scheduled + if not sync_event_bp_scheduled.wait(timeout=10): + logging.info("sync_event_bp_scheduled event not received, raising TimeoutError") + raise TimeoutError() + else: + logging.info("Received sync_event_bp_scheduled event") + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_processed": + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("Task not activated within expected time") + + sync_event_bp_scheduled.clear() + + + #check the active task name + self.assertEqual("wait_processed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') + + #Change subtask status to finished + for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): + task_blueprint.output_pinned=True + task_blueprint.save() + + for subtask in task_blueprint.subtasks.all(): + subtask.state = models.SubtaskState.objects.get(value='finished') + subtask.save() + + if not sync_event_bp_cannot_proceed.wait(timeout=10): + logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") + raise TimeoutError() + else: + logging.info("Received sync_event_bp_cannot_proceed event") + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "qa_reporting_to": + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("Task not activated within expected time") + + sync_event_bp_cannot_proceed.clear() + + #check the active task name + self.assertEqual("qa_reporting_to", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'ASSIGNED') + + #API: Perform qa_reporting_to step + headers = {'content-type': 'application/json'} + data = '{"operator_report": "Test report", "operator_accept": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[4].flow_task.name, 'check_operator_accept') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[4].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].flow_task.name, 'qa_reporting_sos') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform qa_reporting_sos step + headers = {'content-type': 'application/json'} + data = '{"sos_report": "Test report", "quality_within_policy": true, "sos_accept_show_pi": false}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].flow_task.name, 'qa_reporting_sos') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[6].flow_task.name, 'check_sos_accept_show_pi') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[6].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].flow_task.name, 'mark_sub') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].status, 'DONE') + + output_data_allowed_to_be_deleted = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_data_allowed_to_be_deleted + self.assertEqual(True,output_data_allowed_to_be_deleted) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].flow_task.name, 'check_data_pinned') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[9].flow_task.name, 'unpin_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[9].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform decide_acceptance step + headers = {'content-type': 'application/json'} + data = '{"unpin_data": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[9].flow_task.name, 'unpin_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[9].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[10].flow_task.name, 'delete_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[10].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[11].flow_task.name, 'end') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[11].status, 'DONE') + + def test_qa_workflow_qa_quality_acceptable_no(self): + from lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow import SchedulingUnitFlow + + from lofar.sas.tmss.tmss.tmssapp import models + from lofar.sas.tmss.tmss.tmssapp.models import TaskType + + from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft + from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data + + from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess + from viewflow.models import Task + + sync_event_bp_scheduled = Event() + sync_event_bp_cannot_proceed = Event() + + + class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): + def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): + super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) + + + if status == "scheduled": + logging.info("Status is %s, sending sync event",status) + sync_event_bp_scheduled.set() + + def onSchedulingUnitBlueprintCannotProceed(self, id: int): + super().onSchedulingUnitBlueprintCannotProceed(id=id) + logging.info("Scheduling Unit Blueprint with id %s cannot proceed, sending sync event",id) + sync_event_bp_cannot_proceed.set() + + service = create_workflow_service(handler_type=TestSchedulingUnitEventMessageHandler, + exchange=self.tmp_exchange.address) + with BusListenerJanitor(service): + strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") + + scheduling_unit_draft = models.SchedulingUnitDraft.objects.create( + name="Test Scheduling Unit UC1", + requirements_doc=strategy_template.template, + requirements_template=strategy_template.scheduling_unit_template, + observation_strategy_template=strategy_template, + copy_reason=models.CopyReason.objects.get(value='template'), + generator_instance_doc="para", + copies=None, + scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) + + scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, + # specifications_template__type__value=TaskType.Choices.INGEST.value) + scheduling_unit_draft.refresh_from_db() + + # there is no signal that SchedulingUnitProcess instance was created, + # so we have to wait an poll before we can proceed with the test + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.filter(su=scheduling_unit_blueprint).count() > 0: + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("SchedulingUnitProcess not created within expected time") + + # Yes! the SchedulingUnitProcess was created, let's get it. + scheduling_unit_process_id = SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).id + + prev_ingest_permission_granted_since = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).ingest_permission_granted_since + self.assertEqual(prev_ingest_permission_granted_since, None) + + #check the active task name + self.assertEqual("wait_scheduled", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[0].flow_task.name, 'start') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[0].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'NEW') + + + #Change subtask status to scheduled + for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): + if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: + for subtask in task_blueprint.subtasks.all(): + subtask.state = models.SubtaskState.objects.get(value='scheduled') + subtask.save() + + # wait until scheduling unit is scheduled + if not sync_event_bp_scheduled.wait(timeout=10): + logging.info("sync_event_bp_scheduled event not received, raising TimeoutError") + raise TimeoutError() + else: + logging.info("Received sync_event_bp_scheduled event") + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_processed": + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("Task not activated within expected time") + + sync_event_bp_scheduled.clear() + + + #check the active task name + self.assertEqual("wait_processed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') + + #Change subtask status to finished + for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): + task_blueprint.output_pinned=True + task_blueprint.save() + + for subtask in task_blueprint.subtasks.all(): + subtask.state = models.SubtaskState.objects.get(value='finished') + subtask.save() + + if not sync_event_bp_cannot_proceed.wait(timeout=10): + logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") + raise TimeoutError() + else: + logging.info("Received sync_event_bp_cannot_proceed event") + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "qa_reporting_to": + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("Task not activated within expected time") + + sync_event_bp_cannot_proceed.clear() + + #check the active task name + self.assertEqual("qa_reporting_to", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'ASSIGNED') + + #API: Perform qa_reporting_to step + headers = {'content-type': 'application/json'} + data = '{"operator_report": "Test report", "operator_accept": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[4].flow_task.name, 'check_operator_accept') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[4].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].flow_task.name, 'qa_reporting_sos') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform qa_reporting_sos step + headers = {'content-type': 'application/json'} + data = '{"sos_report": "Test report", "quality_within_policy": true, "sos_accept_show_pi": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].flow_task.name, 'qa_reporting_sos') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[6].flow_task.name, 'check_sos_accept_show_pi') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[6].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].flow_task.name, 'pi_verification') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform pi_verification step + headers = {'content-type': 'application/json'} + data = '{"pi_report": "Test report", "pi_accept": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].flow_task.name, 'pi_verification') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].flow_task.name, 'decide_acceptance') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform decide_acceptance step + headers = {'content-type': 'application/json'} + data = '{"sos_accept_after_pi": false}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].flow_task.name, 'decide_acceptance') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[9].flow_task.name, 'check_sos_accept_after_pi') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[9].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[10].flow_task.name, 'mark_sub') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[10].status, 'DONE') + + output_data_allowed_to_be_deleted = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_data_allowed_to_be_deleted + self.assertEqual(True,output_data_allowed_to_be_deleted) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[11].flow_task.name, 'check_data_pinned') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[11].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[12].flow_task.name, 'unpin_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[12].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform decide_acceptance step + headers = {'content-type': 'application/json'} + data = '{"unpin_data": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[12].flow_task.name, 'unpin_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[12].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[13].flow_task.name, 'delete_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[13].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].flow_task.name, 'end') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].status, 'DONE') + + def test_qa_workflow_qa_is_data_pinned_no(self): + from lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow import SchedulingUnitFlow + + from lofar.sas.tmss.tmss.tmssapp import models + from lofar.sas.tmss.tmss.tmssapp.models import TaskType + + from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft + from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data + + from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess + from viewflow.models import Task + + sync_event_bp_scheduled = Event() + sync_event_bp_cannot_proceed = Event() + + + class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): + def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): + super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) + + + if status == "scheduled": + logging.info("Status is %s, sending sync event",status) + sync_event_bp_scheduled.set() + + def onSchedulingUnitBlueprintCannotProceed(self, id: int): + super().onSchedulingUnitBlueprintCannotProceed(id=id) + logging.info("Scheduling Unit Blueprint with id %s cannot proceed, sending sync event",id) + sync_event_bp_cannot_proceed.set() + + service = create_workflow_service(handler_type=TestSchedulingUnitEventMessageHandler, + exchange=self.tmp_exchange.address) + with BusListenerJanitor(service): + strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") + + scheduling_unit_draft = models.SchedulingUnitDraft.objects.create( + name="Test Scheduling Unit UC1", + requirements_doc=strategy_template.template, + requirements_template=strategy_template.scheduling_unit_template, + observation_strategy_template=strategy_template, + copy_reason=models.CopyReason.objects.get(value='template'), + generator_instance_doc="para", + copies=None, + scheduling_set=models.SchedulingSet.objects.create(**SchedulingSet_test_data())) + + scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + #ingest_subtask = models.Subtask.objects.get(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint.id, + # specifications_template__type__value=TaskType.Choices.INGEST.value) + scheduling_unit_draft.refresh_from_db() + + # there is no signal that SchedulingUnitProcess instance was created, + # so we have to wait an poll before we can proceed with the test + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.filter(su=scheduling_unit_blueprint).count() > 0: + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("SchedulingUnitProcess not created within expected time") + + # Yes! the SchedulingUnitProcess was created, let's get it. + scheduling_unit_process_id = SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).id + + prev_ingest_permission_granted_since = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).ingest_permission_granted_since + self.assertEqual(prev_ingest_permission_granted_since, None) + + #check the active task name + self.assertEqual("wait_scheduled", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[0].flow_task.name, 'start') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[0].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'NEW') + + + #Change subtask status to scheduled + for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): + if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: + for subtask in task_blueprint.subtasks.all(): + subtask.state = models.SubtaskState.objects.get(value='scheduled') + subtask.save() + + # wait until scheduling unit is scheduled + if not sync_event_bp_scheduled.wait(timeout=10): + logging.info("sync_event_bp_scheduled event not received, raising TimeoutError") + raise TimeoutError() + else: + logging.info("Received sync_event_bp_scheduled event") + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_processed": + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("Task not activated within expected time") + + sync_event_bp_scheduled.clear() + + + #check the active task name + self.assertEqual("wait_processed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') + + #Change subtask status to finished + for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): + task_blueprint.output_pinned=False + task_blueprint.save() + + for subtask in task_blueprint.subtasks.all(): + subtask.state = models.SubtaskState.objects.get(value='finished') + subtask.save() + + if not sync_event_bp_cannot_proceed.wait(timeout=10): + logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") + raise TimeoutError() + else: + logging.info("Received sync_event_bp_cannot_proceed event") + poll_starttime = datetime.utcnow() + while True: + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "qa_reporting_to": + break + sleep(0.1) + if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + raise TimeoutError("Task not activated within expected time") + + sync_event_bp_cannot_proceed.clear() + + #check the active task name + self.assertEqual("qa_reporting_to", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'ASSIGNED') + + #API: Perform qa_reporting_to step + headers = {'content-type': 'application/json'} + data = '{"operator_report": "Test report", "operator_accept": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[4].flow_task.name, 'check_operator_accept') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[4].status, 'DONE') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].flow_task.name, 'qa_reporting_sos') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform qa_reporting_sos step + headers = {'content-type': 'application/json'} + data = '{"sos_report": "Test report", "quality_within_policy": true, "sos_accept_show_pi": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].flow_task.name, 'qa_reporting_sos') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[5].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[6].flow_task.name, 'check_sos_accept_show_pi') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[6].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].flow_task.name, 'pi_verification') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform pi_verification step + headers = {'content-type': 'application/json'} + data = '{"pi_report": "Test report", "pi_accept": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].flow_task.name, 'pi_verification') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[7].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].flow_task.name, 'decide_acceptance') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].status, 'NEW') + + #API: Get current task + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + task_id=r_dict[0]['pk'] + + #API: Assign to an user + data = "" + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_task/{}/assign/'.format(task_id), json=data, auth=self.__class__.AUTH) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + #API: Perform decide_acceptance step + headers = {'content-type': 'application/json'} + data = '{"sos_accept_after_pi": true}' + response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/perform/'.format(scheduling_unit_process_id), data=data, auth=self.__class__.AUTH, headers=headers) + content = response.content.decode('utf-8') + r_dict = json.loads(content) + self.assertEqual(200,response.status_code) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].flow_task.name, 'decide_acceptance') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[8].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[9].flow_task.name, 'check_sos_accept_after_pi') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[9].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[10].flow_task.name, 'allow_ingest') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[10].status, 'DONE') + + #verify that ingest_permission_granted_since is now a valid datetime + ingest_permission_granted_since = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).ingest_permission_granted_since + self.assertEqual(True,isinstance(ingest_permission_granted_since, datetime)) + + #verify that output_data_allowed_to_be_ingested is now True + output_data_allowed_to_be_ingested = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_data_allowed_to_be_ingested + self.assertEqual(True,output_data_allowed_to_be_ingested) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[11].flow_task.name, 'ingest_done') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[11].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[12].flow_task.name, 'mark_sub') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[12].status, 'DONE') + + output_data_allowed_to_be_deleted = models.SchedulingUnitBlueprint.objects.get(pk=scheduling_unit_process_id).output_data_allowed_to_be_deleted + self.assertEqual(True,output_data_allowed_to_be_deleted) + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[13].flow_task.name, 'check_data_pinned') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[13].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].flow_task.name, 'delete_data') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[14].status, 'DONE') + + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[15].flow_task.name, 'end') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[15].status, 'DONE') + if __name__ == '__main__': #run the unit tests diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/viewsets/schedulingunitflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/viewsets/schedulingunitflow.py index 258b4ff6e87..6e684b4d119 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/viewsets/schedulingunitflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/viewsets/schedulingunitflow.py @@ -160,6 +160,33 @@ class DecideAcceptanceView(FlowMixin, generic.CreateView): logging.info('Activation done') self.activation.done() +class UnpinDataView(FlowMixin, generic.CreateView): + template_name = 'qa_reporting.html' + + model = models.UnpinData + fields = [ + 'unpin_data' + ] + + def form_valid(self, form): + report_data = form.save(commit=False) + report_data.save() + + self.activation.process.unpin_data = report_data + self.activation.process.save() + self.activation_done() + try: + return redirect(self.get_success_url()) + except NoReverseMatch as e: + return + + def activation_done(self, *args, **kwargs): + # TODO: Should Wait for data to be unpinned? + """Finish the task activation.""" + logging.info('Activation done') + self.activation.done() + + class SchedulingUnitTaskAssignViewSet(mixins.CreateModelMixin, #mixins.ListModelMixin, diff --git a/SAS/TMSS/backend/test/t_adapter.py b/SAS/TMSS/backend/test/t_adapter.py index b80e7db82c7..36d0faf94e1 100755 --- a/SAS/TMSS/backend/test/t_adapter.py +++ b/SAS/TMSS/backend/test/t_adapter.py @@ -252,11 +252,12 @@ _isCobalt=T subtask_pipe: models.Subtask = models.Subtask.objects.create(**subtask_data) subtask_pipe_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_pipe)) + test_dir = "/tmp/test/data/%s" % uuid.uuid4() empty_feedback_template = models.DataproductFeedbackTemplate.objects.get(name='empty') - dataproduct_obs_out1:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L220133_SAP000_SB000_uv.MS', producer=subtask_obs_output, feedback_template=empty_feedback_template)) - dataproduct_obs_out2: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L220133_SAP000_SB001_uv.MS', producer=subtask_obs_output, feedback_template=empty_feedback_template)) - dataproduct_pipe_out1: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L99307_SB000_uv.dppp.MS', producer=subtask_pipe_output, feedback_template=empty_feedback_template)) - dataproduct_pipe_out2: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L99307_SB001_uv.dppp.MS', producer=subtask_pipe_output, feedback_template=empty_feedback_template)) + dataproduct_obs_out1:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L220133_SAP000_SB000_uv.MS', directory=test_dir, producer=subtask_obs_output, feedback_template=empty_feedback_template)) + dataproduct_obs_out2: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L220133_SAP000_SB001_uv.MS', directory=test_dir, producer=subtask_obs_output, feedback_template=empty_feedback_template)) + dataproduct_pipe_out1: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L99307_SB000_uv.dppp.MS', directory=test_dir, producer=subtask_pipe_output, feedback_template=empty_feedback_template)) + dataproduct_pipe_out2: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L99307_SB001_uv.dppp.MS', directory=test_dir, producer=subtask_pipe_output, feedback_template=empty_feedback_template)) models.DataproductTransform.objects.create(input=dataproduct_obs_out1, output=dataproduct_pipe_out1, identity=True) models.DataproductTransform.objects.create(input=dataproduct_obs_out2, output=dataproduct_pipe_out2, identity=True) @@ -285,11 +286,12 @@ _isCobalt=T subtask_pipe: models.Subtask = models.Subtask.objects.create(**subtask_data) subtask_pipe_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask_pipe)) + test_dir = "/tmp/test/data/%s" % uuid.uuid4() empty_feedback_template = models.DataproductFeedbackTemplate.objects.get(name='empty') - dataproduct_obs_out1:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L220133_SAP000_SB000_uv.MS', producer=subtask_obs_output, feedback_template=empty_feedback_template)) - dataproduct_obs_out2: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L220133_SAP000_SB001_uv.MS', producer=subtask_obs_output, feedback_template=empty_feedback_template)) - dataproduct_pipe_out1: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L99307_SB000_uv.dppp.MS', producer=subtask_pipe_output, feedback_template=empty_feedback_template)) - dataproduct_pipe_out2: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L99307_SB001_uv.dppp.MS', producer=subtask_pipe_output, feedback_template=empty_feedback_template)) + dataproduct_obs_out1:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L220133_SAP000_SB000_uv.MS', directory=test_dir, producer=subtask_obs_output, feedback_template=empty_feedback_template)) + dataproduct_obs_out2: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L220133_SAP000_SB001_uv.MS', directory=test_dir, producer=subtask_obs_output, feedback_template=empty_feedback_template)) + dataproduct_pipe_out1: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L99307_SB000_uv.dppp.MS', directory=test_dir, producer=subtask_pipe_output, feedback_template=empty_feedback_template)) + dataproduct_pipe_out2: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(filename='L99307_SB001_uv.dppp.MS', directory=test_dir, producer=subtask_pipe_output, feedback_template=empty_feedback_template)) models.DataproductTransform.objects.create(input=dataproduct_obs_out1, output=dataproduct_pipe_out1, identity=True) models.DataproductTransform.objects.create(input=dataproduct_obs_out2, output=dataproduct_pipe_out2, identity=True) diff --git a/SAS/TMSS/backend/test/test_utils.py b/SAS/TMSS/backend/test/test_utils.py index c7d1aaa6823..afed5e6e43f 100644 --- a/SAS/TMSS/backend/test/test_utils.py +++ b/SAS/TMSS/backend/test/test_utils.py @@ -395,8 +395,8 @@ class TMSSTestEnvironment: # start the websocket service, so the changes in the database are posted (via the messagebus) to an http web socket # this implies that _start_pg_listener should be true as well self._start_pg_listener = True - from lofar.sas.tmss.services.websocket_service import create_service - self.websocket_service = create_service(exchange=self._exchange, broker=self._broker) + from lofar.sas.tmss.services.websocket_service import create_service as create_websocket_service, DEFAULT_WEBSOCKET_PORT + self.websocket_service = create_websocket_service(exchange=self._exchange, broker=self._broker, websocket_port=find_free_port(DEFAULT_WEBSOCKET_PORT)) service_threads.append(threading.Thread(target=self.websocket_service.start_listening)) service_threads[-1].start() @@ -423,9 +423,9 @@ class TMSSTestEnvironment: if self._start_feedback_service: try: - from lofar.sas.tmss.services.feedback_handling import TMSSFeedbackListener - self.feedback_service = TMSSFeedbackListener() - service_threads.append(threading.Thread(target=self.feedback_service.start_handling)) + from lofar.sas.tmss.services.feedback_handling import create_service as create_feedback_service + self.feedback_service = create_feedback_service(exchange=self._exchange, broker=self._broker) + service_threads.append(threading.Thread(target=self.feedback_service.start_listening)) service_threads[-1].start() except Exception as e: logger.exception(e) @@ -458,7 +458,7 @@ class TMSSTestEnvironment: self.postgres_listener = None if self.feedback_service is not None: - self.feedback_service.stop_handling() + self.feedback_service.stop_listening() self.feedback_service = None if self.websocket_service is not None: diff --git a/SAS/TMSS/client/lib/tmssbuslistener.py b/SAS/TMSS/client/lib/tmssbuslistener.py index ed788255749..8831dcfbfcf 100644 --- a/SAS/TMSS/client/lib/tmssbuslistener.py +++ b/SAS/TMSS/client/lib/tmssbuslistener.py @@ -134,6 +134,8 @@ class TMSSEventMessageHandler(AbstractMessageHandler): elif stripped_subject == 'SchedulingUnitBlueprint.Object.IngestPermissionGranted': self.onSchedulingUnitBlueprintIngestPermissionGranted(id=msg.content['id'], ingest_permission_granted_since=parser.parse(msg.content['ingest_permission_granted_since'], ignoretz=True)) + elif stripped_subject == 'TaskBlueprint.Object.OutputPinningUpdated': + self.onTaskBlueprintOutputPinningUpdated(**msg.content) else: raise MessageHandlerUnknownSubjectError("TMSSBusListener.handleMessage: unknown subject: %s" % msg.subject) @@ -307,6 +309,13 @@ class TMSSEventMessageHandler(AbstractMessageHandler): :param ingest_permission_granted_since: the timestamp when the permission was granted ''' pass + + def onTaskBlueprintOutputPinningUpdated(self, id: int, output_pinned: bool): + '''onTaskBlueprintOutputPinningUpdated is called upon receiving a TaskBlueprint.Object.OutputPinningUpdated message, usually as a result of a change on the TaskBlueprint output_pinned field. + :param id: the TMSS id of the TaskBlueprint + :param output_pinned: True if the output of this task is pinned to disk, that is, forbidden to be removed. + ''' + pass class TMSSBusListener(BusListener): diff --git a/SAS/TMSS/frontend/tmss_webapp/.vscode/settings.json b/SAS/TMSS/frontend/tmss_webapp/.vscode/settings.json new file mode 100644 index 00000000000..3b664107303 --- /dev/null +++ b/SAS/TMSS/frontend/tmss_webapp/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "git.ignoreLimitWarning": true +} \ No newline at end of file diff --git a/SAS/TMSS/frontend/tmss_webapp/package.json b/SAS/TMSS/frontend/tmss_webapp/package.json new file mode 100644 index 00000000000..e9cc1d244a2 --- /dev/null +++ b/SAS/TMSS/frontend/tmss_webapp/package.json @@ -0,0 +1,85 @@ +{ + "name": "tmss_gui", + "version": "0.1.0", + "private": true, + "dependencies": { + "@ag-grid-community/all-modules": "^24.1.0", + "@apidevtools/json-schema-ref-parser": "^9.0.6", + "@fortawesome/fontawesome-free": "^5.13.1", + "@json-editor/json-editor": "^2.3.0", + "@kevincobain2000/json-to-html-table": "^1.0.1", + "@testing-library/jest-dom": "^4.2.4", + "@testing-library/react": "^9.3.2", + "@testing-library/user-event": "^7.1.2", + "ag-grid-community": "^24.1.0", + "ag-grid-react": "^24.1.1", + "axios": "^0.19.2", + "bootstrap": "^4.5.0", + "cleave.js": "^1.6.0", + "flatpickr": "^4.6.3", + "font-awesome": "^4.7.0", + "history": "^5.0.0", + "interactjs": "^1.9.22", + "jspdf": "^2.3.0", + "jspdf-autotable": "^3.5.13", + "katex": "^0.12.0", + "lodash": "^4.17.19", + "match-sorter": "^4.1.0", + "moment": "^2.27.0", + "node-sass": "^4.12.0", + "papaparse": "^5.3.0", + "primeflex": "^1.3.0", + "primeicons": "^4.0.0", + "primereact": "^4.2.2", + "prop-types": "^15.7.2", + "react": "^16.13.1", + "react-app-polyfill": "^1.0.6", + "react-bootstrap": "^1.0.1", + "react-bootstrap-datetimepicker": "0.0.22", + "react-calendar-timeline": "^0.27.0", + "react-dom": "^16.13.1", + "react-frame-component": "^4.1.2", + "react-json-to-table": "^0.1.7", + "react-json-view": "^1.19.1", + "react-loader-spinner": "^3.1.14", + "react-router-dom": "^5.2.0", + "react-scripts": "^3.4.2", + "react-split-pane": "^0.1.92", + "react-table": "^7.2.1", + "react-table-plugins": "^1.3.1", + "react-transition-group": "^2.5.1", + "react-websocket": "^2.1.0", + "reactstrap": "^8.5.1", + "styled-components": "^5.1.1", + "suneditor-react": "^2.14.10", + "typescript": "^3.9.5", + "yup": "^0.29.1" + }, + "scripts": { + "start": "react-scripts start", + "build": "react-scripts build", + "test": "react-scripts test", + "eject": "react-scripts eject" + }, + "proxy": "http://127.0.0.1:8008/", + "eslintConfig": { + "extends": "react-app" + }, + "browserslist": { + "production": [ + ">0.2%", + "not dead", + "not op_mini all" + ], + "development": [ + "last 1 chrome version", + "last 1 firefox version", + "last 1 safari version" + ] + }, + "devDependencies": { + "customize-cra": "^0.9.1", + "react-app-rewired": "^1.6.2", + "babel-polyfill": "^6.26.0" + } +} diff --git a/SAS/TMSS/frontend/tmss_webapp/src/App.css b/SAS/TMSS/frontend/tmss_webapp/src/App.css index b12be252f74..9edaec7d97d 100644 --- a/SAS/TMSS/frontend/tmss_webapp/src/App.css +++ b/SAS/TMSS/frontend/tmss_webapp/src/App.css @@ -233,11 +233,3 @@ div[data-schemapath='root.$schema'] { display: inline !important; } -.alignTimeLineHeader { - display: flex; - justify-content: space-between; - padding-top: 10px; -} -.sub-header { - display: inline-block; -} \ No newline at end of file diff --git a/SAS/TMSS/frontend/tmss_webapp/src/components/JSONEditor/JEditor.js b/SAS/TMSS/frontend/tmss_webapp/src/components/JSONEditor/JEditor.js index 4430720ff46..f66ef0a6bef 100644 --- a/SAS/TMSS/frontend/tmss_webapp/src/components/JSONEditor/JEditor.js +++ b/SAS/TMSS/frontend/tmss_webapp/src/components/JSONEditor/JEditor.js @@ -40,18 +40,12 @@ function Jeditor(props) { if (property["$ref"] && !property["$ref"].startsWith("#")) { // 1st level reference of the object const refUrl = property["$ref"]; let newRef = refUrl.substring(refUrl.indexOf("#")); - //>>>>>> TODO if pointin works fine, remove these commented lines - // if (refUrl.endsWith("/pointing")) { // For type pointing - // schema.definitions["pointing"] = (await $RefParser.resolve(refUrl)).get(newRef); - // property["$ref"] = newRef; - // } else { // General object to resolve if any reference in child level - // property = await resolveSchema((await $RefParser.resolve(refUrl)).get(newRef)); - // } let defKey = refUrl.substring(refUrl.lastIndexOf("/")+1); schema.definitions[defKey] = (await $RefParser.resolve(refUrl)).get(newRef); property["$ref"] = newRef; - if(schema.definitions[defKey].type && schema.definitions[defKey].type === 'array'){ - let resolvedItems = await resolveSchema(schema.definitions[defKey].items); + if(schema.definitions[defKey].type && (schema.definitions[defKey].type === 'array' + || schema.definitions[defKey].type === 'object')){ + let resolvedItems = await resolveSchema(schema.definitions[defKey]); schema.definitions = {...schema.definitions, ...resolvedItems.definitions}; delete resolvedItems['definitions']; } @@ -67,32 +61,35 @@ function Jeditor(props) { } properties[propertyKey] = property; } - } else if (schema["oneOf"]) { // Reference in OneOf array + } else if (schema["oneOf"] || schema["anyOf"]) { // Reference in OneOf/anyOf array + let defKey = schema["oneOf"]?"oneOf":"anyOf"; let resolvedOneOfList = [] - for (const oneOfProperty of schema["oneOf"]) { + for (const oneOfProperty of schema[defKey]) { const resolvedOneOf = await resolveSchema(oneOfProperty); resolvedOneOfList.push(resolvedOneOf); + if (resolvedOneOf.definitions) { + schema.definitions = {...schema.definitions, ...resolvedOneOf.definitions}; + } } - schema["oneOf"] = resolvedOneOfList; + schema[defKey] = resolvedOneOfList; } else if (schema["$ref"] && !schema["$ref"].startsWith("#")) { //reference in oneOf list item const refUrl = schema["$ref"]; let newRef = refUrl.substring(refUrl.indexOf("#")); - //>>>>>> TODO: If pointing works fine, remove these commented lines - // if (refUrl.endsWith("/pointing")) { - // schema.definitions["pointing"] = (await $RefParser.resolve(refUrl)).get(newRef); - // schema["$ref"] = newRef; - // } else { - // schema = await resolveSchema((await $RefParser.resolve(refUrl)).get(newRef)); - // } let defKey = refUrl.substring(refUrl.lastIndexOf("/")+1); schema.definitions[defKey] = (await $RefParser.resolve(refUrl)).get(newRef); - if (schema.definitions[defKey].properties) { + if (schema.definitions[defKey].properties || schema.definitions[defKey].type === "object" + || schema.definitions[defKey].type === "array") { let property = await resolveSchema(schema.definitions[defKey]); schema.definitions = {...schema.definitions, ...property.definitions}; delete property['definitions']; schema.definitions[defKey] = property; } schema["$ref"] = newRef; + } else if(schema["type"] === "array") { // reference in array items definition + let resolvedItems = await resolveSchema(schema["items"]); + schema.definitions = {...schema.definitions, ...resolvedItems.definitions}; + delete resolvedItems['definitions']; + schema["items"] = resolvedItems; } return schema; } diff --git a/SAS/TMSS/frontend/tmss_webapp/src/components/Timeline/CalendarTimeline.js b/SAS/TMSS/frontend/tmss_webapp/src/components/Timeline/CalendarTimeline.js index d57f1f7ea36..f0210bc1b3a 100644 --- a/SAS/TMSS/frontend/tmss_webapp/src/components/Timeline/CalendarTimeline.js +++ b/SAS/TMSS/frontend/tmss_webapp/src/components/Timeline/CalendarTimeline.js @@ -21,6 +21,7 @@ import 'react-calendar-timeline/lib/Timeline.css'; import { Calendar } from 'primereact/calendar'; import { Checkbox } from 'primereact/checkbox'; import { ProgressSpinner } from 'primereact/progressspinner'; +import { CustomPageSpinner } from '../CustomPageSpinner'; import UIConstants from '../../utils/ui.constants'; // Label formats for day headers based on the interval label width @@ -1248,6 +1249,7 @@ export class CalendarTimeline extends Component { * @param {Object} props */ async updateTimeline(props) { + this.setState({ showSpinner: true }); let group = DEFAULT_GROUP.concat(props.group); if (!this.props.showSunTimings && this.state.viewType === UIConstants.timeline.types.NORMAL) { props.items = await this.addStationSunTimes(this.state.defaultStartTime, this.state.defaultEndTime, props.group, props.items); @@ -1256,12 +1258,13 @@ export class CalendarTimeline extends Component { } else if (this.state.viewType === UIConstants.timeline.types.WEEKVIEW) { props.items = await this.addWeekSunTimes(this.state.defaultStartTime, this.state.defaultEndTime, group, props.items); } - this.setState({group: group, items: _.orderBy(props.items, ['type'], ['desc'])}); + this.setState({group: group, showSpinner: false, items: _.orderBy(props.items, ['type'], ['desc'])}); } render() { return ( <React.Fragment> + <CustomPageSpinner visible={this.state.showSpinner} /> {/* Toolbar for the timeline */} <div className={`p-fluid p-grid timeline-toolbar ${this.props.className}`}> {/* Clock Display */} diff --git a/SAS/TMSS/frontend/tmss_webapp/src/layout/sass/_timeline.scss b/SAS/TMSS/frontend/tmss_webapp/src/layout/sass/_timeline.scss index fb4eaf2706b..82ec299e6de 100644 --- a/SAS/TMSS/frontend/tmss_webapp/src/layout/sass/_timeline.scss +++ b/SAS/TMSS/frontend/tmss_webapp/src/layout/sass/_timeline.scss @@ -352,4 +352,41 @@ .timeline-popover:after { display: none !important; -} \ No newline at end of file +} + +.p-multiselect-items-wrapper { + height: 120px !important; +} + +.p-multiselect-header .p-multiselect-close { + position: absolute; + right: -30px; + top: .375em; + display: block; + border: 0 none; +} + +body .p-multiselect-panel .p-multiselect-header .p-multiselect-filter-container .p-multiselect-filter-icon { + color: #007ad9; + top: 50%; + margin-top: -0.5em; + right: -1em; + left: auto; +} +body .p-multiselect-panel .p-multiselect-header .p-multiselect-filter-container .p-inputtext { + padding: 0.520em; + padding-right: 6em; +} +.alignTimeLineHeader { + display: flex; + justify-content: space-between; + +} +.sub-header { + display: inline-block; +} +.body .p-inputswitch { + width: 3em; + height: 1.75em; + // top: -3px; +} diff --git a/SAS/TMSS/frontend/tmss_webapp/src/routes/Scheduling/ViewSchedulingUnit.js b/SAS/TMSS/frontend/tmss_webapp/src/routes/Scheduling/ViewSchedulingUnit.js index 8638bb966b7..539ef6a9895 100644 --- a/SAS/TMSS/frontend/tmss_webapp/src/routes/Scheduling/ViewSchedulingUnit.js +++ b/SAS/TMSS/frontend/tmss_webapp/src/routes/Scheduling/ViewSchedulingUnit.js @@ -252,7 +252,7 @@ class ViewSchedulingUnit extends Component{ // Common keys for Task and Blueprint let commonkeys = ['id','created_at','description','name','tags','updated_at','url','do_cancel','relative_start_time','relative_stop_time','start_time','stop_time','duration','status']; for(const task of schedulingUnit.task_drafts){ - let scheduletask = []; + let scheduletask = {}; scheduletask['tasktype'] = 'Draft'; scheduletask['actionpath'] = '/task/view/draft/'+task['id']; scheduletask['blueprint_draft'] = _.map(task['task_blueprints'], 'url'); @@ -274,7 +274,7 @@ class ViewSchedulingUnit extends Component{ scheduletask.produced_by_ids = task.produced_by_ids; for(const blueprint of task['task_blueprints']){ - let taskblueprint = []; + let taskblueprint = {}; taskblueprint['tasktype'] = 'Blueprint'; taskblueprint['actionpath'] = '/task/view/blueprint/'+blueprint['id']; taskblueprint['blueprint_draft'] = blueprint['draft']; diff --git a/SAS/TMSS/frontend/tmss_webapp/src/routes/Timeline/view.js b/SAS/TMSS/frontend/tmss_webapp/src/routes/Timeline/view.js index 0b818804e62..4ea87d8fd8f 100644 --- a/SAS/TMSS/frontend/tmss_webapp/src/routes/Timeline/view.js +++ b/SAS/TMSS/frontend/tmss_webapp/src/routes/Timeline/view.js @@ -5,7 +5,8 @@ import _ from 'lodash'; import Websocket from 'react-websocket'; // import SplitPane, { Pane } from 'react-split-pane'; -import {InputSwitch} from 'primereact/inputswitch'; +import { InputSwitch } from 'primereact/inputswitch'; +import { CustomPageSpinner } from '../../components/CustomPageSpinner'; import AppLoader from '../../layout/components/AppLoader'; import PageHeader from '../../layout/components/PageHeader'; @@ -24,7 +25,8 @@ import { Dropdown } from 'primereact/dropdown'; import { OverlayPanel } from 'primereact/overlaypanel'; import { RadioButton } from 'primereact/radiobutton'; import { TieredMenu } from 'primereact/tieredmenu'; -import {MultiSelect} from 'primereact/multiselect'; +import { MultiSelect } from 'primereact/multiselect'; +//import { TRUE } from 'node-sass'; // Color constant for SU status @@ -100,6 +102,7 @@ export class TimelineView extends Component { } async componentDidMount() { + this.setState({ loader: true }); // Fetch all details from server and prepare data to pass to timeline and table components const promises = [ ProjectService.getProjectList(), ScheduleService.getSchedulingUnitsExtended('blueprint'), @@ -181,16 +184,17 @@ export class TimelineView extends Component { this.suConstraintTemplates = suConstraintTemplates; }); this.setState({suBlueprints: suBlueprints, suDrafts: suDrafts, group: group, suSets: suSets, + loader: false, projects: projects, suBlueprintList: suList, items: items, currentUTC: currentUTC, isLoading: false, currentStartTime: defaultStartTime, currentEndTime: defaultEndTime}); this.mainStationGroups = responses[7]; this.mainStationGroupOptions = Object.keys(responses[7]).map(value => ({ value })); - }); + }); } setSelectedStationGroup(value) { - this.setState({ selectedStationGroup: value }); + this.setState({ selectedStationGroup: value}); } /** @@ -402,7 +406,7 @@ export class TimelineView extends Component { groupSUStations(stationList) { let suStationGroups = {}; for (const group in this.mainStationGroups) { - suStationGroups[group] = _.intersection(this.mainStationGroups[group], stationList); + suStationGroups[group] = _.intersection(this.mainStationGroups[group],stationList); } return suStationGroups; } @@ -631,7 +635,7 @@ export class TimelineView extends Component { let timelineItem = (this.state.showSUs || this.state.stationView)?this.getTimelineItem(suBlueprint):null; if (this.state.stationView) { this.getStationItemGroups(suBlueprint, timelineItem, this.allStationsGroup, items); - } else { + } else { if (timelineItem) { items.push(timelineItem); if (!_.find(group, {'id': suBlueprint.suDraft.id})) { @@ -653,13 +657,14 @@ export class TimelineView extends Component { if (this.timeline) { this.timeline.updateTimeline({group: this.state.stationView ? this.getStationsByGroupName() : _.orderBy(_.uniqBy(group, 'id'),["parent", "start"], ['asc', 'asc']), items: items}); } + } - getStationsByGroupName() { + getStationsByGroupName() { let stations = []; this.state.selectedStationGroup.forEach((i) => { - stations = [...stations, ...this.mainStationGroups[i]]; - }); + stations = [...stations, ...this.mainStationGroups[i]]; + }); stations = stations.map(i => ({id: i, title: i})); return stations; } @@ -668,7 +673,6 @@ export class TimelineView extends Component { this.closeSUDets(); this.setState({stationView: e.value}); } - showOptionMenu(event) { this.optionsMenu.toggle(event); } @@ -838,6 +842,9 @@ export class TimelineView extends Component { render() { if (this.state.redirect) { return <Redirect to={ {pathname: this.state.redirect} }></Redirect> + } + if (this.state.loader) { + return <AppLoader /> } const isSUDetsVisible = this.state.isSUDetsVisible; const isTaskDetsVisible = this.state.isTaskDetsVisible; @@ -857,7 +864,6 @@ export class TimelineView extends Component { {icon: 'fa-calendar-alt',title:'Week View', props : { pathname: `/su/timelineview/week`}} ]} /> - { this.state.isLoading ? <AppLoader /> : <div className="p-grid"> {/* SU List Panel */} @@ -892,28 +898,31 @@ export class TimelineView extends Component { <i className="pi pi-step-forward"></i> </button> </div> + <div className={`timeline-view-toolbar ${this.state.stationView && 'alignTimeLineHeader'}`}> <div className="sub-header"> - <label>Station View</label> - <InputSwitch checked={this.state.stationView} onChange={(e) => {this.setStationView(e)}} /> - {this.state.stationView && + <label >Station View</label> + <InputSwitch checked={this.state.stationView} onChange={(e) => {this.setStationView(e)}} /> + { this.state.stationView && <> - <label style={{marginLeft: '15px'}}>Stations Group</label> + <label style={{marginLeft: '20px'}}>Stations Group</label> <MultiSelect data-testid="stations" id="stations" optionLabel="value" optionValue="value" filter={true} + style={{top:'2px'}} tooltip="Select Stations" value={this.state.selectedStationGroup} options={this.mainStationGroupOptions} placeholder="Select Stations" onChange={(e) => this.setSelectedStationGroup(e.value)} /> - </> + </> } </div> + {this.state.stationView && <div className="sub-header"> - <label style={{marginLeft: '15px'}}>Reservation</label> + <label style={{marginLeft: '20px'}}>Reservation</label> <Dropdown optionLabel="name" optionValue="name" - style={{fontSize: '10px', top: '-5px'}} + style={{top:'2px'}} value={this.state.reservationFilter} options={this.reservationReasons} filter showClear={true} filterBy="name" @@ -934,6 +943,7 @@ export class TimelineView extends Component { </> } </div> + <Timeline ref={(tl)=>{this.timeline=tl}} group={this.state.group} items={this.state.items} @@ -1011,7 +1021,8 @@ export class TimelineView extends Component { </OverlayPanel> {!this.state.isLoading && <Websocket url={process.env.REACT_APP_WEBSOCKET_URL} onOpen={this.onConnect} onMessage={this.handleData} onClose={this.onDisconnect} /> } - </React.Fragment> + </React.Fragment> + ); } diff --git a/SAS/TMSS/src/tmss/workflowapp/signals/ingesttaskstatuschanged.py b/SAS/TMSS/src/tmss/workflowapp/signals/ingesttaskstatuschanged.py deleted file mode 100644 index e2bfcb4e4c6..00000000000 --- a/SAS/TMSS/src/tmss/workflowapp/signals/ingesttaskstatuschanged.py +++ /dev/null @@ -1,3 +0,0 @@ -import django.dispatch - -ingest_task_blueprint_status_changed_signal = django.dispatch.Signal() \ No newline at end of file -- GitLab