diff --git a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py index 61544a98a3ea56854344ad80a406ac61c4a3e092..0cc16c7295b1ef151da7dc7fd7f79f6fd0e5a0ea 100644 --- a/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/lib/dynamic_scheduling.py @@ -69,11 +69,11 @@ class Scheduler: self._scheduling_thread = None def __enter__(self): - self.start_scheduling_thread() + self.start_scheduling() return self def __exit__(self, exc_type, exc_val, exc_tb): - self.stop_scheduling_thread() + self.stop_scheduling() def trigger(self): self._do_schedule_event.set() @@ -104,7 +104,8 @@ class Scheduler: def _run_scheduling_loop(self): logger.info("Scheduler starting scheduling thread...") while self._scheduling_thread_running: - if self._do_schedule_event.wait(timeout=10): + logger.debug("Scheduler waiting for trigger to compute new schedule...") + if self._do_schedule_event.wait(timeout=1): logger.info("Scheduler was triggered to compute new schedule...") self._do_schedule_event.clear() @@ -279,7 +280,7 @@ class Scheduler: # aparently this triggered schedunit could not be scheduled due to a scheduled/running unit being in the way. # no need to seek furter, # mark the remaining_defined_triggered_subtasks in the interrupts_telescope-scheduling unit as error and return None.... - remaining_defined_triggered_subtasks = list(models.Subtask.independent_subtasks().filter(task_blueprints__scheduling_unit_blueprint_id=best_scheduling_unit.id, + remaining_defined_triggered_subtasks = list(models.Subtask.independent_subtasks().filter(task_blueprint__scheduling_unit_blueprint_id=best_scheduling_unit.id, state__value=models.SubtaskState.Choices.DEFINED.value).all()) for subtask in remaining_defined_triggered_subtasks: subtask.state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.ERROR.value) @@ -530,6 +531,7 @@ class TMSSDynamicSchedulingMessageHandler(TMSSEventMessageHandler): def create_dynamic_scheduling_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): return TMSSBusListener(handler_type=TMSSDynamicSchedulingMessageHandler, handler_kwargs=None, + num_threads=1, exchange=exchange, broker=broker) diff --git a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py index 7e0005ecfee40e065c71cd766ca319ee9d45cebb..1c011c9122365b03fc2935a3b9a79f3cb6ecaafa 100755 --- a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py +++ b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py @@ -52,7 +52,6 @@ tmss_test_env = TMSSTestEnvironment(exchange=tmp_exchange.address, start_ra_test_environment=True, enable_viewflow=False, start_dynamic_scheduler=False) # do not start the dynamic scheduler in the testenv, because it is the object-under-test. tmss_test_env.start() -from django.test import TestCase def tearDownModule(): tmss_test_env.stop() @@ -63,14 +62,14 @@ from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp.tasks import create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft, update_task_graph_from_specifications_doc, mark_independent_subtasks_in_scheduling_unit_blueprint_as_schedulable from lofar.sas.tmss.tmss.tmssapp.subtasks import unschedule_subtask from lofar.common.postgres import PostgresDatabaseConnection -from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions +from lofar.sas.tmss.test.test_utils import set_subtask_state_following_allowed_transitions, wait_for_scheduling_unit_blueprint_status # the module under test import lofar.sas.tmss.services.scheduling.constraints.template_constraints_v1 as tc1 from lofar.sas.tmss.services.scheduling.dynamic_scheduling import * -class TestDynamicScheduling(TestCase): # Note: we use django.test.TestCase instead of unittest.TestCase to avoid manual cleanup of objects created by other tests +class TestDynamicScheduling(unittest.TestCase): ''' Tests for the Dynamic Scheduling ''' @@ -421,6 +420,63 @@ class TestDynamicScheduling(TestCase): # Note: we use django.test.TestCase inst self.assertEqual(at, scheduling_unit_blueprint.scheduled_start_time) + def test_project_activation_deactivation_event_handling(self): + """ + Test a simple observation with the 'at' constraint and 'fixed_time' scheduler. + First ensure it is unschedulable by making the project suspended. + Then active the project, and see if the scheduler picks it up and schedules it. + """ + # Use at constraint in the near future + at = round_to_second_precision(datetime.utcnow() + timedelta(days=1)) + scheduling_unit_draft = self.create_simple_observation_scheduling_unit_fixed_time(at=at) + scheduling_unit_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft) + + # assert blueprint has correct constraints, and is schedulable + self.assertEqual('fixed_time', scheduling_unit_blueprint.scheduling_constraints_doc['scheduler']) + self.assertEqual(at.isoformat(), scheduling_unit_blueprint.scheduling_constraints_doc['time']['at']) + self.assertEqual(scheduling_unit_blueprint.status.value, models.SchedulingUnitStatus.Choices.SCHEDULABLE.value) + + active_project_state = models.ProjectState.objects.get(value=models.ProjectState.Choices.ACTIVE.value) + suspended_project_state = models.ProjectState.objects.get(value=models.ProjectState.Choices.SUSPENDED.value) + self.assertIsNotNone(active_project_state) + self.assertIsNotNone(suspended_project_state) + + # set the scheduling_unit_blueprint's project state to suspended + project = scheduling_unit_blueprint.project + project.project_state = suspended_project_state + project.save() + + # start the dynamic_scheduling_service, which includes eventmessage handling, and a running scheduler + with create_dynamic_scheduling_service(): + # at scheduler startup, all units are evaluated for schedulability. + # assert the scheduling_unit has been marked as unschedulable + scheduling_unit_blueprint.refresh_from_db() + self.assertEqual(models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value, scheduling_unit_blueprint.status.value) + + # set the project's state + project.project_state = active_project_state + project.save() + + # now wait and poll until unit is scheduled, or timeout + scheduling_unit_blueprint = wait_for_scheduling_unit_blueprint_status(scheduling_unit_blueprint.id, models.SchedulingUnitStatus.Choices.SCHEDULED.value) + + # is it scheduled at the right time? + self.assertEqual(models.SchedulingUnitStatus.Choices.SCHEDULED.value, scheduling_unit_blueprint.status.value) + self.assertEqual(at, scheduling_unit_blueprint.scheduled_start_time) + + # set the project's state to inactive again + project.project_state = suspended_project_state + project.save() + + # wait and poll until unit is unschedulabel, or timeout + scheduling_unit_blueprint = wait_for_scheduling_unit_blueprint_status(scheduling_unit_blueprint.id, models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value) + + # is it unschedulable at the right time? + self.assertEqual(models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value, scheduling_unit_blueprint.status.value) + self.assertEqual(at, scheduling_unit_blueprint.scheduled_start_time) + + + @unittest.skip("Skipped to finish work on fixed_time scheduling in TMSS-1135 first (on this TMSS-671-branch). Re-enable this test in TMSS-671 on a new branch") def test_simple_observation_with_at_constraint(self): """ @@ -731,7 +787,7 @@ class TestDynamicScheduling(TestCase): # Note: we use django.test.TestCase inst -class TestDailyConstraints(TestCase): +class TestDailyConstraints(unittest.TestCase): ''' Tests for the constraint checkers used in dynamic scheduling ''' @@ -1316,7 +1372,7 @@ class TestSkyConstraints(unittest.TestCase): self.target_transit_mock.side_effect = None -class TestTimeConstraints(TestCase): +class TestTimeConstraints(unittest.TestCase): """ Tests for the time constraint checkers used in dynamic scheduling with different boundaries Possible time constraints are @@ -2139,7 +2195,7 @@ class TestReservedStations(unittest.TestCase): self.assertTrue(can_run_within_station_reservations(self.scheduling_unit_blueprint)) -class TestTriggers(TestCase): +class TestTriggers(unittest.TestCase): """ Tests for scheduling behavior of triggered observations """ @@ -2627,4 +2683,4 @@ logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=loggin if __name__ == '__main__': #run the unit tests - unittest.main() + unittest.main(defaultTest='TestDynamicScheduling.test_project_activation_deactivation_event_handling')