Skip to content
Snippets Groups Projects
Select Git revision
  • 57901e6460ca0b5e1a53db82e2440bcba6c118c2
  • main default protected
  • dev
3 results

imcal.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    msgbus.py 5.81 KiB
    #!/usr/bin/python
    # Copyright (C) 2012-2013  ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    #
    # id.. TDB
    
    try:
      import qpid.messaging as messaging
      MESSAGING_ENABLED = True
    except ImportError:
      import noqpidfallback as messaging
      MESSAGING_ENABLED = False
    
    import os
    import signal
    import logging
    import lofar.messagebus.message as message
    import atexit
    
    # Candidate for a config file
    broker="127.0.0.1" 
    options="create:never"
    
    # Define logging. Until we have a python loging framework, we'll have
    # to do any initialising here
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    logger=logging.getLogger("MessageBus")
    
    class BusException(Exception):
        pass
    
    class Session:
        def __init__(self, broker):
            self.closed = False
    
            logger.info("[Bus] Connecting to broker %s", broker)
            self.connection = messaging.Connection(broker)
            self.connection.reconnect = True
            logger.info("[Bus] Connected to broker %s", broker)
    
            try:
                self.connection.open()
                self.session = self.connection.session() 
            except messaging.MessagingError, m:
                raise BusException(m)
    
            # NOTE: We cannuot use:
            #  __del__: its broken (does not always get called, destruction order is unpredictable)
            #  with:    not supported in python 2.4, does not work well on arrays of objects
            #  weakref: dpes not guarantee to be called (depends on gc)
            #
            # Note that this atexit call will prevent self from being destructed until the end of the program,
            # since a reference will be retained
            atexit.register(self.close)
    
        def close(self):
            if self.closed:
                return
    
            self.closed = True
    
            # NOTE: session.close() freezes under certain error conditions,
            # f.e. opening a receiver on a non-existing queue.
            # This seems to happen whenever a Python exception was thrown
            # by the qpid wrapper.
            #
            # This especially happens if we would put this code in __del__.
            # Note that we cannot use __enter__ and __exit__ either due to
            # ccu001/ccu099 still carrying python 2.4.
            #
            # See https://issues.apache.org/jira/browse/QPID-6402
            #
            # We set a timeout to prevent freezing, which obviously leads
            # to data loss if the stall was legit.
            try:
                self.connection.close(5.0)
            except messaging.exceptions.Timeout, t:
                logger.error("[Bus] Could not close connection: %s", t)
    
        def __enter__(self):
            return self
    
        def __exit__(self, type, value, traceback):
            self.close()
            return False
    
        def address(self, queue, options):
            return "%s%s; {%s}" % (self._queue_prefix(), queue, options)
    
        def _queue_prefix(self):
          return os.environ.get("QUEUE_PREFIX", "")
    
    class ToBus(Session):
        def __init__(self, queue, options=options, broker=broker):
            Session.__init__(self, broker)
            self.queue = queue
    
            try:
                self.sender = self.session.sender(self.address(queue, options))
            except messaging.MessagingError, m:
                raise BusException(m)
    
        def send(self, msg):
            try:
                logger.info("[ToBus] Sending message to queue %s", self.queue)
    
                try:
                  # Send Message or MessageContent object
                  self.sender.send(msg.qpidMsg())
                except AttributeError:
                  # Send string or messaging.Message object
                  self.sender.send(msg)
    
                logger.info("[ToBus] Message sent to queue %s", self.queue)
            except messaging.SessionError, m:
                raise BusException(m)
    
    class FromBus(Session):
        def __init__(self, queue, options=options, broker=broker):
            Session.__init__(self, broker)
    
            self.add_queue(queue, options)
    
        def add_queue(self, queue, options=options):
            try:
                receiver = self.session.receiver(self.address(queue, options))
    
                # Need capacity >=1 for 'self.session.next_receiver' to function across multiple queues
                receiver.capacity = 1 #32
            except messaging.MessagingError, m:
                raise BusException(m)
    
        def get(self, timeout=None):
            msg = None
    
            logger.info("[FromBus] Waiting for message")
            try:
                receiver = self.session.next_receiver(timeout)
                if receiver != None:
                    logger.info("[FromBus] Message available on queue %s", receiver.source)
                    msg = receiver.fetch() # receiver.get() is better, but requires qpid 0.31+
                    if msg is None:
                        logger.error("[FromBus] Could not retrieve available message on queue %s", receiver.source)
                    else:
                        logger.info("[FromBus] Message received on queue %s", receiver.source)
            except messaging.exceptions.Empty, e:
                return None
    
            if msg is None:
              return None
            else:
              return message.Message(qpidMsg=msg)
    
        def ack(self, msg):
            self.session.acknowledge(msg.qpidMsg())
            logging.info("[FromBus] Message ACK'ed");