diff --git a/SAS/OTDB_Services/TreeService.py b/SAS/OTDB_Services/TreeService.py index 86ce8f988e1c024702c660d53c9eabab033bfecc..f9c8b33f438efdb2e9dc7d4a6b6f791cd27cf7df 100755 --- a/SAS/OTDB_Services/TreeService.py +++ b/SAS/OTDB_Services/TreeService.py @@ -39,12 +39,12 @@ GetStations --- : Returns a list of the defined stations from the SetProject --- : Creates or updates the information of a project/campaign. """ -import sys, time, pg +import time +import pg import logging from datetime import datetime -from lofar.messaging.Service import * +from lofar.messaging.rpc_service import ServiceMessageHandler, Service from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.common.util import waitForInterrupt from .config import DEFAULT_OTDB_SERVICENAME QUERY_EXCEPTIONS = (TypeError, ValueError, MemoryError, pg.ProgrammingError, pg.InternalError) @@ -57,14 +57,18 @@ VIC_TREE = 30 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) + # Define our own exceptions class FunctionError(Exception): "Something when wrong during the execution of the function" pass + + class DatabaseError(Exception): "Connection with the database could not be made" pass + # Task Get IDs def TaskGetIDs(input_dict, db_connection, return_tuple=True): """ @@ -113,6 +117,7 @@ def TaskGetIDs(input_dict, db_connection, return_tuple=True): return (None, otdb_id, mom_id) if return_tuple else [None, otdb_id, mom_id] # TODO have this return a Dict + # Task Get Specification def TaskGetSpecification(input_dict, db_connection): """ @@ -160,6 +165,7 @@ def TaskGetSpecification(input_dict, db_connection): answer_dict["tree"] = answer_list return {'TaskSpecification':answer_dict} + # Task Get TreeInfo def TaskGetTreeInfo(otdb_id, db_connection): result = db_connection.query("""select momID, groupID, d_creation, modificationdate, state, starttime, stoptime, processType, processSubtype, description @@ -244,6 +250,7 @@ def TaskCreate(input_dict, db_connection): # When we are here we always have a task, so do the key updates return TaskSetSpecification({'OtdbID':otdb_id, 'Specification':input_dict['Specification']}, db_connection) + # Task Get State def TaskGetStatus(otdb_id, db_connection): result = db_connection.query("""select treestate.id, treestate.name from otdbtree @@ -254,6 +261,7 @@ def TaskGetStatus(otdb_id, db_connection): return {'OtdbID':otdb_id, 'status_id': -1, 'status': 'unknown'} + # Task Set State def TaskSetStatus(input_dict, db_connection): """ @@ -505,6 +513,7 @@ def GetDefaultTemplates(input_dict, db_connection): return { 'DefaultTemplates': Templates } + # Get Stations def GetStations(input_dict, db_connection): """ @@ -529,6 +538,7 @@ def GetStations(input_dict, db_connection): return { 'Stations': Stations } + # Set Project def SetProject(input_dict, db_connection): """ @@ -564,7 +574,7 @@ def SetProject(input_dict, db_connection): return { "projectID": project_id } -class PostgressMessageHandler(MessageHandlerInterface): +class PostgresMessageHandler(ServiceMessageHandler): """ Implements a generic message handlers for services that are tied to a postgres database. kwargs must contain the keys: @@ -573,30 +583,13 @@ class PostgressMessageHandler(MessageHandlerInterface): db_host <string> Name of the machine the database server is running function <type> Function to call when a message is received on the message bus. """ - def __init__(self, **kwargs): - super(PostgressMessageHandler, self).__init__() - self.dbcreds = kwargs.pop("dbcreds") - if len(kwargs): - raise AttributeError("Unknown keys in arguments of 'DatabaseTiedMessageHandler: %s" % kwargs) + def __init__(self, dbcreds): + super(PostgresMessageHandler, self).__init__() + self.dbcreds = dbcreds self.connection = None self.connected = False - self.service2MethodMap = { - "TaskGetSpecification": self._TaskGetSpecification, - "TaskCreate": self._TaskCreate, - "TaskGetStatus": self._TaskGetStatus, - "TaskSetStatus": self._TaskSetStatus, - "TaskSetSpecification": self._TaskSetSpecification, - "TaskPrepareForScheduling": self._TaskPrepareForScheduling, - "TaskGetIDs": self._TaskGetIDs, - "TaskDelete": self._TaskDelete, - "GetDefaultTemplates": self._GetDefaultTemplates, - "GetStations": self._GetStations, - "SetProject": self._SetProject, - "TaskGetTreeInfo": self._TaskGetTreeInfo - } - - def prepare_receive(self): + def before_receive_message(self): "Called in main processing loop just before a blocking wait for messages is done." "Make sure we are connected with the database." self.connected = (self.connection and self.connection.status == 1) @@ -611,94 +604,88 @@ class PostgressMessageHandler(MessageHandlerInterface): time.sleep(5) # The following functions are called from the Service code. - def _TaskGetSpecification(self, **kwargs): + def TaskGetSpecification(self, **kwargs): logger.info("_TaskGetSpecification({0})".format(kwargs)) return TaskGetSpecification(kwargs, self.connection) - def _TaskCreate(self, **kwargs): + def TaskCreate(self, **kwargs): logger.info("_TaskCreate({0})".format(kwargs)) return TaskCreate(kwargs, self.connection) - def _TaskGetStatus(self, **kwargs): + def TaskGetStatus(self, **kwargs): logger.info("_TaskGetStatus({0})".format(kwargs)) return TaskGetStatus(kwargs.get('otdb_id'), self.connection) - def _TaskSetStatus(self, **kwargs): + def TaskSetStatus(self, **kwargs): logger.info("_TaskSetStatus({0})".format(kwargs)) return TaskSetStatus(kwargs, self.connection) - def _TaskGetTreeInfo(self, **kwargs): + def TaskGetTreeInfo(self, **kwargs): logger.info("_TaskGetTreeInfo({0})".format(kwargs)) return TaskGetTreeInfo(kwargs.get('otdb_id'), self.connection) - def _TaskSetSpecification(self, **kwargs): + def TaskSetSpecification(self, **kwargs): logger.info("_TaskSetSpecification({0})".format(kwargs)) return TaskSetSpecification(kwargs, self.connection) - def _TaskPrepareForScheduling(self, **kwargs): + def TaskPrepareForScheduling(self, **kwargs): logger.info("_TaskPrepareForScheduling({0})".format(kwargs)) return TaskPrepareForScheduling(kwargs, self.connection) - def _TaskGetIDs(self, **kwargs): + def TaskGetIDs(self, **kwargs): logger.info("_TaskGetIDs({0})".format(kwargs)) return TaskGetIDs(kwargs, self.connection, return_tuple=False) - def _TaskDelete(self, **kwargs): + def TaskDelete(self, **kwargs): logger.info("_TaskDelete({0})".format(kwargs)) return TaskDelete(kwargs, self.connection) - def _GetDefaultTemplates(self, **kwargs): + def GetDefaultTemplates(self, **kwargs): logger.info("_GetDefaultTemplates()") return GetDefaultTemplates(kwargs, self.connection) - def _GetStations(self, **kwargs): + def GetStations(self, **kwargs): logger.info("_GetStations()") return GetStations(kwargs, self.connection) - def _SetProject(self, **kwargs): + def SetProject(self, **kwargs): logger.info("_SetProject({0})".format(kwargs)) return SetProject(kwargs, self.connection) -def create_service(busname, dbcreds, broker=None): - return Service(DEFAULT_OTDB_SERVICENAME, - PostgressMessageHandler, - busname=busname, - use_service_methods=True, - numthreads=1, - handler_args={"dbcreds" : dbcreds}, - broker=broker, - verbose=True) + +def create_service(dbcreds, broker=DEFAULT_BROKER, exchange=DEFAULT_BUSNAME): + return Service(service_name=DEFAULT_OTDB_SERVICENAME, + handler_type=PostgresMessageHandler, handler_kwargs={"dbcreds": dbcreds}, + broker=broker, exchange=exchange) def main(): from optparse import OptionParser from lofar.common import dbcredentials from lofar.common.util import waitForInterrupt - from lofar.messaging import setQpidLogLevel # Check the invocation arguments parser = OptionParser("%prog [options]") - parser.add_option('-q', '--broker', dest='broker', type='string', - default=None, + parser.add_option('-b', '--broker', dest='broker', type='string', + default=DEFAULT_BROKER, help='Address of the qpid broker, default: localhost') - parser.add_option("-B", "--busname", dest="busname", type="string", + parser.add_option("-e", "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="Bus on which RPC commands are received. [default: %default]") - parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') + parser.add_option('-V', '--verbose', dest='verbose', action='store_true', + help='verbose logging') # Add options of dbcredentials: --database, --host, ... parser.add_option_group(dbcredentials.options_group(parser)) parser.set_defaults(dbcredentials="OTDB") (options, args) = parser.parse_args() - setQpidLogLevel(logging.INFO) dbcreds = dbcredentials.parse_options(options) print("###dbcreds:", dbcreds) - with create_service(busname=options.busname, - dbcreds=dbcreds, - broker=options.broker): + with create_service(broker=options.broker, exchange=options.exchage, dbcreds=dbcreds): waitForInterrupt() logger.info("Stopped the OTDB services") + if __name__ == "__main__": main() diff --git a/SAS/OTDB_Services/test/t_TreeService.py b/SAS/OTDB_Services/test/t_TreeService.py index b5a1800f317892d3c42523f2e394fac955f0950f..7719639f17211d11d9ed6e66b3f1adf73b8d9fc2 100644 --- a/SAS/OTDB_Services/test/t_TreeService.py +++ b/SAS/OTDB_Services/test/t_TreeService.py @@ -33,7 +33,7 @@ import testing.postgresql import psycopg2 import subprocess from lofar.messaging.messagebus import * -from lofar.messaging.RPC import * +from lofar.messaging.rpc_service import * from lofar.sas.otdb.TreeService import create_service from lofar.common.dbcredentials import Credentials @@ -72,116 +72,123 @@ try: proc1.wait(timeout=60) proc2.wait(timeout=60) - def do_rpc_catch_exception(exc_text, rpc_instance, arg_dict): + def do_rpc_catch_exception(exc_text, rpc_instance, method_name, arg_dict): try: - print("** Executing {0}({1})...".format(rpc_instance.ServiceName,arg_dict)) - (data, status) = (rpc_instance)(**arg_dict) + print("** Executing {0}({1})...".format(method_name, arg_dict)) + rpc_instance.execute(method_name=method_name, **arg_dict) raise Exception("Expected an exception {0}, didn't get any".format(exc_text)) except Exception: print("Caught expected exception {0}".format(exc_text)) print("======") - def do_rpc(rpc_instance, arg_dict): - print("** Executing {0}({1})...".format(rpc_instance.ServiceName,arg_dict)) - (data, status) = (rpc_instance)(**arg_dict) - if status != "OK": - raise Exception("Status returned is {0}".format(status)) - # if isinstance(data, dict): - # for key in sorted(data): - # print "%s ==> %s" % (key, data[key]) - # else: - print("result =", data) + def do_rpc(rpc_instance, method_name, arg_dict): + print("** Executing {0}({1})...".format(method_name, arg_dict)) + answer = rpc_instance.execute(method_name=method_name, **arg_dict) + print("result =", answer) print("======") - return data - - with TemporaryQueue(__name__) as tmp_queue: - busname = tmp_queue.address - - with create_service(busname, database_credentials): - with RPC("OTDBService.TaskGetIDs", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC: - # Existing: otdb_id:1099268, mom_id:353713 - do_rpc (otdbRPC, {'OtdbID': 1099268, 'MomID': 353713 }) - do_rpc (otdbRPC, {'OtdbID': 1099268, 'MomID': 5 }) - do_rpc (otdbRPC, {'OtdbID': 1099268, 'MomID': None }) - do_rpc (otdbRPC, {'OtdbID': 5, 'MomID': 353713 }) - do_rpc_catch_exception('', otdbRPC, {'OtdbID': 5, 'MomID': 5 }) - do_rpc_catch_exception('', otdbRPC, {'OtdbID': 5, 'MomID': None }) - do_rpc (otdbRPC, {'OtdbID': None, 'MomID': 353713 }) - do_rpc_catch_exception('', otdbRPC, {'OtdbID': None, 'MomID': 5 }) - do_rpc_catch_exception('', otdbRPC, {'OtdbID': None, 'MomID': None }) - - with RPC("OTDBService.GetDefaultTemplates", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC: - do_rpc(otdbRPC,{}) - - with RPC("OTDBService.SetProject", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC: - do_rpc(otdbRPC,{'name':"Taka Tuka Land", "title":"Adventure movie", "pi":"Pippi", "co_i":"Mr.Nelson", "contact":"Witje"}) - - with RPC("OTDBService.TaskCreate", ForwardExceptions=True, busname=busname, timeout=10) as task_create: - do_rpc(task_create, {'OtdbID':1099268, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}}) - do_rpc(task_create, {'MomID':353713, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}}) - do_rpc_catch_exception('on non-exsisting campaign', task_create, + return answer + + with TemporaryExchange(__name__) as tmp_exchange: + exchange = tmp_exchange.address + + with create_service(exchange=exchange, dbcreds=database_credentials) as service: + + with RPC(service_name=service.service_name, exchange=exchange, timeout=10) as otdbRPC: # Existing: otdb_id:1099268, mom_id:353713 + do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 1099268, 'MomID': 353713 }) + do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 1099268, 'MomID': 5 }) + do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 1099268, 'MomID': None }) + do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 5, 'MomID': 353713 }) + do_rpc_catch_exception('', otdbRPC, "TaskGetIDs", {'OtdbID': 5, 'MomID': 5 }) + do_rpc_catch_exception('', otdbRPC, "TaskGetIDs", {'OtdbID': 5, 'MomID': None }) + do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': None, 'MomID': 353713 }) + do_rpc_catch_exception('', otdbRPC, "TaskGetIDs", {'OtdbID': None, 'MomID': 5 }) + do_rpc_catch_exception('', otdbRPC, "TaskGetIDs", {'OtdbID': None, 'MomID': None }) + + do_rpc(otdbRPC, "GetDefaultTemplates", {}) + + do_rpc(otdbRPC, "SetProject", + {'name':"Taka Tuka Land", "title":"Adventure movie", "pi":"Pippi", + "co_i":"Mr.Nelson", "contact":"Witje"}) + + do_rpc(otdbRPC, "TaskCreate", {'OtdbID':1099268, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}}) + do_rpc(otdbRPC, "TaskCreate", {'MomID':353713, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}}) + do_rpc_catch_exception('on non-exsisting campaign', otdbRPC, "TaskCreate", {'MomID':998877, 'TemplateName':'BeamObservation', 'CampaignName':'No such campaign', 'Specification': {'state':'finished'}}) - do_rpc(task_create, {'MomID':998877, 'TemplateName':'BeamObservation', + do_rpc(otdbRPC, "TaskCreate", {'MomID':998877, 'TemplateName':'BeamObservation', 'CampaignName':'Taka Tuka Land', 'Specification': {'state':'finished'}}) - data = do_rpc(task_create, {'MomID':12345, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}}) + data = do_rpc(otdbRPC, "TaskCreate", {'MomID':12345, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}}) new_tree1 = data['MomID'] - data = do_rpc(task_create, {'MomID':54321, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}}) + data = do_rpc(otdbRPC, "TaskCreate", {'MomID':54321, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}}) new_tree2= data['MomID'] - with RPC("OTDBService.TaskPrepareForScheduling", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC: - do_rpc(otdbRPC, {'MomID':new_tree1}) # template - do_rpc(otdbRPC, {'MomID':new_tree1}) # now a VIC tree - do_rpc(otdbRPC, {'MomID':new_tree1, 'StartTime':'2016-03-01 12:00:00', 'StopTime':'2016-03-01 12:34:56'}) + do_rpc(otdbRPC, "TaskPrepareForScheduling", {'MomID':new_tree1}) # template + do_rpc(otdbRPC, "TaskPrepareForScheduling", {'MomID':new_tree1}) # now a VIC tree + do_rpc(otdbRPC, "TaskPrepareForScheduling", + {'MomID':new_tree1, 'StartTime':'2016-03-01 12:00:00', + 'StopTime':'2016-03-01 12:34:56'}) do_rpc_catch_exception("on invalid stoptime", otdbRPC, - {'MomID':new_tree1, 'StartTime':'2016-03-01 12:00:00', 'StopTime':'2016'}) + "TaskPrepareForScheduling", + {'MomID':new_tree1, 'StartTime':'2016-03-01 12:00:00', + 'StopTime':'2016'}) - with RPC("OTDBService.TaskDelete", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC: - do_rpc(otdbRPC, {'MomID':new_tree2}) + do_rpc(otdbRPC, "TaskDelete", {'MomID':new_tree2}) - with RPC("OTDBService.TaskGetSpecification", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC: - do_rpc(otdbRPC, {'OtdbID':1099269}) # PIC - do_rpc(otdbRPC, {'OtdbID':1099238}) # Template - do_rpc(otdbRPC, {'OtdbID':1099266}) # VIC - do_rpc_catch_exception('on non-existing treeID', otdbRPC, {'OtdbID':5}) # Non existing + do_rpc(otdbRPC, "TaskGetSpecification", {'OtdbID':1099269}) # PIC + do_rpc(otdbRPC, "TaskGetSpecification", {'OtdbID':1099238}) # Template + do_rpc(otdbRPC, "TaskGetSpecification", {'OtdbID':1099266}) # VIC + do_rpc_catch_exception('on non-existing treeID', otdbRPC, + "TaskGetSpecification", {'OtdbID':5}) # Non existing - with RPC("OTDBService.TaskSetStatus", ForwardExceptions=True, busname=busname, timeout=5) as status_update_command: # PIC - do_rpc(status_update_command, {'OtdbID':1099269, 'NewStatus':'finished', 'UpdateTimestamps':True}) + do_rpc(otdbRPC, "TaskSetStatus", + {'OtdbID':1099269, 'NewStatus':'finished', 'UpdateTimestamps':True}) # Template - do_rpc(status_update_command, {'OtdbID':1099238, 'NewStatus':'finished', 'UpdateTimestamps':True}) + do_rpc(otdbRPC, "TaskSetStatus", + {'OtdbID':1099238, 'NewStatus':'finished', 'UpdateTimestamps':True}) # VIC - do_rpc(status_update_command, {'OtdbID':1099266, 'NewStatus':'finished', 'UpdateTimestamps':True}) + do_rpc(otdbRPC, "TaskSetStatus", + {'OtdbID':1099266, 'NewStatus':'finished', 'UpdateTimestamps':True}) # Nonexisting tree - do_rpc_catch_exception('on invalid treeID', - status_update_command, {'OtdbID':10, 'NewStatus':'finished', 'UpdateTimestamps':True}) + do_rpc_catch_exception('on invalid treeID', otdbRPC, + "TaskSetStatus", + {'OtdbID':10, 'NewStatus':'finished', + 'UpdateTimestamps':True}) # VIC tree: invalid status - do_rpc_catch_exception('on invalid status', - status_update_command, {'OtdbID':1099266, 'NewStatus':'what_happend', 'UpdateTimestamps':True}) + do_rpc_catch_exception('on invalid status', otdbRPC, "TaskSetStatus", + {'OtdbID':1099266, 'NewStatus':'what_happend', + 'UpdateTimestamps':True}) # Set PIC back to active... - do_rpc(status_update_command, {'OtdbID':1099269, 'NewStatus':'active', 'UpdateTimestamps':True}) + do_rpc(otdbRPC, "TaskSetStatus", + {'OtdbID':1099269, 'NewStatus':'active', 'UpdateTimestamps':True}) - with RPC("OTDBService.GetStations", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC: - do_rpc(otdbRPC,{}) + do_rpc(otdbRPC, "GetStations", {}) - with RPC("OTDBService.TaskSetSpecification", ForwardExceptions=True, busname=busname, timeout=5) as key_update: # VIC tree: valid - do_rpc(key_update, {'OtdbID':1099266, - 'Specification':{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.pythonHost':'NameOfTestHost'}}) + do_rpc(otdbRPC, "TaskSetSpecification", + {'OtdbID':1099266, + 'Specification': + {'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.pythonHost': + 'NameOfTestHost'}}) # Template tree: not supported yet - do_rpc(key_update, {'OtdbID':1099238, + do_rpc(otdbRPC, "TaskSetSpecification", {'OtdbID':1099238, 'Specification':{'LOFAR.ObsSW.Observation.Scheduler.priority':'0.1'}}) # PIC tree: not supported yet - do_rpc_catch_exception('on invalid treetype (PIC)', key_update, - {'OtdbID':1099269, 'Specification':{'LOFAR.PIC.Core.CS001.status_state':'50'}}) + do_rpc_catch_exception('on invalid treetype (PIC)', otdbRPC, + "TaskSetSpecification", + {'OtdbID':1099269, + 'Specification':{'LOFAR.PIC.Core.CS001.status_state':'50'}}) # Non exsisting tree - do_rpc_catch_exception('on invalid treeID', key_update, {'OtdbID':10, - 'Specification':{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.pythonHost':'NameOfTestHost'}}) + do_rpc_catch_exception('on invalid treeID', otdbRPC, + "TaskSetSpecification", + {'OtdbID':10, + 'Specification': + {'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.pythonHost':'NameOfTestHost'}}) # VIC tree: wrong key - do_rpc_catch_exception('on invalid key', key_update, {'OtdbID':1099266, + do_rpc_catch_exception('on invalid key', otdbRPC, "TaskSetSpecification", {'OtdbID':1099266, 'Specification':{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.NoSuchKey':'NameOfTestHost'}}) finally: