From 3bccd7918a3265f67ba9fefc4425f93226fa25d3 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Thu, 19 Sep 2019 12:55:21 +0200 Subject: [PATCH] SW-816: reused radb connections. Testing showed that opening/closing db connection too quickly in succession can lead to timeouts while waiting for sockets in TCP_WAIT state. --- .../ResourceAssigner/lib/resource_assigner.py | 10 ++++-- .../ResourceAssigner/lib/schedulers.py | 32 ++++++++----------- .../test/t_resourceassigner.py | 3 +- .../ResourceAssigner/test/t_schedulers.py | 31 ++++++++++-------- 4 files changed, 41 insertions(+), 35 deletions(-) diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py index 2b498c030d4..113d6ab2198 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py @@ -98,6 +98,7 @@ class ResourceAssigner(object): def open(self): """Open connections to various service/buses/databases""" + logger.debug("resource_assigner opening all bus/db connections") self.radb.connect() self.rerpc.open() self.otdbrpc.open() @@ -106,9 +107,11 @@ class ResourceAssigner(object): self.curpc.open() self.ra_notification_bus.open() self.obscontrol.open() + logger.info("resource_assigner opened all bus/db connections") def close(self): """Close connections to various service/buses/databases""" + logger.debug("resource_assigner closing all bus/db connections") self.obscontrol.close() self.radb.disconnect() self.rerpc.close() @@ -117,6 +120,7 @@ class ResourceAssigner(object): self.sqrpc.close() self.curpc.close() self.ra_notification_bus.close() + logger.info("resource_assigner closed all bus/db connections") @property @cache @@ -269,7 +273,7 @@ class ResourceAssigner(object): specification_tree=specification_tree, resource_estimator=self._get_resource_estimates, resource_availability_checker=self.resource_availability_checker, - radbcreds=self.radb_creds, + radb=self.radb, min_starttime=min_starttime, max_starttime=max_starttime, duration=duration) @@ -278,7 +282,7 @@ class ResourceAssigner(object): specification_tree=specification_tree, resource_estimator=self._get_resource_estimates, resource_availability_checker=self.resource_availability_checker, - radbcreds=self.radb_creds) + radb=self.radb) except Exception as e: logger.exception('Error in scheduler._schedule_resources: %s', e) #Why are we mentioning _schedule_resources here? return False @@ -292,7 +296,7 @@ class ResourceAssigner(object): specification_tree=specification_tree, resource_estimator=self._get_resource_estimates, resource_availability_checker=self.resource_availability_checker, - radbcreds=self.radb_creds) + radb=self.radb) with basic_scheduler: (scheduler_result, changed_tasks) = basic_scheduler.allocate_resources() return scheduler_result diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py index 0f17dae22dd..334008e6119 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulers.py @@ -47,7 +47,7 @@ class ScheduleException(Exception): class BasicScheduler(object): """ A Scheduler that allocates resources at a fixed time. Resources are searched for. """ - def __init__(self, task_id, specification_tree, resource_estimator, resource_availability_checker, radbcreds=None): + def __init__(self, task_id, specification_tree, resource_estimator, resource_availability_checker, radb: RADatabase): """ Creates a BasicScheduler instance @@ -55,7 +55,7 @@ class BasicScheduler(object): :param specification_tree: the full specification; will be modified where needed with respect to resources :param resource_estimator: the ResourceEstimator function that turns a specification into resource estimates :param resource_availability_checker: the ResourceAvailabilityScheduler to be used by the BasicScheduler - :param radbcreds: ResourceAssigner database credentials. If None, the default credentials will be used + :param radb: a RADatabase instance. :raises AssertionError if task_id is a negative number or is None """ @@ -64,9 +64,7 @@ class BasicScheduler(object): self.specification_tree = specification_tree self.resource_estimator = resource_estimator self.resource_availability_checker = resource_availability_checker - - # We need a DIRECT connection to the database in order to do client-side (that's us) transactions - self.radb = RADatabase(dbcreds=radbcreds) + self.radb = radb # Ensure a valid task_id is given, since radb.getTasks() will not raise if task_id equals None if task_id < 0: @@ -92,12 +90,10 @@ class BasicScheduler(object): self.close() def open(self): - """ Open radb connection""" - self.radb.connect() + pass def close(self): - """ Close radb connection""" - self.radb.disconnect() + pass def allocate_resources(self): """ @@ -333,7 +329,7 @@ class StationScheduler(BasicScheduler): After scheduling, the get_stations() function returns a list of the allocated stations. """ def __init__(self, task_id, specification_tree, resource_estimator, resource_availability_checker, - radbcreds=None, + radb: RADatabase, broker=DEFAULT_BROKER): """ Creates a StationScheduler instance @@ -342,13 +338,13 @@ class StationScheduler(BasicScheduler): :param specification_tree: the full specification; will be modified where needed with respect to resources :param resource_estimator: the ResourceEstimator function that turns a specification into resource estimates :param resource_availability_checker: the ResourceAvailabilityChecker instance to use - :param radbcreds: the RADB credentials to use + :param radb: a RADatabase instance. :param mom_busname: the MoM Query service bus name (default: 'lofar.ra.command') :param mom_servicename: the MoM Query service name (default: 'momqueryservice') :param broker: the message broker to use for send messages/RPC calls/etc. """ - super(StationScheduler, self).__init__(task_id, specification_tree, resource_estimator, resource_availability_checker, radbcreds) + super(StationScheduler, self).__init__(task_id, specification_tree, resource_estimator, resource_availability_checker, radb) # For observations without a fixed station list, we need to derive one. TODO: this likely isnt the condition we want to decide on self.must_derive_station_list = specification_tree["task_type"] == "observation" and specification_tree["specification"]["Observation.VirtualInstrument.stationList"] == [] and specification_tree["station_requirements"] @@ -489,7 +485,7 @@ class PriorityScheduler(StationScheduler): Conflict resolution is done by killing jobs with lower priority. """ def __init__(self, task_id, specification_tree, resource_estimator, resource_availability_checker, - radbcreds=None, + radb: RADatabase, broker=DEFAULT_BROKER): """ Creates a PriorityScheduler instance @@ -498,11 +494,11 @@ class PriorityScheduler(StationScheduler): :param specification_tree: the full specification; will be modified where needed with respect to resources :param resource_estimator: the ResourceEstimator function that turns a specification into resource estimates :param resource_availability_checker: the ResourceAvailabilityChecker instance to use - :param radbcreds: the RADB credentials to use + :param radb: a RADatabase instance. :param broker: the message broker to use for send messages/RPC calls/etc. """ - super(PriorityScheduler, self).__init__(task_id, specification_tree, resource_estimator, resource_availability_checker, radbcreds) + super(PriorityScheduler, self).__init__(task_id, specification_tree, resource_estimator, resource_availability_checker, radb) self.momqueryservice = MoMQueryRPC.create(broker=broker, timeout=180) @@ -719,7 +715,7 @@ class DwellScheduler(PriorityScheduler): max_starttime, duration, resource_availability_checker, - radbcreds=None, + radb: RADatabase = None, broker=DEFAULT_BROKER): """ Create a DwellScheduler instance @@ -731,7 +727,7 @@ class DwellScheduler(PriorityScheduler): :param max_starttime: the task's desired maximum start time :param duration: the task's duration :param resource_availability_checker: the ResourceAvailabilityChecker to use - :param radbcreds: the RADB credentials to use + :param radb: a RADatabase instance. :param mom_busname: the MoM Query service bus name (default: 'lofar.ra.command') :param mom_servicename: the MoM Query service name (default: 'momqueryservice') :param broker: the message broker to use for send messages/RPC calls/etc. @@ -741,7 +737,7 @@ class DwellScheduler(PriorityScheduler): specification_tree=specification_tree, resource_estimator=resource_estimator, resource_availability_checker=resource_availability_checker, - radbcreds=radbcreds, + radb=radb, broker=broker) self.min_starttime = min_starttime diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py index 9500af8328f..17cfcb1e8b6 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py @@ -443,7 +443,8 @@ class ResourceAssignerTest(RADBCommonTestMixin, unittest.TestCase): self.logger_mock.warn.side_effect = myprint self.logger_mock.error.side_effect = myprint - self.resource_assigner = ResourceAssigner(exchange=self.tmp_exchange.address, radb_dbcreds=self.radb.dbcreds) + self.resource_assigner = ResourceAssigner(exchange=self.tmp_exchange.address, radb_dbcreds=self.dbcreds) + self.addCleanup(self.resource_assigner.close) self.reset_specification_tree() diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py index 028b2e65b4a..85efbd6aa7c 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py @@ -42,24 +42,29 @@ from lofar.sas.resourceassignment.database.testing.radb_common_testing import RA class SchedulerTest(RADBCommonTestMixin, unittest.TestCase): """ create test radb postgres instance, and use that in a ResourceAvailabilityChecker""" - def setUp(self): - super().setUp() - self.resource_availability_checker = ResourceAvailabilityChecker(self.radb) - self._enforce_limited_station_group_list() + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._enforce_limited_station_group_list() - def _enforce_limited_station_group_list(self): + @classmethod + def _enforce_limited_station_group_list(cls): # for test simplicity, create a simple virtual instrument which makes debugging easier. # this is safe, because we are working on a test database LIMITED_STATION_GROUP_LIST = ('CS001', 'CS002', 'RS106', 'RS205') - unwanted_resource_group_ids = [rg['id'] for rg in self.radb.getResourceGroups() + unwanted_resource_group_ids = [rg['id'] for rg in cls.radb.getResourceGroups() if rg['type'] == 'station' and rg['name'] not in LIMITED_STATION_GROUP_LIST] - self.radb.executeQuery("DELETE FROM virtual_instrument.resource_group rg WHERE rg.id in (%s)" % ( - ', '.join([str(id) for id in unwanted_resource_group_ids])),) - self.radb.commit() + if unwanted_resource_group_ids: + cls.radb.executeQuery("DELETE FROM virtual_instrument.resource_group rg WHERE rg.id in (%s)" % ( + ', '.join([str(id) for id in unwanted_resource_group_ids])),) + cls.radb.commit() + def setUp(self): + super().setUp() + self.resource_availability_checker = ResourceAvailabilityChecker(self.radb) class BasicSchedulerTest(SchedulerTest): def new_task(self, mom_otdb_id=0, starttime=None, endtime=None): @@ -99,7 +104,7 @@ class BasicSchedulerTest(SchedulerTest): return BasicScheduler(task_id, specification_tree if specification_tree else self.get_specification_tree(task_id), resource_estimator if resource_estimator else lambda _:[], - self.resource_availability_checker, self.radb.dbcreds) + self.resource_availability_checker, self.radb) def get_station_bandwidth_max_capacity(self): resource_CS001bw0 = [r for r in self.radb.getResources(resource_types="bandwidth", include_availability=True) @@ -239,7 +244,7 @@ class StationSchedulerTest(BasicSchedulerTest): return StationScheduler(task_id, specification_tree if specification_tree else self.get_specification_tree(task_id), resource_estimator if resource_estimator else self.fake_resource_estimator, - self.resource_availability_checker, self.radb.dbcreds) + self.resource_availability_checker, self.radb) def fake_resource_estimator(self, specification_tree): """ Return an estimate for each station, plus a fixed storage claim of half the available storage capacity. """ @@ -507,7 +512,7 @@ class PrioritySchedulerTest(StationSchedulerTest): return PriorityScheduler(task_id, specification_tree if specification_tree else self.get_specification_tree(task_id), resource_estimator if resource_estimator else self.fake_resource_estimator, - self.resource_availability_checker, self.radb.dbcreds) + self.resource_availability_checker, self.radb) def test_unschedule_lower_priority_future_task(self): """ @@ -898,7 +903,7 @@ class DwellSchedulerTest(PrioritySchedulerTest): min_starttime, max_starttime, datetime.timedelta(hours=1), # duration - self.resource_availability_checker, self.radb.dbcreds) + self.resource_availability_checker, self.radb) def test_no_dwell(self): """ Whether a task will not dwell unnecessarily on an empty system. """ -- GitLab