diff --git a/.gitattributes b/.gitattributes index 46e2c9f47802d36bfcc02b7e91bb8bcb79ec43d9..34122aa261d0ef58cba8f670bcca9c0a39b2b3cc 100644 --- a/.gitattributes +++ b/.gitattributes @@ -2149,6 +2149,7 @@ LTA/ltastorageoverview/bin/ltastorageoverviewwebservice.ini -text LTA/ltastorageoverview/doc/lta_storage_overview.md -text LTA/ltastorageoverview/lib/CMakeLists.txt -text LTA/ltastorageoverview/lib/__init__.py -text +LTA/ltastorageoverview/lib/ingesteventhandler.py -text LTA/ltastorageoverview/lib/ltaso/create_db_ltastorageoverview.sql -text LTA/ltastorageoverview/lib/report.py -text LTA/ltastorageoverview/lib/scraper.py -text @@ -2163,6 +2164,9 @@ LTA/ltastorageoverview/test/db_performance_test.py -text LTA/ltastorageoverview/test/integration_test_store.py -text LTA/ltastorageoverview/test/integration_test_store.run -text LTA/ltastorageoverview/test/integration_test_store.sh -text +LTA/ltastorageoverview/test/test_ingesteventhandler.py -text +LTA/ltastorageoverview/test/test_ingesteventhandler.run -text +LTA/ltastorageoverview/test/test_ingesteventhandler.sh -text LTA/ltastorageoverview/test/test_lso_webservice.py -text LTA/ltastorageoverview/test/test_lso_webservice.run -text LTA/ltastorageoverview/test/test_lso_webservice.sh -text diff --git a/LTA/ltastorageoverview/CMakeLists.txt b/LTA/ltastorageoverview/CMakeLists.txt index 4005698213f69912cbda6e544dc9140a93055f7b..3ec201ad5d387e976f5aef66eac0b95232206244 100644 --- a/LTA/ltastorageoverview/CMakeLists.txt +++ b/LTA/ltastorageoverview/CMakeLists.txt @@ -1,7 +1,7 @@ # $Id$ lofar_find_package(Python 2.6 REQUIRED) -lofar_package(ltastorageoverview 0.1 DEPENDS PyCommon) +lofar_package(ltastorageoverview 0.1 DEPENDS PyCommon LTAIngestClient) include(PythonInstall) diff --git a/LTA/ltastorageoverview/lib/CMakeLists.txt b/LTA/ltastorageoverview/lib/CMakeLists.txt index d9a9a63929c5e21b944c84c99068017f691bdd1f..8f6241c4ebb8b1670c670960273755e2c2c36aba 100644 --- a/LTA/ltastorageoverview/lib/CMakeLists.txt +++ b/LTA/ltastorageoverview/lib/CMakeLists.txt @@ -5,6 +5,7 @@ python_install( scraper.py store.py report.py + ingesteventhandler.py webservice/webservice.py webservice/__init__.py DESTINATION lofar/lta/ltastorageoverview) diff --git a/LTA/ltastorageoverview/lib/ingesteventhandler.py b/LTA/ltastorageoverview/lib/ingesteventhandler.py new file mode 100755 index 0000000000000000000000000000000000000000..6163f2e7021aa4c4dc5420756b5e94481714242e --- /dev/null +++ b/LTA/ltastorageoverview/lib/ingesteventhandler.py @@ -0,0 +1,128 @@ +# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +# $Id$ + +import logging +logger = logging.getLogger(__name__) + +from lofar.common import dbcredentials +from lofar.common.util import waitForInterrupt +from lofar.lta.ltastorageoverview import store +from lofar.lta.ltastorageoverview.scraper import VISIT_INTERVAL +from lofar.lta.ingest.common.srm import * +from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener +from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_SUBJECTS +from lofar.lta.ingest.common.config import DEFAULT_BROKER +from lofar.messaging import adaptNameToEnvironment + +DEFAULT_INGEST_NOTIFICATION_QUEUE = adaptNameToEnvironment('lofar.lta.ingest.notification.for.ltastorageoverview') + + +class IngestEventHandler(IngestBusListener): + def __init__(self, dbcreds, + busname=DEFAULT_INGEST_NOTIFICATION_QUEUE, + subjects=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, + broker=DEFAULT_BROKER): + self._dbcreds = dbcreds + super(IngestEventHandler, self).__init__(busname=busname, subjects=subjects, broker=broker) + + def onJobFinished(self, job_dict): + self._logJobNotification('finished', job_dict) + self._schedule_srmurl_for_visit(job_dict.get('srm_url')) + + def onTaskFinished(self, task_dict): + self._logJobNotification('task finised', task_dict) + self._schedule_srmurl_for_visit(task_dict.get('srm_url')) + + def _schedule_srmurl_for_visit(self, srm_url): + with store.LTAStorageDb(self._dbcreds) as db: + site = self._get_site_from_db(srm_url) + dir_path = get_dir_path_in_site(srm_url) + dir = db.directoryByName(dir_path, site['id']) + + if dir is None: + dir_id = self._insertMissingDirectoryTreeIfNeeded(srm_url) + else: + dir_id = dir.get('dir_id') + + if dir_id is not None: + self._markDirectoryForAVisit(dir_id) + + def _markDirectoryForAVisit(self, dir_id): + ''' + update the directory's last visit time to now-VISIT_INTERVAL, + so that it appears in the visitStats which are used by the scraper to determine the next directory to be visited. + :param int dir_id: the id of the directory + :return: None + ''' + with store.LTAStorageDb(self._dbcreds) as db: + return db.updateDirectoryLastVisitTime(dir_id, datetime.utcnow() - VISIT_INTERVAL) + + def _get_site_from_db(self, srm_url): + """ + find the site entry in the database for the given srm_url. + raises a lookup error if not found. + :param string srm_url: a valid srm url + :return: a site entry dict from the database + """ + site_url = get_site_surl(srm_url) + + # find site in db + with store.LTAStorageDb(self._dbcreds) as db: + site = next((s for s in db.sites() if s['url'] == site_url), None) + if site is None: + raise LookupError('Could not find site %s in database %s' % (site_url, self._dbcreds.database)) + return site + + def _insertMissingDirectoryTreeIfNeeded(self, srm_url): + with store.LTAStorageDb(self._dbcreds) as db: + # example url: srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/lc8_029/652884/L652884_SAP000_B000_P001_bf_e619e5da.tar + # or for a dir: srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/lc8_029/652884 + # site_url then becomes: srm://lofar-srm.fz-juelich.de:8443 + # dir_path then becomes: /pnfs/fz-juelich.de/data/lofar/ops/projects/lc8_029/652884 + site = self._get_site_from_db(srm_url) + site_url = site['url'] + dir_path = get_dir_path_in_site(srm_url) + + # for this site (which might have multiple root dirs), find the root_dir under which this dir_path belongs + root_dirs = db.rootDirectoriesForSite(site['id']) + parent_root_dir = next((rd for rd in root_dirs if dir_path.startswith(rd['dir_name'])), None) + + if parent_root_dir is None: + raise LookupError('Could not find parent root dir for site "%s" with root_dirs=%s for surl %s in database %s' + % (site['name'], root_dirs, srm_url, self._dbcreds.database)) + + # climb up the dir tree until a known dir is found in the database + climbing_dir_path = dir_path + db_dir = db.directoryByName(climbing_dir_path, site['id']) + missing_child_dirs = [] + while db_dir is None and parent_root_dir['dir_name'] != climbing_dir_path: + # climb up one dir, add lowest subdir as missing child + path_parts = climbing_dir_path.split('/') + missing_child_dirs.append(climbing_dir_path) + climbing_dir_path = '/'.join(path_parts[:-1]) + db_dir = db.directoryByName(climbing_dir_path, site['id']) + + # now we should have a known parent dir from the db. + # append the missing children in reverse order + # (from just under the known parent, down to the lowest missing subdir). + missing_childs_parent_dir_id = db_dir['dir_id'] + for missing_child_dir in reversed(missing_child_dirs): + missing_childs_parent_dir_id = db.insertSubDirectory(missing_child_dir, missing_childs_parent_dir_id) + + return missing_childs_parent_dir_id diff --git a/LTA/ltastorageoverview/test/CMakeLists.txt b/LTA/ltastorageoverview/test/CMakeLists.txt index b2142f8b4d86c0e5d7aa44c46a54c483b8e9b092..bb3b942d55057994bac70db523300c68aa3b57fe 100644 --- a/LTA/ltastorageoverview/test/CMakeLists.txt +++ b/LTA/ltastorageoverview/test/CMakeLists.txt @@ -4,5 +4,6 @@ include(LofarCTest) lofar_add_test(test_store) lofar_add_test(test_scraper) lofar_add_test(test_lso_webservice) +lofar_add_test(test_ingesteventhandler) lofar_add_test(integration_test_store) diff --git a/LTA/ltastorageoverview/test/test_ingesteventhandler.py b/LTA/ltastorageoverview/test/test_ingesteventhandler.py new file mode 100755 index 0000000000000000000000000000000000000000..a4ecff4337d6728143870439ca88e1c4ab3479bf --- /dev/null +++ b/LTA/ltastorageoverview/test/test_ingesteventhandler.py @@ -0,0 +1,302 @@ +#!/usr/bin/python + +# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +# $Id$ + +from datetime import datetime + +from common_test_ltastoragedb import * +from lofar.lta.ltastorageoverview import store +from lofar.lta.ltastorageoverview.ingesteventhandler import IngestEventHandler + +import logging +logger = logging.getLogger(__name__) + +class TestIngestEventHandler(CommonLTAStorageDbTest): + def setUp(self): + # allow superclass to setup empty database + super(TestIngestEventHandler, self).setUp() + + # fill empty database with simple sites and root dirs + with store.LTAStorageDb(self.dbcreds, True) as db: + db.insertSiteIfNotExists('siteA', 'srm://siteA.foo.bar:8443') + db.insertSiteIfNotExists('siteB', 'srm://siteB.foo.bar:8443') + + db.insertRootDirectory('siteA', '/root_dir_1') + db.insertRootDirectory('siteA', '/root_dir_2') + db.insertRootDirectory('siteA', '/long/path/to/root_dir_3') + db.insertRootDirectory('siteB', '/root_dir_1') + + self._markAllDirectoriesRecentlyVisited() + + def _markAllDirectoriesRecentlyVisited(self): + """pretend that all dirs were recently visited + """ + with store.LTAStorageDb(self.dbcreds, True) as db: + db.executeQuery('''update scraper.last_directory_visit + set visit_date=%s;''', (datetime.utcnow(), )) + db.commit() + + def test_01_schedule_srmurl_for_visit_unknown_site(self): + """ try to schedule some unknown site's surl. Should raise. + """ + with store.LTAStorageDb(self.dbcreds, True) as db: + handler = IngestEventHandler(dbcreds=self.dbcreds) + + with self.assertRaises(LookupError) as context: + surl = 'srm://foo.bar:1234/fdjsalfja5h43535h3oiu/5u905u3f' + handler._schedule_srmurl_for_visit(surl) + self.assertTrue('Could not find site' in context.exception.message) + + def test_02_markDirectoryForAVisit(self): + """ Test core method _markDirectoryForAVisit for all known root dirs. + Should set the last visit time for each dir way in the past. + """ + with store.LTAStorageDb(self.dbcreds, True) as db: + handler = IngestEventHandler(dbcreds=self.dbcreds) + now = datetime.utcnow() + + for site in db.sites(): + for root_dir in db.rootDirectoriesForSite(site['id']): + dir_id = root_dir['root_dir_id'] + # make sure the dir's last visit time is recent + db.updateDirectoryLastVisitTime(dir_id, now) + timestamp_before_mark = db.directoryLastVisitTime(dir_id) + self.assertEqual(now, timestamp_before_mark) + + # let the handler mark the dir for a next visit... + handler._markDirectoryForAVisit(dir_id) + + # by marking the dir for a next visit, the dir's last visit time is set way in the past. + timestamp_after_mark = db.directoryLastVisitTime(dir_id) + self.assertLess(timestamp_after_mark, timestamp_before_mark) + + def test_03_insertMissingDirectoryTreeIfNeeded(self): + """ Test core method _insertMissingDirectoryTreeIfNeeded for all known root dirs. + Should result in new directory entries in the database for the new sub directories only. + """ + with store.LTAStorageDb(self.dbcreds, True) as db: + handler = IngestEventHandler(dbcreds=self.dbcreds) + + for site in db.sites(): + for root_dir in db.rootDirectoriesForSite(site['id']): + site_surl = site['url'] + dir_path = root_dir['dir_name'] + surl = site_surl + dir_path + + # root dir should already exist + dir = db.directoryByName(dir_path, site['id']) + self.assertIsNotNone(dir) + + # let the handler insert the not-so-missing dirs. + # nothing should happen, because the root dir already exists + new_dir_id = handler._insertMissingDirectoryTreeIfNeeded(surl) + self.assertEqual(dir['dir_id'], new_dir_id) + + # now insert some new subdirs, with multiple levels. + for subdir_path in ['/foo', '/bar/xyz']: + dir_path = root_dir['dir_name'] + subdir_path + surl = site_surl + dir_path + # dir should not exist yet + self.assertIsNone(db.directoryByName(dir_path, site['id'])) + + # let the handler insert the missing dirs. + handler._insertMissingDirectoryTreeIfNeeded(surl) + + # dir should exist now + dir = db.directoryByName(dir_path, site['id']) + self.assertIsNotNone(dir) + + # check if new dir has expected root dir + parents = db.parentDirectories(dir['dir_id']) + self.assertEqual(root_dir['root_dir_id'], parents[0]['id']) + + def test_04_insertMissingDirectoryTreeIfNeeded_for_path_with_unknown_rootdir(self): + """ Test core method _insertMissingDirectoryTreeIfNeeded for a path with an unknown root dir + Should raise LookupError. + """ + with store.LTAStorageDb(self.dbcreds, True) as db: + handler = IngestEventHandler(dbcreds=self.dbcreds) + + for site in db.sites(): + with self.assertRaises(LookupError) as context: + surl = site['url'] + '/fdjsalfja5h43535h3oiu/5u905u3f' + handler._insertMissingDirectoryTreeIfNeeded(surl) + self.assertTrue('Could not find parent root dir' in context.exception.message) + + def test_05_schedule_srmurl_for_visit_for_root_dir(self): + """ Test higher level method _schedule_srmurl_for_visit for all known root dirs. + Should result in marking the dir matching the surl as being the dir which should be visited next. + """ + with store.LTAStorageDb(self.dbcreds, True) as db: + handler = IngestEventHandler(dbcreds=self.dbcreds) + + for site in db.sites(): + for root_dir in db.rootDirectoriesForSite(site['id']): + self._markAllDirectoriesRecentlyVisited() + now = datetime.utcnow() + + dir_id = root_dir['root_dir_id'] + surl = site['url'] + root_dir['dir_name'] + handler._schedule_srmurl_for_visit(surl) + + # surl was scheduled for a visit, so this dir should be the least_recent_visited_dir + site_visit_stats = db.visitStats(datetime.utcnow())[site['name']] + self.assertEqual(dir_id, site_visit_stats['least_recent_visited_dir_id']) + + # mimick a directory visit by the scraper, by setting the last visit time to now. + db.updateDirectoryLastVisitTime(dir_id, now) + + # we faked a visit, so this dir should not be the least_recent_visited_dir anymore + site_visit_stats = db.visitStats(now)[site['name']] + self.assertNotEqual(dir_id, site_visit_stats.get('least_recent_visited_dir_id')) + + def test_06_schedule_srmurl_for_visit_for_new_root_sub_dir(self): + """ Test higher level method _schedule_srmurl_for_visit for all new unknown subdirs of the known root dirs. + Should result in marking the dir matching the surl as being the dir which should be visited next. + """ + with store.LTAStorageDb(self.dbcreds, True) as db: + handler = IngestEventHandler(dbcreds=self.dbcreds) + + for site in db.sites(): + for root_dir in db.rootDirectoriesForSite(site['id']): + self._markAllDirectoriesRecentlyVisited() + now = datetime.utcnow() + + # create the subdir surl + sub_dir_name = '/foo' + sub_dir_path = root_dir['dir_name'] + sub_dir_name + surl = site['url'] + sub_dir_path + + # call the method under test + handler._schedule_srmurl_for_visit(surl) + + # surl was scheduled for a visit, all other dir's were marked as visited already... + # so there should be a new dir for this surl, and it should be the least_recent_visited_dir + site_visit_stats = db.visitStats(datetime.utcnow())[site['name']] + + least_recent_visited_dir_id = site_visit_stats.get('least_recent_visited_dir_id') + self.assertIsNotNone(least_recent_visited_dir_id) + + least_recent_visited_dir = db.directory(least_recent_visited_dir_id) + self.assertEqual(sub_dir_path, least_recent_visited_dir['dir_name']) + + # mimick a directory visit by the scraper, by setting the last visit time to now. + db.updateDirectoryLastVisitTime(least_recent_visited_dir_id, now) + + # we faked a visit, so this dir should not be the least_recent_visited_dir anymore + site_visit_stats = db.visitStats(now)[site['name']] + self.assertNotEqual(least_recent_visited_dir_id, site_visit_stats.get('least_recent_visited_dir_id')) + + def test_07_schedule_srmurl_for_visit_for_path_with_unknown_rootdir(self): + """ Test higher level method _schedule_srmurl_for_visit for a path with an unknown root dir + Should raise LookupError. + """ + with store.LTAStorageDb(self.dbcreds, True) as db: + handler = IngestEventHandler(dbcreds=self.dbcreds) + + for site in db.sites(): + with self.assertRaises(LookupError) as context: + surl = site['url'] + '/fdjsalfja5h43535h3oiu/5u905u3f' + handler._schedule_srmurl_for_visit(surl) + self.assertTrue('Could not find parent root dir' in context.exception.message) + + def test_08_integration_test_with_messagebus(self): + """ Full blown integration test listening for notifications on the bus, + and checking which dir is up for a visit next. + Needs a working local qpid broker. Test is skipped if qpid not available. + """ + try: + broker = None + connection = None + + import uuid + from threading import Event + from qpid.messaging import Connection, ConnectError + from qpidtoollibs import BrokerAgent + from lofar.messaging.messagebus import ToBus + from lofar.messaging.messages import EventMessage + from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_PREFIX + + # setup broker connection + connection = Connection.establish('127.0.0.1') + broker = BrokerAgent(connection) + + # add test service bus + busname = 'test-ingesteventhandler-%s' % (uuid.uuid1()) + broker.addExchange('topic', busname) + + sync_event = Event() + + class SyncedIngestEventHandler(IngestEventHandler): + """This derived IngestEventHandler behaves exactly like the normal + object under test IngestEventHandler, but it also sets a sync_event + to sync between the listener thread and this main test thread""" + def _handleMessage(self, msg): + super(SyncedIngestEventHandler, self)._handleMessage(msg) + sync_event.set() + + with SyncedIngestEventHandler(self.dbcreds, busname=busname): + with store.LTAStorageDb(self.dbcreds, True) as db: + for site in db.sites(): + for root_dir in db.rootDirectoriesForSite(site['id']): + self._markAllDirectoriesRecentlyVisited() + + # create the subdir surl + sub_dir_name = '/foo' + sub_dir_path = root_dir['dir_name'] + sub_dir_name + surl = site['url'] + sub_dir_path + + with ToBus(busname) as sender: + msg = EventMessage(context=DEFAULT_INGEST_NOTIFICATION_PREFIX+"TaskFinished", + content={'srm_url': surl}) + sender.send(msg) + + # wait for the handler to have processed the message + self.assertTrue(sync_event.wait(2)) + sync_event.clear() + + # surl should have been scheduled for a visit, all other dir's were marked as visited already... + # so there should be a new dir for this surl, and it should be the least_recent_visited_dir + site_visit_stats = db.visitStats(datetime.utcnow())[site['name']] + + least_recent_visited_dir_id = site_visit_stats.get('least_recent_visited_dir_id') + self.assertIsNotNone(least_recent_visited_dir_id) + + least_recent_visited_dir = db.directory(least_recent_visited_dir_id) + self.assertEqual(sub_dir_path, least_recent_visited_dir['dir_name']) + + except ImportError as e: + logger.warning("skipping test due to: %s", e) + except ConnectError as e: + logger.warning("skipping test due to: %s", e) + finally: + # cleanup test bus and exit + if broker: + broker.delExchange(busname) + if connection: + connection.close() + + +# run tests if main +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', + level=logging.INFO) + + unittest.main() diff --git a/LTA/ltastorageoverview/test/test_ingesteventhandler.run b/LTA/ltastorageoverview/test/test_ingesteventhandler.run new file mode 100755 index 0000000000000000000000000000000000000000..8b7d318ffffefcf07ed482a95250975ba792aa39 --- /dev/null +++ b/LTA/ltastorageoverview/test/test_ingesteventhandler.run @@ -0,0 +1,4 @@ +#!/bin/bash + +source python-coverage.sh +python_coverage_test "*ingesteventhandler*" test_ingesteventhandler.py diff --git a/LTA/ltastorageoverview/test/test_ingesteventhandler.sh b/LTA/ltastorageoverview/test/test_ingesteventhandler.sh new file mode 100755 index 0000000000000000000000000000000000000000..4f5d35a30389fe0c9cf66adc6add483dc6e32a0b --- /dev/null +++ b/LTA/ltastorageoverview/test/test_ingesteventhandler.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh test_ingesteventhandler diff --git a/SAS/QPIDInfrastructure/bin/populateDB.sh b/SAS/QPIDInfrastructure/bin/populateDB.sh index 8b3daa6410c58ee7926101bc0dd5ba18bc9424e9..f56d103e1ca47d2b12cdd1efe7d2d37c838d0286 100755 --- a/SAS/QPIDInfrastructure/bin/populateDB.sh +++ b/SAS/QPIDInfrastructure/bin/populateDB.sh @@ -219,13 +219,29 @@ addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.mac.notification addtoQPIDDB.py --broker $SCU --queue mom.importxml --federation $MOM_SYSTEM # ----------------------------------------- -# Ingest -> ResourceAssignment +# Ingest -> SCU # ----------------------------------------- addtoQPIDDB.py --broker $LEXAR --exchange ${PREFIX}lofar.lta.ingest.notification --federation $SCU + +# ----------------------------------------- +# Ingest -> ResourceAssignment @ SCU +# ----------------------------------------- + addtoQPIDDB.py --broker $SCU --queue ${PREFIX}lofar.lta.ingest.notification.autocleanupservice addtoQPIDDB.py --broker $SCU --bind --exchange ${PREFIX}lofar.lta.ingest.notification --queue ${PREFIX}lofar.lta.ingest.notification.autocleanupservice --routingkey LTAIngest.# +# ----------------------------------------- +# Ingest -> LTA-storage-overview @ SCU +# ----------------------------------------- + +addtoQPIDDB.py --broker $SCU --queue ${PREFIX}lofar.lta.ingest.notification.for.ltastorageoverview +addtoQPIDDB.py --broker $SCU --bind --exchange ${PREFIX}lofar.lta.ingest.notification --queue ${PREFIX}lofar.lta.ingest.notification.for.ltastorageoverview --routingkey LTAIngest.# + + +# ----------------------------------------- +# CEP4 cpu nodes +# ----------------------------------------- for head in head01.cep4.control.lofar do