diff --git a/.gitattributes b/.gitattributes index f2131fcfdfe3ab43c12cb330caf1d483e836bffc..7df6163e244021f0cc89742ccadf02136d4c6ff9 100644 --- a/.gitattributes +++ b/.gitattributes @@ -4960,6 +4960,7 @@ SAS/OTDB_Services/test/t_TreeStatusEvents.sh -text svneol=unset#application/x-sh SAS/OTDB_Services/test/unittest_db.dump.gz -text svneol=unset#application/x-gzip SAS/ResourceAssignment/CMakeLists.txt -text SAS/ResourceAssignment/QPIDDatabase/bin/addtoQPIDDB.py -text +SAS/ResourceAssignment/QPIDDatabase/bin/cep4_config.sh -text SAS/ResourceAssignment/QPIDDatabase/bin/configQPIDfromDB.py -text SAS/ResourceAssignment/QPIDDatabase/bin/gatherfrombrokers.sh -text SAS/ResourceAssignment/QPIDDatabase/bin/qpidinfra_dump.sql -text diff --git a/SAS/ResourceAssignment/QPIDDatabase/bin/addtoQPIDDB.py b/SAS/ResourceAssignment/QPIDDatabase/bin/addtoQPIDDB.py index 9c9c9d3895f8c102978a20bcf14dfbc25ad07bfd..31b4d8287470b59955f0c0868725da4fef57190e 100755 --- a/SAS/ResourceAssignment/QPIDDatabase/bin/addtoQPIDDB.py +++ b/SAS/ResourceAssignment/QPIDDatabase/bin/addtoQPIDDB.py @@ -46,12 +46,18 @@ if __name__ == '__main__': QPIDinfra.addhost(options.federation) if (options.queue): QPIDinfra.addqueue(options.queue) # should be superfluous + ecxchange='' + if (options.exchange): + QPIDinfra.addexchange(options.exchange) + exchange=options.exchange + QPIDinfra.bindqueuetohost(options.queue,options.federation) - QPIDinfra.setqueueroute(options.queue,options.broker,options.federation) - - if (options.exchange): - QPIDinfra.addexchange(options.exchange) # should be superfluous - QPIDinfra.bindexchangetohost(options.exchange,options.federation) - QPIDinfra.setexchangeroute(options.exchange,options.routingkey,options.broker,options.federation) - + QPIDinfra.setqueueroute(options.queue,options.broker,options.federation,exchange) + else: + if (options.exchange): + QPIDinfra.addexchange(options.exchange) # should be superfluous + QPIDinfra.bindexchangetohost(options.exchange,options.federation) + QPIDinfra.setexchangeroute(options.exchange,options.routingkey,options.broker,options.federation) + else: + raise Exception("federation can only be setup with a queue or an exchange") diff --git a/SAS/ResourceAssignment/QPIDDatabase/bin/cep4_config.sh b/SAS/ResourceAssignment/QPIDDatabase/bin/cep4_config.sh new file mode 100755 index 0000000000000000000000000000000000000000..901b43b95a3e674bb2f55a7b4d5ee4b465cb67c2 --- /dev/null +++ b/SAS/ResourceAssignment/QPIDDatabase/bin/cep4_config.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +for tnode in head{01..02}.cep4.lofar +do +for fnode in cpu{01..50}.cep4.lofar +do + ./addtoQPIDDB.py -b $fnode -q lofar.task.feedback.dataproducts -f $tnode -e lofar.default.bus +done +./addtoQPIDDB.py -b$tnode -q lofar.task.feedback.dataproducts -f ccu001.control.lofar -e lofar.default.bus +done + + diff --git a/SAS/ResourceAssignment/QPIDDatabase/bin/configQPIDfromDB.py b/SAS/ResourceAssignment/QPIDDatabase/bin/configQPIDfromDB.py index 4e7184bd7c333af8e9297c2fe0001d442e7f4379..02a9937d37be709f55d0980bac8885268bdbe9de 100755 --- a/SAS/ResourceAssignment/QPIDDatabase/bin/configQPIDfromDB.py +++ b/SAS/ResourceAssignment/QPIDDatabase/bin/configQPIDfromDB.py @@ -5,17 +5,17 @@ from QPIDDB import qpidinfra -def qpidconfig_add_queue(host,queue): - print ("qpid-config -b %s add queue %s --durable" %(host,queue)) +def qpidconfig_add_queue(settings): + print ("qpid-config -b %s add queue %s --durable" %(settings['hostname'],settings['queuename'])) -def qpidconfig_add_topic(host,exchange): - print ("qpid-config -b %s add exchange topic %s --durable" %(host,exchange)) +def qpidconfig_add_topic(settings): + print ("qpid-config -b %s add exchange topic %s --durable" %(settings['hostname'],settings['exchangename'])) -def qpidroute_add(fromhost,tohost,exchange,routingkey): - print ("qpid-route -d route add %s %s %s \'%s\' " %(tohost,fromhost,exchange,routingkey)) +def qpidroute_add(settings): + print ("qpid-route -d route add %s %s %s \'%s\' " %(settings['tohost'],settings['fromhost'],settings['exchangename'],settings['routingkey'])) -def qpidQroute_add(fromhost,tohost,queue): - print ("qpid-route -d queue add %s %s %s amq.direct" %(tohost,fromhost,queue)) +def qpidQroute_add(settings): + print ("qpid-route -d queue add %s %s %s '%s'" %(settings['tohost'],settings['fromhost'],settings['queuename'],settings['exchangename'])) QPIDinfra = qpidinfra() QPIDinfra.perqueue(qpidconfig_add_queue) @@ -23,7 +23,3 @@ QPIDinfra.perexchange(qpidconfig_add_topic) QPIDinfra.perfederationexchange(qpidroute_add) QPIDinfra.perfederationqueue(qpidQroute_add) - - - - diff --git a/SAS/ResourceAssignment/QPIDDatabase/bin/route_to_struct.py b/SAS/ResourceAssignment/QPIDDatabase/bin/route_to_struct.py index e092756eab8bc012c9798711690631000d58a7d4..634b1cafaed31677119312ced550ca2a6920cfe0 100755 --- a/SAS/ResourceAssignment/QPIDDatabase/bin/route_to_struct.py +++ b/SAS/ResourceAssignment/QPIDDatabase/bin/route_to_struct.py @@ -31,6 +31,10 @@ def to_hostname(s): return fqdn +def to_exchangename(s): + exchangename=s.split('=')[1].split(')')[0] + print(" found exchangename '%s'" %(exchangename)) + return exchangename print (" Num lines %d " %(numlines)) @@ -49,12 +53,17 @@ if (offset!=numlines): s = tosearch[offset].split(' ') if ( len(s) ==5 ): # valid description hosta=to_hostname(s[2]) + exchangename=to_exchangename(s[2]) + if (exchangename == ''): + exchangename='lofar.default.bus' queuename=s[4].split('=')[1].split(')')[0] hostb=to_hostname(s[4]) #.split(':')[0].split('.') if (s[3]=='<='): + todb.bindexchangetohost(exchangename,hosta) + todb.bindexchangetohost(exchangename,hostb) todb.bindqueuetohost(queuename,hosta) todb.bindqueuetohost(queuename,hostb) - todb.setqueueroute(queuename,hostb,hosta) + todb.setqueueroute(queuename,hostb,hosta,exchangename) print ("# queue %s from %s to %s" %(queuename,hostb,hosta)) if (s[3]=='=>'): todb.bindqueuetohost(queuename,hosta) diff --git a/SAS/ResourceAssignment/QPIDDatabase/lib/QPIDDB.py b/SAS/ResourceAssignment/QPIDDatabase/lib/QPIDDB.py index 7a85e8ec0ea6861994e048b1422063b0e975b91d..de8362f52e08f423751daf7ded8c5e8b43cf0695 100755 --- a/SAS/ResourceAssignment/QPIDDatabase/lib/QPIDDB.py +++ b/SAS/ResourceAssignment/QPIDDatabase/lib/QPIDDB.py @@ -34,24 +34,24 @@ class qpidinfra: def perqueue(self,callback): ret=self.doquery("select hostname,queuename from persistentqueues INNER join hosts on (hid=hostid) INNER JOIN queues on (qid=queueid);") for item in ret: - callback(item['hostname'],item['queuename']) + callback(item) def perexchange(self,callback): ret= self.doquery("select hostname,exchangename from persistentexchanges INNER join hosts on (hid=hostid) INNER JOIN exchanges on (eid=exchangeid);") for item in ret: - callback(item['hostname'],item['exchangename']) + callback(item) def perfederationexchange(self,callback): # cur.execute("select h1.hostname as fromhost ,h2.hostname as tohost , exchangename , keyname from exchangeroutes JOIN hosts as h1 on (fromhost=h1.hostid) JOIN hosts as h2 on (tohost=h2.hostid) JOIN exchanges on (exchangeid=eid) JOIN routingkeys on (keyid=kid);") - ret=self.doquery("select h1.hostname as fromhost ,h2.hostname as tohost , exchangename from exchangeroutes JOIN hosts as h1 on (fromhost=h1.hostid) JOIN hosts as h2 on (tohost=h2.hostid) JOIN exchanges on (exchangeid=eid);") + ret=self.doquery("select h1.hostname as fromhost ,h2.hostname as tohost , exchangename , dynamic , routingkey from exchangeroutes JOIN hosts as h1 on (fromhost=h1.hostid) JOIN hosts as h2 on (tohost=h2.hostid) JOIN exchanges on (exchangeid=eid);") for item in ret: - callback(item['fromhost'],item['tohost'],item['exchangename'],'#') #item['keyname']) + callback(item) def perfederationqueue(self,callback): - ret=self.doquery("select h1.hostname as fromhost ,h2.hostname as tohost , queuename from queueroutes JOIN hosts as h1 on (fromhost=h1.hostid) JOIN hosts as h2 on (tohost=h2.hostid) JOIN queues on (queueid=qid);") + ret=self.doquery("select h1.hostname as fromhost ,h2.hostname as tohost , queuename, exchangename from queueroutes JOIN hosts as h1 on (fromhost=h1.hostid) JOIN hosts as h2 on (tohost=h2.hostid) JOIN queues on (queueid=qid) JOIN exchanges on (exchangeid=eid);") for item in ret: - callback(item['fromhost'],item['tohost'],item['queuename']) + callback(item) def gethostid(self,hostname): tmp=self.doquery("select * from hosts where hostname='%s';" %(hostname)) @@ -71,32 +71,41 @@ class qpidinfra: return 0 return tmp[0]['exchangeid'] - def addhost(self,hostname): + def addhost(self,hostname,verbose=True): id=self.gethostid(hostname) if (id!=0): - print ("Host %s already available in database with id = %d" %(hostname,id)) - return + if verbose: + print ("Host %s already available in database with id = %d" %(hostname,id)) + return id self.docommit("insert into hosts (hostname) VALUES ('%s');" %(hostname)) + print (" added host %s to DB" %(hostname)) + return self.gethostid(hostname) - def addqueue(self,queue): + def addqueue(self,queue, verbose=True): id=self.getqueueid(queue) if (id!=0): - print ("Queue %s already available in database with id = %d" %(queue,id)) - return + if verbose: + print ("Queue %s already available in database with id = %d" %(queue,id)) + return id self.docommit("insert into queues (queuename) VALUES ('%s');" %(queue)) + print (" added queue %s to DB" %(queue)) + return self.getqueueid(queue) - def addexchange(self,exchange): + def addexchange(self,exchange, verbose=True): id=self.getexchangeid(exchange) - if (id!=0): - print ("Exchange %s already available in database with id = %d" %(exchange,id)) - return + if (id): + if verbose: + print ("Exchange %s already available in database with id = %d" %(exchange,id)) + return id self.docommit("insert into exchanges (exchangename) VALUES ('%s');" %(exchange)) + print(" added exchange %s to DB " %(exchange)) + return self.getexchangeid(exchange) def getqueuebinding(self,queueid,hostid): @@ -127,9 +136,9 @@ class qpidinfra: return ret[0]['qrouteid'] - def addqueueroute(self,queueid,fromid,toid): + def addqueueroute(self,queueid,fromid,toid,exchangeid): if (self.getqueueroute(queueid,fromid,toid)==0): - self.docommit("insert into queueroutes (qid,fromhost,tohost) VALUES ( %s , %s , %s );" %(queueid,fromid,toid)) + self.docommit("insert into queueroutes (qid,fromhost,tohost,eid) VALUES ( %s , %s , %s, %s );" %(queueid,fromid,toid,exchangeid)) def getexchangeroute(self,exchangeid,routingkey,fromid,toid): ret=self.doquery("select * from exchangeroutes where eid=%s and fromhost=%s and tohost=%s and routingkey='%s';" %(exchangeid,fromid,toid,routingkey)) @@ -137,22 +146,14 @@ class qpidinfra: return 0; return ret[0]['erouteid'] - def addexchangeroute(self,exchangeid,routingkey,fromid,toid): + def addexchangeroute(self,exchangeid,routingkey,fromid,toid,dynamic=False): if (getexchangeroute(self,exchangeid,routingkey,fromid,toid)==0): - self.docommit("insert into exchangeroutes (eid,fromhost,tohost,routingkey);" %(exchangeid,fromid,toid,routingkey)) + self.docommit("insert into exchangeroutes (eid,fromhost,tohost,routingkey,dynamic);" %(exchangeid,fromid,toid,routingkey,dynamic)) def bindqueuetohost(self,queue,host): - hostid=self.gethostid(host) - if (hostid==0): - self.addhost(host) - hostid=self.gethostid(host) - - queueid=self.getqueueid(queue) - if (queueid==0): - self.addqueue(queue) - queueid=self.getqueueid(queue) - + hostid=self.addhost(host) + queueid=self.addqueue(queue) bindid=self.getqueuebinding(queueid,hostid) if (bindid==0): # not found self.addqueuebinding(queueid,hostid) @@ -160,33 +161,25 @@ class qpidinfra: print ("Queue %s already binded with broker %s in database" %(queue,host)) def bindexchangetohost(self,exchange,host): - hostid=self.gethostid(host) - if (hostid==0): - self.addhost(host) - hostid=self.gethostid(host) - - exchangeid=self.getexchangeid(exchange) - if (exchangeid==0): - self.addexchange(exchange) - exchangeid=self.getexchangeid(exchange) - - + hostid=self.addhost(host,verbose=False) + exchangeid=self.addexchange(exchange,verbose=False) if (self.getexchangebinding(exchangeid,hostid)==0): self.addexchangebinding(exchangeid,hostid) else: print("Exchange %s already binded with broker %s in database" %(exchange,host)) - def setqueueroute(self,queuename,fromname,toname): - fromid=self.gethostid(fromname) - toid=self.gethostid(toname) - queueid=self.getqueueid(queuename) - self.addqueueroute(queueid,fromid,toid) + def setqueueroute(self,queuename,fromname,toname,exchange): + fromid = self.addhost(fromname) + toid = self.addhost(toname) + queueid = self.addqueue(queuename) + exchangeid = self.addexchange(exchange) + self.addqueueroute(queueid,fromid,toid,exchangeid) - def setexchangeroute(self,exchangename,routingkey,fromname,toname): - exchangeid=self.getexchangeid(exchangename) - fromid=self.gethostid(fromname) - toid=self.gethostid(toname) - self.addexchangeroute(exchangeid,routingkey,fromid,toid) + def setexchangeroute(self,exchangename,routingkey,fromname,toname,dynamic=False): + exchangeid = self.addexchange(exchangename) + fromid = self.addhost(fromname) + toid = self.addhost(toname) + self.addexchangeroute(exchangeid,routingkey,fromid,toid,dynamic) diff --git a/SAS/ResourceAssignment/QPIDDatabase/sql/qpidinfradb.sql b/SAS/ResourceAssignment/QPIDDatabase/sql/qpidinfradb.sql index a632cfc8d39489a113fdfd571c9973ab9e1b3026..3fd83791efe2a8b7b2a28a61e6ab1de600ab8590 100644 --- a/SAS/ResourceAssignment/QPIDDatabase/sql/qpidinfradb.sql +++ b/SAS/ResourceAssignment/QPIDDatabase/sql/qpidinfradb.sql @@ -42,6 +42,7 @@ CREATE TABLE queueroutes( fromhost bigint NOT NULL, tohost bigint NOT NULL, qid bigint NOT NULL, + eid bigint NOT NULL, PRIMARY KEY (qrouteid) ); CREATE TABLE exchangeroutes( @@ -49,7 +50,8 @@ CREATE TABLE exchangeroutes( fromhost bigint NOT NULL, tohost bigint NOT NULL, eid bigint NOT NULL, - routingkey varchar(512) NOT NULL, + dynamic bool default false, + routingkey varchar(512) default '#', PRIMARY KEY (erouteid) ); CREATE TABLE queuelistener(