From 16fed478844fff60eb4eebfb9d00b11740b3b748 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20K=C3=BCnsem=C3=B6ller?= <jkuensem@physik.uni-bielefeld.de> Date: Mon, 27 Sep 2021 18:34:57 +0200 Subject: [PATCH] TMSS-159: Change subtask start/stop times to various flavors --- SAS/TMSS/backend/src/migrate_momdb_to_tmss.py | 4 +- .../src/tmss/tmssapp/adapters/feedback.py | 2 +- .../src/tmss/tmssapp/adapters/parset.py | 8 +- .../src/tmss/tmssapp/adapters/reports.py | 2 +- .../backend/src/tmss/tmssapp/adapters/sip.py | 4 +- .../tmss/tmssapp/migrations/0001_initial.py | 16 ++-- .../src/tmss/tmssapp/models/scheduling.py | 18 +++-- .../src/tmss/tmssapp/models/specification.py | 8 +- SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py | 76 +++++++++---------- .../src/tmss/tmssapp/viewsets/scheduling.py | 10 ++- SAS/TMSS/client/lib/tmss_http_rest_client.py | 30 ++++---- 11 files changed, 96 insertions(+), 82 deletions(-) diff --git a/SAS/TMSS/backend/src/migrate_momdb_to_tmss.py b/SAS/TMSS/backend/src/migrate_momdb_to_tmss.py index 98f9cc1042a..14d0e112e99 100755 --- a/SAS/TMSS/backend/src/migrate_momdb_to_tmss.py +++ b/SAS/TMSS/backend/src/migrate_momdb_to_tmss.py @@ -509,8 +509,8 @@ def create_subtask_trees_for_project_in_momdb(project_mom2id, project): "tags": ["migrated_from_MoM", "migration_incomplete"], # todo: set complete once it is verified that all info is present "priority": project.priority_rank, # todo: correct to derive from project? # optional: - "start_time": start_time, - "stop_time": stop_time + "scheduled_on_sky_start_time": start_time, + "scheduled_on_sky_stop_time": stop_time # "created_or_updated_by_user" = None, # "raw_feedback" = None, #"cluster": None # I guess from lofar_observation.storage_cluster_id diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py index 33d19956337..9fdbd7d3d80 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/feedback.py @@ -371,7 +371,7 @@ def process_feedback_for_subtask_and_set_to_finished_if_complete(subtask: Subtas if subtask.state.value == SubtaskState.objects.get(value='started').value: logger.info("received feedback for subtask id=%s while it is still running (state=%s). Setting state to 'finishing' and stop_time to now.", subtask.id, subtask.state.value) subtask.state = SubtaskState.objects.get(value='finishing') - subtask.stop_time = datetime.utcnow() + subtask.scheduled_on_sky_stop_time = datetime.utcnow() subtask.save() # the submitted feedback_doc is (should be) a plain text document in parset format diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index 9729aba5286..897e065cdc2 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -317,8 +317,8 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas parset["Observation.processSubtype"] = "Beam Observation" parset["Observation.Campaign.name"] = subtask.project.name parset["Observation.Campaign.PI"] = "classified" # pulp needs the PI to be non-empty - parset["Observation.startTime"] = formatDatetime(subtask.start_time) if isinstance(subtask.start_time, datetime) else subtask.start_time - parset["Observation.stopTime"] = formatDatetime(subtask.stop_time) if isinstance(subtask.stop_time, datetime) else subtask.stop_time + parset["Observation.startTime"] = formatDatetime(subtask.scheduled_on_sky_start_time) if isinstance(subtask.scheduled_on_sky_start_time, datetime) else subtask.scheduled_on_sky_start_time + parset["Observation.stopTime"] = formatDatetime(subtask.scheduled_on_sky_stop_time) if isinstance(subtask.scheduled_on_sky_stop_time, datetime) else subtask.scheduled_on_sky_stop_time parset["Observation.strategy"] = "default" # maybe not mandatory? # ---------------------------- @@ -424,8 +424,8 @@ def _common_parset_dict_for_pipeline_schemas(subtask: models.Subtask) -> dict: parset["Observation.momID"] = 0 # Needed by MACScheduler parset["Observation.otdbID"] = subtask.pk # HACK: the pipeline uses otdbID as the sasID. our tmssID>2000000 to prevent clashes. TODO: replace all otdbID's by sasID. parset["Observation.tmssID"] = subtask.pk - parset["Observation.startTime"] = formatDatetime(subtask.start_time) if isinstance(subtask.start_time, datetime) else subtask.start_time - parset["Observation.stopTime"] = formatDatetime(subtask.stop_time) if isinstance(subtask.stop_time, datetime) else subtask.stop_time + parset["Observation.startTime"] = formatDatetime(subtask.scheduled_on_sky_start_time) if isinstance(subtask.scheduled_on_sky_start_time, datetime) else subtask.scheduled_on_sky_start_time + parset["Observation.stopTime"] = formatDatetime(subtask.scheduled_on_sky_stop_time) if isinstance(subtask.scheduled_on_sky_stop_time, datetime) else subtask.scheduled_on_sky_stop_time parset["Observation.processType"] = "Pipeline" diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/reports.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/reports.py index 1af1533c1b7..bddd07685d1 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/reports.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/reports.py @@ -387,7 +387,7 @@ def _get_subs_and_durations_from_project(project_pk: int, start: datetime, stop: # Get ingest info dataproducts = models.Dataproduct.objects.filter(producer__subtask__specifications_template__type='ingest').filter(producer__subtask__state__value='finished').filter(producer__subtask__task_blueprint__scheduling_unit_blueprint=sub) # TODO: Maybe it would be useful to implement an 'ingested_stop_time' (and even an 'ingested_start_time' and consequently an 'ingested_duration'?) - sub_info['ingested_date'] = max(dataproducts, key=lambda x: x.producer.subtask.stop_time).producer.subtask.stop_time if dataproducts else None + sub_info['ingested_date'] = max(dataproducts, key=lambda x: x.producer.subtask.scheduled_on_sky_stop_time).producer.subtask.scheduled_on_sky_stop_time if dataproducts else None sub_info['ingested_data_size'] = dataproducts.aggregate(Sum('size'))['size__sum'] # Gather durations diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py index 9cf6144b953..1621708ea60 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/sip.py @@ -137,8 +137,8 @@ def create_sip_representation_for_subtask(subtask: Subtask): name = str(subtask.id) process_map = siplib.ProcessMap(strategyname=subtask.specifications_template.name, strategydescription=subtask.specifications_template.description, - starttime=subtask.start_time, - duration=isodate.duration_isoformat(datetime.timedelta(seconds=round((subtask.stop_time-subtask.start_time).total_seconds()))), + starttime=subtask.scheduled_on_sky_start_time, + duration=isodate.duration_isoformat(datetime.timedelta(seconds=round((subtask.scheduled_on_sky_stop_time-subtask.scheduled_on_sky_start_time).total_seconds()))), identifier=subtask_sip_identifier, observation_identifier=subtask_sip_identifier, relations=[]) # todo, not sure this is still needed, can be empty for now (potentially scheduling_unit?) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py index b39c19f3e09..f5433184dee 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 3.0.9 on 2021-09-02 19:01 +# Generated by Django 3.0.9 on 2021-09-27 14:27 from django.conf import settings import django.contrib.auth.models @@ -754,9 +754,15 @@ class Migration(migrations.Migration): ('tags', django.contrib.postgres.fields.ArrayField(base_field=models.CharField(max_length=128), blank=True, default=list, help_text='User-defined search keywords for object.', size=8)), ('created_at', models.DateTimeField(auto_now_add=True, help_text='Moment of object creation.')), ('updated_at', models.DateTimeField(auto_now=True, help_text='Moment of last object update.')), - ('start_time', models.DateTimeField(help_text='Start this subtask at the specified time (NULLable).', null=True)), - ('stop_time', models.DateTimeField(help_text='Stop this subtask at the specified time (NULLable).', null=True)), - ('primary', models.BooleanField(default=False, help_text='TRUE if this is the one-and-only primary subtask in a parent TaskBlueprint.')), + ('scheduled_process_start_time', models.DateTimeField(help_text='The time the system will (try to) start the process (NULLable).', null=True)), + ('scheduled_process_stop_time', models.DateTimeField(help_text='The time the process is expected to stop (NULLable).', null=True)), + ('actual_process_start_time', models.DateTimeField(help_text='The time the process actually started (NULLable).', null=True)), + ('actual_process_stop_time', models.DateTimeField(help_text='The time the process actually stopped (NULLable).', null=True)), + ('scheduled_on_sky_start_time', models.DateTimeField(help_text='The time the observation will start recording (NULLable).', null=True)), + ('scheduled_on_sky_stop_time', models.DateTimeField(help_text='The time the observation will stop recording (NULLable).', null=True)), + ('actual_on_sky_start_time', models.DateTimeField(help_text='The time the observation actually started recording (NULLable).', null=True)), + ('actual_on_sky_stop_time', models.DateTimeField(help_text='The time the observation actually stopped recording (NULLable).', null=True)), + ('primary', models.BooleanField(db_index=True, default=False, help_text='TRUE if this is the one-and-only primary subtask in a parent TaskBlueprint.')), ('specifications_doc', django.contrib.postgres.fields.jsonb.JSONField(help_text='Final specifications, as input for the controller.')), ('raw_feedback', models.CharField(help_text='The raw feedback for this Subtask', max_length=1048576, null=True)), ], @@ -1233,7 +1239,7 @@ class Migration(migrations.Migration): migrations.AddField( model_name='subtask', name='task_blueprint', - field=models.ForeignKey(help_text='The parent TaskBlueprint.', null=True, on_delete=django.db.models.deletion.PROTECT, related_name='subtasks', to='tmssapp.TaskBlueprint'), + field=models.ForeignKey(help_text='The parent TaskBlueprint.', on_delete=django.db.models.deletion.PROTECT, related_name='subtasks', to='tmssapp.TaskBlueprint'), ), migrations.AddConstraint( model_name='stationtimeline', diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py index 5ad73a12bce..2445ff5b650 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py @@ -145,8 +145,14 @@ class Subtask(BasicCommon, ProjectPropertyMixin, TemplateSchemaMixin): inspection plots on the observed data, etc. Each task has a specific configuration, will have resources allocated to it, and represents a single run. """ - start_time = DateTimeField(null=True, help_text='Start this subtask at the specified time (NULLable).') - stop_time = DateTimeField(null=True, help_text='Stop this subtask at the specified time (NULLable).') + scheduled_process_start_time = DateTimeField(null=True, help_text='The time the system will (try to) start the process (NULLable).') + scheduled_process_stop_time = DateTimeField(null=True, help_text='The time the process is expected to stop (NULLable).') + actual_process_start_time = DateTimeField(null=True, help_text='The time the process actually started (NULLable).') + actual_process_stop_time = DateTimeField(null=True, help_text='The time the process actually stopped (NULLable).') + scheduled_on_sky_start_time = DateTimeField(null=True, help_text='The time the observation will start recording (NULLable).') # previously start_time + scheduled_on_sky_stop_time = DateTimeField(null=True, help_text='The time the observation will stop recording (NULLable).') # previously stop_time + actual_on_sky_start_time = DateTimeField(null=True, help_text='The time the observation actually started recording (NULLable).') + actual_on_sky_stop_time = DateTimeField(null=True, help_text='The time the observation actually stopped recording (NULLable).') state = ForeignKey('SubtaskState', null=False, on_delete=PROTECT, related_name='task_states', help_text='Subtask state (see Subtask State Machine).') primary = BooleanField(default=False, db_index=True, help_text='TRUE if this is the one-and-only primary subtask in a parent TaskBlueprint.') specifications_doc = JSONField(help_text='Final specifications, as input for the controller.') @@ -168,9 +174,9 @@ class Subtask(BasicCommon, ProjectPropertyMixin, TemplateSchemaMixin): @property def duration(self) -> timedelta: '''the duration of this subtask (stop-start), or 0 if start/stop are None''' - if self.start_time is None or self.stop_time is None: + if self.scheduled_on_sky_start_time is None or self.scheduled_on_sky_stop_time is None: return timedelta(seconds=0) - return self.stop_time - self.start_time + return self.scheduled_on_sky_stop_time - self.scheduled_on_sky_start_time @property def specified_duration(self) -> timedelta: @@ -274,7 +280,7 @@ class Subtask(BasicCommon, ProjectPropertyMixin, TemplateSchemaMixin): if self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: # progress for an observation is just how far we are into the duration - num_seconds_running = max(0, (datetime.utcnow() - self.start_time).total_seconds()) + num_seconds_running = max(0, (datetime.utcnow() - self.scheduled_on_sky_start_time).total_seconds()) return min(1.0, float(num_seconds_running) / float(self.duration.total_seconds())) # TODO: add more progress computations for more subtask types if possible @@ -301,7 +307,7 @@ class Subtask(BasicCommon, ProjectPropertyMixin, TemplateSchemaMixin): # check if we have a start time when scheduling if self.state.value == SubtaskState.Choices.SCHEDULED.value and self.__original_state_id == SubtaskState.Choices.SCHEDULING.value: - if self.start_time is None: + if self.scheduled_on_sky_start_time is None: raise SubtaskSchedulingException("Cannot schedule subtask id=%s when start time is 'None'." % (self.pk, )) # ensure there is and will be exactly one primary subtask per parent task_blueprint diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py index 47ab3ea87a3..ad46892d869 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py @@ -1104,9 +1104,9 @@ class TaskBlueprint(ProjectPropertyMixin, TemplateSchemaMixin, NamedCommon): def start_time(self) -> datetime or None: '''return the earliest start time of all subtasks of this task ''' - subtasks_with_start_time = list(filter(lambda x: x.start_time is not None, self.subtasks.all())) + subtasks_with_start_time = list(filter(lambda x: x.scheduled_on_sky_start_time is not None, self.subtasks.all())) if subtasks_with_start_time: - return min(subtasks_with_start_time, key=lambda x: x.start_time).start_time + return min(subtasks_with_start_time, key=lambda x: x.scheduled_on_sky_start_time).scheduled_on_sky_start_time else: return None @@ -1114,9 +1114,9 @@ class TaskBlueprint(ProjectPropertyMixin, TemplateSchemaMixin, NamedCommon): def stop_time(self) -> datetime or None: '''return the latest stop time of all subtasks of this task ''' - subtasks_with_stop_time = list(filter(lambda x: x.stop_time is not None, self.subtasks.all())) + subtasks_with_stop_time = list(filter(lambda x: x.scheduled_on_sky_stop_time is not None, self.subtasks.all())) if subtasks_with_stop_time: - return max(subtasks_with_stop_time, key=lambda x: x.stop_time).stop_time + return max(subtasks_with_stop_time, key=lambda x: x.scheduled_on_sky_stop_time).scheduled_on_sky_stop_time else: return None diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 718a8a1ecaf..674eeb5d327 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -485,8 +485,8 @@ def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskB # step 1: create subtask in defining state cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") - subtask_data = { "start_time": None, - "stop_time": None, + subtask_data = { "scheduled_on_sky_start_time": None, + "scheduled_on_sky_stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), "specifications_doc": specifications_doc, "task_blueprint": task_blueprint, @@ -545,8 +545,8 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) qafile_subtask_spec['nr_of_timestamps'] = obs_task_qafile_spec.get("nr_of_timestamps") validate_json_against_schema(qafile_subtask_spec, qafile_subtask_template.schema) - qafile_subtask_data = { "start_time": None, - "stop_time": None, + qafile_subtask_data = { "scheduled_on_sky_start_time": None, + "scheduled_on_sky_stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), "task_blueprint": observation_subtask.task_blueprint, "specifications_template": qafile_subtask_template, @@ -623,8 +623,8 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta qaplots_subtask_spec_doc['crosscorrelation'] = obs_task_qaplots_spec.get("crosscorrelation") validate_json_against_schema(qaplots_subtask_spec_doc, qaplots_subtask_template.schema) - qaplots_subtask_data = { "start_time": None, - "stop_time": None, + qaplots_subtask_data = { "scheduled_on_sky_start_time": None, + "scheduled_on_sky_stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), "task_blueprint": qafile_subtask.task_blueprint, "specifications_template": qaplots_subtask_template, @@ -673,8 +673,8 @@ def create_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint, s subtask_specs = generate_subtask_specs_from_task_spec_func(task_specs_with_defaults, default_subtask_specs) cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") - subtask_data = { "start_time": None, - "stop_time": None, + subtask_data = { "scheduled_on_sky_start_time": None, + "scheduled_on_sky_stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), "task_blueprint": task_blueprint, "specifications_template": subtask_template, @@ -728,8 +728,8 @@ def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema, cache=_schema_cache) subtask_specs = default_subtask_specs # todo: translate specs from task to subtask once we have non-empty templates cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") - subtask_data = {"start_time": None, - "stop_time": None, + subtask_data = {"scheduled_on_sky_start_time": None, + "scheduled_on_sky_stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), "task_blueprint": task_blueprint, "specifications_template": subtask_template, @@ -770,8 +770,8 @@ def create_cleanup_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> subtask_template = SubtaskTemplate.objects.get(name='cleanup') subtask_specs = get_default_json_object_for_schema(subtask_template.schema, cache=_schema_cache) cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4") - subtask_data = {"start_time": None, - "stop_time": None, + subtask_data = {"scheduled_on_sky_start_time": None, + "scheduled_on_sky_stop_time": None, "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value), "task_blueprint": task_blueprint, "specifications_template": subtask_template, @@ -808,9 +808,9 @@ def schedule_subtask(subtask: Subtask) -> Subtask: '''Generic scheduling method for subtasks. Calls the appropiate scheduling method based on the subtask's type.''' check_prerequities_for_scheduling(subtask) - if (subtask.start_time is None or subtask.start_time < datetime.utcnow()) and subtask.predecessors.count() > 0: + if (subtask.scheduled_on_sky_start_time is None or subtask.scheduled_on_sky_start_time < datetime.utcnow()) and subtask.predecessors.count() > 0: # this is a successor task that can start now. Auto assign nice start_time just a bit in the future. - subtask.start_time = round_to_second_precision(datetime.utcnow()+timedelta(seconds=30)) + subtask.scheduled_on_sky_start_time = round_to_second_precision(datetime.utcnow()+timedelta(seconds=30)) subtask.save() try: @@ -911,8 +911,8 @@ def update_subtasks_start_times_for_scheduling_unit(scheduling_unit: SchedulingU def update_start_time_and_shift_successors_until_after_stop_time(subtask: Subtask, start_time: datetime): - subtask.start_time = start_time - subtask.stop_time = subtask.start_time + subtask.specified_duration + subtask.scheduled_on_sky_start_time = start_time + subtask.scheduled_on_sky_stop_time = subtask.scheduled_on_sky_start_time + subtask.specified_duration subtask.save() shift_successors_until_after_stop_time(subtask) @@ -921,7 +921,7 @@ def update_start_time_and_shift_successors_until_after_stop_time(subtask: Subtas def shift_successors_until_after_stop_time(subtask: Subtask): for successor in subtask.successors: # by default, let the successor directly follow this tasks... - successor_start_time = subtask.stop_time + successor_start_time = subtask.scheduled_on_sky_stop_time # ... but adjust it if there is a scheduling_relation with an offset. # so, check if these successive subtasks have different task_blueprint parents @@ -949,8 +949,8 @@ def clear_defined_subtasks_start_stop_times_for_scheduling_unit(scheduling_unit: for task_blueprint in scheduling_unit.task_blueprints.all(): defined_subtasks = task_blueprint.subtasks.filter(state__value='defined').all() for subtask in defined_subtasks: - subtask.start_time = None - subtask.stop_time = None + subtask.scheduled_on_sky_start_time = None + subtask.scheduled_on_sky_stop_time = None subtask.save() @@ -982,8 +982,8 @@ def _create_ra_specification(_subtask): 'task_type': _subtask.specifications_template.type.value.lower(), 'task_subtype': parset_dict.get("Observation.processSubtype","").lower(), 'status': 'prescheduled' if _subtask.state.value == SubtaskState.Choices.SCHEDULING.value else 'approved', - 'starttime': _subtask.start_time, - 'endtime': _subtask.stop_time, + 'starttime': _subtask.scheduled_on_sky_start_time, + 'endtime': _subtask.scheduled_on_sky_stop_time, 'cluster': _subtask.cluster.name, 'station_requirements': [], 'specification': parset_dict } @@ -1324,7 +1324,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): # step 1a: check start/stop times # start time should be known. If not raise. Then the user and/or scheduling service should supply a properly calculated/estimated start_time first. - if observation_subtask.start_time is None: + if observation_subtask.scheduled_on_sky_start_time is None: raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because it has no start_time" % (observation_subtask.pk, observation_subtask.specifications_template.type)) @@ -1334,7 +1334,7 @@ def schedule_observation_subtask(observation_subtask: Subtask): observation_subtask.specified_duration)) # always update the stop_time according to the spec - observation_subtask.stop_time = observation_subtask.start_time + observation_subtask.specified_duration + observation_subtask.scheduled_on_sky_stop_time = observation_subtask.scheduled_on_sky_start_time + observation_subtask.specified_duration # step 2: define input dataproducts # NOOP: observations take no inputs @@ -1370,8 +1370,8 @@ def schedule_observation_subtask(observation_subtask: Subtask): saps = [SAP.objects.create(specifications_doc={ "name": "L%s_SAP%03d_%s" % (observation_subtask.id, sap_nr, pointing['name']), "pointing": pointing['pointing'], - "time": {"start_time": observation_subtask.start_time.isoformat()+'Z', - "duration": (observation_subtask.stop_time - observation_subtask.start_time).total_seconds()}, + "time": {"start_time": observation_subtask.scheduled_on_sky_start_time.isoformat()+'Z', + "duration": (observation_subtask.scheduled_on_sky_stop_time - observation_subtask.scheduled_on_sky_start_time).total_seconds()}, "antennas": { "antenna_set": antennaset, "fields": antennafields @@ -1696,10 +1696,10 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): # step 1a: check start/stop times # not very relevant for tmss/dynamic scheduling, but the resource assigner demands it. - if pipeline_subtask.start_time is None: + if pipeline_subtask.scheduled_on_sky_start_time is None: now = datetime.utcnow() logger.info("pipeline id=%s has no starttime. assigned default: %s", pipeline_subtask.pk, formatDatetime(now)) - pipeline_subtask.start_time = now + pipeline_subtask.scheduled_on_sky_start_time = now if pipeline_subtask.specified_duration < timedelta(seconds=1): raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because its specified duration is too short: %s" % (pipeline_subtask.pk, @@ -1707,7 +1707,7 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask): pipeline_subtask.specified_duration)) # always update the stop_time according to the spec - pipeline_subtask.stop_time = pipeline_subtask.start_time + pipeline_subtask.specified_duration + pipeline_subtask.scheduled_on_sky_stop_time = pipeline_subtask.scheduled_on_sky_start_time + pipeline_subtask.specified_duration # step 2: link input dataproducts if pipeline_subtask.inputs.count() == 0: @@ -1774,8 +1774,8 @@ def schedule_ingest_subtask(ingest_subtask: Subtask): # not very relevant for ingest subtasks, but it's nice for the user to see when the ingest task was scheduled. # please note that an ingest subtask may idle for some time while it is in the ingest queue. # the actual start/stop times are set by the IngestTMSSAdapter when the subtask starts and stops. - ingest_subtask.start_time = max([pred.stop_time for pred in ingest_subtask.predecessors] + [datetime.utcnow()]) - ingest_subtask.stop_time = ingest_subtask.start_time + timedelta(hours=6) + ingest_subtask.scheduled_on_sky_start_time = max([pred.stop_time for pred in ingest_subtask.predecessors] + [datetime.utcnow()]) + ingest_subtask.scheduled_on_sky_stop_time = ingest_subtask.scheduled_on_sky_start_time + timedelta(hours=6) # step 2: link input dataproducts if ingest_subtask.inputs.count() == 0: @@ -1852,8 +1852,8 @@ def schedule_cleanup_subtask(cleanup_subtask: Subtask): # not very relevant for ingest subtasks, but it's nice for the user to see when the cleanup task was scheduled. # please note that an cleanup subtask may idle for some time while it is in the cleanup queue. # the actual start/stop times are set by the IngestTMSSAdapter when the subtask starts and stops. - cleanup_subtask.start_time = max([pred.stop_time for pred in cleanup_subtask.predecessors] + [datetime.utcnow()]) - cleanup_subtask.stop_time = cleanup_subtask.start_time + timedelta(hours=1) + cleanup_subtask.scheduled_on_sky_start_time = max([pred.stop_time for pred in cleanup_subtask.predecessors] + [datetime.utcnow()]) + cleanup_subtask.scheduled_on_sky_stop_time = cleanup_subtask.scheduled_on_sky_start_time + timedelta(hours=1) # step 2: link input dataproducts if cleanup_subtask.inputs.count() == 0: @@ -1898,15 +1898,15 @@ def schedule_copy_subtask(copy_subtask: Subtask): # step 1a: check start/stop times # not very relevant for tmss/dynamic scheduling, but the resource assigner demands it. - if copy_subtask.start_time is None: + if copy_subtask.scheduled_on_sky_start_time is None: now = datetime.utcnow() logger.info("copy id=%s has no starttime. assigned default: %s", copy_subtask.pk, formatDatetime(now)) - copy_subtask.start_time = now + copy_subtask.scheduled_on_sky_start_time = now - if copy_subtask.stop_time is None: - stop_time = copy_subtask.start_time + timedelta(hours=+1) + if copy_subtask.scheduled_on_sky_stop_time is None: + stop_time = copy_subtask.scheduled_on_sky_start_time + timedelta(hours=+1) logger.info("copy id=%s has no stop_time. assigned default: %s", copy_subtask.pk, formatDatetime(stop_time)) - copy_subtask.stop_time = stop_time + copy_subtask.scheduled_on_sky_stop_time = stop_time # step 2: link input dataproducts if copy_subtask.inputs.count() == 0: @@ -1945,7 +1945,7 @@ def schedule_independent_subtasks_in_task_blueprint(task_blueprint: TaskBlueprin for subtask in independent_subtasks: if start_time is not None: - subtask.start_time = start_time + subtask.scheduled_on_sky_start_time = start_time schedule_subtask_and_update_successor_start_times(subtask) return independent_subtasks diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/scheduling.py index 25b4d7b9aa5..18f108dd5cd 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/scheduling.py @@ -142,8 +142,10 @@ class SubTaskFilter(filters.FilterSet): model = Subtask fields = { 'state__value': ['exact'], - 'start_time': ['lt', 'gt'], - 'stop_time': ['lt', 'gt'], + 'scheduled_on_sky_start_time': ['lt', 'gt'], + 'scheduled_on_sky_stop_time': ['lt', 'gt'], + 'actual_on_sky_start_time': ['lt', 'gt'], + 'actual_on_sky_stop_time': ['lt', 'gt'], 'cluster__name': ['exact', 'icontains'], } filter_overrides = FILTER_OVERRIDES @@ -153,7 +155,7 @@ class SubtaskViewSet(LOFARViewSet): queryset = models.Subtask.objects.all() serializer_class = serializers.SubtaskSerializer filter_class = SubTaskFilter - ordering = ('start_time',) + ordering = ('scheduled_on_sky_start_time',) # performance boost: select the related models in a single db call. queryset = queryset.select_related('state', 'specifications_template', 'specifications_template__type', 'cluster', 'created_or_updated_by_user') @@ -370,7 +372,7 @@ class SubtaskNestedViewSet(LOFARNestedViewSet): queryset = models.Subtask.objects.all() serializer_class = serializers.SubtaskSerializer filter_class = SubTaskFilter - ordering = ('start_time',) + ordering = ('scheduled_on_sky_start_time',) # performance boost: select the related models in a single db call. queryset = queryset.select_related('state', 'specifications_template', 'specifications_template__type', 'cluster', 'created_or_updated_by_user') diff --git a/SAS/TMSS/client/lib/tmss_http_rest_client.py b/SAS/TMSS/client/lib/tmss_http_rest_client.py index 5ad370e07e9..e5a6eb81d0e 100644 --- a/SAS/TMSS/client/lib/tmss_http_rest_client.py +++ b/SAS/TMSS/client/lib/tmss_http_rest_client.py @@ -150,7 +150,7 @@ class TMSSsession(object): '''set the status for the given subtask, and return the subtask with its new state, or raise on error''' json_doc = {'state': "%s/subtask_state/%s/" % (self.api_url, status)} if status == 'finishing' or status == 'cancelling': - json_doc['stop_time'] = datetime.utcnow().isoformat() + json_doc['scheduled_on_sky_stop_time'] = datetime.utcnow().isoformat() logger.info("updating subtask id=%s status to '%s'", subtask_id, status) response = self.session.patch(url='%s/subtask/%s/' % (self.api_url, subtask_id), json=json_doc) @@ -219,22 +219,22 @@ class TMSSsession(object): def get_subtasks(self, state: str=None, cluster: str=None, - start_time_less_then: datetime=None, start_time_greater_then: datetime=None, - stop_time_less_then: datetime = None, stop_time_greater_then: datetime = None) -> list: + scheduled_on_sky_start_time_less_then: datetime=None, scheduled_on_sky_start_time_greater_then: datetime=None, + scheduled_on_sky_stop_time_less_then: datetime = None, scheduled_on_sky_stop_time_greater_then: datetime = None) -> list: '''get subtasks (as list of dicts) filtered by the given parameters''' clauses = {} if state is not None: clauses["state__value"] = state if cluster is not None: clauses["cluster__name"] = cluster - if start_time_less_then is not None: - clauses["start_time__lt="] = formatDatetime(start_time_less_then) - if start_time_greater_then is not None: - clauses["start_time__gt"] = formatDatetime(start_time_greater_then) - if stop_time_less_then is not None: - clauses["stop_time__lt"] = formatDatetime(stop_time_less_then) - if stop_time_greater_then is not None: - clauses["stop_time__gt"] = formatDatetime(stop_time_greater_then) + if scheduled_on_sky_start_time_less_then is not None: + clauses["scheduled_on_sky_start_time__lt="] = formatDatetime(scheduled_on_sky_start_time_less_then) + if scheduled_on_sky_start_time_greater_then is not None: + clauses["scheduled_on_sky_start_time__gt"] = formatDatetime(scheduled_on_sky_start_time_greater_then) + if scheduled_on_sky_stop_time_less_then is not None: + clauses["scheduled_on_sky_stop_time__lt"] = formatDatetime(scheduled_on_sky_stop_time_less_then) + if scheduled_on_sky_stop_time_greater_then is not None: + clauses["scheduled_on_sky_stop_time__gt"] = formatDatetime(scheduled_on_sky_stop_time_greater_then) return self.get_path_as_json_object("subtask", clauses) @@ -407,11 +407,11 @@ class TMSSsession(object): return result.content.decode('utf-8') raise Exception("Could not specify observation for task %s.\nResponse: %s" % (task_id, result)) - def schedule_subtask(self, subtask_id: int, start_time: datetime=None, retry_count: int=0) -> {}: - """schedule the subtask for the given subtask_id at the given start_time. If start_time==None then already (pre)set start_time is used. + def schedule_subtask(self, subtask_id: int, scheduled_on_sky_start_time: datetime=None, retry_count: int=0) -> {}: + """schedule the subtask for the given subtask_id at the given scheduled_on_sky_start_time. If scheduled_on_sky_start_time==None then already (pre)set scheduled_on_sky_start_time is used. returns the scheduled subtask upon success, or raises.""" - if start_time is not None: - self.session.patch(self.get_full_url_for_path('subtask/%s' % subtask_id), {'start_time': datetime.utcnow()}) + if scheduled_on_sky_start_time is not None: + self.session.patch(self.get_full_url_for_path('subtask/%s' % subtask_id), {'scheduled_on_sky_start_time': datetime.utcnow()}) return self.post_to_path_and_get_result_as_json_object('subtask/%s/schedule' % (subtask_id), retry_count=retry_count) def unschedule_subtask(self, subtask_id: int, retry_count: int=0) -> {}: -- GitLab