Skip to content
Snippets Groups Projects
Commit a723d2fe authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-699: adaptations to new messaging and rpc.py module

parent 5700a6e2
No related branches found
No related tags found
No related merge requests found
......@@ -19,8 +19,7 @@
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
from lofar.common.util import humanreadablesize
from lofar.messaging.messagebus import AbstractMessageHandler, BusListener
from lofar.messaging.messagebus import AbstractMessageHandler, BusListener, LofarMessage, EventMessage
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.mac.tbbservice.config import DEFAULT_TBB_NOTIFICATION_PREFIX
......@@ -28,35 +27,37 @@ import logging
logger = logging.getLogger()
class TBBBusMessageHandler(AbstractMessageHandler):
class TBBEventMessageHandler(AbstractMessageHandler):
def __init__(self):
super(TBBBusMessageHandler, self).__init__()
self.subject_prefix = DEFAULT_TBB_NOTIFICATION_PREFIX # TODO: This used to be defined on the AbstractBusListener. Can be removed?
super().__init__()
def _handleMessage(self, msg):
def handle_message(self, msg: LofarMessage):
# try to handle an incoming message, and call the associated on<SomeMessage> method
try:
logger.info("on%s: %s", msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' '))
if msg.subject == '%sTBBServiceStarted' % self.subject_prefix:
self.onTBBServiceStarted(msg.content)
elif msg.subject == '%sTBBServiceStopped' % self.subject_prefix:
self.onTBBServiceStopped(msg.content)
elif msg.subject == '%sDataWritersStarting' % self.subject_prefix:
self.onDataWritersStarting(msg.content)
elif msg.subject == '%sDataWritersStarted' % self.subject_prefix:
self.onDataWritersStarted(msg.content)
elif msg.subject == '%sDataWritersFinished' % self.subject_prefix:
self.onDataWritersFinished(msg.content)
elif msg.subject == '%sDataWritersStopping' % self.subject_prefix:
self.onDataWritersStopping(msg.content)
elif msg.subject == '%sDataWritersStopped' % self.subject_prefix:
self.onDataWritersStopped(msg.content)
else:
logger.error("TBBBusListener.handleMessage: unknown subject: %s", msg.subject)
except Exception as e:
logger.exception("TBBBusListener.handleMessage: %s", e)
raise
if not isinstance(msg, EventMessage):
raise ValueError("%s: Ignoring non-EventMessage: %s" % (self.__class__.__name__, msg))
stripped_subject = msg.subject.replace("%s." % DEFAULT_TBB_NOTIFICATION_PREFIX, '')
logger.info("TBBEventMessageHandler.handleMessage on%s: %s" % (stripped_subject, str(msg.content).replace('\n', ' ')))
logger.info("on%s: %s", msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' '))
if stripped_subject == 'TBBServiceStarted':
self.onTBBServiceStarted(msg.content)
elif stripped_subject == 'TBBServiceStopped':
self.onTBBServiceStopped(msg.content)
elif stripped_subject == 'DataWritersStarting':
self.onDataWritersStarting(msg.content)
elif stripped_subject == 'DataWritersStarted':
self.onDataWritersStarted(msg.content)
elif stripped_subject == 'DataWritersFinished':
self.onDataWritersFinished(msg.content)
elif stripped_subject == 'DataWritersStopping':
self.onDataWritersStopping(msg.content)
elif stripped_subject == 'DataWritersStopped':
self.onDataWritersStopped(msg.content)
else:
raise ValueError("TBBEventMessageHandler.handleMessage: unknown subject: %s" % msg.subject)
def onTBBServiceStarted(self, msg_content):
'''onTBBServiceStarted is called upon receiving a TBBServiceStarted message.
......@@ -100,14 +101,17 @@ class TBBBusListener(BusListener):
If you want to implement your own behaviour, then derive a subclass of the OTDBEventMessageHandler, and inject that in this OTDBBusListener.
See example at the top of this file.
"""
def __init__(self, handler_type: TBBBusMessageHandler.__class__ = TBBBusMessageHandler, handler_kwargs: dict = None,
def __init__(self, handler_type: TBBEventMessageHandler.__class__ = TBBEventMessageHandler,
handler_kwargs: dict = None,
exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, num_threads: int = 1):
if not issubclass(handler_type, TBBBusMessageHandler):
if not issubclass(handler_type, TBBEventMessageHandler):
raise TypeError("handler_type should be a TBBBusMessagehandler subclass")
super(TBBBusListener, self).__init__(handler_type=handler_type, handler_kwargs=handler_kwargs,
exchange=exchange, routing_key="%s.#" % (DEFAULT_TBB_NOTIFICATION_PREFIX),
num_threads=num_threads, broker=broker)
super(TBBBusListener, self).__init__(handler_type=handler_type,
handler_kwargs=handler_kwargs,
exchange=exchange,
routing_key="%s.#" % (DEFAULT_TBB_NOTIFICATION_PREFIX),
num_threads=num_threads, broker=broker)
def main():
......
DEFAULT_TBB_SERVICENAME = 'TBBService'
DEFAULT_TBB_SERVICENAME = 'TBB.Service'
DEFAULT_TBB_NOTIFICATION_PREFIX = 'TBB.'
DEFAULT_TBB_NOTIFICATION_PREFIX = 'TBB.notification'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment