Skip to content
Snippets Groups Projects
Commit 3bccd791 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-816: reused radb connections. Testing showed that opening/closing db...

SW-816: reused radb connections. Testing showed that opening/closing db connection too quickly in succession can lead to timeouts while waiting for sockets in TCP_WAIT state.
parent 8a3f87a3
No related branches found
No related tags found
2 merge requests!59Merge LOFAR-Release-4_0 into master,!57Resolve SW-816
...@@ -98,6 +98,7 @@ class ResourceAssigner(object): ...@@ -98,6 +98,7 @@ class ResourceAssigner(object):
def open(self): def open(self):
"""Open connections to various service/buses/databases""" """Open connections to various service/buses/databases"""
logger.debug("resource_assigner opening all bus/db connections")
self.radb.connect() self.radb.connect()
self.rerpc.open() self.rerpc.open()
self.otdbrpc.open() self.otdbrpc.open()
...@@ -106,9 +107,11 @@ class ResourceAssigner(object): ...@@ -106,9 +107,11 @@ class ResourceAssigner(object):
self.curpc.open() self.curpc.open()
self.ra_notification_bus.open() self.ra_notification_bus.open()
self.obscontrol.open() self.obscontrol.open()
logger.info("resource_assigner opened all bus/db connections")
def close(self): def close(self):
"""Close connections to various service/buses/databases""" """Close connections to various service/buses/databases"""
logger.debug("resource_assigner closing all bus/db connections")
self.obscontrol.close() self.obscontrol.close()
self.radb.disconnect() self.radb.disconnect()
self.rerpc.close() self.rerpc.close()
...@@ -117,6 +120,7 @@ class ResourceAssigner(object): ...@@ -117,6 +120,7 @@ class ResourceAssigner(object):
self.sqrpc.close() self.sqrpc.close()
self.curpc.close() self.curpc.close()
self.ra_notification_bus.close() self.ra_notification_bus.close()
logger.info("resource_assigner closed all bus/db connections")
@property @property
@cache @cache
...@@ -269,7 +273,7 @@ class ResourceAssigner(object): ...@@ -269,7 +273,7 @@ class ResourceAssigner(object):
specification_tree=specification_tree, specification_tree=specification_tree,
resource_estimator=self._get_resource_estimates, resource_estimator=self._get_resource_estimates,
resource_availability_checker=self.resource_availability_checker, resource_availability_checker=self.resource_availability_checker,
radbcreds=self.radb_creds, radb=self.radb,
min_starttime=min_starttime, min_starttime=min_starttime,
max_starttime=max_starttime, max_starttime=max_starttime,
duration=duration) duration=duration)
...@@ -278,7 +282,7 @@ class ResourceAssigner(object): ...@@ -278,7 +282,7 @@ class ResourceAssigner(object):
specification_tree=specification_tree, specification_tree=specification_tree,
resource_estimator=self._get_resource_estimates, resource_estimator=self._get_resource_estimates,
resource_availability_checker=self.resource_availability_checker, resource_availability_checker=self.resource_availability_checker,
radbcreds=self.radb_creds) radb=self.radb)
except Exception as e: except Exception as e:
logger.exception('Error in scheduler._schedule_resources: %s', e) #Why are we mentioning _schedule_resources here? logger.exception('Error in scheduler._schedule_resources: %s', e) #Why are we mentioning _schedule_resources here?
return False return False
...@@ -292,7 +296,7 @@ class ResourceAssigner(object): ...@@ -292,7 +296,7 @@ class ResourceAssigner(object):
specification_tree=specification_tree, specification_tree=specification_tree,
resource_estimator=self._get_resource_estimates, resource_estimator=self._get_resource_estimates,
resource_availability_checker=self.resource_availability_checker, resource_availability_checker=self.resource_availability_checker,
radbcreds=self.radb_creds) radb=self.radb)
with basic_scheduler: with basic_scheduler:
(scheduler_result, changed_tasks) = basic_scheduler.allocate_resources() (scheduler_result, changed_tasks) = basic_scheduler.allocate_resources()
return scheduler_result return scheduler_result
......
...@@ -47,7 +47,7 @@ class ScheduleException(Exception): ...@@ -47,7 +47,7 @@ class ScheduleException(Exception):
class BasicScheduler(object): class BasicScheduler(object):
""" A Scheduler that allocates resources at a fixed time. Resources are searched for. """ """ A Scheduler that allocates resources at a fixed time. Resources are searched for. """
def __init__(self, task_id, specification_tree, resource_estimator, resource_availability_checker, radbcreds=None): def __init__(self, task_id, specification_tree, resource_estimator, resource_availability_checker, radb: RADatabase):
""" """
Creates a BasicScheduler instance Creates a BasicScheduler instance
...@@ -55,7 +55,7 @@ class BasicScheduler(object): ...@@ -55,7 +55,7 @@ class BasicScheduler(object):
:param specification_tree: the full specification; will be modified where needed with respect to resources :param specification_tree: the full specification; will be modified where needed with respect to resources
:param resource_estimator: the ResourceEstimator function that turns a specification into resource estimates :param resource_estimator: the ResourceEstimator function that turns a specification into resource estimates
:param resource_availability_checker: the ResourceAvailabilityScheduler to be used by the BasicScheduler :param resource_availability_checker: the ResourceAvailabilityScheduler to be used by the BasicScheduler
:param radbcreds: ResourceAssigner database credentials. If None, the default credentials will be used :param radb: a RADatabase instance.
:raises AssertionError if task_id is a negative number or is None :raises AssertionError if task_id is a negative number or is None
""" """
...@@ -64,9 +64,7 @@ class BasicScheduler(object): ...@@ -64,9 +64,7 @@ class BasicScheduler(object):
self.specification_tree = specification_tree self.specification_tree = specification_tree
self.resource_estimator = resource_estimator self.resource_estimator = resource_estimator
self.resource_availability_checker = resource_availability_checker self.resource_availability_checker = resource_availability_checker
self.radb = radb
# We need a DIRECT connection to the database in order to do client-side (that's us) transactions
self.radb = RADatabase(dbcreds=radbcreds)
# Ensure a valid task_id is given, since radb.getTasks() will not raise if task_id equals None # Ensure a valid task_id is given, since radb.getTasks() will not raise if task_id equals None
if task_id < 0: if task_id < 0:
...@@ -92,12 +90,10 @@ class BasicScheduler(object): ...@@ -92,12 +90,10 @@ class BasicScheduler(object):
self.close() self.close()
def open(self): def open(self):
""" Open radb connection""" pass
self.radb.connect()
def close(self): def close(self):
""" Close radb connection""" pass
self.radb.disconnect()
def allocate_resources(self): def allocate_resources(self):
""" """
...@@ -333,7 +329,7 @@ class StationScheduler(BasicScheduler): ...@@ -333,7 +329,7 @@ class StationScheduler(BasicScheduler):
After scheduling, the get_stations() function returns a list of the allocated stations. """ After scheduling, the get_stations() function returns a list of the allocated stations. """
def __init__(self, task_id, specification_tree, resource_estimator, resource_availability_checker, def __init__(self, task_id, specification_tree, resource_estimator, resource_availability_checker,
radbcreds=None, radb: RADatabase,
broker=DEFAULT_BROKER): broker=DEFAULT_BROKER):
""" """
Creates a StationScheduler instance Creates a StationScheduler instance
...@@ -342,13 +338,13 @@ class StationScheduler(BasicScheduler): ...@@ -342,13 +338,13 @@ class StationScheduler(BasicScheduler):
:param specification_tree: the full specification; will be modified where needed with respect to resources :param specification_tree: the full specification; will be modified where needed with respect to resources
:param resource_estimator: the ResourceEstimator function that turns a specification into resource estimates :param resource_estimator: the ResourceEstimator function that turns a specification into resource estimates
:param resource_availability_checker: the ResourceAvailabilityChecker instance to use :param resource_availability_checker: the ResourceAvailabilityChecker instance to use
:param radbcreds: the RADB credentials to use :param radb: a RADatabase instance.
:param mom_busname: the MoM Query service bus name (default: 'lofar.ra.command') :param mom_busname: the MoM Query service bus name (default: 'lofar.ra.command')
:param mom_servicename: the MoM Query service name (default: 'momqueryservice') :param mom_servicename: the MoM Query service name (default: 'momqueryservice')
:param broker: the message broker to use for send messages/RPC calls/etc. :param broker: the message broker to use for send messages/RPC calls/etc.
""" """
super(StationScheduler, self).__init__(task_id, specification_tree, resource_estimator, resource_availability_checker, radbcreds) super(StationScheduler, self).__init__(task_id, specification_tree, resource_estimator, resource_availability_checker, radb)
# For observations without a fixed station list, we need to derive one. TODO: this likely isnt the condition we want to decide on # For observations without a fixed station list, we need to derive one. TODO: this likely isnt the condition we want to decide on
self.must_derive_station_list = specification_tree["task_type"] == "observation" and specification_tree["specification"]["Observation.VirtualInstrument.stationList"] == [] and specification_tree["station_requirements"] self.must_derive_station_list = specification_tree["task_type"] == "observation" and specification_tree["specification"]["Observation.VirtualInstrument.stationList"] == [] and specification_tree["station_requirements"]
...@@ -489,7 +485,7 @@ class PriorityScheduler(StationScheduler): ...@@ -489,7 +485,7 @@ class PriorityScheduler(StationScheduler):
Conflict resolution is done by killing jobs with lower priority. """ Conflict resolution is done by killing jobs with lower priority. """
def __init__(self, task_id, specification_tree, resource_estimator, resource_availability_checker, def __init__(self, task_id, specification_tree, resource_estimator, resource_availability_checker,
radbcreds=None, radb: RADatabase,
broker=DEFAULT_BROKER): broker=DEFAULT_BROKER):
""" """
Creates a PriorityScheduler instance Creates a PriorityScheduler instance
...@@ -498,11 +494,11 @@ class PriorityScheduler(StationScheduler): ...@@ -498,11 +494,11 @@ class PriorityScheduler(StationScheduler):
:param specification_tree: the full specification; will be modified where needed with respect to resources :param specification_tree: the full specification; will be modified where needed with respect to resources
:param resource_estimator: the ResourceEstimator function that turns a specification into resource estimates :param resource_estimator: the ResourceEstimator function that turns a specification into resource estimates
:param resource_availability_checker: the ResourceAvailabilityChecker instance to use :param resource_availability_checker: the ResourceAvailabilityChecker instance to use
:param radbcreds: the RADB credentials to use :param radb: a RADatabase instance.
:param broker: the message broker to use for send messages/RPC calls/etc. :param broker: the message broker to use for send messages/RPC calls/etc.
""" """
super(PriorityScheduler, self).__init__(task_id, specification_tree, resource_estimator, resource_availability_checker, radbcreds) super(PriorityScheduler, self).__init__(task_id, specification_tree, resource_estimator, resource_availability_checker, radb)
self.momqueryservice = MoMQueryRPC.create(broker=broker, timeout=180) self.momqueryservice = MoMQueryRPC.create(broker=broker, timeout=180)
...@@ -719,7 +715,7 @@ class DwellScheduler(PriorityScheduler): ...@@ -719,7 +715,7 @@ class DwellScheduler(PriorityScheduler):
max_starttime, max_starttime,
duration, duration,
resource_availability_checker, resource_availability_checker,
radbcreds=None, radb: RADatabase = None,
broker=DEFAULT_BROKER): broker=DEFAULT_BROKER):
""" """
Create a DwellScheduler instance Create a DwellScheduler instance
...@@ -731,7 +727,7 @@ class DwellScheduler(PriorityScheduler): ...@@ -731,7 +727,7 @@ class DwellScheduler(PriorityScheduler):
:param max_starttime: the task's desired maximum start time :param max_starttime: the task's desired maximum start time
:param duration: the task's duration :param duration: the task's duration
:param resource_availability_checker: the ResourceAvailabilityChecker to use :param resource_availability_checker: the ResourceAvailabilityChecker to use
:param radbcreds: the RADB credentials to use :param radb: a RADatabase instance.
:param mom_busname: the MoM Query service bus name (default: 'lofar.ra.command') :param mom_busname: the MoM Query service bus name (default: 'lofar.ra.command')
:param mom_servicename: the MoM Query service name (default: 'momqueryservice') :param mom_servicename: the MoM Query service name (default: 'momqueryservice')
:param broker: the message broker to use for send messages/RPC calls/etc. :param broker: the message broker to use for send messages/RPC calls/etc.
...@@ -741,7 +737,7 @@ class DwellScheduler(PriorityScheduler): ...@@ -741,7 +737,7 @@ class DwellScheduler(PriorityScheduler):
specification_tree=specification_tree, specification_tree=specification_tree,
resource_estimator=resource_estimator, resource_estimator=resource_estimator,
resource_availability_checker=resource_availability_checker, resource_availability_checker=resource_availability_checker,
radbcreds=radbcreds, radb=radb,
broker=broker) broker=broker)
self.min_starttime = min_starttime self.min_starttime = min_starttime
......
...@@ -443,7 +443,8 @@ class ResourceAssignerTest(RADBCommonTestMixin, unittest.TestCase): ...@@ -443,7 +443,8 @@ class ResourceAssignerTest(RADBCommonTestMixin, unittest.TestCase):
self.logger_mock.warn.side_effect = myprint self.logger_mock.warn.side_effect = myprint
self.logger_mock.error.side_effect = myprint self.logger_mock.error.side_effect = myprint
self.resource_assigner = ResourceAssigner(exchange=self.tmp_exchange.address, radb_dbcreds=self.radb.dbcreds) self.resource_assigner = ResourceAssigner(exchange=self.tmp_exchange.address, radb_dbcreds=self.dbcreds)
self.addCleanup(self.resource_assigner.close)
self.reset_specification_tree() self.reset_specification_tree()
......
...@@ -42,24 +42,29 @@ from lofar.sas.resourceassignment.database.testing.radb_common_testing import RA ...@@ -42,24 +42,29 @@ from lofar.sas.resourceassignment.database.testing.radb_common_testing import RA
class SchedulerTest(RADBCommonTestMixin, unittest.TestCase): class SchedulerTest(RADBCommonTestMixin, unittest.TestCase):
""" create test radb postgres instance, and use that in a ResourceAvailabilityChecker""" """ create test radb postgres instance, and use that in a ResourceAvailabilityChecker"""
def setUp(self): @classmethod
super().setUp() def setUpClass(cls):
self.resource_availability_checker = ResourceAvailabilityChecker(self.radb) super().setUpClass()
self._enforce_limited_station_group_list() cls._enforce_limited_station_group_list()
def _enforce_limited_station_group_list(self): @classmethod
def _enforce_limited_station_group_list(cls):
# for test simplicity, create a simple virtual instrument which makes debugging easier. # for test simplicity, create a simple virtual instrument which makes debugging easier.
# this is safe, because we are working on a test database # this is safe, because we are working on a test database
LIMITED_STATION_GROUP_LIST = ('CS001', 'CS002', 'RS106', 'RS205') LIMITED_STATION_GROUP_LIST = ('CS001', 'CS002', 'RS106', 'RS205')
unwanted_resource_group_ids = [rg['id'] for rg in self.radb.getResourceGroups() unwanted_resource_group_ids = [rg['id'] for rg in cls.radb.getResourceGroups()
if rg['type'] == 'station' and rg['name'] not in LIMITED_STATION_GROUP_LIST] if rg['type'] == 'station' and rg['name'] not in LIMITED_STATION_GROUP_LIST]
self.radb.executeQuery("DELETE FROM virtual_instrument.resource_group rg WHERE rg.id in (%s)" % ( if unwanted_resource_group_ids:
', '.join([str(id) for id in unwanted_resource_group_ids])),) cls.radb.executeQuery("DELETE FROM virtual_instrument.resource_group rg WHERE rg.id in (%s)" % (
self.radb.commit() ', '.join([str(id) for id in unwanted_resource_group_ids])),)
cls.radb.commit()
def setUp(self):
super().setUp()
self.resource_availability_checker = ResourceAvailabilityChecker(self.radb)
class BasicSchedulerTest(SchedulerTest): class BasicSchedulerTest(SchedulerTest):
def new_task(self, mom_otdb_id=0, starttime=None, endtime=None): def new_task(self, mom_otdb_id=0, starttime=None, endtime=None):
...@@ -99,7 +104,7 @@ class BasicSchedulerTest(SchedulerTest): ...@@ -99,7 +104,7 @@ class BasicSchedulerTest(SchedulerTest):
return BasicScheduler(task_id, return BasicScheduler(task_id,
specification_tree if specification_tree else self.get_specification_tree(task_id), specification_tree if specification_tree else self.get_specification_tree(task_id),
resource_estimator if resource_estimator else lambda _:[], resource_estimator if resource_estimator else lambda _:[],
self.resource_availability_checker, self.radb.dbcreds) self.resource_availability_checker, self.radb)
def get_station_bandwidth_max_capacity(self): def get_station_bandwidth_max_capacity(self):
resource_CS001bw0 = [r for r in self.radb.getResources(resource_types="bandwidth", include_availability=True) resource_CS001bw0 = [r for r in self.radb.getResources(resource_types="bandwidth", include_availability=True)
...@@ -239,7 +244,7 @@ class StationSchedulerTest(BasicSchedulerTest): ...@@ -239,7 +244,7 @@ class StationSchedulerTest(BasicSchedulerTest):
return StationScheduler(task_id, return StationScheduler(task_id,
specification_tree if specification_tree else self.get_specification_tree(task_id), specification_tree if specification_tree else self.get_specification_tree(task_id),
resource_estimator if resource_estimator else self.fake_resource_estimator, resource_estimator if resource_estimator else self.fake_resource_estimator,
self.resource_availability_checker, self.radb.dbcreds) self.resource_availability_checker, self.radb)
def fake_resource_estimator(self, specification_tree): def fake_resource_estimator(self, specification_tree):
""" Return an estimate for each station, plus a fixed storage claim of half the available storage capacity. """ """ Return an estimate for each station, plus a fixed storage claim of half the available storage capacity. """
...@@ -507,7 +512,7 @@ class PrioritySchedulerTest(StationSchedulerTest): ...@@ -507,7 +512,7 @@ class PrioritySchedulerTest(StationSchedulerTest):
return PriorityScheduler(task_id, return PriorityScheduler(task_id,
specification_tree if specification_tree else self.get_specification_tree(task_id), specification_tree if specification_tree else self.get_specification_tree(task_id),
resource_estimator if resource_estimator else self.fake_resource_estimator, resource_estimator if resource_estimator else self.fake_resource_estimator,
self.resource_availability_checker, self.radb.dbcreds) self.resource_availability_checker, self.radb)
def test_unschedule_lower_priority_future_task(self): def test_unschedule_lower_priority_future_task(self):
""" """
...@@ -898,7 +903,7 @@ class DwellSchedulerTest(PrioritySchedulerTest): ...@@ -898,7 +903,7 @@ class DwellSchedulerTest(PrioritySchedulerTest):
min_starttime, min_starttime,
max_starttime, max_starttime,
datetime.timedelta(hours=1), # duration datetime.timedelta(hours=1), # duration
self.resource_availability_checker, self.radb.dbcreds) self.resource_availability_checker, self.radb)
def test_no_dwell(self): def test_no_dwell(self):
""" Whether a task will not dwell unnecessarily on an empty system. """ """ Whether a task will not dwell unnecessarily on an empty system. """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment