diff --git a/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py index 5119377be0cb340df0d23e41852db35efcc6f524..b248eaa6d11257ccfba52ea2e31184999d864507 100644 --- a/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py @@ -28,86 +28,152 @@ import os import logging logger = logging.getLogger(__name__) -from datetime import datetime, timedelta -from django.db.models.aggregates import Max +from datetime import datetime, timedelta, time from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp.tasks import schedule_independent_subtasks_in_scheduling_unit_blueprint, unschedule_subtasks_in_scheduling_unit_blueprint -from lofar.sas.tmss.tmss.tmssapp.subtasks import shift_successors_until_after_stop_time +from lofar.sas.tmss.tmss.tmssapp.subtasks import update_subtasks_start_times_for_scheduling_unit from lofar.sas.tmss.client.tmssbuslistener import * from lofar.sas.tmss.tmss.exceptions import * from lofar.common.datetimeutils import round_to_minute_precision -def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: +def get_schedulable_scheduling_units() -> [models.SchedulingUnitBlueprint]: + '''get a list of all schedulable scheduling_units''' + defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined') + scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=defined_independend_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()).all() + return [su for su in scheduling_units if su.status == 'schedulable'] - # for now (as a proof of concept) UNschedule all scheduled subtasks/scheduling_units first to make room. - # TODO: determine if the new canditate for to be scheduled has a higher 'score' then the current one, and only unschedule it then. - # get all already scheduled scheduling_units - scheduled_subtasks = models.Subtask.objects.filter(state__value='scheduled').all() - scheduled_scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()) - for scheduled_scheduling_unit in scheduled_scheduling_units: - unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) +def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) -> [models.SchedulingUnitBlueprint]: + '''get a list of all scheduled scheduling_units''' + scheduled_subtasks = models.Subtask.objects.filter(state__value='scheduled') + if lower is not None: + scheduled_subtasks = scheduled_subtasks.filter(stop_time__gte=lower) + if upper is not None: + scheduled_subtasks = scheduled_subtasks.filter(start_time__lte=upper) + return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()).all()) - # get all schedualable scheduling_units - defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined') - scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=defined_independend_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()) - # for now (as a proof of concept), order the scheduling units by project priority_rank, then by creation date - scheduling_units = scheduling_units.order_by('-draft__scheduling_set__project__priority_rank', 'created_at').all() +def find_best_next_schedulable_unit(lower_bound_start_time: datetime=None, upper_bound_stop_time: datetime=None, scheduling_units:[models.SchedulingUnitBlueprint]=None) -> (models.SchedulingUnitBlueprint, datetime): + """ + find the best schedulable scheduling_unit which can run withing the given time window, for the given scheduling_units. + Returns a tuple of the best next schedulable scheduling unit and its proposed starttime where it best fits its contraints. + """ + if lower_bound_start_time is None: + lower_bound_start_time = datetime.utcnow() + + if not scheduling_units: + scheduling_units = get_schedulable_scheduling_units() + filtered_scheduling_units = [] for scheduling_unit in scheduling_units: - logger.info("scheduling_unit name='%s' project='%s' project_prio=%s created_at=%s start=%s stop=%s duration=%s", - scheduling_unit.name, scheduling_unit.draft.scheduling_set.project.name, scheduling_unit.draft.scheduling_set.project.priority_rank, scheduling_unit.created_at, scheduling_unit.start_time, scheduling_unit.stop_time, scheduling_unit.duration) - for task in scheduling_unit.task_blueprints.all(): - logger.info(" task name='%s' start=%s stop=%s duration=%s rel_start=%s", task.name, task.start_time, task.stop_time, task.duration, task.relative_start_time) - for subtask in task.subtasks.order_by('start_time').all(): - logger.info(" subtask type='%s' indep=%s start=%s stop=%s duration=%s", subtask.specifications_template.type.value, subtask.predecessors.count()==0, subtask.start_time, subtask.stop_time, subtask.duration) - - # for now (as a proof of concept), select the scheduling unit which belongs to the project with the highest rank - first_scheduling_unit = scheduling_units[0] - remaining_scheduling_units = scheduling_units[1:] - - while first_scheduling_unit and remaining_scheduling_units: + constraints = scheduling_unit.draft.scheduling_constraints_doc + + # TODO: filter by the constraints + filtered_scheduling_units.append(scheduling_unit) + + if filtered_scheduling_units: + filtered_scheduling_units = sorted(filtered_scheduling_units, key=get_score, reverse=True) + best_next_schedulable_unit = filtered_scheduling_units[0] + + # TODO: compute best start_time so the constrainst yields the max score. + best_start_time = get_first_possible_start_time(best_next_schedulable_unit, lower_bound_start_time) + + return best_next_schedulable_unit, best_start_time + + return None, None + + +def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: + '''find the best next schedulable scheduling unit and try to schedule it. + Overlapping existing scheduled units are unscheduled if their score is lower. + :return: the scheduled scheduling unit.''' + schedulable_units = get_schedulable_scheduling_units() + + # estimate the lower_bound_start_time + lower_bound_start_time = get_min_first_possible_start_time(schedulable_units, datetime.utcnow() + timedelta(seconds=90)) + + while True: try: - # schedule the first one! - # it can start running now + the needed lofar startup time of 1~2 minutes. - start_time = round_to_minute_precision(datetime.utcnow() + timedelta(seconds=90)) - scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(first_scheduling_unit, start_time=start_time) - break + # try to find the best next scheduling_unit + best_scheduling_unit, best_start_time = find_best_next_schedulable_unit(lower_bound_start_time, None, schedulable_units) + best_scheduling_unit_score = get_score(best_scheduling_unit) + + # make start_time "look nice" for us humans + best_start_time = round_to_minute_precision(best_start_time) + + # unschedule any previously scheduled units + scheduled_scheduling_units = get_scheduled_scheduling_units(lower=best_start_time, upper=best_start_time+best_scheduling_unit.duration) + for scheduled_scheduling_unit in scheduled_scheduling_units: + if best_scheduling_unit_score > get_score(scheduled_scheduling_unit): + unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit) + + # check again... are still there any scheduled_scheduling_units in the way? + scheduled_scheduling_units = get_scheduled_scheduling_units(lower=best_start_time, upper=best_start_time+best_scheduling_unit.duration) + if scheduled_scheduling_units: + # accept current solution with current scheduled_scheduling_units + return None + else: + # schedule it! + scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(best_scheduling_unit, start_time=best_start_time) + return scheduled_scheduling_unit except SubtaskSchedulingException as e: - logger.error("Could not schedule scheduling_unit %%s '%s'. Error: %s", - first_scheduling_unit.id, first_scheduling_unit.name, e) - if remaining_scheduling_units: - first_scheduling_unit = remaining_scheduling_units[0] - remaining_scheduling_units = remaining_scheduling_units[1:] + logger.error("Could not schedule scheduling_unit id=%s name='%s'. Error: %s", best_scheduling_unit.id, best_scheduling_unit.name, e) - # update the starttimes of the remaining ones (so they form queue, and can be visualized in a timeline) - last_scheduling_unit_stop_time = scheduled_scheduling_unit.stop_time - for scheduling_unit in remaining_scheduling_units: - defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined').filter(task_blueprint__scheduling_unit_blueprint=scheduling_unit).all() + # we assume the best_scheduling_unit is now in ERROR state, and should be fixed by a human + # remove it from schedulable_units... + schedulable_units.remove(best_scheduling_unit) + lower_bound_start_time = get_min_first_possible_start_time(schedulable_units, datetime.utcnow() + timedelta(seconds=90)) + # and seach again... (loop) - for subtask in defined_independend_subtasks: - subtask.start_time = last_scheduling_unit_stop_time + subtask.task_blueprint.relative_start_time - subtask.stop_time = subtask.start_time + subtask.specified_duration - subtask.save() - shift_successors_until_after_stop_time(subtask) - # keep track of the previous - last_scheduling_unit_stop_time = scheduling_unit.stop_time +def assign_start_stop_times_to_schedulable_scheduling_units(): + try: + last_scheduling_unit_stop_time = max(su.stop_time for su in get_scheduled_scheduling_units() if su.stop_time is not None) + except: + last_scheduling_unit_stop_time = datetime.utcnow() - scheduling_units = models.SchedulingUnitBlueprint.objects.order_by('-draft__scheduling_set__project__priority_rank', 'created_at').all() - for scheduling_unit in scheduling_units: - logger.info("scheduling_unit name='%s' project='%s' project_prio=%s created_at=%s start=%s stop=%s duration=%s", - scheduling_unit.name, scheduling_unit.draft.scheduling_set.project.name, scheduling_unit.draft.scheduling_set.project.priority_rank, scheduling_unit.created_at, scheduling_unit.start_time, scheduling_unit.stop_time, scheduling_unit.duration) - for task in scheduling_unit.task_blueprints.all(): - logger.info(" task name='%s' start=%s stop=%s duration=%s rel_start=%s", task.name, task.start_time, task.stop_time, task.duration, task.relative_start_time) - for subtask in task.subtasks.order_by('start_time').all(): - logger.info(" subtask type='%s' indep=%s start=%s stop=%s duration=%s", subtask.specifications_template.type.value, subtask.predecessors.count()==0, subtask.start_time, subtask.stop_time, subtask.duration) + scheduling_units = get_schedulable_scheduling_units() + + # update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline) + while scheduling_units: + scheduling_unit, start_time = find_best_next_schedulable_unit(lower_bound_start_time=last_scheduling_unit_stop_time, scheduling_units=scheduling_units) + + if scheduling_unit: + update_subtasks_start_times_for_scheduling_unit(scheduling_unit, start_time) + + # keep track of the previous + last_scheduling_unit_stop_time = scheduling_unit.stop_time + + scheduling_units.remove(scheduling_unit) + else: + # search again in a later timeslot + last_scheduling_unit_stop_time = get_min_first_possible_start_time(scheduling_units) + + +def get_first_possible_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime=None) -> datetime: + now = datetime.utcnow() + + if lower_bound is None: + lower_bound = now + + constraints = scheduling_unit.draft.scheduling_constraints_doc + + # we're not evaluating constraints yet, so return a guestimate of now+1hour + return max(lower_bound, now)+timedelta(hours=1) + + +def get_min_first_possible_start_time(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound: datetime=None) -> datetime: + if lower_bound is None: + lower_bound = datetime.utcnow() + return min(get_first_possible_start_time(scheduling_unit, lower_bound) for scheduling_unit in scheduling_units) - return scheduled_scheduling_unit +def get_score(schedulingunit_blueprint: models.SchedulingUnitBlueprint) -> float: + # for now (as a proof of concept), order the scheduling units by project priority_rank + # TODO: sort by constraints, lst, etc + return schedulingunit_blueprint.draft.scheduling_set.project.priority_rank class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSSchedulingUnitBlueprintEventMessageHandler): ''' @@ -115,8 +181,8 @@ class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSSchedulingU def onTaskBlueprintsAndSubtasksCreated(self, schedulingunit_blueprint_id: int): logger.info("onTaskBlueprintsAndSubtasksCreated(schedulingunit_blueprint_id=%s): Updating dynamic schedule...", schedulingunit_blueprint_id) - # call schedule_next_scheduling_unit for now, which also updates/schedules all schedulingunit_blueprints, including this one schedule_next_scheduling_unit() + assign_start_stop_times_to_schedulable_scheduling_units() def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): diff --git a/SAS/TMSS/services/scheduling/test/t_dynamic_scheduling.py b/SAS/TMSS/services/scheduling/test/t_dynamic_scheduling.py index dc28e006909fd6987336ace0c549ebecf8517364..f21010cc40c4a5475b6c469c492c46b21289906a 100755 --- a/SAS/TMSS/services/scheduling/test/t_dynamic_scheduling.py +++ b/SAS/TMSS/services/scheduling/test/t_dynamic_scheduling.py @@ -31,6 +31,7 @@ from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor from time import sleep from datetime import datetime, timedelta +from lofar.common.json_utils import get_default_json_object_for_schema, add_defaults_to_json_object_for_schema class TestDynamicScheduling(unittest.TestCase): @@ -56,7 +57,8 @@ class TestDynamicScheduling(unittest.TestCase): cls.ra_test_env = RATestEnvironment(exchange=cls.tmp_exchange.address) cls.ra_test_env.start() - cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address) + cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, + populate_schemas=True, populate_test_data=False) cls.tmss_test_env.start() @@ -67,9 +69,31 @@ class TestDynamicScheduling(unittest.TestCase): cls.tmp_exchange.close() + @staticmethod + def create_simple_observation_scheduling_unit(name:str=None, scheduling_set=None, + obs_duration:int=60, + constraints=None): + from lofar.sas.tmss.tmss.tmssapp import models + from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft + + constraints_template = models.SchedulingConstraintsTemplate.objects.get(name="constraints") + constraints = add_defaults_to_json_object_for_schema(constraints or {}, constraints_template.schema) + + strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="Simple Observation") + scheduling_unit_spec = add_defaults_to_json_object_for_schema(strategy_template.template, + strategy_template.scheduling_unit_template.schema) + + # add the scheduling_unit_doc to a new SchedulingUnitDraft instance, and were ready to use it! + return models.SchedulingUnitDraft.objects.create(name=name, + scheduling_set=scheduling_set, + requirements_template=strategy_template.scheduling_unit_template, + requirements_doc=scheduling_unit_spec, + observation_strategy_template=strategy_template, + scheduling_constraints_doc=constraints, + scheduling_constraints_template=constraints_template) + + def test_01(self): - # import here, and not at top of module, because the tmsstestenv needs to be running before importing - from lofar.sas.tmss.tmss.tmssapp.populate import create_UC1_scheduling_unit from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft, schedule_independent_subtasks_in_scheduling_unit_blueprint from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data, Project_test_data @@ -81,16 +105,17 @@ class TestDynamicScheduling(unittest.TestCase): scheduling_set1 = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=project1)) scheduling_set2 = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=project2)) - scheduling_unit_draft1 = create_UC1_scheduling_unit("UC1 scheduling unit 1", scheduling_set=scheduling_set1) + scheduling_unit_draft1 = self.create_simple_observation_scheduling_unit("UC1 scheduling unit 1", scheduling_set=scheduling_set1) scheduling_unit_blueprint1 = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft1) - scheduling_unit_draft2 = create_UC1_scheduling_unit("UC1 scheduling unit 2", scheduling_set=scheduling_set2) + scheduling_unit_draft2 = self.create_simple_observation_scheduling_unit("UC1 scheduling unit 2", scheduling_set=scheduling_set2) scheduling_unit_blueprint2 = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft2) # call the method-under-test. scheduled_scheduling_unit = schedule_next_scheduling_unit() # we expect the scheduling_unit with the highest project rank to be scheduled first + self.assertIsNotNone(scheduled_scheduling_unit) self.assertEqual(scheduling_unit_blueprint2.id, scheduled_scheduling_unit.id) # check the results @@ -98,7 +123,7 @@ class TestDynamicScheduling(unittest.TestCase): start_time__gt=datetime.utcnow()) # we expect the 1st and 2nd Calibrator and the Target observation subtasks of scheduling_unit_blueprint2 to be scheduled - self.assertEqual(3, upcoming_scheduled_subtasks.count()) + self.assertEqual(1, upcoming_scheduled_subtasks.count()) for subtask in upcoming_scheduled_subtasks: self.assertTrue(subtask in models.Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint=scheduled_scheduling_unit).all()) self.assertEqual(subtask.specifications_template.type.value, models.SubtaskType.Choices.OBSERVATION.value)