Skip to content
Snippets Groups Projects
Commit adaba9fc authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #10057: merged branch ^/branches/LOFAR-Release-2_19 back into trunk

parents 8eb07601 44ef166e
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
import logging import logging
import os.path import os.path
import socket import socket
import time
import subprocess import subprocess
import time import time
from datetime import datetime from datetime import datetime
...@@ -263,7 +264,7 @@ class CleanupHandler(MessageHandlerInterface): ...@@ -263,7 +264,7 @@ class CleanupHandler(MessageHandlerInterface):
except Exception as e: except Exception as e:
logger.error(str(e)) logger.error(str(e))
def _removePath(self, path): def _removePath(self, path, do_recurse=True):
logger.info("Remove path: %s" % (path,)) logger.info("Remove path: %s" % (path,))
# do various sanity checking to prevent accidental deletes # do various sanity checking to prevent accidental deletes
...@@ -304,14 +305,29 @@ class CleanupHandler(MessageHandlerInterface): ...@@ -304,14 +305,29 @@ class CleanupHandler(MessageHandlerInterface):
logger.warn(message) logger.warn(message)
return {'deleted': True, 'message': message, 'path': path} return {'deleted': True, 'message': message, 'path': path}
du_result = self._sqrpc.getDiskUsageForPath(path) du_result = self._sqrpc.getDiskUsageForPath(path) if do_recurse else {}
if du_result.get('found'): if du_result.get('found'):
logger.info("Attempting to delete %s in %s", du_result.get('disk_usage_readable', '?B'), path) logger.info("Attempting to delete %s in %s", du_result.get('disk_usage_readable', '?B'), path)
else: else:
logger.info("Attempting to delete %s", path) logger.info("Attempting to delete %s", path)
self._sendNotification(subject='PathDeleting', content={'path': path, 'size': du_result.get('disk_usage', 0) }) if do_recurse:
# LustreFS on CEP4 like many small deletes better than one large tree delete
# so, recurse into the sub_directories,
# and take a small sleep in between so other processes (like observation datawriters) can access LustreFS
# (we've seen observation data loss when deleting large trees)
subdirs_result = self.path_resolver.getSubDirectories(path)
if subdirs_result.get('found') and subdirs_result.get('sub_directories'):
sub_directories = subdirs_result['sub_directories']
for subdir in sub_directories:
subdir_path = os.path.join(path, subdir)
self._removePath(subdir_path, do_recurse=False) #recurse only one level deep
time.sleep(0.01)
else:
self._sendNotification(subject='PathDeleting', content={'path': path, 'size': du_result.get('disk_usage', 0) })
cmd = ['rm', '-rf', path] cmd = ['rm', '-rf', path]
hostname = socket.gethostname() hostname = socket.gethostname()
...@@ -325,11 +341,15 @@ class CleanupHandler(MessageHandlerInterface): ...@@ -325,11 +341,15 @@ class CleanupHandler(MessageHandlerInterface):
message = "Deleted %s in '%s'" % (du_result.get('disk_usage_readable', '?B'), path) message = "Deleted %s in '%s'" % (du_result.get('disk_usage_readable', '?B'), path)
logger.info(message) logger.info(message)
self._sendNotification(subject='PathDeleted', content={'deleted': True, 'path': path, 'message':message, 'size': du_result.get('disk_usage', 0)}) if do_recurse:
#only send notification if not recursing
self._sendNotification(subject='PathDeleted', content={'deleted': True, 'path': path, 'message':message, 'size': du_result.get('disk_usage', 0)})
return {'deleted': True, 'message': message, 'path': path, 'size': du_result.get('disk_usage', 0)} return {'deleted': True, 'message': message, 'path': path, 'size': du_result.get('disk_usage', 0)}
self._sendNotification(subject='PathDeleted', content={'deleted': False, 'path': path, 'message':'Failed to delete (part of) %s' % path}) if do_recurse:
#only send notification if not recursing
self._sendNotification(subject='PathDeleted', content={'deleted': False, 'path': path, 'message':'Failed to delete (part of) %s' % path})
#sanitize special chars #sanitize special chars
err = err.decode('utf-8').encode('ascii', 'ignore') err = err.decode('utf-8').encode('ascii', 'ignore')
......
...@@ -13,8 +13,9 @@ if __name__ == '__main__': ...@@ -13,8 +13,9 @@ if __name__ == '__main__':
parser.add_option('-f', '--federation', dest='federation' , type='string', default=None, help='Address of the federated broker') parser.add_option('-f', '--federation', dest='federation' , type='string', default=None, help='Address of the federated broker')
parser.add_option('-q', '--queue', dest='queue', type='string', default=None, help='Name of the queue on the broker') parser.add_option('-q', '--queue', dest='queue', type='string', default=None, help='Name of the queue on the broker')
parser.add_option('-e', '--exchange', dest='exchange', type='string', default=None, help='Name of the exchange on the broker') parser.add_option('-e', '--exchange', dest='exchange', type='string', default=None, help='Name of the exchange on the broker')
parser.add_option('-k', '--routingkey', dest='routingkey', type='string', default='#', help='Federation routing key') parser.add_option('-k', '--routingkey', dest='routingkey', type='string', default='#', help='(Federation) routing key')
parser.add_option('-d', '--dynamic', dest='dynamic', action="store_true", default=False, help='Create a dynamic route') parser.add_option('-d', '--dynamic', dest='dynamic', action="store_true", default=False, help='Create a dynamic route')
parser.add_option('--bind', dest='bind', action="store_true", default=False, help='bind given exchange (with -e) to given queue (with -q) with given routing key (with -k)')
parser.add_option_group(dbcredentials.options_group(parser, "qpidinfra")) parser.add_option_group(dbcredentials.options_group(parser, "qpidinfra"))
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
...@@ -41,6 +42,18 @@ if __name__ == '__main__': ...@@ -41,6 +42,18 @@ if __name__ == '__main__':
QPIDinfra.addexchange(options.exchange) QPIDinfra.addexchange(options.exchange)
QPIDinfra.bindexchangetohost(options.exchange,options.broker) QPIDinfra.bindexchangetohost(options.exchange,options.broker)
if (options.bind):
if options.exchange == None or options.queue == None or options.broker == None:
print
print 'ERROR: When binding an exchange to a queue, you need to specify options: -e, -q, -k, -b'
print
parser.print_help()
sys.exit(1)
#queue and exchange were already added to db above.
#here, just add the qpid-bind link
QPIDinfra.bindexchangetoqueueonhost(options.exchange,options.queue,options.broker,options.routingkey)
if (options.federation): if (options.federation):
QPIDinfra.addhost(options.federation) QPIDinfra.addhost(options.federation)
if (options.queue): if (options.queue):
......
...@@ -24,10 +24,14 @@ def qpidQroute_add(settings): ...@@ -24,10 +24,14 @@ def qpidQroute_add(settings):
print ("qpid-route -d queue del %s %s '%s' '%s'" %(settings['tohost'],settings['fromhost'],settings['exchangename'],settings['queuename'])) print ("qpid-route -d queue del %s %s '%s' '%s'" %(settings['tohost'],settings['fromhost'],settings['exchangename'],settings['queuename']))
print ("qpid-route -d queue add %s %s '%s' '%s'" %(settings['tohost'],settings['fromhost'],settings['exchangename'],settings['queuename'])) print ("qpid-route -d queue add %s %s '%s' '%s'" %(settings['tohost'],settings['fromhost'],settings['exchangename'],settings['queuename']))
def qpidconfig_add_binding(settings):
print ("qpid-config -d -b %s bind %s %s %s" %(settings['hostname'],settings['exchangename'],settings['queuename'],settings['routingkey']))
dbcreds = dbcredentials.DBCredentials().get("qpidinfra") dbcreds = dbcredentials.DBCredentials().get("qpidinfra")
QPIDinfra = qpidinfra(dbcreds) QPIDinfra = qpidinfra(dbcreds)
QPIDinfra.perqueue(qpidconfig_add_queue) QPIDinfra.perqueue(qpidconfig_add_queue)
QPIDinfra.perexchange(qpidconfig_add_topic) QPIDinfra.perexchange(qpidconfig_add_topic)
QPIDinfra.perfederationexchange(qpidroute_add) QPIDinfra.perfederationexchange(qpidroute_add)
QPIDinfra.perfederationqueue(qpidQroute_add) QPIDinfra.perfederationqueue(qpidQroute_add)
QPIDinfra.perqpidbinding(qpidconfig_add_binding)
...@@ -36,6 +36,7 @@ if $PROD; then ...@@ -36,6 +36,7 @@ if $PROD; then
MCU=mcu001.control.lofar MCU=mcu001.control.lofar
SCU=scu001.control.lofar SCU=scu001.control.lofar
SAS=sas001.control.lofar SAS=sas001.control.lofar
LEXAR=lexar003.offline.lofar
MOM_SYSTEM=lcs023.control.lofar MOM_SYSTEM=lcs023.control.lofar
MOM_INGEST=lcs029.control.lofar MOM_INGEST=lcs029.control.lofar
...@@ -49,6 +50,7 @@ else ...@@ -49,6 +50,7 @@ else
MCU=mcu199.control.lofar MCU=mcu199.control.lofar
SCU=scu199.control.lofar SCU=scu199.control.lofar
SAS=sas099.control.lofar SAS=sas099.control.lofar
LEXAR=lexar003.offline.lofar
MOM_SYSTEM=lcs028.control.lofar MOM_SYSTEM=lcs028.control.lofar
MOM_INGEST=lcs028.control.lofar MOM_INGEST=lcs028.control.lofar
...@@ -143,6 +145,19 @@ addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.mom.bus --federation $MOM ...@@ -143,6 +145,19 @@ addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.mom.bus --federation $MOM
addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.mom.command --federation $MOM_SYSTEM addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.mom.command --federation $MOM_SYSTEM
addtoQPIDDB.py --broker $MOM_SYSTEM --exchange ${PREFIX}lofar.mom.notification --federation $SCU addtoQPIDDB.py --broker $MOM_SYSTEM --exchange ${PREFIX}lofar.mom.notification --federation $SCU
# -----------------------------------------
# Ingest
# -----------------------------------------
addtoQPIDDB.py --broker $LEXAR --exchange ${PREFIX}lofar.lta.ingest.command
addtoQPIDDB.py --broker $LEXAR --exchange ${PREFIX}lofar.lta.ingest.notification
addtoQPIDDB.py --broker $LEXAR --queue ${PREFIX}lofar.lta.ingest.jobs
addtoQPIDDB.py --broker $LEXAR --queue ${PREFIX}lofar.lta.ingest.jobs.for_transfer
addtoQPIDDB.py --broker $LEXAR --queue ${PREFIX}lofar.lta.ingest.notification.jobmanager
addtoQPIDDB.py --broker $LEXAR --bind --exchange ${PREFIX}lofar.lta.ingest.notification --queue ${PREFIX}lofar.lta.ingest.notification.momingestadapter --routingkey LTAIngest.#
addtoQPIDDB.py --broker $LEXAR --bind --exchange ${PREFIX}lofar.lta.ingest.notification --queue ${PREFIX}lofar.lta.ingest.notification.jobmanager --routingkey LTAIngest.#
# ----------------------------------------- # -----------------------------------------
# ResourceAssignment # ResourceAssignment
# ----------------------------------------- # -----------------------------------------
...@@ -156,6 +171,13 @@ addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.ssdb.notification ...@@ -156,6 +171,13 @@ addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.ssdb.notification
addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.dm.command addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.dm.command
addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.dm.notification addtoQPIDDB.py --broker $SCU --exchange ${PREFIX}lofar.dm.notification
# -----------------------------------------
# Ingest -> ResourceAssignment
# -----------------------------------------
addtoQPIDDB.py --broker $LEXAR --exchange ${PREFIX}lofar.lta.ingest.notification --federation $SCU
for head in head01.cep4.control.lofar for head in head01.cep4.control.lofar
do do
for cpu in $CEP4 for cpu in $CEP4
......
...@@ -71,6 +71,18 @@ class qpidinfra: ...@@ -71,6 +71,18 @@ class qpidinfra:
for item in ret: for item in ret:
callback(item) callback(item)
def perqpidbinding(self,callback):
""" Iterate over all exchange->queue bindings defined in the database.
"""
ret= self.db.doquery("""SELECT h.hostname as hostname, e.exchangename as exchangename, q.queuename as queuename, subject as routingkey
FROM queuelistener
INNER JOIN hosts h on fromhost=h.hostid
INNER JOIN exchanges e on eid=e.exchangeid
INNER JOIN queues q on qid=q.queueid
;""")
for item in ret:
callback(item)
def gethostid(self,hostname): def gethostid(self,hostname):
""" return the database id of the given hostname or 0 if non existant. """ return the database id of the given hostname or 0 if non existant.
example: example:
...@@ -235,6 +247,20 @@ class qpidinfra: ...@@ -235,6 +247,20 @@ class qpidinfra:
print("Removing exchangeroute for key %s and exchange %s from host %s to host %s" %(routingkey,exchangekey,fromid,toid)) print("Removing exchangeroute for key %s and exchange %s from host %s to host %s" %(routingkey,exchangekey,fromid,toid))
self.db.docommit("delete from exchangeroutes where erouteid=%d;" %(id)) self.db.docommit("delete from exchangeroutes where erouteid=%d;" %(id))
def getexchangetoqueuebinding(self,exchangeid,queueid,hostid,routingkey):
""" Retrieve the info on the exchange binding for the given exchangeid and queueid.
Returns 0 if none found.
"""
ret=self.db.doquery("select * from queuelistener where eid=%s and qid=%s and fromhost=%s and subject=\'%s\';" %(exchangeid,queueid,hostid,routingkey))
if (ret==[]):
return 0
return ret[0]['qlistenid']
def addexchangetoqueuebinding(self,exchangeid,queueid,hostid,routingkey='#'):
""" Add a qpid binding from the given exchangeid to the given queueid on the broker at hostid.
"""
if (self.getexchangetoqueuebinding(exchangeid,queueid,hostid,routingkey)==0):
self.db.docommit("insert into queuelistener (fromhost,eid,qid,subject) VALUES ( %s , %s , %s , \'%s\' ) ;" %(hostid,exchangeid,queueid,routingkey))
def bindqueuetohost(self,queue,host): def bindqueuetohost(self,queue,host):
""" Insert a binding in the database for queue on host. """ Insert a binding in the database for queue on host.
...@@ -259,6 +285,24 @@ class qpidinfra: ...@@ -259,6 +285,24 @@ class qpidinfra:
else: else:
print("Exchange %s already binded with broker %s in database" %(exchange,host)) print("Exchange %s already binded with broker %s in database" %(exchange,host))
def bindexchangetoqueueonhost(self,exchange,queue,host,routingkey='#'):
""" Insert a qpid-binding in the database from an exchange to a queue on host with the given routingkey.
Exchange, queue and/or host will be added to the database if needed.
"""
#add exchange, queue to db if needed, and hook to given host if needed.
self.bindexchangetohost(exchange, host)
self.bindqueuetohost(queue, host)
#get the id's of exchange,queue,host (the add just returns the existing id's)
hostid=self.addhost(host,verbose=False)
exchangeid=self.addexchange(exchange,verbose=False)
queueid=self.addqueue(queue,verbose=False)
#check if this qpid-route is already available, else add
if (self.getexchangetoqueuebinding(exchangeid,queueid,hostid,routingkey)==0):
self.addexchangetoqueuebinding(exchangeid,queueid,hostid,routingkey)
else:
print("Exchange \'%s\' to queue \'%s\' binding with routingkey \'%s\' on broker \'%s\' is already known in database" %(exchange,queue,routingkey,host))
def setqueueroute(self,queuename,fromname,toname,exchange): def setqueueroute(self,queuename,fromname,toname,exchange):
""" Insert a queue route in the database for queuename,fromname,toname,exchange. """ Insert a queue route in the database for queuename,fromname,toname,exchange.
......
...@@ -142,7 +142,7 @@ ALTER TABLE resource_allocation.specification ...@@ -142,7 +142,7 @@ ALTER TABLE resource_allocation.specification
OWNER TO resourceassignment; OWNER TO resourceassignment;
CREATE INDEX specification_cluster_idx CREATE INDEX specification_cluster_idx
ON resource_allocation.specification; ON resource_allocation.specification (cluster);
CREATE INDEX specification_starttime_endtime_idx CREATE INDEX specification_starttime_endtime_idx
ON resource_allocation.specification (starttime DESC, endtime); ON resource_allocation.specification (starttime DESC, endtime);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment