diff --git a/LCS/Messaging/python/messaging/config.py b/LCS/Messaging/python/messaging/config.py index ac35034e35fdaed0212f7c772089a7d7881aedef..03ed391446fd7af57fe94487b2db853c870621eb 100644 --- a/LCS/Messaging/python/messaging/config.py +++ b/LCS/Messaging/python/messaging/config.py @@ -8,7 +8,7 @@ DEFAULT_BROKER = "scu001.control.lofar" if isProductionEnvironment() else \ "scu199.control.lofar" if isTestEnvironment() else \ "localhost" -DEFAULT_PORT = 5675 if isProductionEnvironment() or isTestEnvironment() else 5672 +DEFAULT_PORT = 5675 DEFAULT_USER = "guest" DEFAULT_PASSWORD = "guest" diff --git a/LCS/Messaging/python/messaging/test/t_RPC.py b/LCS/Messaging/python/messaging/test/t_RPC.py index 39203d9e2f10a0aad1fecd0f1edd71732f69c5d7..6d769f1e44c46a62f9dab566bcbc3827394ea5cd 100644 --- a/LCS/Messaging/python/messaging/test/t_RPC.py +++ b/LCS/Messaging/python/messaging/test/t_RPC.py @@ -54,7 +54,7 @@ class TestRPC(unittest.TestCase): self.assertTrue(service.is_listening()) self.assertTrue(service.is_running()) - with RPCClient(service_name=TEST_SERVICE_NAME, exchange=tmp_exchange.address, timeout=0.2) as rpc_client: + with RPCClient(service_name=TEST_SERVICE_NAME, exchange=tmp_exchange.address, timeout=1) as rpc_client: self.assertEqual("foo", rpc_client.execute("my_public_method1")) self.assertEqual(("bar", 42), rpc_client.execute("my_public_method2", 42)) diff --git a/LCS/PyCommon/postgres.py b/LCS/PyCommon/postgres.py index d3e35d048639663738d6552d07c6f61af27b063f..1ffe9e117dcd2909f2cd4145794ce84ae0e467ec 100644 --- a/LCS/PyCommon/postgres.py +++ b/LCS/PyCommon/postgres.py @@ -201,6 +201,11 @@ class PostgresDatabaseConnection: '''returns the database name''' return self._dbcreds.database + @property + def dbcreds(self) -> DBCredentials: + '''returns the database credentials''' + return self._dbcreds + @property def is_connected(self) -> bool: return self._connection is not None and self._connection.closed==0 diff --git a/LCS/PyCommon/subprocess_utils.py b/LCS/PyCommon/subprocess_utils.py index 35bf9ef5a053cb3f43bee5de679ce5e97c13438b..d5c78f98c32d23bc01381841b4e1bb5530b59aaf 100644 --- a/LCS/PyCommon/subprocess_utils.py +++ b/LCS/PyCommon/subprocess_utils.py @@ -90,7 +90,7 @@ def execute_in_parallel(cmd_lists, timeout=3600, max_parallel=32): proc.kill() raise - PopenResult = namedtuple('PopenResult', ['returncode', 'stdout', 'stderr'], verbose=False) + PopenResult = namedtuple('PopenResult', ['returncode', 'stdout', 'stderr']) results = [PopenResult(p.returncode, p.stdout.read(), p.stderr.read()) for p in procs] diff --git a/LCS/PyCommon/util.py b/LCS/PyCommon/util.py index ae27862ebeb7b7be37769a0b5ff4f79aa8451228..7bf8680948d4d1c480ccfa8c30c770797e7b8d2a 100644 --- a/LCS/PyCommon/util.py +++ b/LCS/PyCommon/util.py @@ -186,10 +186,10 @@ def single_line_with_single_spaces(lines: str) -> str: lines = str(lines) line = lines.replace('\n', ' ').replace('\t', ' ') - lenght = len(line) + length = len(line) while True: line = line.replace(' ', ' ') - new_lenght = len(line) - if new_lenght == lenght: + new_length = len(line) + if new_length == length: return line - lenght = new_lenght + length = new_length diff --git a/LCS/pyparameterset/src/__init__.py b/LCS/pyparameterset/src/__init__.py index dd18b4e2b1a3cd797399faf137d82f87bf782131..0309b12e714afe059a475d0711d96e1ccb1580a4 100755 --- a/LCS/pyparameterset/src/__init__.py +++ b/LCS/pyparameterset/src/__init__.py @@ -268,4 +268,4 @@ class parameterset(PyParameterSet): def __str__(self): """:returns the parset in a human readable string (lines of key=value, sorted by key)""" - return '\n'.join("%s = %s" % (key, self[key]) for key in sorted(self.keys())) \ No newline at end of file + return '\n'.join("%s=%s" % (key, self[key]) for key in sorted(self.keys())) \ No newline at end of file diff --git a/LCS/pyparameterset/test/tpyparameterset.stdout b/LCS/pyparameterset/test/tpyparameterset.stdout index 854ff79614fe997b77ac86c35ef8793cad8338c2..b56db0b5cc46853b0bc8cb8c8db70538a3a42232 100644 --- a/LCS/pyparameterset/test/tpyparameterset.stdout +++ b/LCS/pyparameterset/test/tpyparameterset.stdout @@ -141,4 +141,3 @@ vec=[1,2,3] vecbool=[true,false,true] vecexp=[1..3,5..10] vecnest=[[1..3,5*10],[5..10]] - diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/CMakeLists.txt b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/CMakeLists.txt index 76bf2d12ba7ff71f10127bffea976f1d31937124..5d8309898fa9306afb6e33b925d4d5f16573aa2a 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/CMakeLists.txt +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/CMakeLists.txt @@ -1,4 +1,4 @@ -lofar_package(LTAIngestTransferServer 2.0 DEPENDS LTACommon LTAIngestCommon LTAIngestServerCommon PyMessaging PyCommon) +lofar_package(LTAIngestTransferServer 2.0 DEPENDS LTACommon LTAIngestCommon LTAIngestServerCommon PyMessaging PyCommon MoMSimpleAPIs MessageLogger) include(PythonInstall) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py index 449adc7f7028a2b41d0a27e1efd042ff072bae8e..9a38fadc6d9975cbaa9aa46fea557569755ad8d1 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py @@ -10,9 +10,9 @@ from lofar.lta.ingest.server.config import MOM_BASE_URL from lofar.common import isProductionEnvironment from lofar.lta.ingest.server.sip import * from lofar.common.util import humanreadablesize -from lofar.mom.simpleapis.momhttpclient import BaseMomClient +from lofar.mom.simpleapis.momhttpclient import BaseMoMClient -class MoMClient(BaseMomClient): +class MoMClient(BaseMoMClient): """This is an HTTP client that knows how to use the Single Sign On of Mom2. It is used instead of a SOAP client, because SOAPpy doesn't support diff --git a/LTA/ltastorageoverview/lib/scraper.py b/LTA/ltastorageoverview/lib/scraper.py index 7ad4e84c577a26fcd38143f8e88fc5a6d64890ae..493cf943ceb3a1130b26bae2ff582ddcf6889ea2 100755 --- a/LTA/ltastorageoverview/lib/scraper.py +++ b/LTA/ltastorageoverview/lib/scraper.py @@ -40,7 +40,7 @@ from random import random, randint logger = logging.getLogger() -VISIT_INTERVAL = datetime.timedelta(days=3) +VISIT_INTERVAL = datetime.timedelta(days=7) LEXAR_HOST = 'ingest@lexar004.offline.lofar' class FileInfo: @@ -249,18 +249,17 @@ class ResultGetterThread(threading.Thread): '''Helper class to query Locations asynchronously for results. Gets the result for the first Location in the locations deque and appends it to the results deque Appends the subdirectory Locations at the end of the locations deque for later processing''' - def __init__(self, dbcreds, dir_id, log_queries=False): + def __init__(self, dbcreds, dir_id): threading.Thread.__init__(self) self.daemon = True self.dbcreds = dbcreds - self.log_queries = log_queries self.dir_id = dir_id def run(self): '''A single location is pop\'ed from the locations deque and the results are queried. Resulting subdirectories are appended to the locations deque''' try: - with store.LTAStorageDb(self.dbcreds, self.log_queries) as db: + with store.LTAStorageDb(self.dbcreds) as db: dir = db.directory(self.dir_id) if not dir: @@ -279,7 +278,7 @@ class ResultGetterThread(threading.Thread): def rescheduleVisit(): for i in range(5): try: - with store.LTAStorageDb(self.dbcreds, self.log_queries) as db: + with store.LTAStorageDb(self.dbcreds) as db: logger.info('Rescheduling %s for new visit.' % (location.path(),)) db.updateDirectoryLastVisitTime(self.dir_id, datetime.datetime.utcnow() - VISIT_INTERVAL + datetime.timedelta(mins=1)) break @@ -291,7 +290,7 @@ class ResultGetterThread(threading.Thread): result = location.getResult() logger.info(result) - with store.LTAStorageDb(self.dbcreds, self.log_queries) as db: + with store.LTAStorageDb(self.dbcreds) as db: # convert the result.files list into a dict #with (filename, dir_id) as key and a tuple with all file info as value result_file_tuple_dict = {} @@ -376,7 +375,7 @@ class ResultGetterThread(threading.Thread): logger.error('Error while scanning %s\n%s' % (location.path(), str(e))) if 'does not exist' in str(e): - with store.LTAStorageDb(self.dbcreds, self.log_queries) as db: + with store.LTAStorageDb(self.dbcreds) as db: db.deleteDirectory(self.dir_id) else: rescheduleVisit() @@ -384,7 +383,7 @@ class ResultGetterThread(threading.Thread): except Exception as e: logger.exception(str(e)) - with store.LTAStorageDb(self.dbcreds, self.log_queries) as db: + with store.LTAStorageDb(self.dbcreds) as db: logger.info('Rescheduling dir_id %d for new visit.' % (self.dir_id,)) db.updateDirectoryLastVisitTime(self.dir_id, datetime.datetime.utcnow() - VISIT_INTERVAL) @@ -458,7 +457,6 @@ def main(): help='Name of the bus exchange on the broker on which the ingest notifications are published, default: %default') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') - parser.add_option('-Q', '--log-queries', dest='log_queries', action='store_true', help='log all pqsl queries') parser.add_option_group(dbcredentials.options_group(parser)) parser.set_defaults(dbcredentials="LTASO") (options, args) = parser.parse_args() @@ -471,7 +469,7 @@ def main(): dbcreds = dbcredentials.parse_options(options) logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword()) - db = store.LTAStorageDb(dbcreds, options.log_queries) + db = store.LTAStorageDb(dbcreds) populateDbWithLTASitesAndRootDirs(db) # for each site we want one or more ResultGetterThreads @@ -508,7 +506,8 @@ def main(): # spawn new ResultGetterThreads # do not overload this host system - while (numLocationsInQueues() > 0 and + num_waiting = numLocationsInQueues() + while (num_waiting > 0 and totalNumGetters() < options.parallel and os.getloadavg()[0] < 4*multiprocessing.cpu_count()): sitesStats = db.visitStats(datetime.datetime.utcnow() - VISIT_INTERVAL) @@ -547,13 +546,15 @@ def main(): logger.debug("chosen_site_name: %s chosen_dir_id: %s", chosen_site_name, chosen_dir_id) # make and start a new ResultGetterThread the location deque of the chosen site - newGetter = ResultGetterThread(dbcreds, chosen_dir_id, options.log_queries) + newGetter = ResultGetterThread(dbcreds, chosen_dir_id) newGetter.start() getters[chosen_site_name].append(newGetter) cleanupFinishedGetters() - logger.info('numLocationsInQueues=%d totalNumGetters=%d siteQueueLengths: %s load_5min: %.1f' % (numLocationsInQueues(), + # refresh num_waiting + num_waiting = numLocationsInQueues() + logger.info('numLocationsInQueues=%d totalNumGetters=%d siteQueueLengths: %s load_5min: %.1f' % (num_waiting, totalNumGetters(), ' '.join(['%s:%d' % (name, stats['queue_length']) for name, stats in list(sitesStats.items())]), os.getloadavg()[0])) @@ -561,7 +562,7 @@ def main(): # sleep before main loop next iteration # to wait for some results # and some getters to finish - time.sleep(5 if numLocationsInQueues() <= options.parallel else 0.25) + time.sleep(30 if num_waiting <= options.parallel else 0.25) # all locations were processed diff --git a/LTA/ltastorageoverview/lib/store.py b/LTA/ltastorageoverview/lib/store.py index 61edb8f3bb65095ef07013d18e1daea3a9861d01..fcf2730da15285e8d5c8fdf9784f33ac1588a66a 100644 --- a/LTA/ltastorageoverview/lib/store.py +++ b/LTA/ltastorageoverview/lib/store.py @@ -41,10 +41,8 @@ class LTAStorageDb(PostgresDatabaseConnection): def __init__(self, dbcreds=None): """Create an instance of a LTAStorageDb :param dbcredentials.DBCredentials dbcreds: the credential for logging in into the db - :param bool log_queries: do or don't log all queries """ - super(LTAStorageDb, self).__init__(dbcreds=dbcreds, - log_queries=log_queries) + super(LTAStorageDb, self).__init__(dbcreds=dbcreds) def insertSite(self, siteName, srmurl): """insert a site into the database diff --git a/LTA/ltastorageoverview/test/integration_test_store.py b/LTA/ltastorageoverview/test/integration_test_store.py index 5a11b0335cdfcc8ada1a57f61815c1e3b4439e2c..d7450eebd2cef7b04393cfaff3c5f8c70052a837 100755 --- a/LTA/ltastorageoverview/test/integration_test_store.py +++ b/LTA/ltastorageoverview/test/integration_test_store.py @@ -40,7 +40,7 @@ class IntegrationTestLTAStorageDb(CommonLTAStorageDbTest): 2) test if the automatically computed tree- and dirstats are correct. """ - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: base_time = datetime.utcnow() base_time -= timedelta(seconds=base_time.second, microseconds=base_time.microsecond) diff --git a/LTA/ltastorageoverview/test/test_ingesteventhandler.py b/LTA/ltastorageoverview/test/test_ingesteventhandler.py index 2214a846fad7a34c78519f8bda3256fbccb1e94e..893d9a9bfa56eaa416f457a81d203a8bfdf80c56 100755 --- a/LTA/ltastorageoverview/test/test_ingesteventhandler.py +++ b/LTA/ltastorageoverview/test/test_ingesteventhandler.py @@ -34,7 +34,7 @@ class TestLTASOIngestEventHandler(CommonLTAStorageDbTest): super(TestLTASOIngestEventHandler, self).setUp() # fill empty database with simple sites and root dirs - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: db.insertSiteIfNotExists('siteA', 'srm://siteA.foo.bar:8443') db.insertSiteIfNotExists('siteB', 'srm://siteB.foo.bar:8443') @@ -48,7 +48,7 @@ class TestLTASOIngestEventHandler(CommonLTAStorageDbTest): def _markAllDirectoriesRecentlyVisited(self): """pretend that all dirs were recently visited """ - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: db.executeQuery('''update scraper.last_directory_visit set visit_date=%s;''', (datetime.utcnow(), )) db.commit() @@ -56,7 +56,7 @@ class TestLTASOIngestEventHandler(CommonLTAStorageDbTest): def test_01_schedule_srmurl_for_visit_unknown_site(self): """ try to schedule some unknown site's surl. Should raise. """ - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: handler = LTASOIngestEventHandler(dbcreds=self.dbcreds) with self.assertRaises(LookupError) as context: @@ -68,7 +68,7 @@ class TestLTASOIngestEventHandler(CommonLTAStorageDbTest): """ Test core method _mark_directory_for_a_visit for all known root dirs. Should set the last visit time for each dir way in the past. """ - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: handler = LTASOIngestEventHandler(dbcreds=self.dbcreds) now = datetime.utcnow() @@ -91,7 +91,7 @@ class TestLTASOIngestEventHandler(CommonLTAStorageDbTest): """ Test core method _insert_missing_directory_tree_if_needed for all known root dirs. Should result in new directory entries in the database for the new sub directories only. """ - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: handler = LTASOIngestEventHandler(dbcreds=self.dbcreds) for site in db.sites(): @@ -132,7 +132,7 @@ class TestLTASOIngestEventHandler(CommonLTAStorageDbTest): """ Test core method _insert_missing_directory_tree_if_needed for a path with an unknown root dir Should raise LookupError. """ - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: handler = LTASOIngestEventHandler(dbcreds=self.dbcreds) for site in db.sites(): @@ -145,7 +145,7 @@ class TestLTASOIngestEventHandler(CommonLTAStorageDbTest): """ Test higher level method _schedule_srmurl_for_visit for all known root dirs. Should result in marking the dir matching the surl as being the dir which should be visited next. """ - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: handler = LTASOIngestEventHandler(dbcreds=self.dbcreds) for site in db.sites(): @@ -172,7 +172,7 @@ class TestLTASOIngestEventHandler(CommonLTAStorageDbTest): """ Test higher level method _schedule_srmurl_for_visit for all new unknown subdirs of the known root dirs. Should result in marking the dir matching the surl as being the dir which should be visited next. """ - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: handler = LTASOIngestEventHandler(dbcreds=self.dbcreds) for site in db.sites(): @@ -209,7 +209,7 @@ class TestLTASOIngestEventHandler(CommonLTAStorageDbTest): """ Test higher level method _schedule_srmurl_for_visit for a path with an unknown root dir Should raise LookupError. """ - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: handler = LTASOIngestEventHandler(dbcreds=self.dbcreds) for site in db.sites(): @@ -254,7 +254,7 @@ class TestLTASOIngestEventHandler(CommonLTAStorageDbTest): sync_event.set() with SyncedLTASOIngestEventHandler(self.dbcreds, busname=busname): - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: for site in db.sites(): for root_dir in db.rootDirectoriesForSite(site['id']): self._markAllDirectoriesRecentlyVisited() diff --git a/LTA/ltastorageoverview/test/test_lso_webservice.py b/LTA/ltastorageoverview/test/test_lso_webservice.py index b19f70ceff3d9d7bfbcb69a316a2c087e35d845f..7375d9b282d075ef293d50e7fa3066766ae7592b 100755 --- a/LTA/ltastorageoverview/test/test_lso_webservice.py +++ b/LTA/ltastorageoverview/test/test_lso_webservice.py @@ -80,7 +80,7 @@ def setUpModule(): logger.info('finished setting up test LTASO database') - webservice.db = store.LTAStorageDb(dbcreds, True) + webservice.db = store.LTAStorageDb(dbcreds) logger.info('filling test LTASO database with test data') webservice.db.insertSiteIfNotExists('siteA', 'srm://siteA.org') diff --git a/LTA/ltastorageoverview/test/test_scraper.py b/LTA/ltastorageoverview/test/test_scraper.py index d274f66d7485a5e230a32d70027cacfe6c7ad516..12278b4968088ef31e5f93d43b065dc97b998bcb 100755 --- a/LTA/ltastorageoverview/test/test_scraper.py +++ b/LTA/ltastorageoverview/test/test_scraper.py @@ -21,6 +21,7 @@ import logging +import unittest from common_test_ltastoragedb import * from lofar.lta.ltastorageoverview import scraper diff --git a/LTA/ltastorageoverview/test/test_store.py b/LTA/ltastorageoverview/test/test_store.py index d7c5bda6e7d3e2461df14625e8b367818d55fd78..d4095f5465ab50cfa86a1ba016f5278da8a6c7b1 100755 --- a/LTA/ltastorageoverview/test/test_store.py +++ b/LTA/ltastorageoverview/test/test_store.py @@ -25,7 +25,7 @@ from pprint import pformat from common_test_ltastoragedb import * from lofar.lta.ltastorageoverview import store -from lofar.common.postgres import FETCH_ALL +from lofar.common.postgres import FETCH_ALL, PostgresDBQueryExecutionError import logging logger = logging.getLogger(__name__) @@ -33,7 +33,7 @@ logger = logging.getLogger(__name__) class TestLTAStorageDb(CommonLTAStorageDbTest): def testSites(self): - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: siteA_id = db.insertSiteIfNotExists('siteA', 'srm://siteA.org') siteB_id = db.insertSiteIfNotExists('siteB', 'srm://siteB.org') @@ -50,7 +50,7 @@ class TestLTAStorageDb(CommonLTAStorageDbTest): self.assertEqual('siteB', site['name']) def testRootDirs(self): - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: siteA_id = db.insertSiteIfNotExists('siteA', 'srm://siteA.org') siteB_id = db.insertSiteIfNotExists('siteB', 'srm://siteB.org') @@ -96,12 +96,12 @@ class TestLTAStorageDb(CommonLTAStorageDbTest): self.assertEqual([], root_dirs_non_existing_site) def testNonExistingDir(self): - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: dir = db.directoryByName('fjsdka;58432aek5843rfsjd8-sa') self.assertEqual(None, dir) def testLeastRecentlyVisitedDirectory(self): - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: db.insertSiteIfNotExists('siteA', 'srm://siteA.org') dir_ids = [] @@ -127,7 +127,7 @@ class TestLTAStorageDb(CommonLTAStorageDbTest): self.assertEquals(dir_ids[2], lvr_dir_id) def testDuplicateSubDirs(self): - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: db.insertSiteIfNotExists('siteA', 'srm://siteA.org') db.insertSiteIfNotExists('siteB', 'srm://siteB.org') @@ -142,8 +142,8 @@ class TestLTAStorageDb(CommonLTAStorageDbTest): self.assertNotEquals(None, subDirA2_id) self.assertNotEquals(None, subDirB1_id) - subDirA1a_id = db.insertSubDirectory('foo', dirA_id) - self.assertEquals(None, subDirA1a_id) + with self.assertRaises(PostgresDBQueryExecutionError): + db.insertSubDirectory('foo', dirA_id) def _fill_test_db_with_sites_and_root_dirs(self, db): """ @@ -162,7 +162,7 @@ class TestLTAStorageDb(CommonLTAStorageDbTest): """ Test core method _insertMissingDirectoryTreeIfNeeded for all known root dirs. Should result in new directory entries in the database for the new sub directories only. """ - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: self._fill_test_db_with_sites_and_root_dirs(db) for site in db.sites(): @@ -200,7 +200,7 @@ class TestLTAStorageDb(CommonLTAStorageDbTest): """ Test core method _insertMissingDirectoryTreeIfNeeded for a path with an unknown root dir Should raise LookupError. """ - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: self._fill_test_db_with_sites_and_root_dirs(db) for site in db.sites(): @@ -211,7 +211,7 @@ class TestLTAStorageDb(CommonLTAStorageDbTest): self.assertTrue('Could not find parent root dir' in str(context.exception)) def testProjectsAndObservations(self): - with store.LTAStorageDb(self.dbcreds, True) as db: + with store.LTAStorageDb(self.dbcreds) as db: #first insert a lot of data... db.insertSiteIfNotExists('juelich', 'srm://lofar-srm.fz-juelich.de:8443') db.insertSiteIfNotExists('sara', 'srm://srm.grid.sara.nl:8443') diff --git a/QA/QA_Service/lib/QABusListener.py b/QA/QA_Service/lib/QABusListener.py index 6961d69d84da257ea401a8ba8122676e341b0a60..a1c77127c18a9399d03565f2433c6b7aa901fdca 100644 --- a/QA/QA_Service/lib/QABusListener.py +++ b/QA/QA_Service/lib/QABusListener.py @@ -26,6 +26,7 @@ Typical usage is to derive your own subclass from QABusListener and implement th from lofar.messaging.messagebus import BusListener, AbstractMessageHandler, LofarMessage, EventMessage from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.qa.service.config import DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX +from lofar.common.util import single_line_with_single_spaces import logging @@ -62,19 +63,19 @@ class QAEventMessageHandler(AbstractMessageHandler): raise ValueError("QAEventMessageHandler.handleMessage: unknown subject: %s" % msg.subject) def onConvertedMS2Hdf5(self, msg_content): - pass + logger.info("%s.onConvertedMS2Hdf5(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content)) def onClustered(self, msg_content): - pass + logger.info("%s.onClustered(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content)) def onCreatedInspectionPlots(self, msg_content): - pass + logger.info("%s.onCreatedInspectionPlots(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content)) def onFinished(self, msg_content): - pass + logger.info("%s.onFinished(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content)) def onError(self, msg_content): - pass + logger.info("%s.onError(%s)", self.__class__.__name__, single_line_with_single_spaces(msg_content)) class QABusListener(BusListener): def __init__(self, @@ -85,9 +86,12 @@ class QABusListener(BusListener): num_threads: int = 1, broker: str = DEFAULT_BROKER): """ - RABusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. - Typical usage is to derive your own subclass from RABusListener and implement the specific on<SomeMessage> methods that you are interested in. + QABusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. + Typical usage is to derive your own subclass from QABusListener and implement the specific on<SomeMessage> methods that you are interested in. """ + if not issubclass(handler_type, QAEventMessageHandler): + raise TypeError("handler_type should be a QAEventMessageHandler subclass") + super().__init__(handler_type, handler_kwargs, exchange, routing_key, None, num_threads, broker) diff --git a/QA/QA_Service/lib/qa_service.py b/QA/QA_Service/lib/qa_service.py index 7895225d95236f6e7cb42a51f25b7bed32e3ceb5..3226cf27d82aaeacef074630fa6dd4670b029ab7 100644 --- a/QA/QA_Service/lib/qa_service.py +++ b/QA/QA_Service/lib/qa_service.py @@ -32,6 +32,8 @@ from lofar.common.cep4_utils import * logger = logging.getLogger(__name__) +QA_BASE_DIR = '/data/qa' + #TODO: idea: convert periodically while observing? class QAOTDBEventMessageHandler(UsingToBusMixin, OTDBEventMessageHandler): @@ -40,7 +42,7 @@ class QAOTDBEventMessageHandler(UsingToBusMixin, OTDBEventMessageHandler): upon observation/pipeline completion. The qa processes convert MS (measurement sets) to hdf5 qa files, and then starts generating plots from the hdf5 file. ''' - def __init__(self, qa_base_dir = '/data/qa'): + def __init__(self, qa_base_dir = QA_BASE_DIR): """ Instantiate a QAService which listens on the given messagebus for Completion messages. See also the superclass, OTDBEventMessageHandler. @@ -245,6 +247,16 @@ class QAOTDBEventMessageHandler(UsingToBusMixin, OTDBEventMessageHandler): logging.exception('error in _cluster_h5_file: %s', e) self._send_event_message('Error', {'otdb_id': otdb_id, 'message': str(e)}) +class QAService(OTDBBusListener): + def __init__(self, qa_base_dir=QA_BASE_DIR, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + """ + The QAService is a QAService handling otdb events via the QAOTDBEventMessageHandler. + :param exchange: valid message exchange address + :param broker: valid broker host (default: None, which means localhost) + """ + super().__init__(handler_type=QAOTDBEventMessageHandler, + handler_kwargs={'qa_base_dir': qa_base_dir}, + exchange=exchange, broker=broker) def main(): @@ -269,8 +281,7 @@ def main(): logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) #start the qa service - with OTDBBusListener(handler_type=QAOTDBEventMessageHandler, - exchange=options.exchange, broker=options.broker): + with QAService(exchange=options.exchange, broker=options.broker): #loop and wait for messages or interrupt. waitForInterrupt() diff --git a/QA/QA_Service/test/t_qa_service.py b/QA/QA_Service/test/t_qa_service.py index 9061968ce9ecdfbbd8b8d691452584bc801fb2c2..7ebb12e93d66d74eaaa8383412873c94a8a45c0a 100755 --- a/QA/QA_Service/test/t_qa_service.py +++ b/QA/QA_Service/test/t_qa_service.py @@ -31,47 +31,54 @@ logger = logging.getLogger(__name__) from lofar.qa.service.qa_service import QAService from lofar.qa.service.QABusListener import * from lofar.qa.hdf5_io import * -from lofar.messaging.messagebus import TemporaryQueue +from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor from lofar.messaging.messages import EventMessage from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_SUBJECT # the tests below test is multi threaded (even multi process) -# define a QABusListener-derivative to handle synchronization (set the *_events) -class SynchronizingQABusListener(QABusListener): +# define a SynchronizationQABusListener-derivative to handle synchronization (set the *_events) +class SynchronizationQABusListener(QABusListener): + class SynchronizationQAEventMessageHandler(QAEventMessageHandler): + def __init__(self, listener): + super().__init__() + self.listener = listener + + def onConvertedMS2Hdf5(self, msg_content): + self.listener.converted_msg_content = msg_content + self.listener.converted_event.set() + + def onCreatedInspectionPlots(self, msg_content): + self.listener.plotted_msg_content = msg_content + self.listener.plotted_event.set() + + def onFinished(self, msg_content): + self.listener.finished_msg_content = msg_content + self.listener.finished_event.set() + + def onClustered(self, msg_content): + self.listener.clustered_msg_content = msg_content + self.listener.clustered_event.set() + + def onError(self, msg_content): + self.listener.error_msg_content = msg_content + self.listener.error_event.set() + ''' - the tests below test is multi threaded (even multi process) + the tests below test are multi threaded (even multi process) this QABusListener-derivative handles synchronization (set the *_events) and stores the msg_content results for expected result checking ''' - def __init__(self, busname): - super(SynchronizingQABusListener, self).__init__(busname=busname) + def __init__(self, exchange): + super().__init__(handler_type=SynchronizationQABusListener.SynchronizationQAEventMessageHandler, + handler_kwargs={'listener':self}, + exchange=exchange) self.converted_event = Event() self.clustered_event = Event() self.plotted_event = Event() self.finished_event = Event() self.error_event = Event() - def onConvertedMS2Hdf5(self, msg_content): - self.converted_msg_content = msg_content - self.converted_event.set() - - def onCreatedInspectionPlots(self, msg_content): - self.plotted_msg_content = msg_content - self.plotted_event.set() - - def onFinished(self, msg_content): - self.finished_msg_content = msg_content - self.finished_event.set() - - def onClustered(self, msg_content): - self.clustered_msg_content = msg_content - self.clustered_event.set() - - def onError(self, msg_content): - self.error_msg_content = msg_content - self.error_event.set() - class TestQAService(unittest.TestCase): ''' @@ -79,21 +86,17 @@ class TestQAService(unittest.TestCase): ''' def setUp(self): ''' - quite complicated setup to setup test qpid-queues + quite complicated setup to setup test message-exchanges/queues and mock away ssh calls to cep4 and mock away dockerized commands ''' - self.tmp_qa_queue = TemporaryQueue(__class__.__name__ + "_qa_notification") - self.tmp_qa_queue.open() - self.addCleanup(self.tmp_qa_queue.close) - - self.tmp_otdb_queue = TemporaryQueue(__class__.__name__ + "_qa_notification") - self.tmp_otdb_queue.open() - self.addCleanup(self.tmp_otdb_queue.close) - self.TEST_UUID = uuid.uuid1() self.TEST_OTDB_ID = 999999 + self.tmp_exchange = TemporaryExchange("%s_%s" % (__class__.__name__, self.TEST_UUID)) + self.tmp_exchange.open() + self.addCleanup(self.tmp_exchange.close) + # where to store the test results self.TEST_DIR = '/tmp/qa_service_%s' % self.TEST_UUID self.TEST_H5_FILE = 'L%s.MS_extract.h5' % (self.TEST_OTDB_ID,) @@ -159,7 +162,7 @@ class TestQAService(unittest.TestCase): def send_otdb_task_completing_event(self): '''helper method: create a ToBus and send a completing EventMessage''' - with self.tmp_otdb_queue.create_tobus() as sender: + with self.tmp_exchange.create_tobus() as sender: msg = EventMessage(subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT, content={"treeID": self.TEST_OTDB_ID, "state": 'completing', @@ -213,12 +216,10 @@ class TestQAService(unittest.TestCase): self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker # start the QAService (the object under test) - with QAService(qa_notification_busname=self.tmp_qa_queue.address, - otdb_notification_busname=self.tmp_otdb_queue.address, - qa_base_dir=self.TEST_DIR): + with BusListenerJanitor(QAService(exchange=self.tmp_exchange.address, qa_base_dir=self.TEST_DIR)): # start listening for QA event messages from the QAService - with SynchronizingQABusListener(self.tmp_qa_queue.address) as qa_listener: + with BusListenerJanitor(SynchronizationQABusListener(exchange=self.tmp_exchange.address)) as qa_listener: # trigger a qa process by sending otdb task completing event # this will result in the QAService actually doing its magic self.send_otdb_task_completing_event() @@ -315,11 +316,10 @@ class TestQAService(unittest.TestCase): self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker # start the QAService (the object under test) - with QAService(qa_notification_busname=self.tmp_qa_queue.address, - otdb_notification_busname=self.tmp_otdb_queue.address, - qa_base_dir=self.TEST_DIR): + with BusListenerJanitor(QAService(exchange=self.tmp_exchange.address, qa_base_dir=self.TEST_DIR)): + # start listening for QA event messages from the QAService - with SynchronizingQABusListener(self.tmp_qa_queue.address) as qa_listener: + with BusListenerJanitor(SynchronizationQABusListener(exchange=self.tmp_exchange.address)) as qa_listener: # trigger a qa process by sending otdb task completing event # this will result in the QAService actually doing its magic self.send_otdb_task_completing_event() @@ -384,11 +384,10 @@ class TestQAService(unittest.TestCase): self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker # start the QAService (the object under test) - with QAService(qa_notification_busname=self.tmp_qa_queue.address, - otdb_notification_busname=self.tmp_otdb_queue.address, - qa_base_dir=self.TEST_DIR): + with BusListenerJanitor(QAService(exchange=self.tmp_exchange.address, qa_base_dir=self.TEST_DIR)): + # start listening for QA event messages from the QAService - with SynchronizingQABusListener(self.tmp_qa_queue.address) as qa_listener: + with BusListenerJanitor(SynchronizationQABusListener(exchange=self.tmp_exchange.address)) as qa_listener: # trigger a qa process by sending otdb task completing event # this will result in the QAService actually doing its magic self.send_otdb_task_completing_event() @@ -436,11 +435,10 @@ class TestQAService(unittest.TestCase): self.wrap_command_in_cep4_cpu_node_ssh_call_mock.side_effect = mocked_wrap_command_in_cep4_cpu_node_ssh_call # start the QAService (the object under test) - with QAService(qa_notification_busname=self.tmp_qa_queue.address, - otdb_notification_busname=self.tmp_otdb_queue.address, - qa_base_dir=self.TEST_DIR): + with BusListenerJanitor(QAService(exchange=self.tmp_exchange.address, qa_base_dir=self.TEST_DIR)): + # start listening for QA event messages from the QAService - with SynchronizingQABusListener(self.tmp_qa_queue.address) as qa_listener: + with BusListenerJanitor(SynchronizationQABusListener(exchange=self.tmp_exchange.address)) as qa_listener: # trigger a qa process by sending otdb task completing event # this will result in the QAService actually doing its magic self.send_otdb_task_completing_event() @@ -461,8 +459,8 @@ class TestQAService(unittest.TestCase): self.ssh_cmd_list_mock.assert_not_called() -if __name__ == '__main__': - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) +if __name__ == '__main__': #run the unit tests unittest.main() diff --git a/SAS/DataManagement/ResourceTool/resourcetool.py b/SAS/DataManagement/ResourceTool/resourcetool.py index 33664d7c1276caeafef39feb8ef1dd59c0e1705c..182e784f2ea11d816c3d0d930ac6276ffeff1b78 100755 --- a/SAS/DataManagement/ResourceTool/resourcetool.py +++ b/SAS/DataManagement/ResourceTool/resourcetool.py @@ -47,7 +47,7 @@ import logging from datetime import datetime, timedelta from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC +from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from lofar.common.util import humanreadablesize logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.WARN) @@ -393,7 +393,7 @@ def main(args): status = 0 radb = None try: - radb = RARPC(busname=options.busname, broker=options.broker) + radb = RADBRPC.create(exchange=options.busname, broker=options.broker) db_resource_list = radb.getResources(resource_types=options.resource_type, include_availability=True) diff --git a/SAS/DataManagement/ResourceTool/test/tresourcetool.py b/SAS/DataManagement/ResourceTool/test/tresourcetool.py index 5e7eec1289c4b02d256f30e34187e977ed433eb7..ebe10b83fb1bc22627240c314ce2f2e222695357 100755 --- a/SAS/DataManagement/ResourceTool/test/tresourcetool.py +++ b/SAS/DataManagement/ResourceTool/test/tresourcetool.py @@ -175,6 +175,11 @@ class RADB_mock: radb_mock = RADB_mock() class RADBRPC_mock: + + @staticmethod + def create(**kwargs): + return RADBRPC_mock() + def __init__(self, busname=None, # don't care about any of these here servicename=None, broker=None, @@ -321,7 +326,7 @@ logger = logging.getLogger(__name__) # Hook the mock class in place after import and before running the program import lofar.sas.datamanagement.resourcetool.resourcetool as rt -rt.RARPC = RADBRPC_mock +rt.RADBRPC = RADBRPC_mock # List all resources and claims (no time range bounds) rv = rt.main(['-G', 'INSTRUMENT', '-T', 'None', '-S', 'None']) diff --git a/SAS/MoM/MoMQueryService/MoMQueryServiceServer/test/t_momqueryservice.py b/SAS/MoM/MoMQueryService/MoMQueryServiceServer/test/t_momqueryservice.py index 2205c19ae078a9db4216bb4a5e1f07c5cbc39ed5..6c35f5954fb7c0e4a2d7b5ef2c0ab2e80204ce91 100755 --- a/SAS/MoM/MoMQueryService/MoMQueryServiceServer/test/t_momqueryservice.py +++ b/SAS/MoM/MoMQueryService/MoMQueryServiceServer/test/t_momqueryservice.py @@ -41,7 +41,8 @@ except ImportError as e: from lofar.common.dbcredentials import Credentials from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC -from lofar.mom.momqueryservice.momqueryservice import MoMDatabaseWrapper, MoMQueryServiceMessageHandler + +from lofar.mom.momqueryservice.momqueryservice import MoMQueryServiceMessageHandler, MoMDatabaseWrapper trigger_specification = '<?xml version="1.0" encoding="UTF-8"?>\ <p:trigger xsi:schemaLocation="http://www.astron.nl/LofarTrigger LofarTrigger.xsd"\ @@ -1011,11 +1012,11 @@ class TestMoMDatabaseWrapper(unittest.TestCase): project_priority = 42 def setUp(self): - logger_patcher = mock.patch('lofar.mom.momqueryservice.momqueryservice.logger') + logger_patcher = mock.patch('lofar.mom.simpleapis.momdbclient.logger') self.addCleanup(logger_patcher.stop) self.logger_mock = logger_patcher.start() - mysql_patcher = mock.patch('lofar.mom.momqueryservice.momqueryservice.connector') + mysql_patcher = mock.patch('lofar.mom.simpleapis.momdbclient.connector') self.addCleanup(mysql_patcher.stop) self.mysql_mock = mysql_patcher.start() @@ -1532,6 +1533,7 @@ class TestMoMDatabaseWrapper(unittest.TestCase): self.assertIsNone(self.mom_database_wrapper.get_storagemanager(1234)) +#TODO: remove skip of unittest @unittest.skip("Skipping integration test") class IntegrationTestMoMDatabaseWrapper(unittest.TestCase): database_credentials = Credentials() @@ -1563,6 +1565,7 @@ class IntegrationTestMoMDatabaseWrapper(unittest.TestCase): # create db wrapper for tests self.database_credentials.port = self.mysqld.dsn()['port'] + self.mom_database_wrapper = MoMDatabaseWrapper(self.database_credentials) logger.info('...finished setting up test MoM database') diff --git a/SAS/OTDB_Services/test/t_TreeStatusEvents.py b/SAS/OTDB_Services/test/t_TreeStatusEvents.py index bfe9a61ed303487c444fd2d3f53473282169d9ba..0b070818a72eaa82b7a11afa068d022809d9d91c 100644 --- a/SAS/OTDB_Services/test/t_TreeStatusEvents.py +++ b/SAS/OTDB_Services/test/t_TreeStatusEvents.py @@ -75,21 +75,21 @@ try: otdb_connection = pg.connect(**database_credentials.pg_connect_options()) - with TemporaryQueue(__name__) as tmp_queue: - with tmp_queue.create_frombus() as frombus: - - t = threading.Thread(target=create_service, args=(tmp_queue.address, database_credentials)) - t.daemon = True - t.start() - - otdb_connection.query("select setTreeState(1, %d, %d::INT2,'%s')" % (1099266, 500, False)) - msg = frombus.receive(timeout=5) # TreeStateEVent are send every 2 seconds - frombus.ack() - msg.show() - try: - ok = (msg.body['treeID'] == 1099266 and msg.body['state'] == 'queued') - except IndexError: - ok = False + with TemporaryExchange(__name__) as tmp_exchange: + with TemporaryQueue(__name__, exchange=tmp_exchange.address) as tmp_queue: + with tmp_queue.create_frombus() as frombus: + + t = threading.Thread(target=create_service, args=(tmp_exchange.address, database_credentials)) + t.daemon = True + t.start() + + otdb_connection.query("select setTreeState(1, %d, %d::INT2,'%s')" % (1099266, 500, False)) + msg = frombus.receive(timeout=5, acknowledge=True) # TreeStateEVent are send every 2 seconds + logger.info(msg) + try: + ok = (msg.content['treeID'] == 1099266 and msg.content['state'] == 'queued') + except IndexError: + ok = False sys.exit(not ok) # 0 = success finally: diff --git a/SAS/ResourceAssignment/RATaskSpecifiedService/test/tRATaskSpecified.py b/SAS/ResourceAssignment/RATaskSpecifiedService/test/tRATaskSpecified.py index 2152d0cf57a7de23678ce9f1b2c3c6d3293b4829..ecfe05af7b6f191b95434ec5c0cdb1de67248606 100644 --- a/SAS/ResourceAssignment/RATaskSpecifiedService/test/tRATaskSpecified.py +++ b/SAS/ResourceAssignment/RATaskSpecifiedService/test/tRATaskSpecified.py @@ -11,7 +11,7 @@ from unittest import mock from lofar.sas.resourceassignment.rataskspecified.RATaskSpecified import \ RATaskSpecifiedOTDBEventMessageHandler -from mock import MagicMock +from unittest.mock import MagicMock import logging logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py index 7e2f199093779d67925abcd82ad05ba4e41c4024..2ebb425e4353574d7b4fc90a33d495ccc8c4564f 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/resource_assigner.py @@ -29,8 +29,8 @@ import logging from lofar.common.cache import cache from lofar.messaging.messages import EventMessage from lofar.messaging.messagebus import ToBus -from lofar.messaging import RPCClient from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.common.util import single_line_with_single_spaces from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from lofar.sas.otdb.otdbrpc import OTDBRPC @@ -46,7 +46,7 @@ from lofar.sas.datamanagement.storagequery.rpc import StorageQueryRPC from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC from lofar.mac.observation_control_rpc import ObservationControlRPCClient -from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_RESOURCEESTIMATOR_SERVICENAME +from lofar.sas.resourceassignment.resourceassignmentestimator.rpc import ResourceEstimatorRPC from lofar.sas.resourceassignment.common.specification import Specification @@ -73,7 +73,7 @@ class ResourceAssigner(object): """ self.radbrpc = RADBRPC.create(exchange=exchange, broker=broker) - self.rerpc = RPCClient(service_name=DEFAULT_RESOURCEESTIMATOR_SERVICENAME, exchange=exchange, broker=broker) + self.rerpc = ResourceEstimatorRPC.create(exchange=exchange, broker=broker) self.otdbrpc = OTDBRPC.create(exchange=exchange, broker=broker) self.momrpc = MoMQueryRPC.create(exchange=exchange, broker=broker) self.sqrpc = StorageQueryRPC.create(exchange=exchange, broker=broker) @@ -200,7 +200,7 @@ class ResourceAssigner(object): subject = 'Task' + new_status[0].upper() + new_status[1:] #TODO this is MAGIC, needs explanation! event_message = EventMessage(subject="%s.%s" % (DEFAULT_RA_NOTIFICATION_PREFIX, subject), content=content) - logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) + logger.info('Sending notification %s: %s' % (subject, single_line_with_single_spaces(content))) self.ra_notification_bus.send(event_message) def _kill_task(self, task): @@ -233,7 +233,7 @@ class ResourceAssigner(object): otdb_id = specification_tree['otdb_id'] - estimates = self.rerpc.execute("get_estimated_resources", specification_tree=specification_tree) + estimates = self.rerpc.get_estimated_resources(specification_tree) logger.info('Resource Estimator reply = %s', estimates) if estimates['errors']: diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resource_availability_checker.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resource_availability_checker.py index 30f663c6f9911bbd12d3666ce5f8c34007a17801..1a5ba3f2542daafbcd66fb7f74e6974aa995f7fc 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resource_availability_checker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resource_availability_checker.py @@ -156,7 +156,7 @@ class ResourceAvailabilityCheckerTest(unittest.TestCase): self.successor_task_mom_ids = [self.successor_task_mom_id] self.predecessor_task_mom_ids = [self.predecessor_task_mom_id] - rarpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC') + rarpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RADBRPC') self.addCleanup(rarpc_patcher.stop) self.rarpc_mock = rarpc_patcher.start() self.rarpc_mock.getTask.side_effect = get_task_side_effect diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py index 54ec83545d3d6f2ae92d74ac26654e640b4da801..2061b5a7ce1ff400904517e6dec312cf5697d3c5 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py @@ -26,13 +26,12 @@ import sys import logging logging.basicConfig(format='%(asctime)s %(levelname)s %(process)s %(message)s', level=logging.INFO) -from lofar.sas.resourceassignment.resourceassigner.resource_assigner import ResourceAssigner +from lofar.sas.resourceassignment.resourceassigner.resource_assigner import ResourceAssigner, DEFAULT_RA_NOTIFICATION_PREFIX from lofar.sas.resourceassignment.resourceassigner.resource_availability_checker import ResourceAvailabilityChecker from lofar.sas.resourceassignment.common.specification import Specification from lofar.parameterset import parameterset from lofar.common.datetimeutils import parseDatetime - -ra_notification_prefix = "ra_notification_prefix" +from lofar.common.util import single_line_with_single_spaces class TestingResourceAssigner(ResourceAssigner): @@ -46,7 +45,6 @@ class TestingResourceAssigner(ResourceAssigner): self.curpc = curpc self.sqrpc = sqrpc self.ra_notification_bus = ra_notification_bus - self.ra_notification_prefix = ra_notification_prefix # Could mock ResourceAvailabilityChecker, but it is out of play already due to mocked DwellScheduler self.resource_availability_checker = ResourceAvailabilityChecker(rarpc) self.dwell_scheduler = dwell_scheduler @@ -563,7 +561,7 @@ class ResourceAssignerTest(unittest.TestCase): self.successor_task_mom_ids = [self.successor_task_mom_id] self.predecessor_task_mom_ids = [self.predecessor_task_mom_id] - rarpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC') + rarpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RADBRPC') self.addCleanup(rarpc_patcher.stop) self.rarpc_mock = rarpc_patcher.start() self.rarpc_mock.getTask.side_effect = get_task_side_effect @@ -1600,15 +1598,14 @@ class ResourceAssignerTest(unittest.TestCase): {'name': 'min_inter_task_delay', 'value': 60}, {'name': 'max_fill_ratio_CEP4_bandwidth', 'value': 0.75} ] - def rerpc_mock(params, timeout=10): - specification_tree = params["specification_tree"] + def rerpc_mock_get_estimated_resources(specification_tree): otdb_id = specification_tree['otdb_id'] - return self.rerpc_replymessage[str(otdb_id)], self.rerpc_status + return self.rerpc_replymessage[str(otdb_id)] - rerpc_patcher = mock.patch('lofar.messaging.RPC') + rerpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentestimator.rpc.ResourceEstimatorRPC') self.addCleanup(rerpc_patcher.stop) self.rerpc_mock = rerpc_patcher.start() - self.rerpc_mock.side_effect = rerpc_mock + self.rerpc_mock.get_estimated_resources.side_effect = rerpc_mock_get_estimated_resources otdbrpc_patcher = mock.patch('lofar.sas.otdb.otdbrpc') self.addCleanup(otdbrpc_patcher.stop) @@ -1788,7 +1785,7 @@ class ResourceAssignerTest(unittest.TestCase): def test_get_resource_estimates_should_request_needed_resources(self): self.resource_assigner._get_resource_estimates(self.specification_tree) - self.rerpc_mock.assert_any_call({"specification_tree": self.specification_tree}, timeout=10) + self.rerpc_mock.get_estimated_resources.any_calls_with(self.specification_tree) # def test_do_assignment_logs_when_otdb_id_not_needed_resources(self): # self.spec_mock.radb_id = self.otdb_id + 11 @@ -1862,11 +1859,10 @@ class ResourceAssignerTest(unittest.TestCase): self.resource_assigner._get_resource_estimates(self.specification_tree) def ra_notification_bus_send_called_with(self, content, subject): - found = False for call in self.ra_notification_bus_mock.send.call_args_list: - if call[0][0].subject == subject and call[0][0].body == content: - found = True - return found + if call[0][0].subject == subject and call[0][0].content == content: + return True + return False def test_do_assignment_notifies_bus_when_it_was_unable_to_schedule_Conflict(self): content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id} @@ -1962,9 +1958,9 @@ class ResourceAssignerTest(unittest.TestCase): self.spec_mock.set_status.assert_called_with('scheduled') def assertBusNotificationAndLogging(self, content, subject): - self.assertTrue(self.ra_notification_bus_send_called_with(content, ra_notification_prefix + subject)) + self.assertTrue(self.ra_notification_bus_send_called_with(content, "%s.%s" %(DEFAULT_RA_NOTIFICATION_PREFIX, subject))) self.logger_mock.info.assert_any_call('Sending notification %s: %s' % - (subject, str(content).replace('\n', ' '))) + (subject, single_line_with_single_spaces(content))) def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification_with_mom_bug(self): exception_str = "Error something went wrong" diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulechecker.py index ebd217a7450fae8ac86b26cc94b5baa353d5e65f..aa3c5f271978c238a9e6afbd820ccb89c278eece 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulechecker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulechecker.py @@ -44,7 +44,7 @@ class ScheduleCheckerTest(unittest.TestCase): thread_patcher.start() self.addCleanup(thread_patcher.stop) - self.rarpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC') + self.rarpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RADBRPC') self.addCleanup(self.rarpc_patcher.stop) self.rarpc_mock = self.rarpc_patcher.start() diff --git a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py index 0264f7192c2693da6bf927454090be63238623d1..d9231b77d4246b409e61163a664fe7506e97276d 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py +++ b/SAS/ResourceAssignment/ResourceAssigner/test/t_schedulers.py @@ -130,8 +130,6 @@ class BasicSchedulerTest(SchedulerTest): # Allocation must succeed and be committed self.assertTrue(allocation_successful) - self.assertTrue(scheduler.radb.committed) - self.assertFalse(scheduler.radb.rolled_back) # Claim must be present in database claims = self.radb.getResourceClaims(task_ids=task_id, extended=True) @@ -162,8 +160,6 @@ class BasicSchedulerTest(SchedulerTest): scheduler = self.new_scheduler(task_id, lambda _: estimates) (allocation_successful, changed_tasks) = scheduler.allocate_resources() - self.assertTrue(scheduler.radb.committed) - self.assertFalse(scheduler.radb.rolled_back) # Allocation must succeed self.assertTrue(allocation_successful) @@ -187,13 +183,9 @@ class BasicSchedulerTest(SchedulerTest): if self.__class__ == BasicSchedulerTest: # This inheritence of test is not ideal # Allocation must fail, and commit called so we get a conflicted state self.assertFalse(allocation_successful) - self.assertTrue(scheduler.radb.committed) - self.assertFalse(scheduler.radb.rolled_back) else: # Allocation must fail, and rollback called self.assertFalse(allocation_successful) - self.assertFalse(scheduler.radb.committed) - self.assertTrue(scheduler.radb.rolled_back) def test_schedule_two_tasks_too_large_task(self): """ Whether two tasks that fit individually but not together will be rejected by the scheduler. """ @@ -362,8 +354,6 @@ class StationSchedulerTest(BasicSchedulerTest): # Allocation must fail self.assertFalse(allocation_successful) self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) - self.assertFalse(scheduler.radb.committed) - self.assertTrue(scheduler.radb.rolled_back) def test_find_overlap_stations(self): @@ -394,8 +384,6 @@ class StationSchedulerTest(BasicSchedulerTest): # Allocation must fail self.assertFalse(allocation_successful) - self.assertFalse(scheduler.radb.committed) - self.assertTrue(scheduler.radb.rolled_back) def test_require_more_stations_than_available(self): """ Test whether requiring too many stations (than are available) fails. """ @@ -418,8 +406,6 @@ class StationSchedulerTest(BasicSchedulerTest): self.assertFalse(allocation_successful) self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) - self.assertFalse(scheduler.radb.committed) - self.assertTrue(scheduler.radb.rolled_back) def test_2obs_coexist(self): @@ -478,17 +464,14 @@ class PrioritySchedulerTest(StationSchedulerTest): # The PriorityScheduler must not regress on the StationScheduler, so we inherit all its tests def mock_momrpc(self): - class FakeMoMQueryService(object): - def get_project_priorities_for_objects(self, mom_ids): - # priority increments by 1000 ids - return {mom_id: mom_id // 1000 for mom_id in mom_ids} + def momrpc_mock_get_project_priorities_for_objects(mom_ids): + # priority increments by 1000 ids + return {mom_id: mom_id // 1000 for mom_id in mom_ids} - self.fake_momrpc = FakeMoMQueryService() - - momrpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.MoMQueryRPC') + momrpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.MoMQueryRPC.get_project_priorities_for_objects') self.addCleanup(momrpc_patcher.stop) self.momrpc_mock = momrpc_patcher.start() - self.momrpc_mock.return_value = self.fake_momrpc + self.momrpc_mock.side_effect = momrpc_mock_get_project_priorities_for_objects def mock_datetime(self): datetime_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.schedulers.datetime') diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py index 73e134175a133468a71482d3b4a82b0d014a4c13..3f22d8d35acbe6765da60e25bbddeb2c3299ac1e 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py @@ -40,13 +40,10 @@ class RADBError(Exception): class RADatabase(PostgresDatabaseConnection): def __init__(self, dbcreds: DBCredentials, - log_queries: bool=False, - auto_commit_selects: bool=True, num_connect_retries: int=5, connect_retry_interval: float=1.0): super().__init__(dbcreds=dbcreds, - log_queries=log_queries, - auto_commit_selects=auto_commit_selects, + auto_commit_selects=False, num_connect_retries=num_connect_retries, connect_retry_interval=connect_retry_interval) self._taskStatusName2IdCache = {} diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py index 39e8780c0c151ff8573d88da520289d6ba5c3773..89a69bac043124e497db465e8718f3ae08f761f9 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py @@ -48,7 +48,7 @@ class RADBPGListener(PostgresListener): self.event_bus = ToBus(exchange=exchange, broker=broker) - self.radb = RADatabase(dbcreds=dbcreds, log_queries=False) + self.radb = RADatabase(dbcreds=dbcreds) self.subscribe('task_update', self.onTaskUpdated) self.subscribe('task_insert', self.onTaskInserted) @@ -195,7 +195,6 @@ def main(): with RADBPGListener(exchange=options.exchange, dbcreds=dbcreds, broker=options.broker) as listener: - listener.radb.insertSpecificationAndTask(0,0,'approved', 'observation', datetime.utcnow(), datetime.utcnow(), '', 'CEP4') listener.waitWhileListening() if __name__ == '__main__': diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/radb_common_testing.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/radb_common_testing.py index 2fa1c5971945c05950426157a87dbf5da9a01840..7487aed9b10d20fb1bcf255bcb960e68e3180e0f 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/radb_common_testing.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/radb_common_testing.py @@ -70,7 +70,7 @@ class RADBCommonTest(unittest.TestCase): port = database_credentials.port) # set up radb python module - self.radb = RADatabase(database_credentials, log_queries = True) + self.radb = RADatabase(database_credentials) self.radb.connect() logger.info('...finished setting up test RA database') diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt b/SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt index b33a0d32c8e16f3727c56053b0f05d0155f02284..03ff4dd8551be198f6d6941272b0e074bae927c1 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/CMakeLists.txt @@ -8,6 +8,7 @@ include(PythonInstall) set(_py_files __init__.py service.py + rpc.py config.py ) diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/rpc.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/rpc.py new file mode 100644 index 0000000000000000000000000000000000000000..50db47f1043d1afc4a3119e10e58b67a5b9b5b2b --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/rpc.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2017 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +import logging +from lofar.messaging import RPCClient, RPCClientContextManagerMixin, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT +from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_RESOURCEESTIMATOR_SERVICENAME + +''' Simple RPC client for ResourceEstimator Service +''' + +logger = logging.getLogger(__name__) + +class ResourceEstimatorRPC(RPCClientContextManagerMixin): + def __init__(self, rpc_client: RPCClient = None): + """Create an instance of the RADBRPC using the given RPCClient, + or if None given, to a default RPCClient connecting to the DEFAULT_RESOURCEESTIMATOR_SERVICENAME service""" + super().__init__() + self._rpc_client = rpc_client or RPCClient(service_name=DEFAULT_RESOURCEESTIMATOR_SERVICENAME) + + @staticmethod + def create(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, timeout: int=DEFAULT_RPC_TIMEOUT): + """Create a ResourceEstimatorRPC connecting to the given exchange/broker on the default DEFAULT_RESOURCEESTIMATOR_SERVICENAME service""" + return ResourceEstimatorRPC(RPCClient(service_name=DEFAULT_RESOURCEESTIMATOR_SERVICENAME, exchange=exchange, broker=broker, timeout=timeout)) + + def get_estimated_resources(self, specification_tree: dict) -> dict: + return self._rpc_client.execute('get_estimated_resources') + diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py index c2436df6c7a7b1593fa4fef26ce51f3799a25b60..af277dee5943c1c812c93e770a2c592f6906e5c1 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py @@ -140,7 +140,7 @@ class ResourceEstimatorHandler(ServiceMessageHandler): logger.warning("ID %s is not a pipeline, observation or reservation." % otdb_id) return {'errors': ["ID %s is not a pipeline, observation or reservation." % otdb_id]} - def _get_estimated_resources(self, specification_tree): + def _get_estimated_resources(self, specification_tree: dict) -> dict: """ Input is like: {"otdb_id": otdb_id, "state": 'prescheduled', 'specification': ..., 'task_type': "pipeline", 'task_subtype': "long baseline pipeline", diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_resource_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_resource_estimator.py index 704ceb36784a8ddf84bc517eb8adde47a284cea6..721d57c3fbffb09c7e630d1d578136adf7751617 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_resource_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/test/t_resource_estimator.py @@ -122,7 +122,7 @@ class TestResourceEstimatorHandler(unittest.TestCase): resource_estimator_handler = ResourceEstimatorHandler() - resource_estimator_handler.handle_message({'specification_tree': specification_tree}) + resource_estimator_handler._get_estimated_resources(specification_tree=specification_tree) ObservationResourceEstimator_mock().verify_and_estimate.assert_called() @@ -140,7 +140,7 @@ class TestResourceEstimatorHandler(unittest.TestCase): resource_estimator_handler = ResourceEstimatorHandler() - resource_estimator_handler.handle_message({'specification_tree': specification_tree}) + resource_estimator_handler._get_estimated_resources(specification_tree=specification_tree) ReservationResourceEstimator_mock().verify_and_estimate.assert_called() @@ -157,7 +157,7 @@ class TestResourceEstimatorHandler(unittest.TestCase): specification_tree = self.generate_project_reservation_spec() resource_estimator_handler = ResourceEstimatorHandler() - resource_estimator_handler.handle_message({'specification_tree': specification_tree}) + resource_estimator_handler._get_estimated_resources(specification_tree=specification_tree) ReservationResourceEstimator_mock().verify_and_estimate.assert_called() @@ -174,7 +174,7 @@ class TestResourceEstimatorHandler(unittest.TestCase): specification_tree = self.generate_calibration_pipeline_spec() resource_estimator_handler = ResourceEstimatorHandler() - resource_estimator_handler.handle_message({'specification_tree': specification_tree}) + resource_estimator_handler._get_estimated_resources(specification_tree=specification_tree) CalibrationPipelineResourceEstimator_mock().verify_and_estimate.assert_called() @@ -197,7 +197,7 @@ class TestResourceEstimatorHandler(unittest.TestCase): specification_tree = self.generate_averaging_pipeline_spec() resource_estimator_handler = ResourceEstimatorHandler() - resource_estimator_handler.handle_message({'specification_tree': specification_tree}) + resource_estimator_handler._get_estimated_resources(specification_tree=specification_tree) CalibrationPipelineResourceEstimator_mock().verify_and_estimate.assert_called() @@ -232,7 +232,7 @@ class TestResourceEstimatorHandler(unittest.TestCase): specification_tree['predecessors']) resource_estimator_handler = ResourceEstimatorHandler() - resource_estimator_handler.handle_message({'specification_tree': specification_tree}) + resource_estimator_handler._get_estimated_resources(specification_tree=specification_tree) ImagePipelineResourceEstimator_mock().verify_and_estimate.assert_called() @@ -241,7 +241,7 @@ class TestResourceEstimatorHandler(unittest.TestCase): specification_tree = self.generate_imageing_pipeline_mss() resource_estimator_handler = ResourceEstimatorHandler() - resource_estimator_handler.handle_message({'specification_tree': specification_tree}) + resource_estimator_handler._get_estimated_resources(specification_tree=specification_tree) ImagePipelineResourceEstimator_mock().verify_and_estimate.assert_called() @@ -264,7 +264,7 @@ class TestResourceEstimatorHandler(unittest.TestCase): specification_tree = self.generate_long_baseline_pipeline_spec() resource_estimator_handler = ResourceEstimatorHandler() - resource_estimator_handler.handle_message({'specification_tree': specification_tree}) + resource_estimator_handler._get_estimated_resources(specification_tree=specification_tree) LongBaselinePipelineResourceEstimator_mock().verify_and_estimate.assert_called() @@ -322,7 +322,7 @@ class TestResourceEstimatorHandler(unittest.TestCase): specification_tree = self.generate_pulsar_pipeline_spec() resource_estimator_handler = ResourceEstimatorHandler() - resource_estimator_handler.handle_message({'specification_tree': specification_tree}) + resource_estimator_handler._get_estimated_resources(specification_tree=specification_tree) PulsarPipelineResourceEstimator_mock().verify_and_estimate.assert_called() @@ -348,7 +348,7 @@ class TestResourceEstimatorHandler(unittest.TestCase): resource_estimator_handler = ResourceEstimatorHandler() - estimate = resource_estimator_handler.handle_message({'specification_tree': specification_tree}) + estimate = resource_estimator_handler._get_estimated_resources(specification_tree=specification_tree) estimate_list = estimate['estimates'] diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/service.py b/SAS/ResourceAssignment/ResourceAssignmentService/service.py index e59436997bc489c99a1da08e1ecae4018f2d72d3..6d5742076a8f86d233a443c96c6f9cc765f43b02 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentService/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/service.py @@ -14,9 +14,9 @@ from lofar.common import dbcredentials logger = logging.getLogger(__name__) class RADBServiceMessageHandler(ServiceMessageHandler): - def __init__(self, dbcreds: dbcredentials.Credentials, log_queries: bool=False): + def __init__(self, dbcreds: dbcredentials.Credentials): super().__init__() - self.radb = radb.RADatabase(dbcreds=dbcreds, log_queries=log_queries) + self.radb = radb.RADatabase(dbcreds=dbcreds) def init_service_handler(self, service_name: str=DEFAULT_RADB_SERVICENAME): super().init_service_handler(service_name) @@ -399,7 +399,6 @@ def main(): description='runs the resourceassignment database service') parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') parser.add_option("-e", "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the broker, default: %default") - parser.add_option('-q', '--log-queries', dest='log_queries', action='store_true', help='log all pqsl queries') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') parser.add_option_group(dbcredentials.options_group(parser)) parser.set_defaults(dbcredentials="RADB") @@ -414,7 +413,6 @@ def main(): with createService(exchange=options.exchange, broker=options.broker, - log_queries=options.log_queries, dbcreds=dbcreds): waitForInterrupt() diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/test/test_ra_service_and_rpc.py b/SAS/ResourceAssignment/ResourceAssignmentService/test/test_ra_service_and_rpc.py index d4367721a751010a6e98dd1028826f52160289c9..d08b85918dce2db4550d34f52c6237fd0effb19c 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentService/test/test_ra_service_and_rpc.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/test/test_ra_service_and_rpc.py @@ -3,68 +3,74 @@ import unittest import datetime import logging + +logger = logging.getLogger(__name__) +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + from lofar.messaging import TemporaryExchange from lofar.sas.resourceassignment.resourceassignmentservice.service import createService -from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC +from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from unittest.mock import patch -with TemporaryExchange(__name__) as tmp_exchange: - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - logger = logging.getLogger(__name__) - - # the system under test is the service and the rpc, not the RADatabase - # so, patch (mock) the RADatabase class during these tests. - # when the service instantiates an RADatabase it will get the mocked class. - with patch('lofar.sas.resourceassignment.database.radb.RADatabase', autospec=True) as MockRADatabase: - mock = MockRADatabase.return_value - # modify the return values of the various RADatabase methods with pre-cooked answers - mock.getTaskStatuses.return_value = [{'id': 1, 'name': 'opened'}, {'id': 2, 'name': 'scheduled'}] - mock.getTaskTypes.return_value = [{'id': 0, 'name': 'OBSERVATION'}, {'id': 1, 'name': 'PIPELINE'}] - mock.getResourceClaimStatuses.return_value = [{'id': 0, 'name': 'TENTATIVE'},{'id': 1, 'name': 'CLAIMED'},{'id': 2, 'name': 'CONFLICT'}] - mock.getUnits.return_value = [{'units': 'rsp_channel_bit', 'id': 0},{'units': 'bytes', 'id': 1},{'units': 'rcu_board', 'id': 2},{'units': 'bits/second', 'id': 3},{'units': 'cores', 'id': 4}] - mock.getResourceTypes.return_value = [{'unit_id': 0, 'id': 0, 'unit': 'rsp_channel_bit', 'name': 'rsp'},{'unit_id': 1, 'id': 1, 'unit': 'bytes', 'name': 'tbb'},{'unit_id': 2, 'id': 2, 'unit': 'rcu_board', 'name': 'rcu'},{'unit_id': 3, 'id': 3, 'unit': 'bits/second', 'name': 'bandwidth'},{'unit_id': 4, 'id': 4, 'unit': 'cores', 'name': 'processor'},{'unit_id': 1, 'id': 5, 'unit': 'bytes', 'name': 'storage'}] - mock.getResourceGroupTypes.return_value = [{'id': 0, 'name': 'instrument'},{'id': 1, 'name': 'cluster'},{'id': 2, 'name': 'station_group'},{'id': 3, 'name': 'station'},{'id': 4, 'name': 'node_group'},{'id': 5, 'name': 'node'}] - mock.getResources.return_value = [{'type_id': 0, 'type': 'rsp', 'id': 0, 'unit': 'rsp_channel_bit', 'name': 'rsp'},{'type_id': 1, 'type': 'tbb', 'id': 1, 'unit': 'bytes', 'name': 'tbb'},{'type_id': 2, 'type': 'rcu', 'id': 2, 'unit': 'rcu_board', 'name': 'rcu'},{'type_id': 3, 'type': 'bandwidth', 'id': 3, 'unit': 'bits/second', 'name': 'bandwidth'}] - mock.getResourceGroups.return_value = [{'type_id': 0, 'type': 'instrument', 'id': 0, 'name': 'LOFAR'},{'type_id': 1, 'type': 'cluster', 'id': 1, 'name': 'CEP2'}] - mock.getTasks.return_value = [{'status': 'prepared', 'type_id': 1, 'status_id': 200, 'specification_id': 1, 'starttime': datetime.datetime(2015, 11, 30, 12, 0), 'mom_id': -1, 'endtime': datetime.datetime(2015, 11, 30, 15, 0), 'type': 'PIPELINE', 'id': 5, 'otdb_id': -1}] - mock.getTask.return_value = mock.getTasks.return_value[0] - mock.getTask.side_effect = lambda x: mock.getTasks.return_value[0] if x == 5 else None - mock.getResourceClaims.return_value = [{'username': 'paulus', 'status': 'CLAIMED', 'user_id': 1, 'task_id': 5, 'status_id': 1, 'resource_id': 1, 'session_id': 1, 'claim_size': 10, 'starttime': datetime.datetime(2015, 11, 30, 12, 0), 'endtime': datetime.datetime(2015, 11, 30, 12, 0), 'id': 5}] - - class Test1(unittest.TestCase): - '''Test''' - - def test(self): - '''basic test ''' - with RARPC(exchange=tmp_exchange.address, timeout=1) as rpc: - self.assertEqual(mock.getTaskStatuses.return_value, rpc.getTaskStatuses()) - self.assertEqual(mock.getTaskTypes.return_value, rpc.getTaskTypes()) - self.assertEqual(mock.getResourceClaimStatuses.return_value, rpc.getResourceClaimStatuses()) - self.assertEqual(mock.getUnits.return_value, rpc.getUnits()) - self.assertEqual(mock.getResourceTypes.return_value, rpc.getResourceTypes()) - self.assertEqual(mock.getResourceGroupTypes.return_value, rpc.getResourceGroupTypes()) - self.assertEqual(mock.getResources.return_value, rpc.getResources()) - self.assertEqual(mock.getResourceGroups.return_value, rpc.getResourceGroups()) - self.assertEqual(mock.getTasks.return_value, rpc.getTasks()) - self.assertEqual(mock.getResourceClaims.return_value, rpc.getResourceClaims()) - - #TODO: fix this test - #self.assertEqual(None, rpc.getTask(1)) - #self.assertEqual(mock.getTask.return_value, rpc.getTask(5)) - - # test non existing service method, should raise - with self.assertRaises(Exception) as e: - rpc.execute('foo') - - ## test method with wrong args - #with self.assertRaises(TypeError) as cm: - #rpc.rpc('GetTasks', timeout=1, fooarg='bar') - #self.assertTrue('got an unexpected keyword argument \'fooarg\'' in str(cm.exception)) - - # create and run the service - with createService(exchange=tmp_exchange.address): - - # and run all tests - unittest.main() + +class Test1(unittest.TestCase): + '''Test''' + + def test(self): + '''basic test ''' + + + with TemporaryExchange(__name__) as tmp_exchange: + + # the system under test is the service and the rpc, not the RADatabase + # so, patch (mock) the RADatabase class during these tests. + # when the service instantiates an RADatabase it will get the mocked class. + with patch('lofar.sas.resourceassignment.database.radb.RADatabase', autospec=True) as MockRADatabase: + mock = MockRADatabase.return_value + # modify the return values of the various RADatabase methods with pre-cooked answers + mock.getTaskStatuses.return_value = [{'id': 1, 'name': 'opened'}, {'id': 2, 'name': 'scheduled'}] + mock.getTaskTypes.return_value = [{'id': 0, 'name': 'OBSERVATION'}, {'id': 1, 'name': 'PIPELINE'}] + mock.getResourceClaimStatuses.return_value = [{'id': 0, 'name': 'TENTATIVE'},{'id': 1, 'name': 'CLAIMED'},{'id': 2, 'name': 'CONFLICT'}] + mock.getUnits.return_value = [{'units': 'rsp_channel_bit', 'id': 0},{'units': 'bytes', 'id': 1},{'units': 'rcu_board', 'id': 2},{'units': 'bits/second', 'id': 3},{'units': 'cores', 'id': 4}] + mock.getResourceTypes.return_value = [{'unit_id': 0, 'id': 0, 'unit': 'rsp_channel_bit', 'name': 'rsp'},{'unit_id': 1, 'id': 1, 'unit': 'bytes', 'name': 'tbb'},{'unit_id': 2, 'id': 2, 'unit': 'rcu_board', 'name': 'rcu'},{'unit_id': 3, 'id': 3, 'unit': 'bits/second', 'name': 'bandwidth'},{'unit_id': 4, 'id': 4, 'unit': 'cores', 'name': 'processor'},{'unit_id': 1, 'id': 5, 'unit': 'bytes', 'name': 'storage'}] + mock.getResourceGroupTypes.return_value = [{'id': 0, 'name': 'instrument'},{'id': 1, 'name': 'cluster'},{'id': 2, 'name': 'station_group'},{'id': 3, 'name': 'station'},{'id': 4, 'name': 'node_group'},{'id': 5, 'name': 'node'}] + mock.getResources.return_value = [{'type_id': 0, 'type': 'rsp', 'id': 0, 'unit': 'rsp_channel_bit', 'name': 'rsp'},{'type_id': 1, 'type': 'tbb', 'id': 1, 'unit': 'bytes', 'name': 'tbb'},{'type_id': 2, 'type': 'rcu', 'id': 2, 'unit': 'rcu_board', 'name': 'rcu'},{'type_id': 3, 'type': 'bandwidth', 'id': 3, 'unit': 'bits/second', 'name': 'bandwidth'}] + mock.getResourceGroups.return_value = [{'type_id': 0, 'type': 'instrument', 'id': 0, 'name': 'LOFAR'},{'type_id': 1, 'type': 'cluster', 'id': 1, 'name': 'CEP2'}] + mock.getTasks.return_value = [{'status': 'prepared', 'type_id': 1, 'status_id': 200, 'specification_id': 1, 'starttime': datetime.datetime(2015, 11, 30, 12, 0), 'mom_id': -1, 'endtime': datetime.datetime(2015, 11, 30, 15, 0), 'type': 'PIPELINE', 'id': 5, 'otdb_id': -1}] + mock.getTask.return_value = mock.getTasks.return_value[0] + mock.getTask.side_effect = lambda x: mock.getTasks.return_value[0] if x == 5 else None + mock.getResourceClaims.return_value = [{'username': 'paulus', 'status': 'CLAIMED', 'user_id': 1, 'task_id': 5, 'status_id': 1, 'resource_id': 1, 'session_id': 1, 'claim_size': 10, 'starttime': datetime.datetime(2015, 11, 30, 12, 0), 'endtime': datetime.datetime(2015, 11, 30, 12, 0), 'id': 5}] + + # create and run the service + with createService(exchange=tmp_exchange.address): + with RADBRPC.create(exchange=tmp_exchange.address, timeout=1) as rpc: + self.assertEqual(mock.getTaskStatuses.return_value, rpc.getTaskStatuses()) + self.assertEqual(mock.getTaskTypes.return_value, rpc.getTaskTypes()) + self.assertEqual(mock.getResourceClaimStatuses.return_value, rpc.getResourceClaimStatuses()) + self.assertEqual(mock.getUnits.return_value, rpc.getUnits()) + self.assertEqual(mock.getResourceTypes.return_value, rpc.getResourceTypes()) + self.assertEqual(mock.getResourceGroupTypes.return_value, rpc.getResourceGroupTypes()) + self.assertEqual(mock.getResources.return_value, rpc.getResources()) + self.assertEqual(mock.getResourceGroups.return_value, rpc.getResourceGroups()) + self.assertEqual(mock.getTasks.return_value, rpc.getTasks()) + self.assertEqual(mock.getResourceClaims.return_value, rpc.getResourceClaims()) + + #TODO: fix this test + #self.assertEqual(None, rpc.getTask(1)) + #self.assertEqual(mock.getTask.return_value, rpc.getTask(5)) + + # test non existing service method, should raise + with self.assertRaises(Exception) as e: + rpc.execute('foo') + + ## test method with wrong args + #with self.assertRaises(TypeError) as cm: + #rpc.rpc('GetTasks', timeout=1, fooarg='bar') + #self.assertTrue('got an unexpected keyword argument \'fooarg\'' in str(cm.exception)) + + +if __name__ == '__main__': + # run all tests + unittest.main() diff --git a/SAS/SpecificationServices/lib/specification_service.py b/SAS/SpecificationServices/lib/specification_service.py index 108bcccc4f9df4718dcaadc9cabdf2f089ea598d..42ca9d4ba47d3818ae8fc32be245ad247d73ef4e 100644 --- a/SAS/SpecificationServices/lib/specification_service.py +++ b/SAS/SpecificationServices/lib/specification_service.py @@ -245,7 +245,7 @@ class SpecificationHandler(ServiceMessageHandler): with self.validationrpc: response = self.validationrpc.validate_mom_specification(mom_xml) if not response["valid"]: - raise Exception("Invalid MoM specification: %s", response["error"]) + raise Exception("Invalid MoM specification: %s" % response["error"]) def _add_spec_to_mom(self, mom_xml): logger.info("about to send mom_xml: %s", mom_xml) diff --git a/SAS/SpecificationServices/test/t_specification_service.py b/SAS/SpecificationServices/test/t_specification_service.py index 273c0dd4e13a1b83a03a323e0891380aef47c5a8..bbbcb05e78b3d33c3faac468e636efcc4b974d99 100644 --- a/SAS/SpecificationServices/test/t_specification_service.py +++ b/SAS/SpecificationServices/test/t_specification_service.py @@ -493,37 +493,38 @@ class TestSpecificationHandler(unittest.TestCase): </spec:specification>''' def setUp(self): - validationrpc_patcher = mock.patch('lofar.specificationservices.specification_service.validationrpc') + validationrpc_patcher = mock.patch('lofar.specificationservices.specification_service.ValidationRPC') self.addCleanup(validationrpc_patcher.stop) self.validationrpc_mock = validationrpc_patcher.start() - + self.validationrpc_mock.create.side_effect = lambda **kwargs: self.validationrpc_mock self.validationrpc_mock.validate_mom_specification.return_value = {"valid": True} - momqueryrpc_patcher = mock.patch('lofar.specificationservices.specification_service.momqueryrpc') + momqueryrpc_patcher = mock.patch('lofar.specificationservices.specification_service.MoMQueryRPC') self.addCleanup(momqueryrpc_patcher.stop) self.momqueryrpc_mock = momqueryrpc_patcher.start() + self.momqueryrpc_mock.create.side_effect = lambda **kwargs: self.momqueryrpc_mock self.momqueryrpc_mock.folderExists.return_value = {"exists": False} self.momqueryrpc_mock.isProjectActive.return_value = {"active": True} - momimportxml_bus_patcher = mock.patch('lofar.specificationservices.specification_service.momimportxml_bus') + translationrpc_patcher = mock.patch('lofar.specificationservices.specification_service.TranslationRPC') + self.addCleanup(translationrpc_patcher.stop) + self.translationrpc_mock = translationrpc_patcher.start() + self.translationrpc_mock.create.side_effect = lambda **kwargs: self.translationrpc_mock + + momimportxml_bus_patcher = mock.patch('lofar.specificationservices.specification_service.ToBusOld') self.addCleanup(momimportxml_bus_patcher.stop) self.momimportxml_bus_mock = momimportxml_bus_patcher.start() - specificationtranslationrpc_patcher = \ - mock.patch('lofar.specificationservices.specification_service.specificationtranslationrpc') - self.addCleanup(specificationtranslationrpc_patcher.stop) - self.specificationtranslationrpc_mock = specificationtranslationrpc_patcher.start() - self.handler = SpecificationHandler() def test_add_specification_should_raise_exception_when_lofax_xml_is_invalid(self): - self.validationrpc_mock.validate_specification.return_value = {"valid": False, "error": "error message"} + self.validationrpc_mock.validate_mom_specification.return_value = {"valid": False, "error": "error message"} with self.assertRaises(Exception) as exception: self.handler.add_specification("user", self.xml) - self.assertEqual(str(exception.exception), "Invalid specification: error message") + self.assertEqual(str(exception.exception), "Invalid MoM specification: error message") def test_add_specification_should_raise_exception_when_spec_does_not_start_correctly(self): wrong_root_xml = "<xml></xml>" @@ -603,7 +604,7 @@ class TestSpecificationHandler(unittest.TestCase): def test_add_specification_should_send_correctly_translated_spec_to_mom(self): self.handler.add_specification("user", self.xml) - self.momimportxml_bus_mock.send.assert_called() + self.assertTrue(any('.send' in call[0] for call in self.momimportxml_bus_mock.mock_calls)) if __name__ == "__main__": unittest.main() diff --git a/SAS/SpecificationServices/test/t_translation_service.py b/SAS/SpecificationServices/test/t_translation_service.py index 5dfb923af6e560713151e6e7ce2b704d3b5c99d4..34eac974a55ada5070b47d698663ffa941f36193 100644 --- a/SAS/SpecificationServices/test/t_translation_service.py +++ b/SAS/SpecificationServices/test/t_translation_service.py @@ -75,14 +75,13 @@ class TestSpecificationTranslationHandler(unittest.TestCase): cls.expected_momxml_direct_type3 = testdir + "/t_translation_service.in_xml/direct_xml_translation_type3.xml" def setUp(self): - validationrpc_patcher = mock.patch('lofar.specificationservices.translation_service.validationrpc') + validationrpc_patcher = mock.patch('lofar.specificationservices.translation_service.ValidationRPC.validate_mom_specification') self.addCleanup(validationrpc_patcher.stop) self.validationrpc_mock = validationrpc_patcher.start() - - self.validationrpc_mock.validate_mom_specification.return_value = {"valid": True} + self.validationrpc_mock.return_value = {"valid": True} def test_specification_to_momspecification_should_raise_exception_if_momspec_is_invalid(self): - self.validationrpc_mock.validate_mom_specification.return_value = {"valid": False} + self.validationrpc_mock.return_value = {"valid": False} handler = SpecificationTranslationHandler() diff --git a/SAS/TriggerEmailService/Server/test/t_TriggerEmailService.py b/SAS/TriggerEmailService/Server/test/t_TriggerEmailService.py index b5cf0d50570d73fa1a575ba17fb5d6c50c034a05..bdee97c728f04168b78837fe17ce079384128b83 100755 --- a/SAS/TriggerEmailService/Server/test/t_TriggerEmailService.py +++ b/SAS/TriggerEmailService/Server/test/t_TriggerEmailService.py @@ -21,7 +21,7 @@ import unittest from unittest import mock import os -from lofar.sas.TriggerEmailService.TriggerEmailService import OTDBTriggerListener, TriggerNotificationListener, email, TriggerNotificationHandler +from lofar.sas.TriggerEmailService.TriggerEmailService import OTDBTriggerHandler, email, TriggerNotificationHandler class TestEmailing(unittest.TestCase): @@ -51,7 +51,7 @@ class TestEmailing(unittest.TestCase): self.assertNotIn("observer@astron.nl", smtp_mock.sendmail.call_args[0][1]) -class TestOTDBTriggerListener(unittest.TestCase): +class TestOTDBTriggerHandler(unittest.TestCase): project_name = "test_lofar" trigger_id = 1 obs_sas_id = 22 @@ -80,54 +80,50 @@ class TestOTDBTriggerListener(unittest.TestCase): self.addCleanup(logger_patcher.stop) self.logger_mock = logger_patcher.start() - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.start_listening') - def test_start_listening_should_open_momquery_rpc(self, super_mock): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + def test_start_handling_should_open_momquery_rpc(self): + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.start_listening() + handler.start_handling() self.momqueryrpc_mock.open.assert_called() - super_mock.assert_called() - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.stop_listening') - def test_stop_listening_should_close_momquery_rpc(self, super_mock): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + def test_stop_handling_should_close_momquery_rpc(self): + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.stop_listening() + handler.stop_handling() self.momqueryrpc_mock.close.assert_called() - super_mock.assert_called() # Aborted def test_onObservationAborted_should_not_email_when_its_not_a_trigger(self): self.momqueryrpc_mock.get_trigger_id.return_value = {'trigger_id': None, 'status': "Error"} - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationAborted(self.obs_sas_id, None) + handler.onObservationAborted(self.obs_sas_id, None) self.email_mock.assert_not_called() def test_onObservationAborted_should_email_when_its_a_trigger(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationAborted(self.obs_sas_id, None) + handler.onObservationAborted(self.obs_sas_id, None) self.email_mock.assert_called() def test_onObservationAborted_should_set_correct_subject(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationAborted(self.obs_sas_id, None) + handler.onObservationAborted(self.obs_sas_id, None) self.assertIn(str(self.trigger_id), self.email_mock.call_args[0][1]) self.assertIn(self.project_name, self.email_mock.call_args[0][1]) def test_onObservationAborted_should_set_correct_body(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationAborted(self.obs_sas_id, None) + handler.onObservationAborted(self.obs_sas_id, None) self.assertIn(str(self.trigger_id), self.email_mock.call_args[0][2]) self.assertIn(self.project_name, self.email_mock.call_args[0][2]) @@ -140,31 +136,31 @@ class TestOTDBTriggerListener(unittest.TestCase): def test_onObservationScheduled_should_not_email_when_its_not_a_trigger(self): self.momqueryrpc_mock.get_trigger_id.return_value = {'trigger_id': None, 'status': "Error"} - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationScheduled(self.obs_sas_id, None) + handler.onObservationScheduled(self.obs_sas_id, None) self.email_mock.assert_not_called() def test_onObservationScheduled_should_email_when_its_a_trigger(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationScheduled(self.obs_sas_id, None) + handler.onObservationScheduled(self.obs_sas_id, None) self.email_mock.assert_called() def test_onObservationScheduled_should_set_correct_subject(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationScheduled(self.obs_sas_id, None) + handler.onObservationScheduled(self.obs_sas_id, None) self.assertIn(str(self.trigger_id), self.email_mock.call_args[0][1]) self.assertIn(self.project_name, self.email_mock.call_args[0][1]) def test_onObservationScheduled_should_set_correct_body(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationScheduled(self.obs_sas_id, None) + handler.onObservationScheduled(self.obs_sas_id, None) self.assertIn(str(self.trigger_id), self.email_mock.call_args[0][2]) self.assertIn(self.project_name, self.email_mock.call_args[0][2]) @@ -177,31 +173,31 @@ class TestOTDBTriggerListener(unittest.TestCase): def test_onObservationFinished_should_not_email_when_its_not_a_trigger(self): self.momqueryrpc_mock.get_trigger_id.return_value = {'trigger_id': None, 'status': "Error"} - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationFinished(self.obs_sas_id, None) + handler.onObservationFinished(self.obs_sas_id, None) self.email_mock.assert_not_called() def test_onObservationFinished_should_email_when_its_a_trigger(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationFinished(self.obs_sas_id, None) + handler.onObservationFinished(self.obs_sas_id, None) self.email_mock.assert_called() def test_onObservationFinished_should_set_correct_subject(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationFinished(self.obs_sas_id, None) + handler.onObservationFinished(self.obs_sas_id, None) self.assertIn(str(self.trigger_id), self.email_mock.call_args[0][1]) self.assertIn(self.project_name, self.email_mock.call_args[0][1]) def test_onObservationFinished_should_set_correct_body(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationFinished(self.obs_sas_id, None) + handler.onObservationFinished(self.obs_sas_id, None) self.assertIn(str(self.trigger_id), self.email_mock.call_args[0][2]) self.assertIn(self.project_name, self.email_mock.call_args[0][2]) @@ -214,31 +210,31 @@ class TestOTDBTriggerListener(unittest.TestCase): def test_onObservationConflict_should_not_email_when_its_not_a_trigger(self): self.momqueryrpc_mock.get_trigger_id.return_value = {'trigger_id': None, 'status': "Error"} - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationConflict(self.obs_sas_id, None) + handler.onObservationConflict(self.obs_sas_id, None) self.email_mock.assert_not_called() def test_onObservationConflict_should_email_when_its_a_trigger(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationConflict(self.obs_sas_id, None) + handler.onObservationConflict(self.obs_sas_id, None) self.email_mock.assert_called() def test_onObservationConflict_should_set_correct_subject(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationConflict(self.obs_sas_id, None) + handler.onObservationConflict(self.obs_sas_id, None) self.assertIn(str(self.trigger_id), self.email_mock.call_args[0][1]) self.assertIn(self.project_name, self.email_mock.call_args[0][1]) def test_onObservationConflict_should_set_correct_body(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationConflict(self.obs_sas_id, None) + handler.onObservationConflict(self.obs_sas_id, None) self.assertIn(str(self.trigger_id), self.email_mock.call_args[0][2]) self.assertIn(self.project_name, self.email_mock.call_args[0][2]) @@ -249,9 +245,9 @@ class TestOTDBTriggerListener(unittest.TestCase): def test_when_trigger_send_email_should_limit_the_amount_of_requests(self, _): self.momqueryrpc_mock.getMoMIdsForOTDBIds.return_value = {self.obs_sas_id: None} - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationAborted(self.obs_sas_id, None) + handler.onObservationAborted(self.obs_sas_id, None) self.assertEqual(10, self.momqueryrpc_mock.getMoMIdsForOTDBIds.call_count) @@ -259,9 +255,9 @@ class TestOTDBTriggerListener(unittest.TestCase): def test_when_trigger_send_email_should_log_when_no_mom_id_can_be_found(self, _): self.momqueryrpc_mock.getMoMIdsForOTDBIds.return_value = {self.obs_sas_id: None} - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationAborted(self.obs_sas_id, None) + handler.onObservationAborted(self.obs_sas_id, None) self.logger_mock.error.assert_any_call("Could not retrieve a mom_id for otdb_id: %s", self.obs_sas_id) @@ -269,24 +265,24 @@ class TestOTDBTriggerListener(unittest.TestCase): def test_when_trigger_send_email_should_wait_three_seconds_between_retries(self, sleep_mock): self.momqueryrpc_mock.getMoMIdsForOTDBIds.return_value = {self.obs_sas_id: None} - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.onObservationAborted(self.obs_sas_id, None) + handler.onObservationAborted(self.obs_sas_id, None) sleep_mock.assert_called_with(3) self.assertEqual(10, sleep_mock.call_count) def test_when_trigger_send_email_should_log_when_sending_email(self): - listener = OTDBTriggerListener(self.momqueryrpc_mock) + handler = OTDBTriggerHandler(self.momqueryrpc_mock) - listener.when_trigger_send_email(self.obs_sas_id, "", "") + handler.when_trigger_send_email(self.obs_sas_id, "", "") self.logger_mock.info.assert_any_call( "Emailing otdb_id: %s, mom_id: %s, trigger_id: %s, template_subject: %s, template_body: %s", self.obs_sas_id, self.obs_mom_id, self.trigger_id, "", "") -class TestTriggerNotificationListener(unittest.TestCase): +class TestTriggerNotificationHandler(unittest.TestCase): project_name = "test_lofar" project_mom_id = 33 trigger_id = 1 @@ -1357,23 +1353,19 @@ class TestTriggerNotificationListener(unittest.TestCase): self.dwell_message = mock.MagicMock() self.dwell_message.content = self.dwell_message_content - @mock.patch('lofar.messaging.messagebus.BusListener.start_listening') - def test_start_listening_should_open_momquery_rpc(self, super_mock): - listener = TriggerNotificationListener(self.momqueryrpc_mock) + def test_start_handling_should_open_momquery_rpc(self): + handler = TriggerNotificationHandler(self.momqueryrpc_mock) - listener.start_listening() + handler.start_handling() self.momqueryrpc_mock.open.assert_called() - super_mock.assert_called() - @mock.patch('lofar.messaging.messagebus.BusListener.stop_listening') - def test_stop_listening_should_close_momquery_rpc(self, super_mock): - listener = TriggerNotificationListener(self.momqueryrpc_mock) + def test_stop_handling_should_close_momquery_rpc(self): + handler = TriggerNotificationHandler(self.momqueryrpc_mock) - listener.stop_listening() + handler.stop_handling() self.momqueryrpc_mock.close.assert_called() - super_mock.assert_called() def test_handleMessage_should_email(self): handler = TriggerNotificationHandler(self.momqueryrpc_mock) diff --git a/SAS/TriggerServices/test/t_trigger_cancellation_service.py b/SAS/TriggerServices/test/t_trigger_cancellation_service.py index 39455d2e4214d3456ac0e85eb011ce83272ce1c8..4eb6217079c9b8a386456cd8341c0d70a29e6b9e 100755 --- a/SAS/TriggerServices/test/t_trigger_cancellation_service.py +++ b/SAS/TriggerServices/test/t_trigger_cancellation_service.py @@ -21,10 +21,10 @@ import unittest from unittest import mock import os -from lofar.triggerservices.trigger_cancellation_service import TriggerCancellationService +from lofar.triggerservices.trigger_cancellation_service import TriggerCancellationHandler -class TestTriggerCancellationService(unittest.TestCase): +class TestTriggerCancellationHandler(unittest.TestCase): project_name = "test_lofar" trigger_id = 1 obs_sas_id = 22 @@ -37,47 +37,43 @@ class TestTriggerCancellationService(unittest.TestCase): self.momqueryrpc_mock.get_trigger_id.return_value = {'trigger_id': self.trigger_id, 'status': "OK" } self.momqueryrpc_mock.getMoMIdsForOTDBIds.return_value = {self.obs_sas_id: self.obs_mom_id} - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.start_listening') - def test_start_listening_opens_momquery_rpc(self, super_mock): - listener = TriggerCancellationService(self.momqueryrpc_mock) + def test_start_listening_opens_momquery_rpc(self): + handler = TriggerCancellationHandler(self.momqueryrpc_mock) - listener.start_listening() + handler.start_handling() self.momqueryrpc_mock.open.assert_called() - super_mock.assert_called() - @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.stop_listening') - def test_stop_listening_closes_momquery_rpc(self, super_mock): - listener = TriggerCancellationService(self.momqueryrpc_mock) + def test_stop_listening_closes_momquery_rpc(self): + handler = TriggerCancellationHandler(self.momqueryrpc_mock) - listener.stop_listening() + handler.stop_handling() self.momqueryrpc_mock.close.assert_called() - super_mock.assert_called() # Aborted def test_onObservationAborted_does_not_call_cancel_trigger_when_its_not_a_trigger(self): self.momqueryrpc_mock.get_trigger_id.return_value = {'trigger_id': None, 'status': "Error"} - listener = TriggerCancellationService(self.momqueryrpc_mock) + handler = TriggerCancellationHandler(self.momqueryrpc_mock) - listener.onObservationAborted(self.obs_sas_id, None) + handler.onObservationAborted(self.obs_sas_id, None) self.momqueryrpc_mock.cancel_trigger.assert_not_called() def test_onObservationAborted_calls_cancel_trigger_with_correct_trigger(self): - listener = TriggerCancellationService(self.momqueryrpc_mock) + handler = TriggerCancellationHandler(self.momqueryrpc_mock) - listener.onObservationAborted(self.obs_sas_id, None) + handler.onObservationAborted(self.obs_sas_id, None) self.momqueryrpc_mock.cancel_trigger.assert_called() def test_onObservationAborted_uses_correct_triggerid_and_sets_correct_cancellation_reason(self): - listener = TriggerCancellationService(self.momqueryrpc_mock) + handler = TriggerCancellationHandler(self.momqueryrpc_mock) - listener.onObservationAborted(self.obs_sas_id, None) + handler.onObservationAborted(self.obs_sas_id, None) # correct otdb id is used to obtain trigger call_otdb_id = self.momqueryrpc_mock.getMoMIdsForOTDBIds.call_args[0][0] @@ -93,23 +89,23 @@ class TestTriggerCancellationService(unittest.TestCase): def test_onObservationError_does_not_call_cancel_trigger_when_its_not_a_trigger(self): self.momqueryrpc_mock.get_trigger_id.return_value = {'trigger_id': None, 'status': "Error"} - listener = TriggerCancellationService(self.momqueryrpc_mock) + handler = TriggerCancellationHandler(self.momqueryrpc_mock) - listener.onObservationError(self.obs_sas_id, None) + handler.onObservationError(self.obs_sas_id, None) self.momqueryrpc_mock.cancel_trigger.assert_not_called() def test_onObservationError_calls_cancel_trigger_when_its_a_trigger(self): - listener = TriggerCancellationService(self.momqueryrpc_mock) + handler = TriggerCancellationHandler(self.momqueryrpc_mock) - listener.onObservationError(self.obs_sas_id, None) + handler.onObservationError(self.obs_sas_id, None) self.momqueryrpc_mock.cancel_trigger.assert_called() def test_onObservationError_uses_correct_triggerid_and_sets_correct_cancellation_reason(self): - listener = TriggerCancellationService(self.momqueryrpc_mock) + handler = TriggerCancellationHandler(self.momqueryrpc_mock) - listener.onObservationError(self.obs_sas_id, None) + handler.onObservationError(self.obs_sas_id, None) # correct otdb id is used to obtain trigger call_otdb_id = self.momqueryrpc_mock.getMoMIdsForOTDBIds.call_args[0][0] @@ -125,23 +121,23 @@ class TestTriggerCancellationService(unittest.TestCase): def test_onObservationConflict_does_not_call_cancel_trigger_when_its_not_a_trigger(self): self.momqueryrpc_mock.get_trigger_id.return_value = {'trigger_id': None, 'status': "Error"} - listener = TriggerCancellationService(self.momqueryrpc_mock) + handler = TriggerCancellationHandler(self.momqueryrpc_mock) - listener.onObservationConflict(self.obs_sas_id, None) + handler.onObservationConflict(self.obs_sas_id, None) self.momqueryrpc_mock.cancel_trigger.assert_not_called() def test_onObservationConflict_calls_cancel_trigger_when_its_a_trigger(self): - listener = TriggerCancellationService(self.momqueryrpc_mock) + handler = TriggerCancellationHandler(self.momqueryrpc_mock) - listener.onObservationConflict(self.obs_sas_id, None) + handler.onObservationConflict(self.obs_sas_id, None) self.momqueryrpc_mock.cancel_trigger.assert_called() def test_onObservationConflict_uses_correct_triggerid_and_sets_correct_cancellation_reason(self): - listener = TriggerCancellationService(self.momqueryrpc_mock) + handler = TriggerCancellationHandler(self.momqueryrpc_mock) - listener.onObservationConflict(self.obs_sas_id, None) + handler.onObservationConflict(self.obs_sas_id, None) # correct otdb id is used to obtain trigger call_otdb_id = self.momqueryrpc_mock.getMoMIdsForOTDBIds.call_args[0][0]