Skip to content
Snippets Groups Projects
Commit 7d6eb26c authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-190: initial crude dynamic scheduler. To be extended with many features

parent b4834e3a
No related branches found
No related tags found
1 merge request!252Resolve TMSS-190
......@@ -28,86 +28,152 @@
import os
import logging
logger = logging.getLogger(__name__)
from datetime import datetime, timedelta
from django.db.models.aggregates import Max
from datetime import datetime, timedelta, time
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.tasks import schedule_independent_subtasks_in_scheduling_unit_blueprint, unschedule_subtasks_in_scheduling_unit_blueprint
from lofar.sas.tmss.tmss.tmssapp.subtasks import shift_successors_until_after_stop_time
from lofar.sas.tmss.tmss.tmssapp.subtasks import update_subtasks_start_times_for_scheduling_unit
from lofar.sas.tmss.client.tmssbuslistener import *
from lofar.sas.tmss.tmss.exceptions import *
from lofar.common.datetimeutils import round_to_minute_precision
def get_schedulable_scheduling_units() -> [models.SchedulingUnitBlueprint]:
'''get a list of all schedulable scheduling_units'''
defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined')
scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=defined_independend_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()).all()
return [su for su in scheduling_units if su.status == 'schedulable']
def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) -> [models.SchedulingUnitBlueprint]:
'''get a list of all scheduled scheduling_units'''
scheduled_subtasks = models.Subtask.objects.filter(state__value='scheduled')
if lower is not None:
scheduled_subtasks = scheduled_subtasks.filter(stop_time__gte=lower)
if upper is not None:
scheduled_subtasks = scheduled_subtasks.filter(start_time__lte=upper)
return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()).all())
def find_best_next_schedulable_unit(lower_bound_start_time: datetime=None, upper_bound_stop_time: datetime=None, scheduling_units:[models.SchedulingUnitBlueprint]=None) -> (models.SchedulingUnitBlueprint, datetime):
"""
find the best schedulable scheduling_unit which can run withing the given time window, for the given scheduling_units.
Returns a tuple of the best next schedulable scheduling unit and its proposed starttime where it best fits its contraints.
"""
if lower_bound_start_time is None:
lower_bound_start_time = datetime.utcnow()
if not scheduling_units:
scheduling_units = get_schedulable_scheduling_units()
filtered_scheduling_units = []
for scheduling_unit in scheduling_units:
constraints = scheduling_unit.draft.scheduling_constraints_doc
# TODO: filter by the constraints
filtered_scheduling_units.append(scheduling_unit)
if filtered_scheduling_units:
filtered_scheduling_units = sorted(filtered_scheduling_units, key=get_score, reverse=True)
best_next_schedulable_unit = filtered_scheduling_units[0]
# TODO: compute best start_time so the constrainst yields the max score.
best_start_time = get_first_possible_start_time(best_next_schedulable_unit, lower_bound_start_time)
return best_next_schedulable_unit, best_start_time
return None, None
def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint:
'''find the best next schedulable scheduling unit and try to schedule it.
Overlapping existing scheduled units are unscheduled if their score is lower.
:return: the scheduled scheduling unit.'''
schedulable_units = get_schedulable_scheduling_units()
# estimate the lower_bound_start_time
lower_bound_start_time = get_min_first_possible_start_time(schedulable_units, datetime.utcnow() + timedelta(seconds=90))
# for now (as a proof of concept) UNschedule all scheduled subtasks/scheduling_units first to make room.
# TODO: determine if the new canditate for to be scheduled has a higher 'score' then the current one, and only unschedule it then.
# get all already scheduled scheduling_units
scheduled_subtasks = models.Subtask.objects.filter(state__value='scheduled').all()
scheduled_scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct())
while True:
try:
# try to find the best next scheduling_unit
best_scheduling_unit, best_start_time = find_best_next_schedulable_unit(lower_bound_start_time, None, schedulable_units)
best_scheduling_unit_score = get_score(best_scheduling_unit)
# make start_time "look nice" for us humans
best_start_time = round_to_minute_precision(best_start_time)
# unschedule any previously scheduled units
scheduled_scheduling_units = get_scheduled_scheduling_units(lower=best_start_time, upper=best_start_time+best_scheduling_unit.duration)
for scheduled_scheduling_unit in scheduled_scheduling_units:
if best_scheduling_unit_score > get_score(scheduled_scheduling_unit):
unschedule_subtasks_in_scheduling_unit_blueprint(scheduled_scheduling_unit)
# check again... are still there any scheduled_scheduling_units in the way?
scheduled_scheduling_units = get_scheduled_scheduling_units(lower=best_start_time, upper=best_start_time+best_scheduling_unit.duration)
if scheduled_scheduling_units:
# accept current solution with current scheduled_scheduling_units
return None
else:
# schedule it!
scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(best_scheduling_unit, start_time=best_start_time)
return scheduled_scheduling_unit
except SubtaskSchedulingException as e:
logger.error("Could not schedule scheduling_unit id=%s name='%s'. Error: %s", best_scheduling_unit.id, best_scheduling_unit.name, e)
# get all schedualable scheduling_units
defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined')
scheduling_units = models.SchedulingUnitBlueprint.objects.filter(id__in=defined_independend_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct())
# we assume the best_scheduling_unit is now in ERROR state, and should be fixed by a human
# remove it from schedulable_units...
schedulable_units.remove(best_scheduling_unit)
lower_bound_start_time = get_min_first_possible_start_time(schedulable_units, datetime.utcnow() + timedelta(seconds=90))
# and seach again... (loop)
# for now (as a proof of concept), order the scheduling units by project priority_rank, then by creation date
scheduling_units = scheduling_units.order_by('-draft__scheduling_set__project__priority_rank', 'created_at').all()
for scheduling_unit in scheduling_units:
logger.info("scheduling_unit name='%s' project='%s' project_prio=%s created_at=%s start=%s stop=%s duration=%s",
scheduling_unit.name, scheduling_unit.draft.scheduling_set.project.name, scheduling_unit.draft.scheduling_set.project.priority_rank, scheduling_unit.created_at, scheduling_unit.start_time, scheduling_unit.stop_time, scheduling_unit.duration)
for task in scheduling_unit.task_blueprints.all():
logger.info(" task name='%s' start=%s stop=%s duration=%s rel_start=%s", task.name, task.start_time, task.stop_time, task.duration, task.relative_start_time)
for subtask in task.subtasks.order_by('start_time').all():
logger.info(" subtask type='%s' indep=%s start=%s stop=%s duration=%s", subtask.specifications_template.type.value, subtask.predecessors.count()==0, subtask.start_time, subtask.stop_time, subtask.duration)
# for now (as a proof of concept), select the scheduling unit which belongs to the project with the highest rank
first_scheduling_unit = scheduling_units[0]
remaining_scheduling_units = scheduling_units[1:]
while first_scheduling_unit and remaining_scheduling_units:
def assign_start_stop_times_to_schedulable_scheduling_units():
try:
# schedule the first one!
# it can start running now + the needed lofar startup time of 1~2 minutes.
start_time = round_to_minute_precision(datetime.utcnow() + timedelta(seconds=90))
scheduled_scheduling_unit = schedule_independent_subtasks_in_scheduling_unit_blueprint(first_scheduling_unit, start_time=start_time)
break
except SubtaskSchedulingException as e:
logger.error("Could not schedule scheduling_unit %%s '%s'. Error: %s",
first_scheduling_unit.id, first_scheduling_unit.name, e)
if remaining_scheduling_units:
first_scheduling_unit = remaining_scheduling_units[0]
remaining_scheduling_units = remaining_scheduling_units[1:]
last_scheduling_unit_stop_time = max(su.stop_time for su in get_scheduled_scheduling_units() if su.stop_time is not None)
except:
last_scheduling_unit_stop_time = datetime.utcnow()
# update the starttimes of the remaining ones (so they form queue, and can be visualized in a timeline)
last_scheduling_unit_stop_time = scheduled_scheduling_unit.stop_time
for scheduling_unit in remaining_scheduling_units:
defined_independend_subtasks = models.Subtask.independent_subtasks().filter(state__value='defined').filter(task_blueprint__scheduling_unit_blueprint=scheduling_unit).all()
scheduling_units = get_schedulable_scheduling_units()
for subtask in defined_independend_subtasks:
subtask.start_time = last_scheduling_unit_stop_time + subtask.task_blueprint.relative_start_time
subtask.stop_time = subtask.start_time + subtask.specified_duration
subtask.save()
# update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline)
while scheduling_units:
scheduling_unit, start_time = find_best_next_schedulable_unit(lower_bound_start_time=last_scheduling_unit_stop_time, scheduling_units=scheduling_units)
shift_successors_until_after_stop_time(subtask)
if scheduling_unit:
update_subtasks_start_times_for_scheduling_unit(scheduling_unit, start_time)
# keep track of the previous
last_scheduling_unit_stop_time = scheduling_unit.stop_time
scheduling_units = models.SchedulingUnitBlueprint.objects.order_by('-draft__scheduling_set__project__priority_rank', 'created_at').all()
for scheduling_unit in scheduling_units:
logger.info("scheduling_unit name='%s' project='%s' project_prio=%s created_at=%s start=%s stop=%s duration=%s",
scheduling_unit.name, scheduling_unit.draft.scheduling_set.project.name, scheduling_unit.draft.scheduling_set.project.priority_rank, scheduling_unit.created_at, scheduling_unit.start_time, scheduling_unit.stop_time, scheduling_unit.duration)
for task in scheduling_unit.task_blueprints.all():
logger.info(" task name='%s' start=%s stop=%s duration=%s rel_start=%s", task.name, task.start_time, task.stop_time, task.duration, task.relative_start_time)
for subtask in task.subtasks.order_by('start_time').all():
logger.info(" subtask type='%s' indep=%s start=%s stop=%s duration=%s", subtask.specifications_template.type.value, subtask.predecessors.count()==0, subtask.start_time, subtask.stop_time, subtask.duration)
scheduling_units.remove(scheduling_unit)
else:
# search again in a later timeslot
last_scheduling_unit_stop_time = get_min_first_possible_start_time(scheduling_units)
def get_first_possible_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime=None) -> datetime:
now = datetime.utcnow()
if lower_bound is None:
lower_bound = now
constraints = scheduling_unit.draft.scheduling_constraints_doc
# we're not evaluating constraints yet, so return a guestimate of now+1hour
return max(lower_bound, now)+timedelta(hours=1)
def get_min_first_possible_start_time(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound: datetime=None) -> datetime:
if lower_bound is None:
lower_bound = datetime.utcnow()
return min(get_first_possible_start_time(scheduling_unit, lower_bound) for scheduling_unit in scheduling_units)
return scheduled_scheduling_unit
def get_score(schedulingunit_blueprint: models.SchedulingUnitBlueprint) -> float:
# for now (as a proof of concept), order the scheduling units by project priority_rank
# TODO: sort by constraints, lst, etc
return schedulingunit_blueprint.draft.scheduling_set.project.priority_rank
class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSSchedulingUnitBlueprintEventMessageHandler):
'''
......@@ -115,8 +181,8 @@ class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSSchedulingU
def onTaskBlueprintsAndSubtasksCreated(self, schedulingunit_blueprint_id: int):
logger.info("onTaskBlueprintsAndSubtasksCreated(schedulingunit_blueprint_id=%s): Updating dynamic schedule...", schedulingunit_blueprint_id)
# call schedule_next_scheduling_unit for now, which also updates/schedules all schedulingunit_blueprints, including this one
schedule_next_scheduling_unit()
assign_start_stop_times_to_schedulable_scheduling_units()
def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER):
......
......@@ -31,6 +31,7 @@ from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
from time import sleep
from datetime import datetime, timedelta
from lofar.common.json_utils import get_default_json_object_for_schema, add_defaults_to_json_object_for_schema
class TestDynamicScheduling(unittest.TestCase):
......@@ -56,7 +57,8 @@ class TestDynamicScheduling(unittest.TestCase):
cls.ra_test_env = RATestEnvironment(exchange=cls.tmp_exchange.address)
cls.ra_test_env.start()
cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address)
cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address,
populate_schemas=True, populate_test_data=False)
cls.tmss_test_env.start()
......@@ -67,9 +69,31 @@ class TestDynamicScheduling(unittest.TestCase):
cls.tmp_exchange.close()
@staticmethod
def create_simple_observation_scheduling_unit(name:str=None, scheduling_set=None,
obs_duration:int=60,
constraints=None):
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft
constraints_template = models.SchedulingConstraintsTemplate.objects.get(name="constraints")
constraints = add_defaults_to_json_object_for_schema(constraints or {}, constraints_template.schema)
strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="Simple Observation")
scheduling_unit_spec = add_defaults_to_json_object_for_schema(strategy_template.template,
strategy_template.scheduling_unit_template.schema)
# add the scheduling_unit_doc to a new SchedulingUnitDraft instance, and were ready to use it!
return models.SchedulingUnitDraft.objects.create(name=name,
scheduling_set=scheduling_set,
requirements_template=strategy_template.scheduling_unit_template,
requirements_doc=scheduling_unit_spec,
observation_strategy_template=strategy_template,
scheduling_constraints_doc=constraints,
scheduling_constraints_template=constraints_template)
def test_01(self):
# import here, and not at top of module, because the tmsstestenv needs to be running before importing
from lofar.sas.tmss.tmss.tmssapp.populate import create_UC1_scheduling_unit
from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft, schedule_independent_subtasks_in_scheduling_unit_blueprint
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data, Project_test_data
......@@ -81,16 +105,17 @@ class TestDynamicScheduling(unittest.TestCase):
scheduling_set1 = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=project1))
scheduling_set2 = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=project2))
scheduling_unit_draft1 = create_UC1_scheduling_unit("UC1 scheduling unit 1", scheduling_set=scheduling_set1)
scheduling_unit_draft1 = self.create_simple_observation_scheduling_unit("UC1 scheduling unit 1", scheduling_set=scheduling_set1)
scheduling_unit_blueprint1 = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft1)
scheduling_unit_draft2 = create_UC1_scheduling_unit("UC1 scheduling unit 2", scheduling_set=scheduling_set2)
scheduling_unit_draft2 = self.create_simple_observation_scheduling_unit("UC1 scheduling unit 2", scheduling_set=scheduling_set2)
scheduling_unit_blueprint2 = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft2)
# call the method-under-test.
scheduled_scheduling_unit = schedule_next_scheduling_unit()
# we expect the scheduling_unit with the highest project rank to be scheduled first
self.assertIsNotNone(scheduled_scheduling_unit)
self.assertEqual(scheduling_unit_blueprint2.id, scheduled_scheduling_unit.id)
# check the results
......@@ -98,7 +123,7 @@ class TestDynamicScheduling(unittest.TestCase):
start_time__gt=datetime.utcnow())
# we expect the 1st and 2nd Calibrator and the Target observation subtasks of scheduling_unit_blueprint2 to be scheduled
self.assertEqual(3, upcoming_scheduled_subtasks.count())
self.assertEqual(1, upcoming_scheduled_subtasks.count())
for subtask in upcoming_scheduled_subtasks:
self.assertTrue(subtask in models.Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint=scheduled_scheduling_unit).all())
self.assertEqual(subtask.specifications_template.type.value, models.SubtaskType.Choices.OBSERVATION.value)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment