diff --git a/SAS/TMSS/backend/services/CMakeLists.txt b/SAS/TMSS/backend/services/CMakeLists.txt index fcf450a17b3faff9e242cc8c0d0160b362129b36..1b2e6dd08f60fa7c43fc28f1453670dbddb47d89 100644 --- a/SAS/TMSS/backend/services/CMakeLists.txt +++ b/SAS/TMSS/backend/services/CMakeLists.txt @@ -11,4 +11,5 @@ lofar_add_package(TMSSSlackWebhookService slack_webhook) lofar_add_package(TMSSWebSocketService websocket) lofar_add_package(TMSSWorkflowService workflow_service) lofar_add_package(TMSSReportRefreshService report_refresh) +lofar_add_package(TMSSLofar2SiblingService lofar2_siblings) diff --git a/SAS/TMSS/backend/services/lofar2_siblings/CMakeLists.txt b/SAS/TMSS/backend/services/lofar2_siblings/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..f6cf877a67f8cffe982c82dab761aa238806584d --- /dev/null +++ b/SAS/TMSS/backend/services/lofar2_siblings/CMakeLists.txt @@ -0,0 +1,4 @@ +lofar_package(TMSSLofar2SiblingService 0.1 DEPENDS TMSSClient PyCommon PyMessaging) + +lofar_add_bin_scripts(tmss_lofar2_sibling_service) + diff --git a/SAS/TMSS/backend/services/lofar2_siblings/tmss_lofar2_sibling_service b/SAS/TMSS/backend/services/lofar2_siblings/tmss_lofar2_sibling_service new file mode 100755 index 0000000000000000000000000000000000000000..29ed75889d9cc8379ac6d0c4ad201a933f862b6b --- /dev/null +++ b/SAS/TMSS/backend/services/lofar2_siblings/tmss_lofar2_sibling_service @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + + +import os +from optparse import OptionParser, OptionGroup +import logging +logger = logging.getLogger(__name__) + +from lofar.sas.tmss.client.tmssbuslistener import * +from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession + + +class TMSSLofar2SiblingEventMessageHandler(TMSSEventMessageHandler): + ''' + ''' + def __init__(self, tmss_client_credentials_id: str="TMSSClient"): + super().__init__() + self.tmss_client = TMSSsession.create_from_dbcreds_for_ldap(tmss_client_credentials_id) + + def start_handling(self): + self.tmss_client.open() + + def stop_handling(self): + self.tmss_client.close() + + def onSchedulingUnitBlueprintCreated(self, id: int): + lofar2_unit = self.tmss_client.create_lofar2_sibling_scheduling_unit(id) + if lofar2_unit is not None: + logger.info("created lofar2 unit: %s", lofar2_unit['url']) + + +def create_lofar2_sibling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, tmss_client_credentials_id: str="TMSSClient"): + return TMSSBusListener(handler_type=TMSSLofar2SiblingEventMessageHandler, + handler_kwargs={'tmss_client_credentials_id': tmss_client_credentials_id}, + exchange=exchange, broker=broker) + + +def main(): + # make sure we run in UTC timezone + os.environ['TZ'] = 'UTC' + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + # Check the invocation arguments + parser = OptionParser('%prog [options]', + description='run the tmss_lofar2_sibling_service which automatically creates a Lofar2 "sibling" scheduling unit for each newly created lofar1 scheduling unit blueprint') + + group = OptionGroup(parser, 'Messaging options') + group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default') + group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="exchange where the TMSS event messages are published. [default: %default]") + parser.add_option_group(group) + + group = OptionGroup(parser, 'Django options') + parser.add_option_group(group) + group.add_option('-R', '--tmss_client_credentials_id', dest='tmss_client_credentials_id', type='string', default='TMSSClient', help='TMSS django REST API credentials name, default: %default') + + (options, args) = parser.parse_args() + + # check TMSS is up and running via the client + TMSSsession.check_connection_and_exit_on_error(options.tmss_client_credentials_id) + + from lofar.common.util import waitForInterrupt + + with create_lofar2_sibling_service(options.exchange, options.broker, options.tmss_client_credentials_id): + waitForInterrupt() + +if __name__ == '__main__': + main() diff --git a/SAS/TMSS/backend/services/scheduling/lib/constraints.py b/SAS/TMSS/backend/services/scheduling/lib/constraints.py index 1c6d6e9785e286ed66c1e4496e84227614b553c5..42ff379d6e4e7fafea5e8529f72b0d2339e26227 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/constraints.py +++ b/SAS/TMSS/backend/services/scheduling/lib/constraints.py @@ -2323,7 +2323,7 @@ def determine_unschedulable_reason_and_mark_unschedulable_if_needed(scheduling_u msg = "Stations %s are reserved at start_time='%s'" % (','.join([str(s) for s in missing_stations]), proposed_start_time) return mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit, msg) - blocking_units = get_blocking_scheduled_or_observing_units(scheduling_unit) + blocking_units = get_blocking_scheduled_or_observing_units(scheduling_unit, proposed_start_time=proposed_start_time) if blocking_units.exists(): if len(blocking_units) == 1: msg = "Scheduling unit id=%s is blocking this unit from being scheduled" % (blocking_units[0].id, ) diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index f2bdf1c17319505bba6c76e1514009d3dae5785b..1aceae9d9864185232193b2683c592181ef3e8c9 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -247,6 +247,9 @@ class Scheduler: assert (scheduled_unit.status.value == models.SchedulingUnitStatus.Choices.SCHEDULED.value) logger.info("Scheduled fixed_time unit [%s/%s] id=%d at '%s'", i, len(schedulable_units), schedulable_unit.id, at_timestamp) scheduled_units.append(scheduled_unit) + + # schedule any related unit(s) + scheduled_units.extend(self.try_schedule_relational_units(scheduled_unit)) else: unschedulable_unit = determine_unschedulable_reason_and_mark_unschedulable_if_needed(schedulable_unit, at_timestamp, at_timestamp + schedulable_unit.specified_observation_duration, proposed_start_time=at_timestamp, @@ -330,6 +333,9 @@ class Scheduler: if scheduled_unit: scheduled_units.append(scheduled_unit) + # schedule any related unit(s) + scheduled_units.extend(self.try_schedule_relational_units(scheduled_unit)) + # see if we can fit any B-prio units in the new gap(s) in the schedule? scheduled_B_units = self.schedule_B_priority_units_in_gaps_around_scheduling_unit(scheduled_unit) scheduled_units.extend(scheduled_B_units) @@ -747,6 +753,56 @@ class Scheduler: return scheduled_scheduling_unit + def try_schedule_relational_units(self, scheduling_unit: models.SchedulingUnitBlueprint) -> [models.SchedulingUnitBlueprint]: + '''Try scheduling all (if any) unit which have a relational scheduling constraint with the given scheduling_unit (for example, run in parallel).''' + if scheduling_unit.status.value not in models.SchedulingUnitStatus.ACTIVE_STATUS_VALUES: + return [] + + related_units = models.SchedulingUnitBlueprint.objects.filter(obsolete_since__isnull=True). \ + filter(status__value__in=(models.SchedulingUnitStatus.Choices.SCHEDULABLE.value, models.SchedulingUnitStatus.Choices.SCHEDULED.value)). \ + filter(scheduling_constraints_template__name="relational"). \ + filter(scheduling_constraints_doc__other=scheduling_unit.id).all() + + scheduled_units = [] + for related_unit in related_units: + time_offset = timedelta(seconds=related_unit.scheduling_constraints_doc.get('time_offset', 0)) + start_time = scheduling_unit.scheduled_start_time + time_offset + logger.info("unit id=%s is related to unit id=%s and needs to be scheduled with an offset='%s' at start_time='%s'", + related_unit.id, scheduling_unit.id, time_offset, start_time) + scheduled_related_unit = self.try_schedule_unit(related_unit, start_time) + if scheduled_related_unit: + scheduled_units.append(scheduled_related_unit) + else: + determine_unschedulable_reason_and_mark_unschedulable_if_needed(related_unit, + lower_bound=scheduling_unit.scheduled_start_time, + upper_bound=scheduling_unit.scheduled_stop_time, + proposed_start_time=start_time, + gridder=self.fine_gridder, + raise_if_interruped=self._raise_if_triggered) + return scheduled_units + + def unschedule_relational_units(self, scheduling_unit: models.SchedulingUnitBlueprint): + '''unscheduling all (if any) unit which have a relational scheduling constraint with the given scheduling_unit (for example, run in parallel).''' + related_units = models.SchedulingUnitBlueprint.objects.filter(obsolete_since__isnull=True). \ + filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULED.value). \ + filter(scheduling_constraints_template__name="relational"). \ + filter(scheduling_constraints_doc__other=scheduling_unit.id).all() + + for related_unit in related_units: + logger.info("unit id=%s is related to unit id=%s and needs to be unscheduled", related_unit.id, scheduling_unit.id) + unschedule_subtasks_in_scheduling_unit_blueprint(related_unit) + + + def mark_relational_units_as_schedulable(self, scheduling_unit: models.SchedulingUnitBlueprint): + '''mark all (if any) unschedulable units which have a relational scheduling constraint with the given scheduling_unit as schedulable.''' + related_units = models.SchedulingUnitBlueprint.objects.filter(obsolete_since__isnull=True). \ + filter(status__value=models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value). \ + filter(scheduling_constraints_template__name="relational"). \ + filter(scheduling_constraints_doc__other=scheduling_unit.id).all() + for related_unit in related_units: + mark_subtasks_in_scheduling_unit_blueprint_as_schedulable(related_unit) + + def schedule_B_priority_units_in_gaps_around_scheduling_unit(self, scheduling_unit: models.SchedulingUnitBlueprint) -> [models.SchedulingUnitBlueprint]: '''try to schedule one or more scheduling units from queue B in the gap between the given scheduled_unit and its previous observed+ unit''' scheduled_units = [] @@ -794,6 +850,9 @@ class Scheduler: maybe_scheduled_unit = self.try_schedule_unit(best_B_candidate_for_gap.scheduling_unit, best_B_candidate_for_gap.start_time) if maybe_scheduled_unit is not None and maybe_scheduled_unit.status.value==models.SchedulingUnitStatus.Choices.SCHEDULED.value: scheduled_units.append(best_B_candidate_for_gap.scheduling_unit) + + # schedule any related unit(s) + scheduled_units.extend(self.try_schedule_relational_units(maybe_scheduled_unit)) except Exception as e: logger.exception("schedule_B_priority_units_in_gaps: Could not schedule B-queue unit id=%s in gap( ['%s', '%s'). %s", best_B_candidate_for_gap.scheduling_unit.id, lower_bound_start_time, upper_bound_stop_time, str(e)) @@ -995,6 +1054,10 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): logger.info("onSchedulingUnitBlueprintStatusChanged(id=%s, status=%s)",id, status) scheduling_unit = models.SchedulingUnitBlueprint.objects.get(id=id) + if status == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value: + self.scheduler.unschedule_relational_units(scheduling_unit) + self.scheduler.mark_relational_units_as_schedulable(scheduling_unit) + # trigger scheduler if needed if scheduling_unit.is_fixed_time_scheduled and self.scheduler.fixed_time_scheduling_enabled: if status == models.SchedulingUnitStatus.Choices.SCHEDULABLE.value: @@ -1016,8 +1079,15 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): at = get_at_constraint_timestamp(scheduling_unit_blueprint) if at is not None: update_subtasks_start_times_for_scheduling_unit(scheduling_unit_blueprint, at) - except: - pass + + # trigger rescheduling of any related unit + if scheduling_unit_blueprint.scheduling_constraints_template.name=="relational": + related_unit_id = scheduling_unit_blueprint.scheduling_constraints_doc.get('other', -1) + related_unit = models.SchedulingUnitBlueprint.objects.get(id=related_unit_id) + self.scheduler.try_schedule_relational_units(related_unit) + except Exception as e: + logger.error(str(e)) + self.onSchedulingUnitBlueprintConstraintsRankOrQueueUpdated(id) def onSchedulingUnitBlueprintRankUpdated(self, id: int, rank: float): @@ -1190,6 +1260,7 @@ def get_dynamically_schedulable_scheduling_units(priority_queue: models.Priority if include_scheduled: states += [models.SchedulingUnitStatus.Choices.SCHEDULED.value] scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value__in=states).filter(obsolete_since__isnull=True) + scheduling_units = scheduling_units.filter(scheduling_constraints_template__name='constraints') scheduling_units = scheduling_units.filter(scheduling_constraints_doc__scheduler='dynamic') if priority_queue is not None: @@ -1204,6 +1275,7 @@ def get_dynamically_schedulable_scheduling_units(priority_queue: models.Priority def get_fixed_time_schedulable_scheduling_units() -> QuerySet: '''get a result QuerySet of all fixed_time schedulable scheduling_units''' scheduling_units = models.SchedulingUnitBlueprint.objects.filter(status__value=models.SchedulingUnitStatus.Choices.SCHEDULABLE.value).filter(obsolete_since__isnull=True) + scheduling_units = scheduling_units.filter(scheduling_constraints_template__name='constraints') scheduling_units = scheduling_units.filter(scheduling_constraints_doc__scheduler='fixed_time') scheduling_units = scheduling_units.order_by('-updated_at') return scheduling_units @@ -1281,6 +1353,7 @@ def unschededule_previously_scheduled_unit_if_needed_and_possible(candidate_sche return unschedule_subtasks_in_scheduling_unit_blueprint(previously_scheduled_scheduling_unit) except models.SchedulingUnitBlueprint.DoesNotExist: pass + return candidate_scheduling_unit def unschededule_blocking_scheduled_units_if_needed_and_possible(candidate_scheduling_unit: models.SchedulingUnitBlueprint, start_time: datetime, gridder: Gridder=None): diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/plots.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/plots.py index ff2d448515e5979c5929c920cadf6fa8bdd0f1b6..bb5b6fb7d09961b8b20ea2c85b21267e59d579b3 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/plots.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/plots.py @@ -48,7 +48,7 @@ class UTC_LST_DateFormatter(mpl.dates.DateFormatter): timestamp_lst = local_sidereal_time_for_utc_and_station_as_pytime(timestamp) if math.isclose(x, self.locs[0], rel_tol=1e-6): # add UTC and LST labels for the first tick - return timestamp.strftime(self.fmt) + "UTC\n" + timestamp_lst.strftime(self.fmt) + "LST" + return timestamp.strftime(self.fmt) + " UTC\n" + timestamp_lst.strftime(self.fmt) + " LST" return timestamp.strftime(self.fmt) + "\n" + timestamp_lst.strftime(self.fmt) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/populate.py b/SAS/TMSS/backend/src/tmss/tmssapp/populate.py index 8705583caeb529434aa7ba25fd2e5acc12ad9b0b..7737fb5c9cab6a1fd3238f46dcd3b0a2f83d6baa 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/populate.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/populate.py @@ -395,7 +395,7 @@ def populate_test_data(): logger.info('created test scheduling_unit_draft: %s', scheduling_unit_draft.name) scheduling_unit_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) - + return if scheduling_unit_blueprint.scheduled_stop_time: next_fixed_time_start_time = scheduling_unit_blueprint.scheduled_stop_time + timedelta(minutes=5) except Exception as e: @@ -560,7 +560,7 @@ def populate_cycles(apps, schema_editor): def populate_projects(apps, schema_editor): - for name, rank in (("high", ProjectRank.HIGHEST.value), ("normal", ProjectRank.LOWEST.value), ("low", ProjectRank.LOWEST.value)): + for name, rank in (("high", ProjectRank.HIGHEST.value), ("normal", ProjectRank.LOWEST.value), ("low", ProjectRank.LOWEST.value), ("COM_LOFAR2", ProjectRank.HIGHEST.value)): try: with transaction.atomic(): tmss_project = models.Project.objects.create(name=name, @@ -1061,6 +1061,13 @@ def populate_project_permissions(): perm.PATCH.set([]) perm.DELETE.set([]) + perm = ProjectPermission.objects.get(name='schedulingunitblueprint-create_lofar2_sibling') + perm.GET.set([]) + perm.POST.set([]) + perm.PUT.set([]) + perm.PATCH.set([]) + perm.DELETE.set([]) + perm = ProjectPermission.objects.get(name='schedulingunitblueprint-mark_as_obsolete') perm.GET.set([]) perm.POST.set([ProjectRole.objects.get(value=role) for role in ['shared_support', 'friend_of_project']]) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/schemas/scheduling_constraints_template/relational-1.json b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/scheduling_constraints_template/relational-1.json new file mode 100644 index 0000000000000000000000000000000000000000..c663595630e9e2d17aaf76f708d3c9887ce64e1f --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/schemas/scheduling_constraints_template/relational-1.json @@ -0,0 +1,40 @@ +{ + "description": "This schema defines the relational scheduling constraints between a pair of scheduling units", + "name": "relational", + "purpose": "production", + "schema": { + "$id": "https://tmss.lofar.eu/api/schemas/schedulingconstraintstemplate/relational/1#", + "$schema": "http://json-schema.org/draft-06/schema#", + "additionalProperties": false, + "default": { + "other": -1, + "time_offset": 0 + }, + "description": "This schema defines the relational scheduling constraints between a pair of scheduling units", + "patternProperties": { + "^[$]schema$": {} + }, + "properties": { + "other": { + "default": -1, + "description": "The id of the other scheduling unit blueprint in the relation", + "type": "integer", + "title": "Other Unit" + }, + "time_offset": { + "$ref": "https://tmss.lofar.eu/api/schemas/commonschematemplate/datetime/10/#/definitions/timedelta", + "default": 0, + "description": "Time between the other and this unit in seconds. 0=parallel, positive=later, negative=earlier", + "title": "Time Offset" + } + }, + "required": [ + "other" + ], + "title": "relational", + "type": "object", + "version": 1 + }, + "state": "active", + "version": 1 +} diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py index f7ec27588e2ba6216278254dfcd00c2a0985f771..192ec53f2fa68eb9ef2322fcf6e0e58f5a7f0d86 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py @@ -184,8 +184,8 @@ def create_scheduling_unit_draft_from_scheduling_unit_blueprint(scheduling_unit_ logger.debug("create_scheduling_unit_draft_from_scheduling_unit_blueprint(scheduling_unit_blueprint.id=%s)", scheduling_unit_blueprint.pk) with transaction.atomic(): - scheduling_unit_draft_copy = models.SchedulingUnitDraft.objects.create(name="%s (Copy from blueprint)" % (scheduling_unit_blueprint.name,), - description="%s (Copy from blueprint '%s' id=%s)" % (scheduling_unit_blueprint.description or "<no description>", scheduling_unit_blueprint.name,scheduling_unit_blueprint.id), + scheduling_unit_draft_copy = models.SchedulingUnitDraft.objects.create(name="%s (Copy)" % (scheduling_unit_blueprint.name,)[:128], + description="%s (Copy from blueprint id=%s)" % (scheduling_unit_blueprint.description or "<no description>", scheduling_unit_blueprint.id)[:256], scheduling_set=scheduling_unit_blueprint.draft.scheduling_set, observation_strategy_template=scheduling_unit_blueprint.draft.observation_strategy_template, specifications_template=scheduling_unit_blueprint.specifications_template, @@ -571,6 +571,61 @@ def create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit return scheduling_unit_blueprint +def create_lofar2_sibling_scheduling_unit_draft_and_blueprint(scheduling_unit_blueprint: models.SchedulingUnitBlueprint) -> models.SchedulingUnitBlueprint: + '''Convenience method: Create a new sibling scheduling_unit_blueprint (and draft), which is a copy of the given scheduling_unit_blueprint but with lofar2 stations, and scheduled in parallel with the original''' + with transaction.atomic(): + # get the current lofar2 stations + from lofar.sas.tmss.tmss.tmssapp.conversions import get_lofar2_stations + lofar2_stations = set(get_lofar2_stations()) + + # do not create a sibling for a unit that is already using lofar2 stations + for obs_task in scheduling_unit_blueprint.observation_tasks.all(): + specified_station_groups = obs_task.specifications_doc.get('station_configuration', {}).get('station_groups', []) + specified_stations = set(sum([group['stations'] for group in specified_station_groups], [])) + if lofar2_stations == specified_stations: + raise BlueprintCreationException("Cannot create a Lofar2 sibling for scheduling unit id=%s because it is already using Lofar2 stations" % (scheduling_unit_blueprint.id,)) + + scheduling_unit_draft = create_scheduling_unit_draft_from_scheduling_unit_blueprint(scheduling_unit_blueprint) + scheduling_unit_draft.name = scheduling_unit_draft.name.replace("(Copy)", "(Lofar2 sibling)") + + # let the commissioners decide if data is to be ingested + scheduling_unit_draft.ingest_permission_required = True + + # move the draft to the COM_LOFAR2 project, siblings scheduling_set + project = models.Project.objects.get(name="COM_LOFAR2") + try: + scheduling_unit_draft.scheduling_set = models.SchedulingSet.objects.get(project=project, name="Lofar2 siblings for Lofar1 schedulingunits") + except models.SchedulingSet.DoesNotExist: + scheduling_unit_draft.scheduling_set = models.SchedulingSet.objects.create(project=project, name="Lofar2 siblings for Lofar1 schedulingunits") + finally: + scheduling_unit_draft.save() + + # make the new unit run in parallel with the original + constraints_template = models.SchedulingConstraintsTemplate.get_latest("relational") + scheduling_unit_draft.scheduling_constraints_template = constraints_template + scheduling_unit_draft.scheduling_constraints_doc = constraints_template.get_default_json_document_for_schema() + scheduling_unit_draft.scheduling_constraints_doc['other'] = scheduling_unit_blueprint.id + scheduling_unit_draft.scheduling_constraints_doc['time_offset'] = 0 + scheduling_unit_draft.save() + + # overwrite any list of original station_groups with the lofar2 station group + lofar2_stations = sorted(list(lofar2_stations)) + for obs_task in scheduling_unit_draft.observation_tasks.all(): + obs_task.specifications_doc['station_configuration']['station_groups'] = [{'stations': lofar2_stations, + 'max_nr_missing': max(0, len(lofar2_stations)-1) + }] + if 'beamformer' in obs_task.specifications_doc: + if 'pipelines' in obs_task.specifications_doc['beamformer']: + for bfpl in obs_task.specifications_doc['beamformer']['pipelines']: + if 'station_groups' in bfpl: + bfpl['station_groups'] = obs_task.specifications_doc['station_configuration']['station_groups'] + obs_task.save() + + # create and return the blueprint, which is immediately schedulable and will be scheduled in parallel to the original + scheduling_unit_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + return scheduling_unit_blueprint + + def create_task_blueprint_and_subtasks_from_task_draft(task_draft: models.TaskDraft) -> models.TaskBlueprint: '''Convenience method: Create the task_blueprint, then create the task_blueprint's subtasks, and schedule the ones that are not dependend on predecessors''' with transaction.atomic(): diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py index 3bc53d2abc3705f2809c5d53b0fc5a35ab3d2b4f..26b39ae2c502c8502fcdf1621d6bdcf80d30f961 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py @@ -1386,6 +1386,20 @@ class SchedulingUnitBlueprintViewSet(LOFARViewSet): return HttpResponse(status=200) + @swagger_auto_schema(responses={201: "The new lofar2 sibling SchedulingUnitBlueprint.", + 403: 'forbidden'}, + operation_description="Create a copy of this unit for the lofar2 stations, and schedule it in parallel with this original.") + @action(methods=['post'], detail=True, url_name="create_lofar2_sibling") + def create_lofar2_sibling(self, request, pk=None): + scheduling_unit_blueprint = get_object_or_404(models.SchedulingUnitBlueprint, pk=pk) + sibling_blueprint = create_lofar2_sibling_scheduling_unit_draft_and_blueprint(scheduling_unit_blueprint) + + # return a response with the new serialized sibling_blueprint (with references to the created task_blueprint(s) and (scheduled) subtasks) + return Response(serializers.SchedulingUnitBlueprintSerializer(sibling_blueprint, context={'request':request}).data, + status=status.HTTP_201_CREATED) + + + class SchedulingUnitBlueprintExtendedViewSet(SchedulingUnitBlueprintViewSet): serializer_class = serializers.SchedulingUnitBlueprintExtendedSerializer diff --git a/SAS/TMSS/client/lib/tmss_http_rest_client.py b/SAS/TMSS/client/lib/tmss_http_rest_client.py index 968fc748865d969647efdf483d66bd9c2991ee7e..92ce074dc55de14ef1dfa2ca12b63e76e2dc2c2d 100644 --- a/SAS/TMSS/client/lib/tmss_http_rest_client.py +++ b/SAS/TMSS/client/lib/tmss_http_rest_client.py @@ -893,4 +893,10 @@ class TMSSsession(object): def get_subtask_l2stationspecs(self, subtask_id, retry_count: int=DEFAULT_RETRY_COUNT) -> str: '''get the lofar 2.0 station specifications (as json) for the given subtask''' - return self.get_path_as_json_object('/subtask/%s/l2stationspecs' % (subtask_id,), retry_count=retry_count) \ No newline at end of file + return self.get_path_as_json_object('/subtask/%s/l2stationspecs' % (subtask_id,), retry_count=retry_count) + + def create_lofar2_sibling_scheduling_unit(self, scheduling_unit_blueprint_id: int, retry_count: int=0) -> {}: + """Create a copy of the unit with the given id, but with lofar2 stations, and schedule it in parallel with this original. + returns the new sibling scheduling_unit_blueprint upon success, or raises.""" + return self.post_to_path_and_get_result_as_json_object('scheduling_unit_blueprint/%s/create_lofar2_sibling' % (scheduling_unit_blueprint_id), + retry_count=retry_count) diff --git a/SAS/TMSS/deploy/docker-compose.yml b/SAS/TMSS/deploy/docker-compose.yml index 833fc28248106f918a4572f4cb3cc80dbceb3b1d..8ef9f7918dc2e5ead03be7e62680d96bbe30e2da 100644 --- a/SAS/TMSS/deploy/docker-compose.yml +++ b/SAS/TMSS/deploy/docker-compose.yml @@ -227,6 +227,29 @@ services: driver: journald options: tag: tmss_scheduling + lofar2_siblings: + container_name: tmss_lofar2_siblings + image: tmss_lofar2_siblings + build: + context: ./app + dockerfile: Dockerfile + args: + SOURCE_IMAGE: ${SOURCE_IMAGE} + HOME: "/localhome/lofarsys" + restart: unless-stopped + env_file: + - env + environment: + - USER=lofarsys + - HOME=/localhome/lofarsys + command: /bin/bash -c 'source /opt/lofar/lofarinit.sh; exec tmss_lofar2_sibling_service' + depends_on: + db_migrate: + condition: service_completed_successfully + logging: + driver: journald + options: + tag: tmss_lofar2_sibling_service workflow: container_name: tmss_workflow image: tmss_workflow