diff --git a/SAS/DataManagement/Cleanup/CleanupService/service.py b/SAS/DataManagement/Cleanup/CleanupService/service.py index 931a4fbf621cd78a82404f46af68a2d92ce9f145..ecee432fc8250291b24347a355665c7c89cafced 100644 --- a/SAS/DataManagement/Cleanup/CleanupService/service.py +++ b/SAS/DataManagement/Cleanup/CleanupService/service.py @@ -6,6 +6,7 @@ import logging import os.path import socket +import time import subprocess import time from datetime import datetime @@ -263,7 +264,7 @@ class CleanupHandler(MessageHandlerInterface): except Exception as e: logger.error(str(e)) - def _removePath(self, path): + def _removePath(self, path, do_recurse=True): logger.info("Remove path: %s" % (path,)) # do various sanity checking to prevent accidental deletes @@ -304,14 +305,29 @@ class CleanupHandler(MessageHandlerInterface): logger.warn(message) return {'deleted': True, 'message': message, 'path': path} - du_result = self._sqrpc.getDiskUsageForPath(path) + du_result = self._sqrpc.getDiskUsageForPath(path) if do_recurse else {} if du_result.get('found'): logger.info("Attempting to delete %s in %s", du_result.get('disk_usage_readable', '?B'), path) else: logger.info("Attempting to delete %s", path) - self._sendNotification(subject='PathDeleting', content={'path': path, 'size': du_result.get('disk_usage', 0) }) + if do_recurse: + # LustreFS on CEP4 like many small deletes better than one large tree delete + # so, recurse into the sub_directories, + # and take a small sleep in between so other processes (like observation datawriters) can access LustreFS + # (we've seen observation data loss when deleting large trees) + subdirs_result = self.path_resolver.getSubDirectories(path) + if subdirs_result.get('found') and subdirs_result.get('sub_directories'): + sub_directories = subdirs_result['sub_directories'] + + for subdir in sub_directories: + subdir_path = os.path.join(path, subdir) + self._removePath(subdir_path, do_recurse=False) #recurse only one level deep + time.sleep(0.01) + else: + self._sendNotification(subject='PathDeleting', content={'path': path, 'size': du_result.get('disk_usage', 0) }) + cmd = ['rm', '-rf', path] hostname = socket.gethostname() @@ -325,11 +341,15 @@ class CleanupHandler(MessageHandlerInterface): message = "Deleted %s in '%s'" % (du_result.get('disk_usage_readable', '?B'), path) logger.info(message) - self._sendNotification(subject='PathDeleted', content={'deleted': True, 'path': path, 'message':message, 'size': du_result.get('disk_usage', 0)}) + if do_recurse: + #only send notification if not recursing + self._sendNotification(subject='PathDeleted', content={'deleted': True, 'path': path, 'message':message, 'size': du_result.get('disk_usage', 0)}) return {'deleted': True, 'message': message, 'path': path, 'size': du_result.get('disk_usage', 0)} - self._sendNotification(subject='PathDeleted', content={'deleted': False, 'path': path, 'message':'Failed to delete (part of) %s' % path}) + if do_recurse: + #only send notification if not recursing + self._sendNotification(subject='PathDeleted', content={'deleted': False, 'path': path, 'message':'Failed to delete (part of) %s' % path}) #sanitize special chars err = err.decode('utf-8').encode('ascii', 'ignore') diff --git a/SAS/QPIDInfrastructure/bin/addtoQPIDDB.py b/SAS/QPIDInfrastructure/bin/addtoQPIDDB.py index de5793a340492284d3cc9de0c54014c3f6aed1c6..c153afaeb6fe38a986c82d37683160f5e3cd86eb 100755 --- a/SAS/QPIDInfrastructure/bin/addtoQPIDDB.py +++ b/SAS/QPIDInfrastructure/bin/addtoQPIDDB.py @@ -13,8 +13,9 @@ if __name__ == '__main__': parser.add_option('-f', '--federation', dest='federation' , type='string', default=None, help='Address of the federated broker') parser.add_option('-q', '--queue', dest='queue', type='string', default=None, help='Name of the queue on the broker') parser.add_option('-e', '--exchange', dest='exchange', type='string', default=None, help='Name of the exchange on the broker') - parser.add_option('-k', '--routingkey', dest='routingkey', type='string', default='#', help='Federation routing key') + parser.add_option('-k', '--routingkey', dest='routingkey', type='string', default='#', help='(Federation) routing key') parser.add_option('-d', '--dynamic', dest='dynamic', action="store_true", default=False, help='Create a dynamic route') + parser.add_option('--bind', dest='bind', action="store_true", default=False, help='bind given exchange (with -e) to given queue (with -q) with given routing key (with -k)') parser.add_option_group(dbcredentials.options_group(parser, "qpidinfra")) (options, args) = parser.parse_args() @@ -41,6 +42,18 @@ if __name__ == '__main__': QPIDinfra.addexchange(options.exchange) QPIDinfra.bindexchangetohost(options.exchange,options.broker) + if (options.bind): + if options.exchange == None or options.queue == None or options.broker == None: + print + print 'ERROR: When binding an exchange to a queue, you need to specify options: -e, -q, -k, -b' + print + parser.print_help() + sys.exit(1) + + #queue and exchange were already added to db above. + #here, just add the qpid-bind link + QPIDinfra.bindexchangetoqueueonhost(options.exchange,options.queue,options.broker,options.routingkey) + if (options.federation): QPIDinfra.addhost(options.federation) if (options.queue): diff --git a/SAS/QPIDInfrastructure/bin/configQPIDfromDB.py b/SAS/QPIDInfrastructure/bin/configQPIDfromDB.py index 6faa807dd8a2ba1be29d1f55b90d4ce3cac80cae..d8c0ba38bb5bcc023b5742e459faf72c5d18fbb8 100755 --- a/SAS/QPIDInfrastructure/bin/configQPIDfromDB.py +++ b/SAS/QPIDInfrastructure/bin/configQPIDfromDB.py @@ -24,10 +24,14 @@ def qpidQroute_add(settings): print ("qpid-route -d queue del %s %s '%s' '%s'" %(settings['tohost'],settings['fromhost'],settings['exchangename'],settings['queuename'])) print ("qpid-route -d queue add %s %s '%s' '%s'" %(settings['tohost'],settings['fromhost'],settings['exchangename'],settings['queuename'])) +def qpidconfig_add_binding(settings): + print ("qpid-config -d -b %s bind %s %s %s" %(settings['hostname'],settings['exchangename'],settings['queuename'],settings['routingkey'])) + dbcreds = dbcredentials.DBCredentials().get("qpidinfra") QPIDinfra = qpidinfra(dbcreds) QPIDinfra.perqueue(qpidconfig_add_queue) QPIDinfra.perexchange(qpidconfig_add_topic) QPIDinfra.perfederationexchange(qpidroute_add) QPIDinfra.perfederationqueue(qpidQroute_add) +QPIDinfra.perqpidbinding(qpidconfig_add_binding) diff --git a/SAS/QPIDInfrastructure/bin/populateDB.sh b/SAS/QPIDInfrastructure/bin/populateDB.sh index 752cacc240746bf572cd2c71e456f54bc8809910..55c1503e126e033892bb19bc6f0ef0352a69853e 100755 --- a/SAS/QPIDInfrastructure/bin/populateDB.sh +++ b/SAS/QPIDInfrastructure/bin/populateDB.sh @@ -36,6 +36,7 @@ if $PROD; then MCU=mcu001.control.lofar SCU=scu001.control.lofar SAS=sas001.control.lofar + LEXAR=lexar003.offline.lofar MOM_SYSTEM=lcs023.control.lofar MOM_INGEST=lcs029.control.lofar @@ -49,6 +50,7 @@ else MCU=mcu199.control.lofar SCU=scu199.control.lofar SAS=sas099.control.lofar + LEXAR=lexar003.offline.lofar MOM_SYSTEM=lcs028.control.lofar MOM_INGEST=lcs028.control.lofar @@ -143,6 +145,19 @@ addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.mom.bus --federation $MOM addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.mom.command --federation $MOM_SYSTEM addtoQPIDDB.py --broker $MOM_SYSTEM --exchange ${PREFIX}lofar.mom.notification --federation $SCU +# ----------------------------------------- +# Ingest +# ----------------------------------------- + +addtoQPIDDB.py --broker $LEXAR --exchange ${PREFIX}lofar.lta.ingest.command +addtoQPIDDB.py --broker $LEXAR --exchange ${PREFIX}lofar.lta.ingest.notification +addtoQPIDDB.py --broker $LEXAR --queue ${PREFIX}lofar.lta.ingest.jobs +addtoQPIDDB.py --broker $LEXAR --queue ${PREFIX}lofar.lta.ingest.jobs.for_transfer +addtoQPIDDB.py --broker $LEXAR --queue ${PREFIX}lofar.lta.ingest.notification.jobmanager +addtoQPIDDB.py --broker $LEXAR --bind --exchange ${PREFIX}lofar.lta.ingest.notification --queue ${PREFIX}lofar.lta.ingest.notification.momingestadapter --routingkey LTAIngest.# +addtoQPIDDB.py --broker $LEXAR --bind --exchange ${PREFIX}lofar.lta.ingest.notification --queue ${PREFIX}lofar.lta.ingest.notification.jobmanager --routingkey LTAIngest.# + + # ----------------------------------------- # ResourceAssignment # ----------------------------------------- @@ -156,6 +171,13 @@ addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.ssdb.notification addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.dm.command addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.dm.notification +# ----------------------------------------- +# Ingest -> ResourceAssignment +# ----------------------------------------- + +addtoQPIDDB.py --broker $LEXAR --exchange ${PREFIX}lofar.lta.ingest.notification --federation $SCU + + for head in head01.cep4.control.lofar do for cpu in $CEP4 diff --git a/SAS/QPIDInfrastructure/lib/QPIDDB.py b/SAS/QPIDInfrastructure/lib/QPIDDB.py index 87019b014f55f47e276cecdf070221daf3192b45..13035da5de6393738912648adced29f8dc7319f1 100755 --- a/SAS/QPIDInfrastructure/lib/QPIDDB.py +++ b/SAS/QPIDInfrastructure/lib/QPIDDB.py @@ -71,6 +71,18 @@ class qpidinfra: for item in ret: callback(item) + def perqpidbinding(self,callback): + """ Iterate over all exchange->queue bindings defined in the database. + """ + ret= self.db.doquery("""SELECT h.hostname as hostname, e.exchangename as exchangename, q.queuename as queuename, subject as routingkey + FROM queuelistener + INNER JOIN hosts h on fromhost=h.hostid + INNER JOIN exchanges e on eid=e.exchangeid + INNER JOIN queues q on qid=q.queueid + ;""") + for item in ret: + callback(item) + def gethostid(self,hostname): """ return the database id of the given hostname or 0 if non existant. example: @@ -235,6 +247,20 @@ class qpidinfra: print("Removing exchangeroute for key %s and exchange %s from host %s to host %s" %(routingkey,exchangekey,fromid,toid)) self.db.docommit("delete from exchangeroutes where erouteid=%d;" %(id)) + def getexchangetoqueuebinding(self,exchangeid,queueid,hostid,routingkey): + """ Retrieve the info on the exchange binding for the given exchangeid and queueid. + Returns 0 if none found. + """ + ret=self.db.doquery("select * from queuelistener where eid=%s and qid=%s and fromhost=%s and subject=\'%s\';" %(exchangeid,queueid,hostid,routingkey)) + if (ret==[]): + return 0 + return ret[0]['qlistenid'] + + def addexchangetoqueuebinding(self,exchangeid,queueid,hostid,routingkey='#'): + """ Add a qpid binding from the given exchangeid to the given queueid on the broker at hostid. + """ + if (self.getexchangetoqueuebinding(exchangeid,queueid,hostid,routingkey)==0): + self.db.docommit("insert into queuelistener (fromhost,eid,qid,subject) VALUES ( %s , %s , %s , \'%s\' ) ;" %(hostid,exchangeid,queueid,routingkey)) def bindqueuetohost(self,queue,host): """ Insert a binding in the database for queue on host. @@ -259,6 +285,24 @@ class qpidinfra: else: print("Exchange %s already binded with broker %s in database" %(exchange,host)) + def bindexchangetoqueueonhost(self,exchange,queue,host,routingkey='#'): + """ Insert a qpid-binding in the database from an exchange to a queue on host with the given routingkey. + Exchange, queue and/or host will be added to the database if needed. + """ + #add exchange, queue to db if needed, and hook to given host if needed. + self.bindexchangetohost(exchange, host) + self.bindqueuetohost(queue, host) + + #get the id's of exchange,queue,host (the add just returns the existing id's) + hostid=self.addhost(host,verbose=False) + exchangeid=self.addexchange(exchange,verbose=False) + queueid=self.addqueue(queue,verbose=False) + + #check if this qpid-route is already available, else add + if (self.getexchangetoqueuebinding(exchangeid,queueid,hostid,routingkey)==0): + self.addexchangetoqueuebinding(exchangeid,queueid,hostid,routingkey) + else: + print("Exchange \'%s\' to queue \'%s\' binding with routingkey \'%s\' on broker \'%s\' is already known in database" %(exchange,queue,routingkey,host)) def setqueueroute(self,queuename,fromname,toname,exchange): """ Insert a queue route in the database for queuename,fromname,toname,exchange. diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_database.sql b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_database.sql index 2a4de6458cef3e54f845ae660a5cb90731476ef3..96d9c6941477f82208ba1630f2958a8b7de220c7 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_database.sql +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_database.sql @@ -142,7 +142,7 @@ ALTER TABLE resource_allocation.specification OWNER TO resourceassignment; CREATE INDEX specification_cluster_idx - ON resource_allocation.specification; + ON resource_allocation.specification (cluster); CREATE INDEX specification_starttime_endtime_idx ON resource_allocation.specification (starttime DESC, endtime);