Skip to content
Snippets Groups Projects
Commit f5308639 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-190: processed review comments. Added documentation. Removed implicit...

TMSS-190: processed review comments. Added documentation. Removed implicit parameters(=None). Factored out slabs of code into simple functions.
parent 535777b5
No related branches found
No related tags found
1 merge request!252Resolve TMSS-190
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
""" """
This __init__ module for this constraints python package defines the 'API' to: This __init__ module for this constraints python package defines the 'API' to:
- filter a list of schedulable scheduling_units by checking their constraints: see method filter_scheduling_units_using_constraints - filter a list of schedulable scheduling_units by checking their constraints: see method filter_scheduling_units_using_constraints
- sort a (possibly filtered) list of schedulable scheduling_units evaluating their constraints and computing a 'finess' score: see method get_sorted_scheduling_units_scored_by_constraints - sort a (possibly filtered) list of schedulable scheduling_units evaluating their constraints and computing a 'fitness' score: see method get_sorted_scheduling_units_scored_by_constraints
These main methods are used in the dynamic_scheduler to pick the next best scheduling unit, and compute the midterm schedule. These main methods are used in the dynamic_scheduler to pick the next best scheduling unit, and compute the midterm schedule.
Currently we have only one SchedulingConstraintsTemplate in TMSS, named 'constraints', version 1. Currently we have only one SchedulingConstraintsTemplate in TMSS, named 'constraints', version 1.
...@@ -53,8 +53,14 @@ class ScoredSchedulingUnit(NamedTuple): ...@@ -53,8 +53,14 @@ class ScoredSchedulingUnit(NamedTuple):
weighted_score: float weighted_score: float
def filter_scheduling_units_using_constraints(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound: datetime, upper_bound: datetime) -> [models.SchedulingUnitBlueprint]: def filter_scheduling_units_using_constraints(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound: datetime, upper_bound: datetime) -> [models.SchedulingUnitBlueprint]:
'''return the schedulable scheduling_units which for which the constraints are 'go' within the given timewindow''' """
Filter the given scheduling_units by whether their constraints are met within the given timewindow.
:param lower_bound: evaluate and score the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time.
:param upper_bound: evaluate and score the constrains before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time.
:param scheduling_units: evaluate/filter these scheduling_units.
Returns a list scheduling_units for which their constraints are met within the given timewindow.
"""
runnable_scheduling_units = [] runnable_scheduling_units = []
for scheduling_unit in scheduling_units: for scheduling_unit in scheduling_units:
try: try:
...@@ -62,7 +68,10 @@ def filter_scheduling_units_using_constraints(scheduling_units:[models.Schedulin ...@@ -62,7 +68,10 @@ def filter_scheduling_units_using_constraints(scheduling_units:[models.Schedulin
runnable_scheduling_units.append(scheduling_unit) runnable_scheduling_units.append(scheduling_unit)
except UnknownTemplateException as e: except UnknownTemplateException as e:
# TODO: how do we notify the user that we cannot dynamically schedule this sub due to an unknown template? # TODO: how do we notify the user that we cannot dynamically schedule this sub due to an unknown template?
# current pragmatic solution: log warning, and set sub state to error via its schedulable subtasks # current pragmatic solution: log warning, and set sub state to error via its schedulable subtasks.
# This ensures that the unit is not schedulable anymore, and forces the user to take action.
# For example, the user can choose a different template,
# or submit a feature request to implement constraint solvers for this new template.
logger.warning(e) logger.warning(e)
for subtask in models.Subtask.independent_subtasks().filter(task_blueprint__scheduling_unit_blueprint_id=scheduling_unit.id).all(): for subtask in models.Subtask.independent_subtasks().filter(task_blueprint__scheduling_unit_blueprint_id=scheduling_unit.id).all():
subtask.status = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.ERROR.value) subtask.status = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.ERROR.value)
...@@ -70,11 +79,65 @@ def filter_scheduling_units_using_constraints(scheduling_units:[models.Schedulin ...@@ -70,11 +79,65 @@ def filter_scheduling_units_using_constraints(scheduling_units:[models.Schedulin
return runnable_scheduling_units return runnable_scheduling_units
def get_sorted_scheduling_units_scored_by_constraints(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound:datetime, upper_bound:datetime) -> [ScoredSchedulingUnit]:
scored_scheduling_units = [compute_scores(scheduling_unit, lower_bound, upper_bound)
for scheduling_unit in scheduling_units]
return sorted(scored_scheduling_units, key=lambda x: x.weighted_score, reverse=True)
def get_best_scored_scheduling_unit_scored_by_constraints(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound_start_time:datetime, upper_bound_stop_time:datetime) -> ScoredSchedulingUnit:
"""
get the best scored schedulable scheduling_unit which can run withing the given time window from the given scheduling_units.
:param lower_bound_start_time: evaluate and score the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time.
:param upper_bound_stop_time: evaluate and score the constrains before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time.
:param scheduling_units: evaluate these scheduling_units.
Returns a ScoredSchedulingUnit struct with the best next schedulable scheduling unit and its proposed start_time where it best fits its contraints.
"""
sorted_scored_scheduling_units = sort_scheduling_units_scored_by_constraints(scheduling_units, lower_bound_start_time, upper_bound_stop_time)
if sorted_scored_scheduling_units:
# they are sorted best to worst, so return/use first.
best_scored_scheduling_unit = sorted_scored_scheduling_units[0]
return best_scored_scheduling_unit
return None
def sort_scheduling_units_scored_by_constraints(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound_start_time: datetime, upper_bound_stop_time: datetime) -> [ScoredSchedulingUnit]:
"""
Compute the score and proposed start_time for all given scheduling_units. Return them sorted by their weighted_score.
:param lower_bound_start_time: evaluate and score the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time.
:param upper_bound_stop_time: evaluate and score the constrains before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time.
:param scheduling_units: evaluate these scheduling_units.
Returns a list of ScoredSchedulingUnit structs with the score details, a weighted_score and a proposed start_time where it best fits its contraints.
"""
scored_scheduling_units = []
for scheduling_unit in scheduling_units:
try:
scored_scheduling_unit = compute_scores(scheduling_unit, lower_bound_start_time, upper_bound_stop_time)
# check and ensure that the proposed start_time is within the required [lower_bound_start_time, upper_bound_stop_time] window.
schedulable_unit = scored_scheduling_unit.scheduling_unit
proposed_start_time = scored_scheduling_unit.start_time
proposed_stop_time = proposed_start_time + schedulable_unit.duration
if proposed_start_time < lower_bound_start_time:
raise DynamicSchedulingException("The best next schedulable scheduling_unit id=%s has a proposed start_time '%s' before the given lower bound '%s'" % (
schedulable_unit.id, proposed_start_time, lower_bound_start_time))
if proposed_stop_time > upper_bound_stop_time:
raise DynamicSchedulingException("The best next schedulable scheduling_unit id=%s has a proposed stop_time '%s' after the given upper bound '%s'" % (
schedulable_unit.id, proposed_stop_time, upper_bound_stop_time))
scored_scheduling_units.append(scored_scheduling_unit)
except (UnknownTemplateException, DynamicSchedulingException) as e:
# TODO: how do we notify the user that we cannot dynamically schedule this sub due to an unknown template?
# current pragmatic solution: log warning, and set sub state to error via its schedulable subtasks.
# This ensures that the unit is not schedulable anymore, and forces the user to take action.
# For example, the user can choose a different template,
# or submit a feature request to implement constraint solvers for this new template.
logger.warning(e)
for subtask in models.Subtask.independent_subtasks().filter(task_blueprint__scheduling_unit_blueprint_id=scheduling_unit.id).all():
subtask.status = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.ERROR.value)
subtask.save()
return sorted(scored_scheduling_units, key=lambda x: x.weighted_score, reverse=True)
################## helper methods ################################################################# ################## helper methods #################################################################
...@@ -118,9 +181,8 @@ def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: ...@@ -118,9 +181,8 @@ def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound:
scheduling_unit.id, constraints_template.name, constraints_template.version)) scheduling_unit.id, constraints_template.name, constraints_template.version))
def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime) -> datetime:
def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime=None) -> datetime: '''determine the earliest possible start_time for the given scheduling unit, taking into account all its constraints'''
'''determine the earliest possible starttime for the given scheduling unit, taking into account all its constraints'''
constraints_template = scheduling_unit.draft.scheduling_constraints_template constraints_template = scheduling_unit.draft.scheduling_constraints_template
# choose appropriate method based on template (strategy pattern), or raise # choose appropriate method based on template (strategy pattern), or raise
...@@ -131,14 +193,12 @@ def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBluep ...@@ -131,14 +193,12 @@ def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBluep
# TODO: if we get more constraint templates or versions, then add a check here and import and use the new module with the constraint methods for that specific template. (strategy pattern) # TODO: if we get more constraint templates or versions, then add a check here and import and use the new module with the constraint methods for that specific template. (strategy pattern)
raise UnknownTemplateException("Cannot compute first possible starttime for scheduling_unit id=%s, because we have no constraint checker for scheduling constraints template '%s' version=%s" % ( raise UnknownTemplateException("Cannot compute earliest possible start_time for scheduling_unit id=%s, because we have no constraint checker for scheduling constraints template '%s' version=%s" % (
scheduling_unit.id, constraints_template.name, constraints_template.version)) scheduling_unit.id, constraints_template.name, constraints_template.version))
def get_min_earliest_possible_start_time(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound: datetime=None) -> datetime: def get_min_earliest_possible_start_time(scheduling_units: [models.SchedulingUnitBlueprint], lower_bound: datetime) -> datetime:
'''deterimine the earliest possible starttime over all given scheduling units, taking into account all their constraints''' '''deterimine the earliest possible starttime over all given scheduling units, taking into account all their constraints'''
if lower_bound is None:
lower_bound = datetime.utcnow()
try: try:
return min(get_earliest_possible_start_time(scheduling_unit, lower_bound) for scheduling_unit in scheduling_units) return min(get_earliest_possible_start_time(scheduling_unit, lower_bound) for scheduling_unit in scheduling_units)
except ValueError: except ValueError:
......
...@@ -92,12 +92,7 @@ def can_run_within_timewindow_with_sky_constraints(scheduling_unit: models.Sched ...@@ -92,12 +92,7 @@ def can_run_within_timewindow_with_sky_constraints(scheduling_unit: models.Sched
return True # for now, ignore sky contraints. return True # for now, ignore sky contraints.
def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime=None) -> datetime: def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: datetime) -> datetime:
now = datetime.utcnow()
if lower_bound is None:
lower_bound = now
constraints = scheduling_unit.draft.scheduling_constraints_doc constraints = scheduling_unit.draft.scheduling_constraints_doc
try: try:
...@@ -128,7 +123,7 @@ def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBluep ...@@ -128,7 +123,7 @@ def get_earliest_possible_start_time(scheduling_unit: models.SchedulingUnitBluep
logger.exception(str(e)) logger.exception(str(e))
# no constraints dictating starttime? make a guesstimate. # no constraints dictating starttime? make a guesstimate.
return max(lower_bound, now) return lower_bound
def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound:datetime, upper_bound:datetime) -> ScoredSchedulingUnit: def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound:datetime, upper_bound:datetime) -> ScoredSchedulingUnit:
...@@ -149,7 +144,7 @@ def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound: ...@@ -149,7 +144,7 @@ def compute_scores(scheduling_unit: models.SchedulingUnitBlueprint, lower_bound:
# add "common" scores which do not depend on constraints, such as project rank and creation date # add "common" scores which do not depend on constraints, such as project rank and creation date
# TODO: should be normalized! # TODO: should be normalized!
scores['project_rank'] = scheduling_unit.draft.scheduling_set.project.priority_rank scores['project_rank'] = scheduling_unit.draft.scheduling_set.project.priority_rank
scores['age'] = (datetime.utcnow() - scheduling_unit.created_at).total_seconds() #scores['age'] = (datetime.utcnow() - scheduling_unit.created_at).total_seconds()
try: try:
# TODO: apply weights. Needs some new weight model in django, probably linked to constraints_template. # TODO: apply weights. Needs some new weight model in django, probably linked to constraints_template.
......
...@@ -60,43 +60,25 @@ def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) -> ...@@ -60,43 +60,25 @@ def get_scheduled_scheduling_units(lower:datetime=None, upper:datetime=None) ->
return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()).all()) return list(models.SchedulingUnitBlueprint.objects.filter(id__in=scheduled_subtasks.values('task_blueprint__scheduling_unit_blueprint_id').distinct()).all())
def find_best_next_schedulable_unit(lower_bound_start_time: datetime=None, upper_bound_stop_time: datetime=None, scheduling_units:[models.SchedulingUnitBlueprint]=None) -> ScoredSchedulingUnit: def find_best_next_schedulable_unit(scheduling_units:[models.SchedulingUnitBlueprint], lower_bound_start_time: datetime, upper_bound_stop_time: datetime) -> ScoredSchedulingUnit:
""" """
find the best schedulable scheduling_unit which can run withing the given time window, for the given scheduling_units. find the best schedulable scheduling_unit which can run withing the given time window from the given scheduling_units.
Returns a ScoredSchedulingUnit struct with a.o. the best next schedulable scheduling unit and its proposed starttime where it best fits its contraints. :param lower_bound_start_time: evaluate the constrains at and after lower_bound_start_time. The returned unit has a start_time guaranteed at or after lower_bound_start_time.
:param upper_bound_stop_time: evaluate the constrains before upper_bound_stop_time. The returned unit has a stop_time guaranteed before upper_bound_stop_time.
:param scheduling_units: evaluate these scheduling_units.
Returns a ScoredSchedulingUnit struct with the best next schedulable scheduling unit and its proposed start_time where it best fits its contraints.
""" """
if lower_bound_start_time is None:
lower_bound_start_time = datetime.utcnow()
if upper_bound_stop_time is None:
upper_bound_stop_time = datetime.utcnow() + timedelta(days=365)
# ensure upper is greater than or equal to lower # ensure upper is greater than or equal to lower
upper_bound_stop_time = max(lower_bound_start_time, upper_bound_stop_time) upper_bound_stop_time = max(lower_bound_start_time, upper_bound_stop_time)
while lower_bound_start_time < upper_bound_stop_time: filtered_scheduling_units = filter_scheduling_units_using_constraints(scheduling_units, lower_bound_start_time, upper_bound_stop_time)
if not scheduling_units:
scheduling_units = get_schedulable_scheduling_units()
filtered_scheduling_units = filter_scheduling_units_using_constraints(scheduling_units, lower_bound_start_time, upper_bound_stop_time)
if filtered_scheduling_units:
sorted_scored_scheduling_units = get_sorted_scheduling_units_scored_by_constraints(filtered_scheduling_units, lower_bound_start_time, upper_bound_stop_time)
best_scored_scheduling_unit = sorted_scored_scheduling_units[0]
best_next_schedulable_unit = best_scored_scheduling_unit.scheduling_unit if filtered_scheduling_units:
best_start_time = best_scored_scheduling_unit.start_time best_scored_scheduling_unit = get_best_scored_scheduling_unit_scored_by_constraints(filtered_scheduling_units, lower_bound_start_time, upper_bound_stop_time)
return best_scored_scheduling_unit
if best_start_time >= lower_bound_start_time and best_start_time+best_next_schedulable_unit.duration < upper_bound_stop_time:
return best_scored_scheduling_unit
else:
# try again a bit further into the future. There might be a spot there...
# ToDo: apply smarter guessed step size
lower_bound_start_time += timedelta(minutes=10)
else:
# no scheduling units...
return None
# no filtered scheduling units found...
logger.info("No schedulable scheduling units found which meet the requirements between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time)
return None return None
...@@ -104,26 +86,29 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: ...@@ -104,26 +86,29 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint:
'''find the best next schedulable scheduling unit and try to schedule it. '''find the best next schedulable scheduling unit and try to schedule it.
Overlapping existing scheduled units are unscheduled if their score is lower. Overlapping existing scheduled units are unscheduled if their score is lower.
:return: the scheduled scheduling unit.''' :return: the scheduled scheduling unit.'''
# --- setup of needed variables ---
schedulable_units = get_schedulable_scheduling_units() schedulable_units = get_schedulable_scheduling_units()
# estimate the lower_bound_start_time # estimate the lower_bound_start_time
lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, datetime.utcnow() + DEFAULT_INTER_OBSERVATION_GAP) lower_bound_start_time = get_min_earliest_possible_start_time(schedulable_units, datetime.utcnow())
# estimate the upper_bound_stop_time, which may give us a small timewindow before any next scheduled unit, or a default window of a day # estimate the upper_bound_stop_time, which may give us a small timewindow before any next scheduled unit, or a default window of a day
try: try:
upper_bound_stop_time = max(su.start_time for su in get_scheduled_scheduling_units(lower=lower_bound_start_time) if su.start_time is not None) upper_bound_stop_time = max(su.start_time for su in get_scheduled_scheduling_units(lower=lower_bound_start_time, upper=lower_bound_start_time + timedelta(days=1)))
except: except ValueError:
upper_bound_stop_time = lower_bound_start_time + timedelta(days=1) upper_bound_stop_time = lower_bound_start_time + timedelta(days=1)
# no need to irritate user in log files with subsecond scheduling precision # no need to irritate user in log files with subsecond scheduling precision
lower_bound_start_time = round_to_second_precision(lower_bound_start_time) lower_bound_start_time = round_to_second_precision(lower_bound_start_time)
upper_bound_stop_time = max(round_to_second_precision(upper_bound_stop_time), lower_bound_start_time) upper_bound_stop_time = max(round_to_second_precision(upper_bound_stop_time), lower_bound_start_time)
# --- core routine ---
while lower_bound_start_time < upper_bound_stop_time: while lower_bound_start_time < upper_bound_stop_time:
try: try:
# try to find the best next scheduling_unit # try to find the best next scheduling_unit
logger.info("schedule_next_scheduling_unit: searching for best scheduling unit to schedule starting between %s and %s", lower_bound_start_time, upper_bound_stop_time) logger.info("schedule_next_scheduling_unit: searching for best scheduling unit to schedule starting between '%s' and '%s'", lower_bound_start_time, upper_bound_stop_time)
best_scored_scheduling_unit = find_best_next_schedulable_unit(lower_bound_start_time, upper_bound_stop_time, schedulable_units) best_scored_scheduling_unit = find_best_next_schedulable_unit(schedulable_units, lower_bound_start_time, upper_bound_stop_time)
if best_scored_scheduling_unit: if best_scored_scheduling_unit:
best_scheduling_unit = best_scored_scheduling_unit.scheduling_unit best_scheduling_unit = best_scored_scheduling_unit.scheduling_unit
best_scheduling_unit_score = best_scored_scheduling_unit.weighted_score best_scheduling_unit_score = best_scored_scheduling_unit.weighted_score
...@@ -171,7 +156,6 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint: ...@@ -171,7 +156,6 @@ def schedule_next_scheduling_unit() -> models.SchedulingUnitBlueprint:
def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time: datetime=None): def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_time: datetime=None):
'''''' ''''''
logger.info("Estimating mid-term schedule...") logger.info("Estimating mid-term schedule...")
if lower_bound_start_time is None: if lower_bound_start_time is None:
...@@ -184,7 +168,9 @@ def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_ti ...@@ -184,7 +168,9 @@ def assign_start_stop_times_to_schedulable_scheduling_units(lower_bound_start_ti
# update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline) # update the start_times of the remaining ones (so they form queue, and can be visualized in a timeline)
while scheduling_units: while scheduling_units:
best_scored_scheduling_unit = find_best_next_schedulable_unit(lower_bound_start_time=lower_bound_start_time, scheduling_units=scheduling_units) best_scored_scheduling_unit = find_best_next_schedulable_unit(scheduling_units,
lower_bound_start_time=lower_bound_start_time,
upper_bound_stop_time=lower_bound_start_time + timedelta(days=365))
if best_scored_scheduling_unit: if best_scored_scheduling_unit:
scheduling_unit = best_scored_scheduling_unit.scheduling_unit scheduling_unit = best_scored_scheduling_unit.scheduling_unit
......
...@@ -27,44 +27,70 @@ from lofar.common.test_utils import skip_integration_tests ...@@ -27,44 +27,70 @@ from lofar.common.test_utils import skip_integration_tests
if skip_integration_tests(): if skip_integration_tests():
exit(3) exit(3)
from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor TEST_UUID = uuid.uuid1()
from time import sleep
from datetime import datetime, timedelta from datetime import datetime, timedelta
from lofar.common.json_utils import get_default_json_object_for_schema, add_defaults_to_json_object_for_schema from lofar.common.json_utils import get_default_json_object_for_schema, add_defaults_to_json_object_for_schema
from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
tmp_exchange = TemporaryExchange("t_dynamic_scheduling_%s" % (TEST_UUID,))
tmp_exchange.open()
class TestDynamicScheduling(unittest.TestCase): # override DEFAULT_BUSNAME
''' import lofar
Tests for the Dynamic Scheduling lofar.messaging.config.DEFAULT_BUSNAME = tmp_exchange.address
'''
@classmethod
def setUpClass(cls) -> None:
cls.TEST_UUID = uuid.uuid1()
cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID)) from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment
cls.tmp_exchange.open() tmss_test_env = TMSSTestEnvironment(exchange=tmp_exchange.address,
populate_schemas=True, populate_test_data=False,
start_postgres_listener=True, start_subtask_scheduler=False,
start_ra_test_environment=True, enable_viewflow=False,
start_dynamic_scheduler=False) # do not start the dynamic scheduler in the testenv, because it is the object-under-test.
tmss_test_env.start()
# override DEFAULT_BUSNAME def tearDownModule():
import lofar tmss_test_env.stop()
lofar.messaging.config.DEFAULT_BUSNAME = cls.tmp_exchange.address tmp_exchange.close()
# import here, and not at top of module, because DEFAULT_BUSNAME needs to be set before importing from lofar.sas.tmss.test.tmss_test_data_django_models import *
from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft
cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, # the module under test
populate_schemas=True, populate_test_data=False, from lofar.sas.tmss.services.scheduling.dynamic_scheduling import *
start_postgres_listener=True, start_subtask_scheduler=False,
start_ra_test_environment=True, enable_viewflow=False,
start_dynamic_scheduler=False) # do not start the dynamic scheduler in the testenv, because it is the object-under-test.
cls.tmss_test_env.start()
class TestDynamicScheduling(unittest.TestCase):
'''
Tests for the Dynamic Scheduling
'''
@classmethod @classmethod
def tearDownClass(cls) -> None: def setUpClass(cls) -> None:
cls.tmss_test_env.stop() # make some re-usable projects with high/low priority
cls.tmp_exchange.close() cls.project_low = models.Project.objects.create(**Project_test_data("dynamic scheduling test project %s"% (uuid.uuid4(),), priority_rank=1))
cls.project_high = models.Project.objects.create(**Project_test_data("dynamic scheduling test project %s"% (uuid.uuid4(),), priority_rank=2))
cls.scheduling_set_low = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=cls.project_low))
cls.scheduling_set_high = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=cls.project_high))
def setUp(self) -> None:
for scheduling_set in [self.scheduling_set_low, self.scheduling_set_high]:
for scheduling_unit_draft in scheduling_set.scheduling_unit_drafts.all():
for scheduling_unit_blueprint in scheduling_unit_draft.scheduling_unit_blueprints.all():
for task_blueprint in scheduling_unit_blueprint.task_blueprints.all():
for subtask in task_blueprint.subtasks.all():
for output in subtask.outputs.all():
for dataproduct in output.dataproducts.all():
dataproduct.delete()
for consumer in output.consumers.all():
consumer.delete()
output.delete()
for input in subtask.inputs.all():
input.delete()
subtask.delete()
task_blueprint.draft.delete()
task_blueprint.delete()
scheduling_unit_blueprint.delete()
scheduling_unit_draft.delete()
@staticmethod @staticmethod
def create_simple_observation_scheduling_unit(name:str=None, scheduling_set=None, def create_simple_observation_scheduling_unit(name:str=None, scheduling_set=None,
...@@ -90,47 +116,35 @@ class TestDynamicScheduling(unittest.TestCase): ...@@ -90,47 +116,35 @@ class TestDynamicScheduling(unittest.TestCase):
def test_two_simple_observations_no_constraints_different_project_priority(self): def test_two_simple_observations_no_constraints_different_project_priority(self):
from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft scheduling_unit_draft_low = self.create_simple_observation_scheduling_unit("scheduling unit 1", scheduling_set=self.scheduling_set_low)
from lofar.sas.tmss.tmss.tmssapp import models scheduling_unit_blueprint_low = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft_low)
from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data, Project_test_data
from lofar.sas.tmss.services.scheduling.dynamic_scheduling import schedule_next_scheduling_unit, assign_start_stop_times_to_schedulable_scheduling_units
project1 = models.Project.objects.create(**Project_test_data("dynamic scheduling test project %s"% (uuid.uuid4(),), priority_rank=1))
project2 = models.Project.objects.create(**Project_test_data("dynamic scheduling test project %s"% (uuid.uuid4(),), priority_rank=2))
scheduling_set1 = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=project1))
scheduling_set2 = models.SchedulingSet.objects.create(**SchedulingSet_test_data(project=project2))
scheduling_unit_draft1 = self.create_simple_observation_scheduling_unit("scheduling unit 1", scheduling_set=scheduling_set1)
scheduling_unit_blueprint1 = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft1)
scheduling_unit_draft2 = self.create_simple_observation_scheduling_unit("scheduling unit 2", scheduling_set=scheduling_set2) scheduling_unit_draft_high = self.create_simple_observation_scheduling_unit("scheduling unit 2", scheduling_set=self.scheduling_set_high)
scheduling_unit_blueprint2 = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft2) scheduling_unit_blueprint_high = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft_high)
# call the method-under-test. # call the method-under-test.
scheduled_scheduling_unit = schedule_next_scheduling_unit() scheduled_scheduling_unit = schedule_next_scheduling_unit()
# we expect the scheduling_unit with the highest project rank to be scheduled first # we expect the scheduling_unit with the highest project rank to be scheduled first
self.assertIsNotNone(scheduled_scheduling_unit) self.assertIsNotNone(scheduled_scheduling_unit)
self.assertEqual(scheduling_unit_blueprint2.id, scheduled_scheduling_unit.id) self.assertEqual(scheduling_unit_blueprint_high.id, scheduled_scheduling_unit.id)
# check the results # check the results
# we expect the sub2 to be scheduled, and sub1 to have a starttime after sub2 # we expect the sub2 to be scheduled, and sub1 to have a starttime after sub2
scheduling_unit_blueprint1.refresh_from_db() scheduling_unit_blueprint_low.refresh_from_db()
scheduling_unit_blueprint2.refresh_from_db() scheduling_unit_blueprint_high.refresh_from_db()
self.assertEqual(scheduling_unit_blueprint1.status, 'schedulable') self.assertEqual(scheduling_unit_blueprint_low.status, 'schedulable')
self.assertEqual(scheduling_unit_blueprint2.status, 'scheduled') self.assertEqual(scheduling_unit_blueprint_high.status, 'scheduled')
# check the scheduled subtask # check the scheduled subtask
upcoming_scheduled_subtasks = models.Subtask.objects.filter(state__value='scheduled', upcoming_scheduled_subtasks = models.Subtask.objects.filter(state__value='scheduled',
start_time__gt=datetime.utcnow(), task_blueprint__scheduling_unit_blueprint__in=(scheduling_unit_blueprint_low, scheduling_unit_blueprint_high)).all()
task_blueprint__scheduling_unit_blueprint__in=(scheduling_unit_blueprint1, scheduling_unit_blueprint2)).all()
self.assertEqual(1, upcoming_scheduled_subtasks.count()) self.assertEqual(1, upcoming_scheduled_subtasks.count())
self.assertEqual(scheduling_unit_blueprint2.id, upcoming_scheduled_subtasks[0].task_blueprint.scheduling_unit_blueprint.id) self.assertEqual(scheduling_unit_blueprint_high.id, upcoming_scheduled_subtasks[0].task_blueprint.scheduling_unit_blueprint.id)
# create the "mid-term schedule" (consisting the the one remaining sub1) # create the "mid-term schedule" (consisting the the one remaining sub1)
assign_start_stop_times_to_schedulable_scheduling_units() assign_start_stop_times_to_schedulable_scheduling_units()
self.assertGreater(scheduling_unit_blueprint1.start_time, scheduling_unit_blueprint2.start_time) self.assertGreater(scheduling_unit_blueprint_low.start_time, scheduling_unit_blueprint_high.start_time)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
......
...@@ -23,6 +23,10 @@ class SubtaskSchedulingException(SchedulingException): ...@@ -23,6 +23,10 @@ class SubtaskSchedulingException(SchedulingException):
class TaskSchedulingException(SchedulingException): class TaskSchedulingException(SchedulingException):
pass pass
class DynamicSchedulingException(SchedulingException):
pass
class UnknownTemplateException(TMSSException): class UnknownTemplateException(TMSSException):
'''raised when TMSS trying to base its processing routines on the chosen template, but this specific template is unknown.''' '''raised when TMSS trying to base its processing routines on the chosen template, but this specific template is unknown.'''
pass pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment