diff --git a/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py b/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py index b795c18ca09e2e585254333691440af99aeeb835..ffea01f23055802d2a803d62b1173140b0db2464 100644 --- a/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py +++ b/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py @@ -19,8 +19,7 @@ # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # -from lofar.common.util import humanreadablesize -from lofar.messaging.messagebus import AbstractMessageHandler, BusListener +from lofar.messaging.messagebus import AbstractMessageHandler, BusListener, LofarMessage, EventMessage from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.mac.tbbservice.config import DEFAULT_TBB_NOTIFICATION_PREFIX @@ -28,35 +27,37 @@ import logging logger = logging.getLogger() -class TBBBusMessageHandler(AbstractMessageHandler): +class TBBEventMessageHandler(AbstractMessageHandler): def __init__(self): - super(TBBBusMessageHandler, self).__init__() - self.subject_prefix = DEFAULT_TBB_NOTIFICATION_PREFIX # TODO: This used to be defined on the AbstractBusListener. Can be removed? + super().__init__() - def _handleMessage(self, msg): + def handle_message(self, msg: LofarMessage): # try to handle an incoming message, and call the associated on<SomeMessage> method - try: - logger.info("on%s: %s", msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' ')) - - if msg.subject == '%sTBBServiceStarted' % self.subject_prefix: - self.onTBBServiceStarted(msg.content) - elif msg.subject == '%sTBBServiceStopped' % self.subject_prefix: - self.onTBBServiceStopped(msg.content) - elif msg.subject == '%sDataWritersStarting' % self.subject_prefix: - self.onDataWritersStarting(msg.content) - elif msg.subject == '%sDataWritersStarted' % self.subject_prefix: - self.onDataWritersStarted(msg.content) - elif msg.subject == '%sDataWritersFinished' % self.subject_prefix: - self.onDataWritersFinished(msg.content) - elif msg.subject == '%sDataWritersStopping' % self.subject_prefix: - self.onDataWritersStopping(msg.content) - elif msg.subject == '%sDataWritersStopped' % self.subject_prefix: - self.onDataWritersStopped(msg.content) - else: - logger.error("TBBBusListener.handleMessage: unknown subject: %s", msg.subject) - except Exception as e: - logger.exception("TBBBusListener.handleMessage: %s", e) - raise + if not isinstance(msg, EventMessage): + raise ValueError("%s: Ignoring non-EventMessage: %s" % (self.__class__.__name__, msg)) + + stripped_subject = msg.subject.replace("%s." % DEFAULT_TBB_NOTIFICATION_PREFIX, '') + + logger.info("TBBEventMessageHandler.handleMessage on%s: %s" % (stripped_subject, str(msg.content).replace('\n', ' '))) + + logger.info("on%s: %s", msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' ')) + + if stripped_subject == 'TBBServiceStarted': + self.onTBBServiceStarted(msg.content) + elif stripped_subject == 'TBBServiceStopped': + self.onTBBServiceStopped(msg.content) + elif stripped_subject == 'DataWritersStarting': + self.onDataWritersStarting(msg.content) + elif stripped_subject == 'DataWritersStarted': + self.onDataWritersStarted(msg.content) + elif stripped_subject == 'DataWritersFinished': + self.onDataWritersFinished(msg.content) + elif stripped_subject == 'DataWritersStopping': + self.onDataWritersStopping(msg.content) + elif stripped_subject == 'DataWritersStopped': + self.onDataWritersStopped(msg.content) + else: + raise ValueError("TBBEventMessageHandler.handleMessage: unknown subject: %s" % msg.subject) def onTBBServiceStarted(self, msg_content): '''onTBBServiceStarted is called upon receiving a TBBServiceStarted message. @@ -100,14 +101,17 @@ class TBBBusListener(BusListener): If you want to implement your own behaviour, then derive a subclass of the OTDBEventMessageHandler, and inject that in this OTDBBusListener. See example at the top of this file. """ - def __init__(self, handler_type: TBBBusMessageHandler.__class__ = TBBBusMessageHandler, handler_kwargs: dict = None, + def __init__(self, handler_type: TBBEventMessageHandler.__class__ = TBBEventMessageHandler, + handler_kwargs: dict = None, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, num_threads: int = 1): - if not issubclass(handler_type, TBBBusMessageHandler): + if not issubclass(handler_type, TBBEventMessageHandler): raise TypeError("handler_type should be a TBBBusMessagehandler subclass") - super(TBBBusListener, self).__init__(handler_type=handler_type, handler_kwargs=handler_kwargs, - exchange=exchange, routing_key="%s.#" % (DEFAULT_TBB_NOTIFICATION_PREFIX), - num_threads=num_threads, broker=broker) + super(TBBBusListener, self).__init__(handler_type=handler_type, + handler_kwargs=handler_kwargs, + exchange=exchange, + routing_key="%s.#" % (DEFAULT_TBB_NOTIFICATION_PREFIX), + num_threads=num_threads, broker=broker) def main(): diff --git a/MAC/Services/TBB/TBBServiceCommon/lib/config.py b/MAC/Services/TBB/TBBServiceCommon/lib/config.py index db8658ca8ae452c970d493936aa40fbddac80d58..ed968623e16b466c4c6d6a45fcb6024b8abeba6f 100644 --- a/MAC/Services/TBB/TBBServiceCommon/lib/config.py +++ b/MAC/Services/TBB/TBBServiceCommon/lib/config.py @@ -1,3 +1,3 @@ -DEFAULT_TBB_SERVICENAME = 'TBBService' +DEFAULT_TBB_SERVICENAME = 'TBB.Service' -DEFAULT_TBB_NOTIFICATION_PREFIX = 'TBB.' +DEFAULT_TBB_NOTIFICATION_PREFIX = 'TBB.notification'