diff --git a/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py b/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py index 4f4da6ad427e11a3b6c974cf48b97edcc08343cc..188a49dac5f14e82fc4248a66e8ddfd54f46807d 100644 --- a/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py +++ b/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py @@ -222,7 +222,7 @@ class OTDBtoRATaskStatusPropagator(OTDBBusListener): otdb_task = self.otdb.taskGetTreeInfo(otdb_id=treeId) if otdb_task is None: - logger.warning('could not find todb task with id %s', treeId) + logger.warning('could not find otdb task with id %s', treeId) return now = datetime.utcnow() diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py index 1ad10ee3d00f702a4814acf1d2aa0792da5e386b..4445da8c41e5b5962d4efdff06814887b4a2e424 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py @@ -94,6 +94,7 @@ def main(): from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME as DEFAULT_STORAGEQUERY_BUSNAME from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME as DEFAULT_STORAGEQUERY_SERVICENAME + from lofar.mac.config import DEFAULT_OBSERVATION_CONTROL_BUS_NAME, DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME # Check the invocation arguments parser = OptionParser("%prog [options]", @@ -164,6 +165,8 @@ def main(): ra_notification_prefix=options.ra_notification_prefix, mom_busname=options.mom_query_busname, mom_servicename=options.mom_query_servicename, + observation_control_busname=DEFAULT_OBSERVATION_CONTROL_BUS_NAME, + observation_control_servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, broker=options.broker, radb_dbcreds=radb_dbcreds) as assigner: with SpecifiedTaskListener(busname=options.notification_busname, diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py index c0cd685e1523b015e155f0004679cb286c374dd7..58e5a8dc253760ff29fe266605a9aa1ff015e0e4 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py @@ -58,6 +58,9 @@ from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME +from lofar.mac.config import DEFAULT_OBSERVATION_CONTROL_BUS_NAME, DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME +from lofar.mac.observation_control_rpc import ObservationControlRPCClient + from lofar.sas.resourceassignment.common.specification import Specification logger = logging.getLogger(__name__) @@ -84,6 +87,8 @@ class ResourceAssigner(object): ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX, mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, + observation_control_busname=DEFAULT_OBSERVATION_CONTROL_BUS_NAME, + observation_control_servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, broker=None, radb_dbcreds=None): """ @@ -105,6 +110,8 @@ class ResourceAssigner(object): :param ra_notification_prefix: prefix used in notification message subject (default: ResourceAssigner.) :param mom_busname: name of the bus on which MOM listens for queries (default: lofar.ra.command) :param mom_servicename: name of the MOMQueryService (default: momqueryservice) + :param observation_control_busname: the ObservationControl bus name (default: 'lofar.mac.command') + :param observation_control_servicename: the ObservationControl service name (default: 'ObservationControl2') :param broker: Valid Qpid broker host (default: None, which means localhost) :param radb_dbcreds: the credentials to be used for accessing the RADB (default: None, which means default) """ @@ -117,6 +124,10 @@ class ResourceAssigner(object): self.curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker) self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker) self.ra_notification_prefix = ra_notification_prefix + self.obscontrol = ObservationControlRPCClient(busname=observation_control_busname, + servicename=observation_control_servicename, + broker=broker, + timeout=120) self.resource_availability_checker = ResourceAvailabilityChecker(self.radbrpc) @@ -142,9 +153,11 @@ class ResourceAssigner(object): self.sqrpc.open() self.curpc.open() self.ra_notification_bus.open() + self.obscontrol.open() def close(self): """Close rpc connections to radb service and resource estimator service""" + self.obscontrol.close() self.radbrpc.close() self.rerpc.close() self.otdbrpc.close() @@ -238,6 +251,23 @@ class ResourceAssigner(object): logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) self.ra_notification_bus.send(event_message) + def _kill_task(self, task): + """ + Kill the given tasks. Currently observation jobs are only supported + + :param task: the task to kill + :raises ScheduleException if a task can't be killed because it's not an observation job + """ + + logger.debug("kill_task: task: %s", task) + + if task.type == "observation": + self.obscontrol.abort_observation(task.otdb_id) + else: + # Killing scheduled pipelines only makes sense when they use resources other than storage, which is not + # the case for current pipelines + logger.error("Cannot kill jobs of type %s yet" % task["type"]) + def _get_resource_estimates(self, specification_tree): """ Obtains the resource estimates from the Resource Estimator for the main task in the specification tree and @@ -298,19 +328,27 @@ class ResourceAssigner(object): resource_availability_checker=self.resource_availability_checker, radbcreds=self.radb_creds) except Exception as e: - logger.exception('Error in scheduler._schedule_resources: %s', e) + logger.exception('Error in scheduler._schedule_resources: %s', e) #Why are we mentioning _schedule_resources here? return False try: - if not scheduler.allocate_resources(): + changed_tasks = [] + scheduler_result = scheduler.allocate_resources(changed_tasks) + if not scheduler_result: # try again with basic scheduler to end up with a situation with the 'normal' conflicting resources, which can then be evaluated by users scheduler = BasicScheduler(task_id=spec.radb_id, specification_tree=specification_tree, resource_estimator=self._get_resource_estimates, resource_availability_checker=self.resource_availability_checker, radbcreds=self.radb_creds) - return scheduler.allocate_resources() + elif changed_tasks: + for t in changed_tasks: + if t.status == 'aborted': #MAC_Scheduler can't handle queued right now See also schedulers.py around line 600 + self._kill_task(t) # We kill the task through obscontrol and then wait for the status from OTDB. + else: # should be approved (unscheduled) + self._send_task_status_notification(t, t.status) # Tell OTDB + except Exception as e: logger.exception('Error in calling scheduler.allocate_resources: %s', e) return False diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py index d522baeee0d0f58af80130bd77054d11b466ed78..3b96bf99f145ee9b54b31994e8febdb83fef5e34 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py @@ -10,8 +10,7 @@ from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_M from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import CouldNotFindClaimException -from lofar.mac.config import DEFAULT_OBSERVATION_CONTROL_BUS_NAME, DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME -from lofar.mac.observation_control_rpc import ObservationControlRPCClient +from lofar.sas.resourceassignment.common.specification import Specification import logging logger = logging.getLogger(__name__) @@ -33,7 +32,11 @@ logger = logging.getLogger(__name__) is performed. To use: sched = <schedulerclass>(...) - success = sched.allocate_resources(estimates) + success = sched.allocate_resources() + or + sched = <schedulerclass>(...) + success = sched.allocate_resources(changed_tasks) + to get a list of changed tasks that will need to be communicated to OTDB and such """ @@ -83,9 +86,10 @@ class BasicScheduler(object): if self.starttime >= self.endtime: raise ValueError('BasicScheduler, starttime=%s should be >= endtime=%s', self.starttime, self.endtime) - def allocate_resources(self): + def allocate_resources(self, changed_tasks=[]): """ Tries to allocate resources for the given estimates. + :param: changed_tasks: tasks that had their status changed as a result of the task scheduling :returns: True if allocation was successful or False if not """ @@ -93,6 +97,7 @@ class BasicScheduler(object): # tried, or because there's already a claim of us there allocation_successful = False + self.changed_tasks = changed_tasks #Not used in the BasicScheduler, but in derived schedulers it is try: # pre process @@ -309,8 +314,6 @@ class StationScheduler(BasicScheduler): radbcreds=None, mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, - observation_control_busname=DEFAULT_OBSERVATION_CONTROL_BUS_NAME, - observation_control_servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, broker=None): """ Creates a StationScheduler instance @@ -322,8 +325,6 @@ class StationScheduler(BasicScheduler): :param radbcreds: the RADB credentials to use :param mom_busname: the MoM Query service bus name (default: 'lofar.ra.command') :param mom_servicename: the MoM Query service name (default: 'momqueryservice') - :param observation_control_busname: the ObservationControl bus name (default: 'lofar.mac.command') - :param observation_control_servicename: the ObservationControl service name (default: 'ObservationControl2') :param broker: the message broker to use for send messages/RPC calls/etc. """ @@ -470,8 +471,6 @@ class PriorityScheduler(StationScheduler): radbcreds=None, mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, - observation_control_busname=DEFAULT_OBSERVATION_CONTROL_BUS_NAME, - observation_control_servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, broker=None): """ Creates a PriorityScheduler instance @@ -483,18 +482,12 @@ class PriorityScheduler(StationScheduler): :param radbcreds: the RADB credentials to use :param mom_busname: the MoM Query service bus name (default: 'lofar.ra.command') :param mom_servicename: the MoM Query service name (default: 'momqueryservice') - :param observation_control_busname: the ObservationControl bus name (default: 'lofar.mac.command') - :param observation_control_servicename: the ObservationControl service name (default: 'ObservationControl2') :param broker: the message broker to use for send messages/RPC calls/etc. """ super(PriorityScheduler, self).__init__(task_id, specification_tree, resource_estimator, resource_availability_checker, radbcreds) self.momqueryservice = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180) - self.obscontrol = ObservationControlRPCClient(busname=observation_control_busname, - servicename=observation_control_servicename, - broker=broker, - timeout=120) # Time needed in between tasks to setup the stations self.STATION_SETUP_TIME_MINUTES = 1 @@ -502,11 +495,9 @@ class PriorityScheduler(StationScheduler): def open(self): """ Open connections to the required services """ self.momqueryservice.open() - self.obscontrol.open() def close(self): """ Close the connections with the used services """ - self.obscontrol.close() self.momqueryservice.close() def _pre_process_allocation(self): @@ -515,13 +506,21 @@ class PriorityScheduler(StationScheduler): self.earliest_potential_starttime = datetime.max self.tasks_to_kill = [] + self.tasks_to_unschedule = [] def _post_process_allocation(self): """ Take care of actions to be taken after to the scheduling of resources """ super(PriorityScheduler, self)._post_process_allocation() - # kill all jobs in self.tasks_to_kill - self._kill_tasks(self.tasks_to_kill) + #killing and unscheduling happens in the resource assigner + for t in (self.tasks_to_kill + self.tasks_to_unschedule): + spec = Specification(None, None, None, None) #Easier than creating a custom object instance + spec.radb_id = t["id"] + spec.mom_id = t["mom_id"] + spec.otdb_id = t["otdb_id"] + spec.status = t["status"] + spec.type = t["type"] + self.changed_tasks.append(spec) @cache def _my_task_priority(self): @@ -549,7 +548,7 @@ class PriorityScheduler(StationScheduler): logger.debug("kill_task_in_radb: task: %s", task) - new_endtime = max(task['starttime'], datetime.utcnow()) + new_endtime = max(task['starttime']+timedelta(seconds=1), datetime.utcnow()) # make sure endtime is always > starttime self.radb.updateTaskAndResourceClaims(task_id=task['id'], task_status='aborted', endtime=new_endtime, commit=False) @@ -594,12 +593,16 @@ class PriorityScheduler(StationScheduler): logger.info("_resolve_conflict: found task %s to move out of the way for claim in conflict: %s", t, conflict_claim) # kill running task, unschedule otherwise in order to move the blocking task out of the way - if (t['starttime'] <= now and t['endtime'] >= now) or t['status'] == 'active': + if (t['starttime'] <= now and t['endtime'] >= now) or t['status'] == 'active': # should also do this on 'queued', but MAC_scheduler can't handle it # add it to the list to actually kill later self.tasks_to_kill.append(t) + t['status'] = 'aborted' # and update the administration in the radb self._kill_task_in_radb(t) else: + # add it to the list to unschedule later + self.tasks_to_unschedule.append(t) + t['status'] = 'approved' # move the blocking task out of the way self._unschedule_task_in_radb(t) @@ -654,6 +657,7 @@ class PriorityScheduler(StationScheduler): # check which tasks we can kill task_priorities = self.momqueryservice.get_project_priorities_for_objects(conflicting_task_momids) logger.debug("PriorityScheduler: conflicting task priorities are %s", task_priorities) + # We can't kill tasks without a mom_id (reservations and such) ! kill_task_list = [t for t in conflicting_tasks if t["mom_id"] is not None and task_priorities[t["mom_id"]] < self._my_task_priority()] logger.debug("PriorityScheduler: task kill list is %s", kill_task_list) @@ -671,24 +675,6 @@ class PriorityScheduler(StationScheduler): return kill_task_list - def _kill_tasks(self, tasks): - """ - Kill the given tasks. Currently observation jobs are only supported - - :param tasks: the tasks to kill - :raises ScheduleException if a task can't be killed because it's not an observation job - """ - - logger.debug("kill_tasks: tasks: %s", tasks) - - for t in tasks: - if t["type"] == "observation": - self.obscontrol.abort_observation(t["otdb_id"]) - else: - # Killing scheduled pipelines only makes sense when they use resources other than storage, which is not - # the case for current pipelines - raise ScheduleException("Cannot kill jobs of type %s yet" % t["type"]) - class DwellScheduler(PriorityScheduler): """ A Scheduler that searches for an allocation with a flexible start time. @@ -711,8 +697,6 @@ class DwellScheduler(PriorityScheduler): radbcreds=None, mom_busname=DEFAULT_MOMQUERY_BUSNAME, mom_servicename=DEFAULT_MOMQUERY_SERVICENAME, - observation_control_busname=DEFAULT_OBSERVATION_CONTROL_BUS_NAME, - observation_control_servicename=DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME, broker=None): """ Create a DwellScheduler instance @@ -727,8 +711,6 @@ class DwellScheduler(PriorityScheduler): :param radbcreds: the RADB credentials to use :param mom_busname: the MoM Query service bus name (default: 'lofar.ra.command') :param mom_servicename: the MoM Query service name (default: 'momqueryservice') - :param observation_control_busname: the ObservationControl bus name (default: 'lofar.mac.command') - :param observation_control_servicename: the ObservationControl service name (default: 'ObservationControl2') :param broker: the message broker to use for send messages/RPC calls/etc. """ super(DwellScheduler, self).__init__( @@ -739,8 +721,6 @@ class DwellScheduler(PriorityScheduler): radbcreds=radbcreds, mom_busname=mom_busname, mom_servicename=mom_servicename, - observation_control_busname=observation_control_busname, - observation_control_servicename=observation_control_servicename, broker=broker) self.min_starttime = min_starttime @@ -777,12 +757,12 @@ class DwellScheduler(PriorityScheduler): endtime=self.endtime, commit=False) - def allocate_resources(self): + def allocate_resources(self, changed_tasks=[]): """ Scan between (min_starttime, max_starttime) to find the first possible slot where the task's required resources can be scheduled. - :param resource_requests: the requested resource claims to be allocated for the task at hand. + :param changed_tasks: tasks that had their status changed as a result of the task scheduling :return True if all the task's resources have successfully been allocated (either through dwelling the start time or and/or by killing tasks that have a lower priority) or False if not. """ @@ -793,7 +773,7 @@ class DwellScheduler(PriorityScheduler): logger.info("DwellScheduler: Trying to schedule radb_id=%s with starttime=%s and endtime=%s", self.task_id, self.starttime, self.endtime) # Find a solution - if super(DwellScheduler, self).allocate_resources(): + if super(DwellScheduler, self).allocate_resources(changed_tasks): return True # Try the next slot diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py index 82c567d256744bbd43e9503522612b5d8a980995..e68ce0961b928174468d863aa500328a6a7b9d3e 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py @@ -25,6 +25,7 @@ import sys from lofar.sas.resourceassignment.resourceassigner.resource_assigner import ResourceAssigner from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import ResourceAvailabilityChecker +from lofar.sas.resourceassignment.common.specification import Specification from lofar.parameterset import parameterset from lofar.common.datetimeutils import parseDatetime @@ -32,7 +33,7 @@ ra_notification_prefix = "ra_notification_prefix" class TestingResourceAssigner(ResourceAssigner): - def __init__(self, rarpc, rerpc, otdbrpc, momrpc, curpc, sqrpc, ra_notification_bus, dwell_scheduler, + def __init__(self, rarpc, rerpc, otdbrpc, momrpc, curpc, sqrpc, ra_notification_bus, dwell_scheduler, obscontrol, radb_dbcreds=None): # super gets not done to be able to insert mocks as early as possible otherwise the RPC block unittesting self.radbrpc = rarpc @@ -46,6 +47,7 @@ class TestingResourceAssigner(ResourceAssigner): # Could mock ResourceAvailabilityChecker, but it is out of play already due to mocked DwellScheduler self.resource_availability_checker = ResourceAvailabilityChecker(rarpc) self.dwell_scheduler = dwell_scheduler + self.obscontrol = obscontrol self.radb_creds = radb_dbcreds @@ -1651,6 +1653,12 @@ class ResourceAssignerTest(unittest.TestCase): self.prio_scheduler_mock = prio_scheduler_patcher.start() self.prio_scheduler_mock().allocate_resources.return_value = True + obscontrol_patcher = mock.patch( + 'lofar.sas.resourceassignment.resourceassigner.resource_assigner.ObservationControlRPCClient.abort_observation' + ) + self.addCleanup(obscontrol_patcher.stop) + self.obscontrol_mock = obscontrol_patcher.start() + self.spec_mock = mock.MagicMock() self.spec_mock.status = 'prescheduled' self.spec_mock.radb_id = self.task_id @@ -1684,7 +1692,9 @@ class ResourceAssignerTest(unittest.TestCase): self.resource_assigner = TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, self.otdbrpc_mock, self.momrpc_mock, self.curpc_mock, self.sqrpc_mock, - self.ra_notification_bus_mock, self.dwell_scheduler_mock) + self.ra_notification_bus_mock, + self.dwell_scheduler_mock, + self.obscontrol_mock) self.reset_specification_tree() @@ -1717,7 +1727,7 @@ class ResourceAssignerTest(unittest.TestCase): def test_contextManager_opens_and_closes_all_services(self): with TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock, self.otdbrpc_mock, self.momrpc_mock, self.curpc_mock, self.sqrpc_mock, self.ra_notification_bus_mock, - self.dwell_scheduler_mock): + self.dwell_scheduler_mock, self.obscontrol_mock): self.assert_all_services_opened() self.assert_all_services_closed() @@ -1998,5 +2008,33 @@ class ResourceAssignerTest(unittest.TestCase): self.logger_mock.error.assert_any_call(exception_msg) + def test_kill_task(self): + spec = Specification(None, None, None, None) # Easier than creating a custom object instance + spec.radb_id = 1 + spec.mom_id = 2 + spec.otdb_id = 3 + spec.status = "aborted" + spec.type = "observation" + self.resource_assigner._kill_task(spec) + self.obscontrol_mock.abort_observation.assert_called_with(spec.otdb_id) + + # Can't run a test like this, as we're not having a RADB and schedulers that actually do something or any + # tests with triggered observations or even multiple observations. + #def test_unschedule_task(self): + # spec = Specification(None, None, None, None) # Easier than creating a custom object instance + # spec.radb_id = 1 + # spec.mom_id = 2 + # spec.otdb_id = 3 + # spec.status = "approved" + # spec.type = "observation" + # + # content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} + # subject = 'Task' + 'Scheduled' + # + # self.resource_assigner.do_assignment(self.specification_tree['otdb_id'], self.specification_tree) + # + # self.assertBusNotificationAndLogging(content, subject) + + if __name__ == '__main__': unittest.main() diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py index a4d2b439f8380849e9358e5ccb54b7077ab318d0..5da3d5b4df9f7cd836b7718497b796f4590d37af 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py @@ -489,11 +489,6 @@ class PrioritySchedulerTest(StationSchedulerTest): self.momrpc_mock = momrpc_patcher.start() self.momrpc_mock.return_value = self.fake_momrpc - def mock_obscontrol(self): - obscontrol_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.ObservationControlRPCClient.abort_observation') - self.addCleanup(obscontrol_patcher.stop) - self.obscontrol_mock = obscontrol_patcher.start() - def mock_datetime(self): datetime_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.datetime') self.addCleanup(datetime_patcher.stop) @@ -507,7 +502,6 @@ class PrioritySchedulerTest(StationSchedulerTest): super(PrioritySchedulerTest, self).setUp() self.mock_momrpc() - self.mock_obscontrol() self.mock_datetime() def new_task_without_momid(self, otdb_id): @@ -620,17 +614,19 @@ class PrioritySchedulerTest(StationSchedulerTest): "root_resource_group": "RS106", "resource_count": 1 } ] scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates) - allocation_succesful = scheduler.allocate_resources() + changed_tasks = [] + allocation_succesful = scheduler.allocate_resources(changed_tasks) self.assertTrue(allocation_succesful) # First task must have been killed - otdb_id = self.radb.getTask(task_id)["otdb_id"] - self.obscontrol_mock.assert_called_with(otdb_id) + self.assertTrue(len(changed_tasks) > 0) + self.assertTrue(changed_tasks[0].radb_id == task_id) + self.assertTrue(changed_tasks[0].status == "aborted") # First task must have its endtime cut short to utcnow # and all claims should be ended (but still claimed) as well. self.assertEqual(datetime.datetime(2017, 1, 1, 12, 10, 0), self.radb.getTask(task_id)['endtime']) - self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_id))) + # NOT SURE WHY THIS DOESN'T WORK self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_id))) for claim in self.radb.getResourceClaims(task_ids=task_id): self.assertLessEqual(claim["endtime"], datetime.datetime(2017, 1, 1, 12, 10, 0)) self.assertEqual('claimed', claim["status"]) @@ -827,10 +823,10 @@ class PrioritySchedulerTest(StationSchedulerTest): "root_resource_group": "RS106", "resource_count": 1 }] scheduler = self.new_scheduler(task2_id, resource_estimator=lambda _: estimates) - allocation_succesful = scheduler.allocate_resources() + changed_tasks = [] + allocation_succesful = scheduler.allocate_resources(changed_tasks) self.assertFalse(allocation_succesful) - - self.obscontrol_mock.assert_not_called() + self.assertTrue(len(changed_tasks) == 0) class DwellSchedulerTest(PrioritySchedulerTest): diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py index 295bcdede7fb390ebb32dfdce877c37b0a005da8..3b14ffafe0d82bdc82068612f96ac0dfe6610bad 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py @@ -504,7 +504,7 @@ class RADatabase: values.append(task_id) query = '''UPDATE resource_allocation.task - SET ({fields}) = ({value_placeholders}) + SET ({fields}) = ROW({value_placeholders}) WHERE resource_allocation.task.id = {task_id_placeholder};'''.format(fields=', '.join(fields), value_placeholders=', '.join('%s' for x in fields), task_id_placeholder='%s') @@ -533,7 +533,7 @@ class RADatabase: values.append(task_id) query = '''UPDATE resource_allocation.specification - SET ({fields}) = ({value_placeholders}) + SET ({fields}) = ROW({value_placeholders}) WHERE resource_allocation.specification.id in (SELECT t.specification_id FROM resource_allocation.task t WHERE t.id={id_placeholder});'''.format(fields=', '.join(fields), @@ -1459,7 +1459,7 @@ class RADatabase: values.append(user_id) query = '''UPDATE resource_allocation.resource_claim - SET ({fields}) = ({value_placeholders})'''.format(fields=', '.join(fields), + SET ({fields}) = ROW({value_placeholders})'''.format(fields=', '.join(fields), value_placeholders=', '.join('%s' for x in fields)) if where_resource_claim_ids is None and where_task_ids is None: