Skip to content
Snippets Groups Projects
Commit efca724f authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8887: cleaned up config mess of busnames and subjects

parent c7056388
No related branches found
No related tags found
No related merge requests found
......@@ -29,6 +29,7 @@ import os.path
import sys, time, pg, datetime
import logging
from lofar.messaging import EventMessage, ToBus
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
QUERY_EXCEPTIONS = (TypeError, ValueError, MemoryError, pg.ProgrammingError, pg.InternalError)
alive = False
......@@ -81,22 +82,16 @@ if __name__ == "__main__":
from optparse import OptionParser
from lofar.common import dbcredentials
import signal
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME as DEFAULT_NOTIFICATION_BUSNAME
# Check the invocation arguments
parser = OptionParser("%prog [options]")
parser.add_option("-B", "--busname", dest="busname", type="string", default=DEFAULT_NOTIFICATION_BUSNAME,
parser.add_option("-B", "--busname", dest="busname", type="string", default=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
help="Busname or queue-name the status changes are published on. [default: %default]")
parser.add_option_group(dbcredentials.options_group(parser))
(options, args) = parser.parse_args()
dbcreds = dbcredentials.parse_options(options)
if not options.busname:
print "Missing busname"
parser.print_help()
sys.exit(1)
# Set signalhandler to stop the program in a neat way.
signal.signal(signal.SIGINT, signal_handler)
......@@ -155,7 +150,7 @@ if __name__ == "__main__":
for (treeid, state, modtime, creation) in record_list:
content = { "treeID" : treeid, "state" : allowed_states.get(state, "unknown_state"),
"time_of_change" : modtime }
msg = EventMessage(context="otdb.treestatus", content=content)
msg = EventMessage(context=DEFAULT_OTDB_NOTIFICATION_SUBJECT, content=content)
logger.info("sending message treeid %s state %s modtime %s" % (treeid, allowed_states.get(state, "unknown_state"), modtime))
send_bus.send(msg)
......
......@@ -5,4 +5,4 @@ DEFAULT_BUSNAME = 'lofar.otdb.specification'
DEFAULT_SERVICENAME = 'OTDBService'
DEFAULT_OTDB_NOTIFICATION_BUSNAME='lofar.otdb.status'
DEFAULT_RA_NOTIFICATION_BUSNAME='lofar.ra.notification'
DEFAULT_OTDB_NOTIFICATION_SUBJECT='TaskStatus'
......@@ -31,7 +31,9 @@ if __name__ == "__main__":
import logging
import sys
from optparse import OptionParser
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT
from lofar.common.util import waitForInterrupt
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
......@@ -40,14 +42,18 @@ if __name__ == "__main__":
parser = OptionParser("%prog -O otdb_bus -B my_bus [options]")
parser.add_option("-O", "--otdb_bus", dest="otdb_busname", type="string", default=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
help="Bus or queue OTDB publishes status changes on. [default: %default]")
parser.add_option("-B", "--my_bus", dest="my_busname", type="string", default=DEFAULT_RA_NOTIFICATION_BUSNAME,
parser.add_option("-B", "--notification_bus", dest="notification_bus", type="string",
default=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME,
help="Bus or queue we publish resource requests on. [default: %default]")
(options, args) = parser.parse_args()
if not options.otdb_busname or not options.my_busname:
if not options.otdb_busname or not options.notification_bus:
parser.print_help()
sys.exit(1)
with RATaskSpecified("OTDB.TaskSpecified", otdb_busname=options.otdb_busname, my_busname=options.my_busname) as jts:
with RATaskSpecified(otdb_notification_busname=options.otdb_busname,
otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
notification_busname=options.notification_bus,
notification_subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT):
waitForInterrupt()
......@@ -29,7 +29,7 @@ from lofar.messaging import FromBus, ToBus, RPC, EventMessage
from lofar.parameterset import PyParameterValue
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.common.util import waitForInterrupt
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
import logging
logger = logging.getLogger(__name__)
......@@ -156,11 +156,16 @@ def resourceIndicatorsFromParset( parset ):
return subset
class RATaskSpecified(OTDBBusListener):
def __init__(self, servicename, otdb_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, my_busname=DEFAULT_RA_NOTIFICATION_BUSNAME, **kwargs):
super(RATaskSpecified, self).__init__(busname=otdb_busname, subject="TaskStatus", **kwargs)
def __init__(self,
otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME,
otdb_notification_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
notification_busname=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME,
notification_subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT,
**kwargs):
super(RATaskSpecified, self).__init__(busname=otdb_busname, subject=otdb_notification_subject, **kwargs)
self.parset_rpc = RPC(service="TaskSpecification", busname=otdb_busname)
self.send_bus = ToBus("%s/%s" % (my_busname, servicename))
self.send_bus = ToBus("%s/%s" % (notification_busname, notification_subject))
def start_listening(self, **kwargs):
self.parset_rpc.open()
......
......@@ -3,10 +3,3 @@
DEFAULT_BUSNAME = 'lofar.ra.command'
DEFAULT_SERVICENAME = 'RAService'
try:
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_NOTIFICATION_BUSNAME as RATASKSPECIFIED_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.rataskspecified.config import RATASKSPECIFIED_NOTIFICATIONNAME as RATASKSPECIFIED_NOTIFICATIONNAME
except ImportError:
RATASKSPECIFIED_NOTIFICATION_BUSNAME = 'lofar.ra.notification'
RATASKSPECIFIED_NOTIFICATIONNAME = 'OTDB.TaskSpecified'
......@@ -36,15 +36,16 @@ from lofar.sas.resourceassignment.rataskspecified.RABusListener import RATaskSpe
from lofar.messaging.RPC import RPC, RPCException
import lofar.sas.resourceassignment.resourceassignmentservice.rpc as rarpc
from lofar.sas.resourceassignment.resourceassigner.config import RATASKSPECIFIED_NOTIFICATION_BUSNAME, RATASKSPECIFIED_NOTIFICATIONNAME
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.rataskspecified.config import DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT
from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAssigner
logger = logging.getLogger(__name__)
class SpecifiedTaskListener(RATaskSpecifiedBusListener):
def __init__(self,
busname=RATASKSPECIFIED_NOTIFICATION_BUSNAME,
subject=RATASKSPECIFIED_NOTIFICATIONNAME,
busname=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME,
subject=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT,
broker=None,
assigner=None,
**kwargs):
......@@ -87,15 +88,33 @@ def main():
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description='runs the resourceassigner service')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("--notification_busname", dest="notification_busname", type="string", default=RATASKSPECIFIED_NOTIFICATION_BUSNAME, help="Name of the notification bus on which taskspecified messages are published, default: %s" % RATASKSPECIFIED_NOTIFICATION_BUSNAME)
parser.add_option("--notification_subject", dest="notification_subject", type="string", default=RATASKSPECIFIED_NOTIFICATIONNAME, help="Subject of the published taskspecified messages to listen for, default: %s" % RATASKSPECIFIED_NOTIFICATIONNAME)
parser.add_option("--radb_busname", dest="radb_busname", type="string", default=RADB_BUSNAME, help="Name of the bus on which the radb service listens, default: %s" % RADB_BUSNAME)
parser.add_option("--radb_servicename", dest="radb_servicename", type="string", default=RADB_SERVICENAME, help="Name of the radb service, default: %s" % RADB_SERVICENAME)
parser.add_option("--re_busname", dest="re_busname", type="string", default=RE_BUSNAME, help="Name of the bus on which the resource estimator service listens, default: %s" % RE_BUSNAME)
parser.add_option("--re_servicename", dest="re_servicename", type="string", default=RE_SERVICENAME, help="Name of the resource estimator service, default: %s" % RE_SERVICENAME)
parser.add_option("--ssdb_busname", dest="ssdb_busname", type="string", default=SSDB_BUSNAME, help="Name of the bus on which the ssdb service listens, default: %s" % SSDB_BUSNAME)
parser.add_option("--ssdb_servicename", dest="ssdb_servicename", type="string", default=SSDB_SERVICENAME, help="Name of the ssdb service, default: %s" % SSDB_SERVICENAME)
parser.add_option('-q', '--broker', dest='broker', type='string',
default=None,
help='Address of the qpid broker, default: localhost')
parser.add_option("--notification_busname", dest="notification_busname", type="string",
default=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_BUSNAME,
help="Name of the notification bus on which taskspecified messages are published. [default: %default]")
parser.add_option("--notification_subject", dest="notification_subject", type="string",
default=DEFAULT_RA_TASK_SPECIFIED_NOTIFICATION_SUBJECT,
help="Subject of the published taskspecified messages to listen for. [default: %default]")
parser.add_option("--radb_busname", dest="radb_busname", type="string",
default=RADB_BUSNAME,
help="Name of the bus on which the radb service listens. [default: %default]")
parser.add_option("--radb_servicename", dest="radb_servicename", type="string",
default=RADB_SERVICENAME,
help="Name of the radb service. [default: %default]")
parser.add_option("--re_busname", dest="re_busname", type="string",
default=RE_BUSNAME,
help="Name of the bus on which the resource estimator service listens. [default: %default]")
parser.add_option("--re_servicename", dest="re_servicename", type="string",
default=RE_SERVICENAME,
help="Name of the resource estimator service. [default: %default]")
parser.add_option("--ssdb_busname", dest="ssdb_busname", type="string",
default=SSDB_BUSNAME,
help="Name of the bus on which the ssdb service listens. [default: %default]")
parser.add_option("--ssdb_servicename", dest="ssdb_servicename", type="string",
default=SSDB_SERVICENAME,
help="Name of the ssdb service. [default: %default]")
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment