Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
Service.py 7.10 KiB
#!/usr/bin/python
# Service.py: Service definition for the lofar.messaging module.
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#

from lofar.messaging.messagebus import ToBus,FromBus
from lofar.messaging.messages import EventMessage,ServiceMessage
import threading
import time
import uuid
import sys
import traceback

# create service:
class Service():
   """
   Service class for registering python functions with a Service name on a messgage bus.
   """
   def __init__(self,BusName,ServiceName,ServiceHandler,options=None,exclusive=True,numthreads=1,Verbose=False):
      self.BusName=BusName
      self.ServiceName=ServiceName
      self.ServiceHandler=ServiceHandler
      self.connected=False
      self.running=False
      self.exclusive=exclusive
      self.link_uuid=str(uuid.uuid4())
      self._numthreads=numthreads
      self.Verbose=Verbose
      self.options={}
      self.options["capacity"]=numthreads*20
      if (self.exclusive==True):
         self.options["link"]="{name:\""+self.link_uuid+"\", x-bindings:[{key:" + self.ServiceName + ", arguments: {\"qpid.exclusive-binding\":True}}]}"
      if (isinstance(options,dict)):
         for key,val in options.iter_items():
            self.options[key]=val
      if (self.BusName!=None):
         self.Listen=FromBus(self.BusName+"/"+self.ServiceName,options=self.options)
         self.Reply=ToBus(self.BusName)
      else:
         self.Listen=FromBus(self.ServiceName,options=self.options)
         self.Reply=self.replyto

   def _debug(self,txt):
      if (self.Verbose==True):
         print(txt)

   def StartListening(self,numthreads=None):
      if (numthreads!=None):
         self._numthreads=numthreads
      self.connected=True
      self.running=True
      self._tr=[]
      self.counter=[]
      for i in range(self._numthreads):
             self._tr.append(threading.Thread(target=self.loop,args=[i]))
             self.counter.append(0)
             self._tr[i].start()

   def __enter__(self):
      if (isinstance(self.Listen,FromBus)):
         self.Listen.open()
      if (isinstance(self.Reply,ToBus)):
         self.Reply.open()
      return self

   def __exit__(self, exc_type, exc_val, exc_tb):
      self.StopListening()

   def loop(self,index):
      print( "Thread %d START Listening for messages on Bus %s and service name %s." %(index,self.BusName,self.ServiceName)) 
      while self.running:
         msg=None
         try:
           # get the next message
           msg=self.Listen.receive(1)
         except Exception as e:
           print e

         try:
           if (isinstance(msg,ServiceMessage)):
             # Initial status is unknown
             status="unknown"
             backtrace=None
             errtxt=None

             # Keep track of number of processed messages
             self.counter[index]+=1
             replymessage=""
             # Execute the service handler function and send reply back to client
             try:
                self._debug("Running handler")
                replymessage=self.ServiceHandler(msg.content)
                self._debug("finished handler")
                status="OK"
                self._debug(status)
             except Exception as e:
                # Any thrown exceptions either Service exception or unhandled exception
                # during the execution of the service handler is caught here.
                self._debug("handling exception")
                exc_type, exc_value, exc_traceback = sys.exc_info()
                backtrace=traceback.format_exception(exc_type, exc_value, exc_traceback)
                errtxt=backtrace[-1]
                self._debug(backtrace)
                del backtrace[1]
                del backtrace[0]
                del backtrace[-1]
                status="ERROR"
                backtrace= ''.join(backtrace).encode('latin-1').decode('unicode_escape')
                self._debug(backtrace)
                if self.Verbose==True:
                  print status
                  print errtxt
                  print backtrace
                replymessage=None


             self._debug("Done call")
             # Compile Event message from reply and status.
             ToSend=EventMessage(replymessage)
             ToSend.status=status
             if (errtxt!=None):
                ToSend.errmsg=errtxt
             if (backtrace!=None):
                ToSend.backtrace=backtrace

             # ensure to deliver at the destination in the reply_to field
             ToSend.subject=msg.reply_to

             # show the message content if required by the Verbose flag.
             if (self.Verbose==True):
               msg.show()
               ToSend.show()

             # send the result to the RPC client
             if (isinstance(self.Reply,ToBus)):
               self.Reply.send(ToSend)
             else:
               dest=ToBus(self.Reply)
               with dest:
                  dest.send(ToSend)

             # acknowledge the message to the messaging subsystem
             self.Listen.ack(msg)
           else:
             # Report if message wasn't a ServiceMessage.
             if (msg!=None):
                print "Received wrong messagetype %s, ServiceMessage expected." %(str(type(msg)))

         except Exception as e:
            # Unknown problem in the library. Report this and continue.
            excinfo = sys.exc_info()
            print "ERROR during processing of incoming message."
            traceback.print_exception(*excinfo)
            print "Thread %d: Resuming listening on bus %s for service %s" %(index,self.BusName,self.ServiceName)

      print("Thread %d STOPPED Listening for messages on Bus %s and service name %s and %d processed." %(index,self.BusName,self.ServiceName,self.counter[index]))

   def StopListening(self):
      # stop all running threads
      if (self.running):
           self.running=False
           for i in range(self._numthreads):
             self._tr[i].join()
      # possibly doubly defined..
      if (self.connected):
           self.connected=False
           if (isinstance(self.Listen,FromBus)):
              self.Listen.close()
           if (isinstance(self.Reply,ToBus)):
              self.Reply.close()

   def WaitForInterrupt(self):
      looping=True
      while looping:
         try:
           time.sleep(100)
         except (KeyboardInterrupt): 
           looping=False
           print("Keyboard interrupt received.")