Skip to content
Snippets Groups Projects
Commit 8e1b6066 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8887: added --notification_busname and --notification_subject options...

Task #8887: added --notification_busname and --notification_subject options for SpecifiedTaskListener
parent a03740d5
No related branches found
No related tags found
No related merge requests found
......@@ -30,6 +30,7 @@ to assign resources to these tasks.
import qpid.messaging
import logging
from datetime import datetime
import pprint
from lofar.sas.resourceassignment.rataskspecified.RABusListener import RATaskSpecifiedBusListener
from lofar.messaging.RPC import RPC
......@@ -40,7 +41,6 @@ from lofar.sas.resourceassignment.resourceassigner.config import RATASKSPECIFIED
logger = logging.getLogger(__name__)
class SpecifiedTaskListener(RATaskSpecifiedBusListener):
def __init__(self,
busname=RATASKSPECIFIED_NOTIFICATION_BUSNAME,
......@@ -60,9 +60,19 @@ class SpecifiedTaskListener(RATaskSpecifiedBusListener):
super(SpecifiedTaskListener, self).__init__(busname=busname, subject=subject, broker=broker, **kwargs)
def onTaskSpecified(self, sasId, modificationTime, resourceIndicators):
logger.info('onTaskSpecified: sasId=%s' % sasId)
__all__ = ["SpecifiedTaskListener"]
logger.info('onTaskSpecified: sasId=%s resourceIndicators==%s' % (sasId, pprint.pformat(resourceIndicators)))
#cluster = parseSpecification(resourceIndicators)
#needed = getNeededResouces(resourceIndicators)
#available = getAvailableResources(cluster)
#if checkResources(needed, available):
#result = claimResources(needed)
#if result.success:
#commitResources(result.id)
##SetTaskToSCHEDULED(Task.)
#else:
##SetTaskToCONFLICT(Task.)
#pass
def parseSpecification(specification):
default = "CEP2"
......@@ -73,7 +83,7 @@ def getNeededResouces(specification):
# Used settings
ServiceName = "ToUpper"
BusName = "simpletest"
# Initialize a Remote Procedure Call object
with RPC(BusName,ServiceName) as remote_fn:
replymessage, status = remote_fn("Hello World ToUpper.")
......@@ -83,7 +93,7 @@ def getAvailableResources(cluster):
# Used settings
ServiceName = "GetServerState"
BusName = "simpletest"
groupnames = []
available = []
with RPC(BusName, ServiceName) as GetServerState:
......@@ -112,18 +122,8 @@ def claimResources(needed):
def commitResources(result_id):
pass
def onTaskSpecified(treeId, modificationTime, specification):
cluster = parseSpecification(specification)
needed = getNeededResouces(specification)
available = getAvailableResources(cluster)
if checkResources(needed, available):
result = claimResources(needed)
if result.success:
commitResources(result.id)
#SetTaskToSCHEDULED(Task.)
else:
#SetTaskToCONFLICT(Task.)
pass
__all__ = ["SpecifiedTaskListener"]
def main():
from optparse import OptionParser
......@@ -136,6 +136,8 @@ def main():
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME)
parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME)
parser.add_option("-n", "--notification_busname", dest="notification_busname", type="string", default=RATASKSPECIFIED_NOTIFICATION_BUSNAME, help="Name of the notification bus on which taskspecified messages are published, default: %s" % RATASKSPECIFIED_NOTIFICATION_BUSNAME)
parser.add_option("-m", "--notification_subject", dest="notification_subject", type="string", default=RATASKSPECIFIED_NOTIFICATIONNAME, help="Subject of the published taskspecified messages to listen for, default: %s" % RATASKSPECIFIED_NOTIFICATIONNAME)
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
......@@ -143,7 +145,9 @@ def main():
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
with SpecifiedTaskListener() as listener:
with SpecifiedTaskListener(busname=options.notification_busname,
subject=options.notification_subject,
broker=options.broker) as listener:
waitForInterrupt()
if __name__ == '__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment