diff --git a/LCS/PyCommon/postgres.py b/LCS/PyCommon/postgres.py index ba96bf7573f49bb4193cb58f4e60f685c8366a06..d7a9932a7e215cb9cb912792c5712fd5620104b5 100644 --- a/LCS/PyCommon/postgres.py +++ b/LCS/PyCommon/postgres.py @@ -331,7 +331,7 @@ class PostgresListener(PostgresDatabaseConnection): # put notification on Queue # let waiting thread handle the callback self.__queue.put((notification.channel, notification.payload)) - else: + elif self.isListening(): # call callback on this listener thread self._callCallback(notification.channel, notification.payload) except Exception as e: diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index abfab5bcc1bcbec4ccc74554293b8ec795f7bb00..ad2a0be67d5eab064a9cecd65ec5cefde9c87707 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -560,8 +560,7 @@ class PipelineControlTMSSHandler(TMSSEventMessageHandler): # !!! TODO Is TMSS supposed to inform Ganglia in future? wget -O - -q "http://ganglia.control.lofar/ganglia/api/events.php?action=add&start_time=now&summary=Pipeline {obsid} FINISHED&host_regex=" else - # notify TMSS that we finished and some error occured - runcmd {setStatus_finished} + # notify TMSS that some error occured runcmd {setStatus_error} fi diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index 552f4d93ae90c3d77e49b076c5e64ba9ff902fba..296c28b0f2d4d1a58bdbf725d70e1567178beb68 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -97,63 +97,42 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: # estimate the lower_bound_start_time earliest_possible_start_time = get_min_earliest_possible_start_time(schedulable_units, datetime.utcnow()+DEFAULT_NEXT_STARTTIME_GAP) - lower_bound_start_time = earliest_possible_start_time - - # estimate the upper_bound_stop_time, which may give us a small timewindow before any next scheduled unit, or a default window of a day - if any([su.interrupts_telescope for su in schedulable_units]): - # ignore what's scheduled if we have triggers - upper_bound_stop_time = lower_bound_start_time + timedelta(days=1) - else: - try: - upper_bound_stop_time = max(su.start_time for su in get_scheduled_scheduling_units(lower=lower_bound_start_time, upper=lower_bound_start_time + timedelta(days=1))) - except ValueError: - upper_bound_stop_time = lower_bound_start_time + timedelta(days=1) - # no need to irritate user in log files with subsecond scheduling precision - lower_bound_start_time = round_to_second_precision(lower_bound_start_time) - upper_bound_stop_time = max(round_to_second_precision(upper_bound_stop_time), lower_bound_start_time) + lower_bound_start_time = round_to_second_precision(earliest_possible_start_time) + upper_bound_stop_time = lower_bound_start_time + timedelta(days=1) # --- core routine --- - while lower_bound_start_time < upper_bound_stop_time: - try: - # try to find the best next scheduling_unit - logger.info("schedule_next_scheduling_unit: searching for best scheduling unit to schedule between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time) - best_scored_scheduling_unit = find_best_next_schedulable_unit(schedulable_units, lower_bound_start_time, upper_bound_stop_time) - if best_scored_scheduling_unit: - best_scheduling_unit = best_scored_scheduling_unit.scheduling_unit - best_scheduling_unit_score = best_scored_scheduling_unit.weighted_score - best_start_time = best_scored_scheduling_unit.start_time - - # make sure we don't start earlier than allowed - best_start_time = max(best_start_time, earliest_possible_start_time) + try: + # try to find the best next scheduling_unit + logger.info("schedule_next_scheduling_unit: searching for best scheduling unit to schedule between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time) + best_scored_scheduling_unit = find_best_next_schedulable_unit(schedulable_units, lower_bound_start_time, upper_bound_stop_time) + if best_scored_scheduling_unit: + best_scheduling_unit = best_scored_scheduling_unit.scheduling_unit + best_scheduling_unit_score = best_scored_scheduling_unit.weighted_score + best_start_time = best_scored_scheduling_unit.start_time - # make start_time "look nice" for us humans - best_start_time = round_to_second_precision(best_start_time) + # make sure we don't start earlier than allowed + best_start_time = max(best_start_time, earliest_possible_start_time) - logger.info("schedule_next_scheduling_unit: found best candidate id=%s '%s' weighted_score=%s start_time=%s interrupts_telescope=%s", - best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope) + # make start_time "look nice" for us humans + best_start_time = round_to_second_precision(best_start_time) - if best_scheduling_unit.interrupts_telescope: - cancel_running_observation_if_needed_and_possible(best_scored_scheduling_unit) + logger.info("schedule_next_scheduling_unit: found best candidate id=%s '%s' weighted_score=%s start_time=%s interrupts_telescope=%s", + best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope) - if unschededule_blocking_scheduled_units_if_needed_and_possible(best_scored_scheduling_unit): - # no (old) scheduled scheduling_units in the way, so schedule our candidate! - scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(best_scheduling_unit, start_time=best_start_time) + if best_scheduling_unit.interrupts_telescope: + cancel_running_observation_if_needed_and_possible(best_scored_scheduling_unit) - logger.info("schedule_next_scheduling_unit: scheduled best candidate id=%s '%s' score=%s start_time=%s interrupts_telescope=%s", - best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope) - return scheduled_scheduling_unit + if unschededule_blocking_scheduled_units_if_needed_and_possible(best_scored_scheduling_unit): + # no (old) scheduled scheduling_units in the way, so schedule our candidate! + scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(best_scheduling_unit, start_time=best_start_time) - except SubtaskSchedulingException as e: - logger.error("Could not schedule scheduling_unit id=%s name='%s'. Error: %s", best_scheduling_unit.id, best_scheduling_unit.name, e) + logger.info("schedule_next_scheduling_unit: scheduled best candidate id=%s '%s' score=%s start_time=%s interrupts_telescope=%s", + best_scheduling_unit.id, best_scheduling_unit.name, best_scheduling_unit_score, best_start_time, best_scheduling_unit.interrupts_telescope) + return scheduled_scheduling_unit - # nothing was found, or an error occurred. - # seach again... (loop) with the remaining schedulable_units and new lower_bound_start_time - schedulable_units = get_dynamically_schedulable_scheduling_units() - if len(schedulable_units) == 0: - logger.info("No scheduling units found...") - return - lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, lower_bound_start_time + timedelta(hours=0.5)) + except SubtaskSchedulingException as e: + logger.error("Could not schedule scheduling_unit id=%s name='%s'. Error: %s", best_scheduling_unit.id, best_scheduling_unit.name, e) def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time: datetime): @@ -321,8 +300,8 @@ def get_dynamically_schedulable_scheduling_units() -> [models.SchedulingUnitBlue def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) -> [models.SchedulingUnitBlueprint]: - '''get a list of all scheduled scheduling_units''' - scheduled_subtasks = models.Subtask.objects.filter(state__value='scheduled') + '''get a list of all scheduling_units for which at least one 'independent' (with no predecessor like an observation) subtask is scheduled''' + scheduled_subtasks = models.Subtask.independent_subtasks().filter(state__value='scheduled') if lower is not None: scheduled_subtasks = scheduled_subtasks.filter(stop_time__gte=lower) if upper is not None: diff --git a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py index 54edda5d4ff6b5dbc57d32e76dbde6bec45b113e..3d2c663ccbe2711db8d84766d09df29507f5b782 100755 --- a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py @@ -2195,6 +2195,7 @@ class TestTriggers(TestCase): #self.assertGreater(triggered_scheduling_unit_blueprint.start_time, regular_scheduling_unit_blueprint_high2.stop_time) # todo: TMSS-704: Make this pass. Currently starts after high1, but unexpectedly before high2 self.assertGreater(regular_scheduling_unit_blueprint_high2.start_time, regular_scheduling_unit_blueprint_high1.stop_time) + @unittest.skip("ToDo: Fix in TMSS-671") @mock.patch("lofar.sas.tmss.services.scheduling.dynamic_scheduling.cancel_subtask") def test_triggered_scheduling_unit_goes_to_unschedulable_if_it_cannot_cancel_and_does_not_fit(self, cancel_mock): diff --git a/SAS/TMSS/backend/services/tmss_postgres_listener/lib/tmss_postgres_listener.py b/SAS/TMSS/backend/services/tmss_postgres_listener/lib/tmss_postgres_listener.py index 6630b0633651d06a4ef81ab62477abefa6408aa6..5e197efa6af95fc9446a20722ad130442b55c9dc 100644 --- a/SAS/TMSS/backend/services/tmss_postgres_listener/lib/tmss_postgres_listener.py +++ b/SAS/TMSS/backend/services/tmss_postgres_listener/lib/tmss_postgres_listener.py @@ -44,12 +44,12 @@ class TMSSPGListener(PostgresListener): super().__init__(dbcreds=dbcreds) self.event_bus = ToBus(exchange=exchange, broker=broker) - # two cache to keep track of the latest task/scheduling_unit (aggregated) statuses, + # two cache's to keep track of the latest task/scheduling_unit (aggregated) statuses and start/stoptimes, # so we can lookup if the (aggregated) status of the task/scheduling_unit actually changes when a subtask's status changes. # This saves many (aggregated) status updates, where the (aggregated) status isn't changed. # contents of dict is a mapping of the task/su ID to a status,timestamp tuple - self._task_status_cache = {} - self._scheduling_unit_status_cache = {} + self._task_cache = {} + self._scheduling_unit_cache = {} def start(self): @@ -163,6 +163,9 @@ class TMSSPGListener(PostgresListener): def _sendNotification(self, subject, contentDict): try: + if not self.event_bus.is_connected: + return + if isinstance(contentDict, str): contentDict = json.loads(contentDict) @@ -179,50 +182,76 @@ class TMSSPGListener(PostgresListener): def onSubTaskUpdated(self, payload = None): self._sendNotification(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Updated', payload) + subtask_id = json.loads(payload)['id'] + self._send_task_and_or_scheduling_unit_updates_upon_change_of_aggregated_properties(subtask_id) + def onSubTaskDeleted(self, payload = None): self._sendNotification(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Deleted', payload) def onSubTaskStateUpdated(self, payload = None): payload_dict = json.loads(payload) # send notification for this subtask... - from lofar.sas.tmss.tmss.tmssapp.models import Subtask - subtask = Subtask.objects.get(id=payload_dict['id']) - self._sendNotification(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.'+subtask.state.value.capitalize(), - {'id': subtask.id, 'status': subtask.state.value}) - - # ... and also send status change and object update events for the parent task, and schedulingunit, - # because their status is implicitly derived from their subtask(s) - # send both object.updated and status change events - - # check if task status is new or changed... If so, send event. - for task_blueprint in subtask.task_blueprints.all(): - task_id = task_blueprint.id - task_status = task_blueprint.status - if task_id not in self._task_status_cache or self._task_status_cache[task_id][1] != task_status: - # update cache for this task - self._task_status_cache[task_id] = (datetime.utcnow(), task_status) - - # send event(s) - self.onTaskBlueprintUpdated( {'id': task_id}) - self._sendNotification(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.'+task_status.capitalize(), - {'id': task_id, 'status': task_status}) - - # check if scheduling_unit status is new or changed... If so, send event. - scheduling_unit_id = task_blueprint.scheduling_unit_blueprint.id - scheduling_unit_status = task_blueprint.scheduling_unit_blueprint.status - if scheduling_unit_id not in self._scheduling_unit_status_cache or self._scheduling_unit_status_cache[scheduling_unit_id][1] != scheduling_unit_status: - # update cache for this task - self._scheduling_unit_status_cache[scheduling_unit_id] = (datetime.utcnow(), scheduling_unit_status) - - # send event(s) - self.onSchedulingUnitBlueprintUpdated( {'id': scheduling_unit_id}) - self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.'+scheduling_unit_status.capitalize(), - {'id': scheduling_unit_id, 'status': scheduling_unit_status}) + subtask_id = payload_dict['id'] + subtask_state = payload_dict['state_id'] + self._sendNotification(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.'+subtask_state.capitalize(), + {'id': subtask_id, 'status': subtask_state}) + + self._send_task_and_or_scheduling_unit_updates_upon_change_of_aggregated_properties(subtask_id) + + def _send_task_and_or_scheduling_unit_updates_upon_change_of_aggregated_properties(self, subtask_id: int): + '''send status change and object update events for the parent task, and schedulingunit, + because their status is implicitly derived from their subtask(s) + send both object.updated and status change events''' + try: + from lofar.sas.tmss.tmss.tmssapp.models import Subtask + subtask = Subtask.objects.get(id=subtask_id) + + # check if task is new or changed... If so, send event. + for task_blueprint in subtask.task_blueprints.all(): + task_id = task_blueprint.id + task_cached_properties = {'status': task_blueprint.status, + 'start_time': task_blueprint.start_time, + 'stop_time': task_blueprint.stop_time } + + if task_id not in self._task_cache or self._task_cache[task_id][1] != task_cached_properties: + # send generic updated event + self.onTaskBlueprintUpdated( {'id': task_id}) + + # only send status updated event when the status changed + if task_cached_properties['status'] != self._task_cache.get(task_id, (None, {}))[1].get('status'): + self._sendNotification(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.'+task_cached_properties['status'].capitalize(), + {'id': task_id, 'status': task_cached_properties['status']}) + + # update cache for this task + self._task_cache[task_id] = (datetime.utcnow(), task_cached_properties) + + + # check if scheduling_unit status is new or changed... If so, send event. + scheduling_unit = task_blueprint.scheduling_unit_blueprint + scheduling_unit_id = scheduling_unit.id + scheduling_unit_cached_properties = {'status': scheduling_unit.status, + 'start_time': scheduling_unit.start_time, + 'stop_time': scheduling_unit.stop_time } + + if scheduling_unit_id not in self._scheduling_unit_cache or self._scheduling_unit_cache[scheduling_unit_id][1] != scheduling_unit_cached_properties: + # send generic updated event + self.onSchedulingUnitBlueprintUpdated( {'id': scheduling_unit_id}) + + # only send status updated event when the status changed + if scheduling_unit_cached_properties['status'] != self._scheduling_unit_cache.get(scheduling_unit_id, (None, {}))[1].get('status'): + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.'+scheduling_unit_cached_properties['status'].capitalize(), + {'id': scheduling_unit_id, 'status': scheduling_unit_cached_properties['status']}) + + # update cache for this task + self._scheduling_unit_cache[scheduling_unit_id] = (datetime.utcnow(), scheduling_unit_cached_properties) + + except Subtask.DoesNotExist: + pass try: # wipe old entries from cache. # This may result in some odd cases that an event is sent twice, even if the status did not change. That's a bit superfluous, but ok. - for cache in [self._task_status_cache, self._scheduling_unit_status_cache]: + for cache in [self._task_cache, self._scheduling_unit_cache]: for id in list(cache.keys()): if datetime.utcnow() - cache[id][0] > timedelta(days=1): del cache[id] @@ -264,15 +293,6 @@ class TMSSPGListener(PostgresListener): def onSchedulingUnitBlueprintUpdated(self, payload = None): self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', payload) - if isinstance(payload, str): - payload = json.loads(payload) - - from lofar.sas.tmss.tmss.tmssapp.models import SchedulingUnitBlueprint - scheduling_unit_blueprint = SchedulingUnitBlueprint.objects.get(id=payload['id']) - - if not scheduling_unit_blueprint.can_proceed: - self.onSchedulingUnitBlueprintCannotProceed( {'id': scheduling_unit_blueprint.id}) - def onSchedulingUnitBlueprintIngestPermissionGranted(self, payload=None): self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX + '.IngestPermissionGranted', payload) @@ -317,9 +337,6 @@ class TMSSPGListener(PostgresListener): payload['value'] = strtobool(payload['value']) self._sendNotification(TMSS_SETTING_OBJECT_EVENT_PREFIX+'.Updated', payload) - def onSchedulingUnitBlueprintCannotProceed(self, payload = None): - self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.CannotProceed', payload) - def create_service(dbcreds, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): '''create a TMSSPGListener instance''' diff --git a/SAS/TMSS/backend/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py b/SAS/TMSS/backend/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py index a9afe191cca24730551345b502dc43568e43a7df..87a81c9cac31c43638b041a016bb38580d530b74 100755 --- a/SAS/TMSS/backend/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py +++ b/SAS/TMSS/backend/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py @@ -21,7 +21,7 @@ import unittest import uuid import logging -logger = logging.getLogger(__name__) +logger = logging.getLogger('lofar.'+__name__) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment @@ -160,9 +160,6 @@ class TestSubtaskSchedulingService(unittest.TestCase): self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX + '.Updated', service.subjects.popleft()) self.assertEqual({'id': subtask['id']}, service.contentDicts.popleft()) - self.assertEqual(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.Defined', service.subjects.popleft()) - self.assertEqual({'id': subtask['id'], 'status': 'defined'}, service.contentDicts.popleft()) - self.assertEqual(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft()) self.assertEqual({'id': task_blueprint['id']}, service.contentDicts.popleft()) @@ -175,6 +172,9 @@ class TestSubtaskSchedulingService(unittest.TestCase): self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.Schedulable', service.subjects.popleft()) self.assertEqual({'id': su_blueprint['id'], 'status': 'schedulable'}, service.contentDicts.popleft()) + self.assertEqual(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.Defined', service.subjects.popleft()) + self.assertEqual({'id': subtask['id'], 'status': 'defined'}, service.contentDicts.popleft()) + # delete subtask, use direct http delete request on rest api requests.delete(subtask['url'], auth=self.test_data_creator.auth) diff --git a/SAS/TMSS/backend/services/workflow_service/lib/workflow_service.py b/SAS/TMSS/backend/services/workflow_service/lib/workflow_service.py index 656b4a8b18d5b55712ff83e846440b877f8b8d2d..81c63c76cce3f6adada563b8dc3ed50cafc39712 100644 --- a/SAS/TMSS/backend/services/workflow_service/lib/workflow_service.py +++ b/SAS/TMSS/backend/services/workflow_service/lib/workflow_service.py @@ -38,21 +38,16 @@ class SchedulingUnitEventMessageHandler(TMSSEventMessageHandler): logger.info("SchedulingUnitBlueprint id=%s status changed to '%s', signalling workflow...", id, status) scheduling_unit_blueprint = SchedulingUnitBlueprint.objects.get(pk=id) scheduling_unit_blueprint_status_changed_signal.send(sender=self.__class__, instance=scheduling_unit_blueprint, status=status) - except Exception as e: - logger.error(e) - def onSchedulingUnitBlueprintCannotProceed(self, id: int): - try: - # import here and not at top of module because we need the django.setup() to be run first, either from this module's main, or from the TMSSTestEnvironment - from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_cannot_proceed_signal - from lofar.sas.tmss.tmss.tmssapp.models import SchedulingUnitBlueprint + if status == SchedulingUnitBlueprint.Status.OBSERVED.value: + logger.info("SchedulingUnitBlueprint id=%s if fully observed, signalling workflow...", id) - logger.info("SchedulingUnitBlueprint id=%s cannot proceeed, signalling workflow...", id) - scheduling_unit_blueprint = SchedulingUnitBlueprint.objects.get(pk=id) - scheduling_unit_blueprint_cannot_proceed_signal.send(sender=self.__class__, instance=scheduling_unit_blueprint) + from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_observed_signal + scheduling_unit_blueprint_observed_signal.send(sender=self.__class__, instance=scheduling_unit_blueprint) except Exception as e: logger.error(e) + def onTaskBlueprintStatusChanged(self, id: int, status:str): try: diff --git a/SAS/TMSS/backend/src/CMakeLists.txt b/SAS/TMSS/backend/src/CMakeLists.txt index 56fb0a64f7243b22828d374cdbd463ffe63924cd..f80c85dc26bfdd8d5a769f107b6b4e0830c00016 100644 --- a/SAS/TMSS/backend/src/CMakeLists.txt +++ b/SAS/TMSS/backend/src/CMakeLists.txt @@ -17,6 +17,7 @@ find_python_module(django_json_widget REQUIRED) # pip3 install django-json-widge find_python_module(jsoneditor REQUIRED) # pip3 install django-jsoneditor find_python_module(jsonschema REQUIRED) # pip3 install jsonschema find_python_module(astropy REQUIRED) # pip3 install astropy +find_python_module(cachetools REQUIRED) # pip3 install cachetools # drf-flex-fields is needed by TMSS, but we can't check for it with find_python_module, # because the package rest_flex_fields is looking for the DJANGO_SETTINGS_MODULE, which diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py index 76d0dc7f0e8530426a173229f714c0a77ff3357a..74dd30b92b0fee17ba8e8d228c5a9ed37563d5d4 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py @@ -641,7 +641,6 @@ class Migration(migrations.Migration): ('ingest_permission_required', models.BooleanField(default=False, help_text='Explicit permission is needed before the task.')), ('ingest_permission_granted_since', models.DateTimeField(help_text='The moment when ingest permission was granted.', null=True)), ('output_pinned', models.BooleanField(default=False, help_text='boolean (default FALSE), which blocks deleting unpinned dataproducts. When toggled ON, backend must pick SUB up for deletion. It also must when dataproducts are unpinned.')), - ('results_accepted', models.BooleanField(default=False, help_text='boolean (default NULL), which records whether the results were accepted, allowing the higher-level accounting to be adjusted.')), ('piggyback_allowed_tbb', models.BooleanField(help_text='Piggyback key for TBB.', null=True)), ('piggyback_allowed_aartfaac', models.BooleanField(help_text='Piggyback key for AARTFAAC.', null=True)), ('priority_rank', models.FloatField(default=0.0, help_text='Priority of this scheduling unit w.r.t. other scheduling units within the same queue and project.')), diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py index a1d3aac80ab38076736ee66f443c4b3b053d2af7..beb610c20436106a31f51e768aed613a984e0f02 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py @@ -506,7 +506,6 @@ class SchedulingUnitBlueprint(ProjectPropertyMixin, TemplateSchemaMixin, NamedCo requirements_template = ForeignKey('SchedulingUnitTemplate', on_delete=CASCADE, help_text='Schema used for requirements_doc (IMMUTABLE).') draft = ForeignKey('SchedulingUnitDraft', related_name='scheduling_unit_blueprints', on_delete=PROTECT, help_text='Scheduling Unit Draft which this run instantiates.') output_pinned = BooleanField(default=False, help_text='boolean (default FALSE), which blocks deleting unpinned dataproducts. When toggled ON, backend must pick SUB up for deletion. It also must when dataproducts are unpinned.') - results_accepted = BooleanField(default=False, help_text='boolean (default NULL), which records whether the results were accepted, allowing the higher-level accounting to be adjusted.') piggyback_allowed_tbb = BooleanField(help_text='Piggyback key for TBB.', null=True) piggyback_allowed_aartfaac = BooleanField(help_text='Piggyback key for AARTFAAC.', null=True) priority_rank = FloatField(null=False, default=0.0, help_text='Priority of this scheduling unit w.r.t. other scheduling units within the same queue and project.') @@ -707,11 +706,6 @@ class SchedulingUnitBlueprint(ProjectPropertyMixin, TemplateSchemaMixin, NamedCo return SchedulingUnitBlueprint.Status.SCHEDULED.value return SchedulingUnitBlueprint.Status.SCHEDULABLE.value - @property - def can_proceed(self) -> bool: - '''Can this scheduling unit proceed with running its tasks?''' - return self.status not in [SchedulingUnitBlueprint.Status.ERROR.value, SchedulingUnitBlueprint.Status.FINISHED.value, SchedulingUnitBlueprint.Status.CANCELLED.value] - def _task_graph_instantiated(self): from .scheduling import Subtask # import here to prevent cirular imports return self._get_total_nbr_tasks() > 0 and \ diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py index 437d82c871b96c9492828f6505b13eba8d4f70ad..bd1db726f7cd3d0b4572011d6acda47937335583 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/flows/schedulingunitflow.py @@ -16,7 +16,7 @@ from lofar.sas.tmss.tmss.tmssapp.models import Subtask, SchedulingUnitBlueprint from lofar.common.datetimeutils import round_to_second_precision from django.dispatch import receiver -from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_status_changed_signal, scheduling_unit_blueprint_cannot_proceed_signal, ingest_task_blueprint_status_changed_signal, obs_task_status_changed_signal +from lofar.sas.tmss.tmss.workflowapp.signals import scheduling_unit_blueprint_status_changed_signal, scheduling_unit_blueprint_observed_signal, ingest_task_blueprint_status_changed_signal, obs_task_status_changed_signal from viewflow import frontend, ThisObject from viewflow.activation import STATUS @@ -103,12 +103,12 @@ class SchedulingUnitFlow(Flow): obs_task_status_changed_signal, task_loader=this.get_scheduling_unit_task ) - .Next(this.wait_processed) + .Next(this.wait_observed) ) - wait_processed = ( + wait_observed = ( flow.Signal( - scheduling_unit_blueprint_cannot_proceed_signal, + scheduling_unit_blueprint_observed_signal, this.check_condition_processed, task_loader=this.get_scheduling_unit_task ) @@ -243,7 +243,7 @@ class SchedulingUnitFlow(Flow): def do_mark_sub(self, activation): activation.process.su.output_pinned = True - activation.process.su.results_accepted = ((activation.process.qa_reporting_to is not None and activation.process.qa_reporting_to.operator_accept) + activation.process.results_accepted = ((activation.process.qa_reporting_to is not None and activation.process.qa_reporting_to.operator_accept) and (activation.process.qa_reporting_sos is not None and activation.process.qa_reporting_sos.sos_accept_show_pi) and (activation.process.decide_acceptance is not None and activation.process.decide_acceptance.sos_accept_after_pi)) diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/migrations/0001_initial.py b/SAS/TMSS/backend/src/tmss/workflowapp/migrations/0001_initial.py index 5a34111ac5ff48565f38e730030b2f1013ee648f..f098ce22a84eed63822235d1627f98f8208c1835 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/migrations/0001_initial.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/migrations/0001_initial.py @@ -62,6 +62,7 @@ class Migration(migrations.Migration): ('qa_reporting_sos', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='workflowapp.QAReportingSOS')), ('qa_reporting_to', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='workflowapp.QAReportingTO')), ('su', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='tmssapp.SchedulingUnitBlueprint')), + ('results_accepted', models.BooleanField(default=None, null=True)) ], options={ 'abstract': False, diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/models/schedulingunitflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/models/schedulingunitflow.py index 3a3c4c6d63779bf1dd9c19cadea5fb62521b48e7..555be3c66f9f0193613be0459635efa1610cd018 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/models/schedulingunitflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/models/schedulingunitflow.py @@ -32,8 +32,8 @@ class UnpinData(Model): class SchedulingUnitProcess(Process): su = ForeignKey(SchedulingUnitBlueprint, blank=True, null=True, on_delete=CASCADE) - qa_reporting_to=ForeignKey(QAReportingTO, blank=True, null=True, on_delete=CASCADE) - qa_reporting_sos=ForeignKey(QAReportingSOS, blank=True, null=True, on_delete=CASCADE) - pi_verification=ForeignKey(PIVerification, blank=True, null=True, on_delete=CASCADE) - decide_acceptance=ForeignKey(DecideAcceptance, blank=True, null=True, on_delete=CASCADE) - + qa_reporting_to = ForeignKey(QAReportingTO, blank=True, null=True, on_delete=CASCADE) + qa_reporting_sos = ForeignKey(QAReportingSOS, blank=True, null=True, on_delete=CASCADE) + pi_verification = ForeignKey(PIVerification, blank=True, null=True, on_delete=CASCADE) + decide_acceptance = ForeignKey(DecideAcceptance, blank=True, null=True, on_delete=CASCADE) + results_accepted = BooleanField(default=None, null=True) diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/signals/CMakeLists.txt b/SAS/TMSS/backend/src/tmss/workflowapp/signals/CMakeLists.txt index 91e675015dc8a3d637aeab54b563512c4f0488c0..95f198ef18248d381803b3c7b3305f6598fdcd49 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/signals/CMakeLists.txt +++ b/SAS/TMSS/backend/src/tmss/workflowapp/signals/CMakeLists.txt @@ -3,10 +3,6 @@ include(PythonInstall) set(_py_files __init__.py - subcannotproceed.py - substatuschanged.py - ingesttaskstatuschanged.py - obstaskstatuschanged.py ) python_install(${_py_files} diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/signals/__init__.py b/SAS/TMSS/backend/src/tmss/workflowapp/signals/__init__.py index d40f956d2b2cc7396f32031af66fc247c2ee897e..a28d16511b4229851fe933c1752688c99ae14413 100644 --- a/SAS/TMSS/backend/src/tmss/workflowapp/signals/__init__.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/signals/__init__.py @@ -1,4 +1,6 @@ -from .subcannotproceed import * -from .substatuschanged import * -from .ingesttaskstatuschanged import * -from .obstaskstatuschanged import * \ No newline at end of file +import django.dispatch + +ingest_task_blueprint_status_changed_signal = django.dispatch.Signal() +obs_task_status_changed_signal = django.dispatch.Signal() +scheduling_unit_blueprint_observed_signal = django.dispatch.Signal() +scheduling_unit_blueprint_status_changed_signal = django.dispatch.Signal() \ No newline at end of file diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/signals/ingesttaskstatuschanged.py b/SAS/TMSS/backend/src/tmss/workflowapp/signals/ingesttaskstatuschanged.py deleted file mode 100644 index e2bfcb4e4c616ec8cec38c1a202e270b18735734..0000000000000000000000000000000000000000 --- a/SAS/TMSS/backend/src/tmss/workflowapp/signals/ingesttaskstatuschanged.py +++ /dev/null @@ -1,3 +0,0 @@ -import django.dispatch - -ingest_task_blueprint_status_changed_signal = django.dispatch.Signal() \ No newline at end of file diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/signals/obstaskstatuschanged.py b/SAS/TMSS/backend/src/tmss/workflowapp/signals/obstaskstatuschanged.py deleted file mode 100644 index b812434d926656a49c6380c801b142a4dfc7102a..0000000000000000000000000000000000000000 --- a/SAS/TMSS/backend/src/tmss/workflowapp/signals/obstaskstatuschanged.py +++ /dev/null @@ -1,3 +0,0 @@ -import django.dispatch - -obs_task_status_changed_signal = django.dispatch.Signal() \ No newline at end of file diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/signals/subcannotproceed.py b/SAS/TMSS/backend/src/tmss/workflowapp/signals/subcannotproceed.py deleted file mode 100644 index 60c50ad80009d186b204ffb653655a2f73052603..0000000000000000000000000000000000000000 --- a/SAS/TMSS/backend/src/tmss/workflowapp/signals/subcannotproceed.py +++ /dev/null @@ -1,3 +0,0 @@ -import django.dispatch - -scheduling_unit_blueprint_status_changed_signal = django.dispatch.Signal() \ No newline at end of file diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/signals/substatuschanged.py b/SAS/TMSS/backend/src/tmss/workflowapp/signals/substatuschanged.py deleted file mode 100644 index 04f2913b8e1136c68b303b545bab1f189f34688d..0000000000000000000000000000000000000000 --- a/SAS/TMSS/backend/src/tmss/workflowapp/signals/substatuschanged.py +++ /dev/null @@ -1,3 +0,0 @@ -import django.dispatch - -scheduling_unit_blueprint_cannot_proceed_signal = django.dispatch.Signal() \ No newline at end of file diff --git a/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py b/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py index 3568f80d5e207713073a08d9e446c7c3e4e8a790..332c5b04d13e5d6b6f03482d5bd0b362fca49b46 100755 --- a/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py +++ b/SAS/TMSS/backend/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py @@ -4,7 +4,7 @@ import requests import json import logging -logger = logging.getLogger(__name__) +logger = logging.getLogger('lofar.'+__name__) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) from lofar.common.test_utils import skip_integration_tests @@ -23,7 +23,28 @@ from lofar.sas.tmss.client.tmssbuslistener import * class SchedulingUnitFlowTest(unittest.TestCase): + sync_event_bp_scheduled = Event() + sync_event_bp_observed = Event() + class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): + def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): + from lofar.sas.tmss.tmss.tmssapp.models import SchedulingUnitBlueprint + try: + super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) + except Exception as e: + logger.exception(e) + + if status == SchedulingUnitBlueprint.Status.SCHEDULED.value: + logging.info("Status is %s, sending sync event", status) + SchedulingUnitFlowTest.sync_event_bp_scheduled.set() + elif status == SchedulingUnitBlueprint.Status.OBSERVED.value: + logging.info("Scheduling Unit Blueprint with id %s was fully observed, sending sync event", id) + SchedulingUnitFlowTest.sync_event_bp_observed.set() + + def setUp(self) -> None: + logger.info('---------------------------------------------------------------------------------') + self.sync_event_bp_scheduled.clear() + self.sync_event_bp_observed.clear() @classmethod def setUpClass(cls) -> None: @@ -51,23 +72,6 @@ class SchedulingUnitFlowTest(unittest.TestCase): cls.BASE_URL_WF_API = cls.tmss_test_env.django_server.url.rstrip('/').replace('/api','/workflow_api') cls.AUTH = requests.auth.HTTPBasicAuth(cls.tmss_test_env.ldap_server.dbcreds.user, cls.tmss_test_env.ldap_server.dbcreds.password) - import time - - cls.sync_event_bp_scheduled = Event() - cls.sync_event_bp_cannot_proceed = Event() - - class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): - def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): - super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) - if status == "scheduled": - cls.sync_event_bp_scheduled.set() - - def onSchedulingUnitBlueprintCannotProceed(self, id: int): - super().onSchedulingUnitBlueprintCannotProceed(id=id) - cls.sync_event_bp_cannot_proceed.set() - - - @classmethod def tearDownClass(cls) -> None: cls.tmss_test_env.stop() @@ -78,7 +82,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): from lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow import SchedulingUnitFlow from lofar.sas.tmss.tmss.tmssapp import models - from lofar.sas.tmss.tmss.tmssapp.models import TaskType + from lofar.sas.tmss.tmss.tmssapp.models import TaskType, SchedulingUnitBlueprint from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data @@ -86,25 +90,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess from viewflow.models import Task - sync_event_bp_scheduled = Event() - sync_event_bp_cannot_proceed = Event() - - - class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): - def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): - super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) - - - if status == "scheduled": - logging.info("Status is %s, sending sync event",status) - sync_event_bp_scheduled.set() - - def onSchedulingUnitBlueprintCannotProceed(self, id: int): - super().onSchedulingUnitBlueprintCannotProceed(id=id) - logging.info("Scheduling Unit Blueprint with id %s cannot proceed, sending sync event",id) - sync_event_bp_cannot_proceed.set() - - service = create_workflow_service(handler_type=TestSchedulingUnitEventMessageHandler, + service = create_workflow_service(handler_type=SchedulingUnitFlowTest.TestSchedulingUnitEventMessageHandler, exchange=self.tmp_exchange.address) with BusListenerJanitor(service): strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") @@ -130,8 +116,8 @@ class SchedulingUnitFlowTest(unittest.TestCase): while True: if SchedulingUnitProcess.objects.filter(su=scheduling_unit_blueprint).count() > 0: break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("SchedulingUnitProcess not created within expected time") # Yes! the SchedulingUnitProcess was created, let's get it. @@ -149,70 +135,75 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'NEW') - #Change subtask status to scheduled + # Change observation subtask status to scheduled from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: for subtask in task_blueprint.subtasks.all(): set_subtask_state_following_allowed_transitions(subtask, 'scheduled') # wait until scheduling unit is scheduled - if not sync_event_bp_scheduled.wait(timeout=10): + logging.info("waiting for sync_event_bp_scheduled...") + if not self.sync_event_bp_scheduled.wait(timeout=10): logging.info("sync_event_bp_scheduled event not received, raising TimeoutError") raise TimeoutError() else: - logging.info("Received sync_event_bp_scheduled event") + logging.info("Received self.sync_event_bp_scheduled event") poll_starttime = datetime.utcnow() while True: - if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_processed": + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_observed": break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("Task not activated within expected time") - sync_event_bp_scheduled.clear() + self.sync_event_bp_scheduled.clear() #check the active task name - self.assertEqual("wait_processed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + self.assertEqual("wait_observed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'DONE') - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_observed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') - #Change subtask status to finished - from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions + # finish the observations for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - task_blueprint.output_pinned=True - task_blueprint.save() - - for subtask in task_blueprint.subtasks.all(): - set_subtask_state_following_allowed_transitions(subtask, 'finished') + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: + for subtask in task_blueprint.subtasks.all(): + set_subtask_state_following_allowed_transitions(subtask, 'finished') - if not sync_event_bp_cannot_proceed.wait(timeout=10): - logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") + logging.info("waiting for sync_event_bp_observed...") + if not self.sync_event_bp_observed.wait(timeout=10): + logging.info("sync_event_bp_observed event not received, raising TimeoutError") raise TimeoutError() else: - logging.info("Received sync_event_bp_cannot_proceed event") + logging.info("Received self.sync_event_bp_observed event") poll_starttime = datetime.utcnow() while True: if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "qa_reporting_to": break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("Task not activated within expected time") - sync_event_bp_cannot_proceed.clear() + self.sync_event_bp_observed.clear() #check the active task name self.assertEqual("qa_reporting_to", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_observed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'DONE') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'NEW') + # schedule, run, finish the pipelines and ingest et al + for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): + if task_blueprint.specifications_template.type.value != TaskType.Choices.OBSERVATION.value: + for subtask in task_blueprint.subtasks.all(): + set_subtask_state_following_allowed_transitions(subtask, 'finished') + #API: Get current task data = "" response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) @@ -394,25 +385,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess from viewflow.models import Task - sync_event_bp_scheduled = Event() - sync_event_bp_cannot_proceed = Event() - - - class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): - def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): - super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) - - - if status == "scheduled": - logging.info("Status is %s, sending sync event",status) - sync_event_bp_scheduled.set() - - def onSchedulingUnitBlueprintCannotProceed(self, id: int): - super().onSchedulingUnitBlueprintCannotProceed(id=id) - logging.info("Scheduling Unit Blueprint with id %s cannot proceed, sending sync event",id) - sync_event_bp_cannot_proceed.set() - - service = create_workflow_service(handler_type=TestSchedulingUnitEventMessageHandler, + service = create_workflow_service(handler_type=SchedulingUnitFlowTest.TestSchedulingUnitEventMessageHandler, exchange=self.tmp_exchange.address) with BusListenerJanitor(service): @@ -439,8 +412,8 @@ class SchedulingUnitFlowTest(unittest.TestCase): while True: if SchedulingUnitProcess.objects.filter(su=scheduling_unit_blueprint).count() > 0: break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("SchedulingUnitProcess not created within expected time") # Yes! the SchedulingUnitProcess was created, let's get it. @@ -458,67 +431,70 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'NEW') - #Change subtask status to scheduled + # Change observation subtask status to scheduled from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: for subtask in task_blueprint.subtasks.all(): set_subtask_state_following_allowed_transitions(subtask, 'scheduled') # wait until scheduling unit is scheduled - if not sync_event_bp_scheduled.wait(timeout=10): + logging.info("waiting for sync_event_bp_scheduled...") + if not self.sync_event_bp_scheduled.wait(timeout=10): logging.info("sync_event_bp_scheduled event not received, raising TimeoutError") raise TimeoutError() else: - logging.info("Received sync_event_bp_scheduled event") + logging.info("Received self.sync_event_bp_scheduled event") poll_starttime = datetime.utcnow() while True: - if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_processed": + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_observed": break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("Task not activated within expected time") - sync_event_bp_scheduled.clear() + self.sync_event_bp_scheduled.clear() #check the active task name - self.assertEqual("wait_processed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + self.assertEqual("wait_observed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'DONE') - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_observed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') - #Change subtask status to finished + # Change observation subtask status to scheduled from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - task_blueprint.output_pinned=True - task_blueprint.save() + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: + task_blueprint.output_pinned = True + task_blueprint.save() - for subtask in task_blueprint.subtasks.all(): - set_subtask_state_following_allowed_transitions(subtask, 'finished') + for subtask in task_blueprint.subtasks.all(): + set_subtask_state_following_allowed_transitions(subtask, 'finished') - if not sync_event_bp_cannot_proceed.wait(timeout=10): - logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") + logging.info("waiting for sync_event_bp_observed...") + if not self.sync_event_bp_observed.wait(timeout=10): + logging.info("sync_event_bp_observed event not received, raising TimeoutError") raise TimeoutError() else: - logging.info("Received sync_event_bp_cannot_proceed event") + logging.info("Received self.sync_event_bp_observed event") poll_starttime = datetime.utcnow() while True: if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "qa_reporting_to": break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("Task not activated within expected time") - sync_event_bp_cannot_proceed.clear() + self.sync_event_bp_observed.clear() #check the active task name self.assertEqual("qa_reporting_to", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_observed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'DONE') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'NEW') @@ -606,25 +582,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess from viewflow.models import Task - sync_event_bp_scheduled = Event() - sync_event_bp_cannot_proceed = Event() - - - class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): - def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): - super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) - - - if status == "scheduled": - logging.info("Status is %s, sending sync event",status) - sync_event_bp_scheduled.set() - - def onSchedulingUnitBlueprintCannotProceed(self, id: int): - super().onSchedulingUnitBlueprintCannotProceed(id=id) - logging.info("Scheduling Unit Blueprint with id %s cannot proceed, sending sync event",id) - sync_event_bp_cannot_proceed.set() - - service = create_workflow_service(handler_type=TestSchedulingUnitEventMessageHandler, + service = create_workflow_service(handler_type=SchedulingUnitFlowTest.TestSchedulingUnitEventMessageHandler, exchange=self.tmp_exchange.address) with BusListenerJanitor(service): strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") @@ -650,8 +608,8 @@ class SchedulingUnitFlowTest(unittest.TestCase): while True: if SchedulingUnitProcess.objects.filter(su=scheduling_unit_blueprint).count() > 0: break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("SchedulingUnitProcess not created within expected time") # Yes! the SchedulingUnitProcess was created, let's get it. @@ -669,66 +627,69 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'NEW') - #Change subtask status to scheduled + # Change observation subtask status to scheduled from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: for subtask in task_blueprint.subtasks.all(): set_subtask_state_following_allowed_transitions(subtask, 'scheduled') # wait until scheduling unit is scheduled - if not sync_event_bp_scheduled.wait(timeout=10): + logging.info("waiting for sync_event_bp_scheduled...") + if not self.sync_event_bp_scheduled.wait(timeout=10): logging.info("sync_event_bp_scheduled event not received, raising TimeoutError") raise TimeoutError() else: - logging.info("Received sync_event_bp_scheduled event") + logging.info("Received self.sync_event_bp_scheduled event") poll_starttime = datetime.utcnow() while True: - if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_processed": + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_observed": break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("Task not activated within expected time") - sync_event_bp_scheduled.clear() + self.sync_event_bp_scheduled.clear() #check the active task name - self.assertEqual("wait_processed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + self.assertEqual("wait_observed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'DONE') - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_observed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') - #Change subtask status to finished + # Change observation subtask status to scheduled from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - task_blueprint.output_pinned=True - task_blueprint.save() + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: + task_blueprint.output_pinned = True + task_blueprint.save() - for subtask in task_blueprint.subtasks.all(): - set_subtask_state_following_allowed_transitions(subtask, 'finished') + for subtask in task_blueprint.subtasks.all(): + set_subtask_state_following_allowed_transitions(subtask, 'finished') - if not sync_event_bp_cannot_proceed.wait(timeout=10): - logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") + logging.info("waiting for sync_event_bp_observed...") + if not self.sync_event_bp_observed.wait(timeout=10): + logging.info("sync_event_bp_observed event not received, raising TimeoutError") raise TimeoutError() else: - logging.info("Received sync_event_bp_cannot_proceed event") + logging.info("Received self.sync_event_bp_observed event") poll_starttime = datetime.utcnow() while True: if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "qa_reporting_to": break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("Task not activated within expected time") - sync_event_bp_cannot_proceed.clear() + self.sync_event_bp_observed.clear() #check the active task name self.assertEqual("qa_reporting_to", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_observed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'DONE') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'NEW') @@ -844,25 +805,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess from viewflow.models import Task - sync_event_bp_scheduled = Event() - sync_event_bp_cannot_proceed = Event() - - - class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): - def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): - super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) - - - if status == "scheduled": - logging.info("Status is %s, sending sync event",status) - sync_event_bp_scheduled.set() - - def onSchedulingUnitBlueprintCannotProceed(self, id: int): - super().onSchedulingUnitBlueprintCannotProceed(id=id) - logging.info("Scheduling Unit Blueprint with id %s cannot proceed, sending sync event",id) - sync_event_bp_cannot_proceed.set() - - service = create_workflow_service(handler_type=TestSchedulingUnitEventMessageHandler, + service = create_workflow_service(handler_type=SchedulingUnitFlowTest.TestSchedulingUnitEventMessageHandler, exchange=self.tmp_exchange.address) with BusListenerJanitor(service): strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") @@ -888,8 +831,8 @@ class SchedulingUnitFlowTest(unittest.TestCase): while True: if SchedulingUnitProcess.objects.filter(su=scheduling_unit_blueprint).count() > 0: break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("SchedulingUnitProcess not created within expected time") # Yes! the SchedulingUnitProcess was created, let's get it. @@ -907,66 +850,69 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'NEW') - #Change subtask status to scheduled + # Change observation subtask status to scheduled from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: for subtask in task_blueprint.subtasks.all(): set_subtask_state_following_allowed_transitions(subtask, 'scheduled') # wait until scheduling unit is scheduled - if not sync_event_bp_scheduled.wait(timeout=10): + logging.info("waiting for sync_event_bp_scheduled...") + if not self.sync_event_bp_scheduled.wait(timeout=10): logging.info("sync_event_bp_scheduled event not received, raising TimeoutError") raise TimeoutError() else: - logging.info("Received sync_event_bp_scheduled event") + logging.info("Received self.sync_event_bp_scheduled event") poll_starttime = datetime.utcnow() while True: - if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_processed": + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_observed": break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("Task not activated within expected time") - sync_event_bp_scheduled.clear() + self.sync_event_bp_scheduled.clear() #check the active task name - self.assertEqual("wait_processed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + self.assertEqual("wait_observed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'DONE') - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_observed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') - #Change subtask status to finished + # Change observation subtask status to scheduled from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - task_blueprint.output_pinned=True - task_blueprint.save() + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: + task_blueprint.output_pinned = True + task_blueprint.save() - for subtask in task_blueprint.subtasks.all(): - set_subtask_state_following_allowed_transitions(subtask, 'finished') + for subtask in task_blueprint.subtasks.all(): + set_subtask_state_following_allowed_transitions(subtask, 'finished') - if not sync_event_bp_cannot_proceed.wait(timeout=10): - logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") + logging.info("waiting for sync_event_bp_observed...") + if not self.sync_event_bp_observed.wait(timeout=10): + logging.info("sync_event_bp_observed event not received, raising TimeoutError") raise TimeoutError() else: - logging.info("Received sync_event_bp_cannot_proceed event") + logging.info("Received self.sync_event_bp_observed event") poll_starttime = datetime.utcnow() while True: if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "qa_reporting_to": break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("Task not activated within expected time") - sync_event_bp_cannot_proceed.clear() + self.sync_event_bp_observed.clear() #check the active task name self.assertEqual("qa_reporting_to", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_observed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'DONE') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'NEW') @@ -1141,25 +1087,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): from lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow import SchedulingUnitProcess from viewflow.models import Task - sync_event_bp_scheduled = Event() - sync_event_bp_cannot_proceed = Event() - - - class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): - def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): - super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) - - - if status == "scheduled": - logging.info("Status is %s, sending sync event",status) - sync_event_bp_scheduled.set() - - def onSchedulingUnitBlueprintCannotProceed(self, id: int): - super().onSchedulingUnitBlueprintCannotProceed(id=id) - logging.info("Scheduling Unit Blueprint with id %s cannot proceed, sending sync event",id) - sync_event_bp_cannot_proceed.set() - - service = create_workflow_service(handler_type=TestSchedulingUnitEventMessageHandler, + service = create_workflow_service(handler_type=SchedulingUnitFlowTest.TestSchedulingUnitEventMessageHandler, exchange=self.tmp_exchange.address) with BusListenerJanitor(service): strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") @@ -1185,8 +1113,8 @@ class SchedulingUnitFlowTest(unittest.TestCase): while True: if SchedulingUnitProcess.objects.filter(su=scheduling_unit_blueprint).count() > 0: break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("SchedulingUnitProcess not created within expected time") # Yes! the SchedulingUnitProcess was created, let's get it. @@ -1204,70 +1132,79 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'NEW') - #Change subtask status to scheduled + # Change observation subtask status to scheduled from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: for subtask in task_blueprint.subtasks.all(): set_subtask_state_following_allowed_transitions(subtask, 'scheduled') # wait until scheduling unit is scheduled - if not sync_event_bp_scheduled.wait(timeout=10): + logging.info("waiting for sync_event_bp_scheduled...") + if not self.sync_event_bp_scheduled.wait(timeout=10): logging.info("sync_event_bp_scheduled event not received, raising TimeoutError") raise TimeoutError() else: - logging.info("Received sync_event_bp_scheduled event") + logging.info("Received self.sync_event_bp_scheduled event") poll_starttime = datetime.utcnow() while True: - if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_processed": + if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "wait_observed": break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("Task not activated within expected time") - sync_event_bp_scheduled.clear() + self.sync_event_bp_scheduled.clear() #check the active task name - self.assertEqual("wait_processed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) + self.assertEqual("wait_observed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'DONE') - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_observed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') - #Change subtask status to finished + # Change observation subtask status to scheduled from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - task_blueprint.output_pinned=False - task_blueprint.save() + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: + task_blueprint.output_pinned = False + task_blueprint.save() - for subtask in task_blueprint.subtasks.all(): - set_subtask_state_following_allowed_transitions(subtask, 'finished') + for subtask in task_blueprint.subtasks.all(): + set_subtask_state_following_allowed_transitions(subtask, 'finished') - if not sync_event_bp_cannot_proceed.wait(timeout=10): - logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") + logging.info("waiting for sync_event_bp_observed...") + if not self.sync_event_bp_observed.wait(timeout=10): + logging.info("sync_event_bp_observed event not received, raising TimeoutError") raise TimeoutError() else: - logging.info("Received sync_event_bp_cannot_proceed event") + logging.info("Received self.sync_event_bp_observed event") poll_starttime = datetime.utcnow() while True: if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name == "qa_reporting_to": break - sleep(0.1) - if datetime.utcnow()-poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow()-poll_starttime > timedelta(seconds=30): raise TimeoutError("Task not activated within expected time") - sync_event_bp_cannot_proceed.clear() + self.sync_event_bp_observed.clear() #check the active task name self.assertEqual("qa_reporting_to", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[0].flow_task.name) - self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_processed') + self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, 'wait_observed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'DONE') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].status, 'NEW') + # schedule, run, finish the pipelines and ingest et al + for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): + if task_blueprint.specifications_template.type.value != TaskType.Choices.OBSERVATION.value: + for subtask in task_blueprint.subtasks.all(): + set_subtask_state_following_allowed_transitions(subtask, 'finished') + #API: Get current task data = "" response = requests.post(self.__class__.BASE_URL_WF_API + '/scheduling_unit_flow/qa_scheduling_unit_process/{}/current_task/'.format(scheduling_unit_process_id), json=data, auth=self.__class__.AUTH) @@ -1423,23 +1360,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): from django.contrib.auth import get_user_model User = get_user_model() - sync_event_bp_scheduled = Event() - sync_event_bp_cannot_proceed = Event() - - class TestSchedulingUnitEventMessageHandler(SchedulingUnitEventMessageHandler): - def onSchedulingUnitBlueprintStatusChanged(self, id: int, status: str): - super().onSchedulingUnitBlueprintStatusChanged(id=id, status=status) - - if status == "scheduled": - logging.info("Status is %s, sending sync event", status) - sync_event_bp_scheduled.set() - - def onSchedulingUnitBlueprintCannotProceed(self, id: int): - super().onSchedulingUnitBlueprintCannotProceed(id=id) - logging.info("Scheduling Unit Blueprint with id %s cannot proceed, sending sync event", id) - sync_event_bp_cannot_proceed.set() - - service = create_workflow_service(handler_type=TestSchedulingUnitEventMessageHandler, + service = create_workflow_service(handler_type=SchedulingUnitFlowTest.TestSchedulingUnitEventMessageHandler, exchange=self.tmp_exchange.address) with BusListenerJanitor(service): strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines") @@ -1466,8 +1387,8 @@ class SchedulingUnitFlowTest(unittest.TestCase): while True: if SchedulingUnitProcess.objects.filter(su=scheduling_unit_blueprint).count() > 0: break - sleep(0.1) - if datetime.utcnow() - poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow() - poll_starttime > timedelta(seconds=30): raise TimeoutError("SchedulingUnitProcess not created within expected time") # Yes! the SchedulingUnitProcess was created, let's get it. @@ -1485,32 +1406,33 @@ class SchedulingUnitFlowTest(unittest.TestCase): self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].flow_task.name, 'wait_scheduled') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'NEW') - # Change subtask status to scheduled + # Change observation subtask status to scheduled from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - if task_blueprint.specifications_template.type.value != TaskType.Choices.INGEST.value: + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: for subtask in task_blueprint.subtasks.all(): set_subtask_state_following_allowed_transitions(subtask, 'scheduled') # wait until scheduling unit is scheduled - if not sync_event_bp_scheduled.wait(timeout=10): + logging.info("waiting for sync_event_bp_scheduled...") + if not self.sync_event_bp_scheduled.wait(timeout=10): logging.info("sync_event_bp_scheduled event not received, raising TimeoutError") raise TimeoutError() else: - logging.info("Received sync_event_bp_scheduled event") + logging.info("Received self.sync_event_bp_scheduled event") poll_starttime = datetime.utcnow() while True: if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[ - 0].flow_task.name == "wait_processed": + 0].flow_task.name == "wait_observed": break - sleep(0.1) - if datetime.utcnow() - poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow() - poll_starttime > timedelta(seconds=30): raise TimeoutError("Task not activated within expected time") - sync_event_bp_scheduled.clear() + self.sync_event_bp_scheduled.clear() # check the active task name - self.assertEqual("wait_processed", + self.assertEqual("wait_observed", SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[ 0].flow_task.name) @@ -1518,33 +1440,35 @@ class SchedulingUnitFlowTest(unittest.TestCase): 'wait_scheduled') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[1].status, 'DONE') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, - 'wait_processed') + 'wait_observed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'NEW') - # Change subtask status to finished + # Change observation subtask status to scheduled from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions for task_blueprint in scheduling_unit_blueprint.task_blueprints.all(): - task_blueprint.output_pinned = False - task_blueprint.save() + if task_blueprint.specifications_template.type.value == TaskType.Choices.OBSERVATION.value: + task_blueprint.output_pinned = False + task_blueprint.save() - for subtask in task_blueprint.subtasks.all(): - set_subtask_state_following_allowed_transitions(subtask, 'finished') + for subtask in task_blueprint.subtasks.all(): + set_subtask_state_following_allowed_transitions(subtask, 'finished') - if not sync_event_bp_cannot_proceed.wait(timeout=10): - logging.info("sync_event_bp_cannot_proceed event not received, raising TimeoutError") + logging.info("waiting for sync_event_bp_observed...") + if not self.sync_event_bp_observed.wait(timeout=10): + logging.info("sync_event_bp_observed event not received, raising TimeoutError") raise TimeoutError() else: - logging.info("Received sync_event_bp_cannot_proceed event") + logging.info("Received self.sync_event_bp_observed event") poll_starttime = datetime.utcnow() while True: if SchedulingUnitProcess.objects.get(su=scheduling_unit_blueprint.id).active_tasks()[ 0].flow_task.name == "qa_reporting_to": break - sleep(0.1) - if datetime.utcnow() - poll_starttime > timedelta(seconds=10): + sleep(0.5) + if datetime.utcnow() - poll_starttime > timedelta(seconds=30): raise TimeoutError("Task not activated within expected time") - sync_event_bp_cannot_proceed.clear() + self.sync_event_bp_observed.clear() # check the active task name self.assertEqual("qa_reporting_to", @@ -1552,7 +1476,7 @@ class SchedulingUnitFlowTest(unittest.TestCase): 0].flow_task.name) self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].flow_task.name, - 'wait_processed') + 'wait_observed') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[2].status, 'DONE') self.assertEqual(Task.objects.filter(process=scheduling_unit_process_id).order_by('id')[3].flow_task.name, 'qa_reporting_to') diff --git a/SAS/TMSS/backend/test/t_tmssapp_specification_django_API.py b/SAS/TMSS/backend/test/t_tmssapp_specification_django_API.py index b06e29792d44d66480cb033d9db0da6e77cea201..a3d0c15d4dcce292467a5ae4bb71a4ed6f6e5a68 100755 --- a/SAS/TMSS/backend/test/t_tmssapp_specification_django_API.py +++ b/SAS/TMSS/backend/test/t_tmssapp_specification_django_API.py @@ -782,7 +782,7 @@ class SchedulingUnitBlueprintTest(unittest.TestCase): scheduling_unit_blueprint.refresh_from_db() # we should be able to modify other fields - scheduling_unit_blueprint.results_accepted = not scheduling_unit_blueprint.results_accepted + scheduling_unit_blueprint.output_pinned = not scheduling_unit_blueprint.output_pinned scheduling_unit_blueprint.save() # but scheduling constraints should be immutable @@ -825,7 +825,7 @@ class SchedulingUnitBlueprintTest(unittest.TestCase): scheduling_unit_blueprint.refresh_from_db() # we should be able to modify other fields - scheduling_unit_blueprint.results_accepted = not scheduling_unit_blueprint.results_accepted + scheduling_unit_blueprint.output_pinned = not scheduling_unit_blueprint.output_pinned scheduling_unit_blueprint.save() # but scheduling constraints should be immutable diff --git a/SAS/TMSS/backend/test/test_utils.py b/SAS/TMSS/backend/test/test_utils.py index 39457890fd86bd48263e10c96ad7653282e0ea03..3f24e55cf19187411df45887a18b92e0d641090a 100644 --- a/SAS/TMSS/backend/test/test_utils.py +++ b/SAS/TMSS/backend/test/test_utils.py @@ -155,6 +155,7 @@ def set_subtask_state_following_allowed_transitions(subtask: typing.Union[Subtas elif subtask.state.value == SubtaskState.Choices.UNSCHEDULING.value: subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) + logger.info("transitioning subtask id=%s to state='%s' desired_state='%s'", subtask.id, subtask.state.value, desired_state_value) subtask.save() # loop, check in while statement at top if we reached the desired state already. diff --git a/SAS/TMSS/client/lib/tmssbuslistener.py b/SAS/TMSS/client/lib/tmssbuslistener.py index 8831dcfbfcf90d78d219d36dbfa4c4a215165298..53337d73d159d0a1a8915ce51e5aca1b14e437d8 100644 --- a/SAS/TMSS/client/lib/tmssbuslistener.py +++ b/SAS/TMSS/client/lib/tmssbuslistener.py @@ -129,8 +129,6 @@ class TMSSEventMessageHandler(AbstractMessageHandler): self.onProjectQuotaArchiveLocationUpdated(**msg.content) elif stripped_subject == 'ProjectQuotaArchiveLocation.Object.Deleted': self.onProjectQuotaArchiveLocationDeleted(**msg.content) - elif stripped_subject == 'SchedulingUnitBlueprint.Object.CannotProceed': - self.onSchedulingUnitBlueprintCannotProceed(**msg.content) elif stripped_subject == 'SchedulingUnitBlueprint.Object.IngestPermissionGranted': self.onSchedulingUnitBlueprintIngestPermissionGranted(id=msg.content['id'], ingest_permission_granted_since=parser.parse(msg.content['ingest_permission_granted_since'], ignoretz=True)) @@ -297,12 +295,6 @@ class TMSSEventMessageHandler(AbstractMessageHandler): ''' pass - def onSchedulingUnitBlueprintCannotProceed(self, id: int): - '''onSchedulingUnitBlueprintCannotProceed is called upon receiving a SchedulingUnitBlueprint.Object.CannotProceed message, which is sent when a SchedulingUnitBlueprints cannot Proceed. - :param id: the TMSS id of the SchedulingUnitBlueprint - ''' - pass - def onSchedulingUnitBlueprintIngestPermissionGranted(self, id: int, ingest_permission_granted_since: datetime): '''onSchedulingUnitBlueprintIngestPermissionGranted is called upon receiving a SchedulingUnitBlueprint.Object.IngestPermissionGranted message, usually as a result of setting the permissing in the database via the QA Workflow. :param id: the TMSS id of the SchedulingUnitBlueprint