Skip to content
Snippets Groups Projects
Commit d4f3f626 authored by Auke Klazema's avatar Auke Klazema
Browse files

SW-705: Convert TreeService to the new messaging system

parent 23f0a50e
No related branches found
No related tags found
No related merge requests found
......@@ -39,12 +39,12 @@ GetStations --- : Returns a list of the defined stations from the
SetProject --- : Creates or updates the information of a project/campaign.
"""
import sys, time, pg
import time
import pg
import logging
from datetime import datetime
from lofar.messaging.Service import *
from lofar.messaging.rpc_service import ServiceMessageHandler, Service
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.common.util import waitForInterrupt
from .config import DEFAULT_OTDB_SERVICENAME
QUERY_EXCEPTIONS = (TypeError, ValueError, MemoryError, pg.ProgrammingError, pg.InternalError)
......@@ -57,14 +57,18 @@ VIC_TREE = 30
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
# Define our own exceptions
class FunctionError(Exception):
"Something when wrong during the execution of the function"
pass
class DatabaseError(Exception):
"Connection with the database could not be made"
pass
# Task Get IDs
def TaskGetIDs(input_dict, db_connection, return_tuple=True):
"""
......@@ -113,6 +117,7 @@ def TaskGetIDs(input_dict, db_connection, return_tuple=True):
return (None, otdb_id, mom_id) if return_tuple else [None, otdb_id, mom_id]
# TODO have this return a Dict
# Task Get Specification
def TaskGetSpecification(input_dict, db_connection):
"""
......@@ -160,6 +165,7 @@ def TaskGetSpecification(input_dict, db_connection):
answer_dict["tree"] = answer_list
return {'TaskSpecification':answer_dict}
# Task Get TreeInfo
def TaskGetTreeInfo(otdb_id, db_connection):
result = db_connection.query("""select momID, groupID, d_creation, modificationdate, state, starttime, stoptime, processType, processSubtype, description
......@@ -244,6 +250,7 @@ def TaskCreate(input_dict, db_connection):
# When we are here we always have a task, so do the key updates
return TaskSetSpecification({'OtdbID':otdb_id, 'Specification':input_dict['Specification']}, db_connection)
# Task Get State
def TaskGetStatus(otdb_id, db_connection):
result = db_connection.query("""select treestate.id, treestate.name from otdbtree
......@@ -254,6 +261,7 @@ def TaskGetStatus(otdb_id, db_connection):
return {'OtdbID':otdb_id, 'status_id': -1, 'status': 'unknown'}
# Task Set State
def TaskSetStatus(input_dict, db_connection):
"""
......@@ -505,6 +513,7 @@ def GetDefaultTemplates(input_dict, db_connection):
return { 'DefaultTemplates': Templates }
# Get Stations
def GetStations(input_dict, db_connection):
"""
......@@ -529,6 +538,7 @@ def GetStations(input_dict, db_connection):
return { 'Stations': Stations }
# Set Project
def SetProject(input_dict, db_connection):
"""
......@@ -564,7 +574,7 @@ def SetProject(input_dict, db_connection):
return { "projectID": project_id }
class PostgressMessageHandler(MessageHandlerInterface):
class PostgresMessageHandler(ServiceMessageHandler):
"""
Implements a generic message handlers for services that are tied to a postgres database.
kwargs must contain the keys:
......@@ -573,30 +583,13 @@ class PostgressMessageHandler(MessageHandlerInterface):
db_host <string> Name of the machine the database server is running
function <type> Function to call when a message is received on the message bus.
"""
def __init__(self, **kwargs):
super(PostgressMessageHandler, self).__init__()
self.dbcreds = kwargs.pop("dbcreds")
if len(kwargs):
raise AttributeError("Unknown keys in arguments of 'DatabaseTiedMessageHandler: %s" % kwargs)
def __init__(self, dbcreds):
super(PostgresMessageHandler, self).__init__()
self.dbcreds = dbcreds
self.connection = None
self.connected = False
self.service2MethodMap = {
"TaskGetSpecification": self._TaskGetSpecification,
"TaskCreate": self._TaskCreate,
"TaskGetStatus": self._TaskGetStatus,
"TaskSetStatus": self._TaskSetStatus,
"TaskSetSpecification": self._TaskSetSpecification,
"TaskPrepareForScheduling": self._TaskPrepareForScheduling,
"TaskGetIDs": self._TaskGetIDs,
"TaskDelete": self._TaskDelete,
"GetDefaultTemplates": self._GetDefaultTemplates,
"GetStations": self._GetStations,
"SetProject": self._SetProject,
"TaskGetTreeInfo": self._TaskGetTreeInfo
}
def prepare_receive(self):
def before_receive_message(self):
"Called in main processing loop just before a blocking wait for messages is done."
"Make sure we are connected with the database."
self.connected = (self.connection and self.connection.status == 1)
......@@ -611,94 +604,88 @@ class PostgressMessageHandler(MessageHandlerInterface):
time.sleep(5)
# The following functions are called from the Service code.
def _TaskGetSpecification(self, **kwargs):
def TaskGetSpecification(self, **kwargs):
logger.info("_TaskGetSpecification({0})".format(kwargs))
return TaskGetSpecification(kwargs, self.connection)
def _TaskCreate(self, **kwargs):
def TaskCreate(self, **kwargs):
logger.info("_TaskCreate({0})".format(kwargs))
return TaskCreate(kwargs, self.connection)
def _TaskGetStatus(self, **kwargs):
def TaskGetStatus(self, **kwargs):
logger.info("_TaskGetStatus({0})".format(kwargs))
return TaskGetStatus(kwargs.get('otdb_id'), self.connection)
def _TaskSetStatus(self, **kwargs):
def TaskSetStatus(self, **kwargs):
logger.info("_TaskSetStatus({0})".format(kwargs))
return TaskSetStatus(kwargs, self.connection)
def _TaskGetTreeInfo(self, **kwargs):
def TaskGetTreeInfo(self, **kwargs):
logger.info("_TaskGetTreeInfo({0})".format(kwargs))
return TaskGetTreeInfo(kwargs.get('otdb_id'), self.connection)
def _TaskSetSpecification(self, **kwargs):
def TaskSetSpecification(self, **kwargs):
logger.info("_TaskSetSpecification({0})".format(kwargs))
return TaskSetSpecification(kwargs, self.connection)
def _TaskPrepareForScheduling(self, **kwargs):
def TaskPrepareForScheduling(self, **kwargs):
logger.info("_TaskPrepareForScheduling({0})".format(kwargs))
return TaskPrepareForScheduling(kwargs, self.connection)
def _TaskGetIDs(self, **kwargs):
def TaskGetIDs(self, **kwargs):
logger.info("_TaskGetIDs({0})".format(kwargs))
return TaskGetIDs(kwargs, self.connection, return_tuple=False)
def _TaskDelete(self, **kwargs):
def TaskDelete(self, **kwargs):
logger.info("_TaskDelete({0})".format(kwargs))
return TaskDelete(kwargs, self.connection)
def _GetDefaultTemplates(self, **kwargs):
def GetDefaultTemplates(self, **kwargs):
logger.info("_GetDefaultTemplates()")
return GetDefaultTemplates(kwargs, self.connection)
def _GetStations(self, **kwargs):
def GetStations(self, **kwargs):
logger.info("_GetStations()")
return GetStations(kwargs, self.connection)
def _SetProject(self, **kwargs):
def SetProject(self, **kwargs):
logger.info("_SetProject({0})".format(kwargs))
return SetProject(kwargs, self.connection)
def create_service(busname, dbcreds, broker=None):
return Service(DEFAULT_OTDB_SERVICENAME,
PostgressMessageHandler,
busname=busname,
use_service_methods=True,
numthreads=1,
handler_args={"dbcreds" : dbcreds},
broker=broker,
verbose=True)
def create_service(dbcreds, broker=DEFAULT_BROKER, exchange=DEFAULT_BUSNAME):
return Service(service_name=DEFAULT_OTDB_SERVICENAME,
handler_type=PostgresMessageHandler, handler_kwargs={"dbcreds": dbcreds},
broker=broker, exchange=exchange)
def main():
from optparse import OptionParser
from lofar.common import dbcredentials
from lofar.common.util import waitForInterrupt
from lofar.messaging import setQpidLogLevel
# Check the invocation arguments
parser = OptionParser("%prog [options]")
parser.add_option('-q', '--broker', dest='broker', type='string',
default=None,
parser.add_option('-b', '--broker', dest='broker', type='string',
default=DEFAULT_BROKER,
help='Address of the qpid broker, default: localhost')
parser.add_option("-B", "--busname", dest="busname", type="string",
parser.add_option("-e", "--exchange", dest="exchange", type="string",
default=DEFAULT_BUSNAME,
help="Bus on which RPC commands are received. [default: %default]")
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
parser.add_option('-V', '--verbose', dest='verbose', action='store_true',
help='verbose logging')
# Add options of dbcredentials: --database, --host, ...
parser.add_option_group(dbcredentials.options_group(parser))
parser.set_defaults(dbcredentials="OTDB")
(options, args) = parser.parse_args()
setQpidLogLevel(logging.INFO)
dbcreds = dbcredentials.parse_options(options)
print("###dbcreds:", dbcreds)
with create_service(busname=options.busname,
dbcreds=dbcreds,
broker=options.broker):
with create_service(broker=options.broker, exchange=options.exchage, dbcreds=dbcreds):
waitForInterrupt()
logger.info("Stopped the OTDB services")
if __name__ == "__main__":
main()
......@@ -33,7 +33,7 @@ import testing.postgresql
import psycopg2
import subprocess
from lofar.messaging.messagebus import *
from lofar.messaging.RPC import *
from lofar.messaging.rpc_service import *
from lofar.sas.otdb.TreeService import create_service
from lofar.common.dbcredentials import Credentials
......@@ -72,116 +72,123 @@ try:
proc1.wait(timeout=60)
proc2.wait(timeout=60)
def do_rpc_catch_exception(exc_text, rpc_instance, arg_dict):
def do_rpc_catch_exception(exc_text, rpc_instance, method_name, arg_dict):
try:
print("** Executing {0}({1})...".format(rpc_instance.ServiceName,arg_dict))
(data, status) = (rpc_instance)(**arg_dict)
print("** Executing {0}({1})...".format(method_name, arg_dict))
rpc_instance.execute(method_name=method_name, **arg_dict)
raise Exception("Expected an exception {0}, didn't get any".format(exc_text))
except Exception:
print("Caught expected exception {0}".format(exc_text))
print("======")
def do_rpc(rpc_instance, arg_dict):
print("** Executing {0}({1})...".format(rpc_instance.ServiceName,arg_dict))
(data, status) = (rpc_instance)(**arg_dict)
if status != "OK":
raise Exception("Status returned is {0}".format(status))
# if isinstance(data, dict):
# for key in sorted(data):
# print "%s ==> %s" % (key, data[key])
# else:
print("result =", data)
def do_rpc(rpc_instance, method_name, arg_dict):
print("** Executing {0}({1})...".format(method_name, arg_dict))
answer = rpc_instance.execute(method_name=method_name, **arg_dict)
print("result =", answer)
print("======")
return data
with TemporaryQueue(__name__) as tmp_queue:
busname = tmp_queue.address
with create_service(busname, database_credentials):
with RPC("OTDBService.TaskGetIDs", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC:
# Existing: otdb_id:1099268, mom_id:353713
do_rpc (otdbRPC, {'OtdbID': 1099268, 'MomID': 353713 })
do_rpc (otdbRPC, {'OtdbID': 1099268, 'MomID': 5 })
do_rpc (otdbRPC, {'OtdbID': 1099268, 'MomID': None })
do_rpc (otdbRPC, {'OtdbID': 5, 'MomID': 353713 })
do_rpc_catch_exception('', otdbRPC, {'OtdbID': 5, 'MomID': 5 })
do_rpc_catch_exception('', otdbRPC, {'OtdbID': 5, 'MomID': None })
do_rpc (otdbRPC, {'OtdbID': None, 'MomID': 353713 })
do_rpc_catch_exception('', otdbRPC, {'OtdbID': None, 'MomID': 5 })
do_rpc_catch_exception('', otdbRPC, {'OtdbID': None, 'MomID': None })
with RPC("OTDBService.GetDefaultTemplates", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC:
do_rpc(otdbRPC,{})
with RPC("OTDBService.SetProject", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC:
do_rpc(otdbRPC,{'name':"Taka Tuka Land", "title":"Adventure movie", "pi":"Pippi", "co_i":"Mr.Nelson", "contact":"Witje"})
with RPC("OTDBService.TaskCreate", ForwardExceptions=True, busname=busname, timeout=10) as task_create:
do_rpc(task_create, {'OtdbID':1099268, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}})
do_rpc(task_create, {'MomID':353713, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}})
do_rpc_catch_exception('on non-exsisting campaign', task_create,
return answer
with TemporaryExchange(__name__) as tmp_exchange:
exchange = tmp_exchange.address
with create_service(exchange=exchange, dbcreds=database_credentials) as service:
with RPC(service_name=service.service_name, exchange=exchange, timeout=10) as otdbRPC: # Existing: otdb_id:1099268, mom_id:353713
do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 1099268, 'MomID': 353713 })
do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 1099268, 'MomID': 5 })
do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 1099268, 'MomID': None })
do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 5, 'MomID': 353713 })
do_rpc_catch_exception('', otdbRPC, "TaskGetIDs", {'OtdbID': 5, 'MomID': 5 })
do_rpc_catch_exception('', otdbRPC, "TaskGetIDs", {'OtdbID': 5, 'MomID': None })
do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': None, 'MomID': 353713 })
do_rpc_catch_exception('', otdbRPC, "TaskGetIDs", {'OtdbID': None, 'MomID': 5 })
do_rpc_catch_exception('', otdbRPC, "TaskGetIDs", {'OtdbID': None, 'MomID': None })
do_rpc(otdbRPC, "GetDefaultTemplates", {})
do_rpc(otdbRPC, "SetProject",
{'name':"Taka Tuka Land", "title":"Adventure movie", "pi":"Pippi",
"co_i":"Mr.Nelson", "contact":"Witje"})
do_rpc(otdbRPC, "TaskCreate", {'OtdbID':1099268, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}})
do_rpc(otdbRPC, "TaskCreate", {'MomID':353713, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}})
do_rpc_catch_exception('on non-exsisting campaign', otdbRPC, "TaskCreate",
{'MomID':998877, 'TemplateName':'BeamObservation',
'CampaignName':'No such campaign', 'Specification': {'state':'finished'}})
do_rpc(task_create, {'MomID':998877, 'TemplateName':'BeamObservation',
do_rpc(otdbRPC, "TaskCreate", {'MomID':998877, 'TemplateName':'BeamObservation',
'CampaignName':'Taka Tuka Land', 'Specification': {'state':'finished'}})
data = do_rpc(task_create, {'MomID':12345, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}})
data = do_rpc(otdbRPC, "TaskCreate", {'MomID':12345, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}})
new_tree1 = data['MomID']
data = do_rpc(task_create, {'MomID':54321, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}})
data = do_rpc(otdbRPC, "TaskCreate", {'MomID':54321, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}})
new_tree2= data['MomID']
with RPC("OTDBService.TaskPrepareForScheduling", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC:
do_rpc(otdbRPC, {'MomID':new_tree1}) # template
do_rpc(otdbRPC, {'MomID':new_tree1}) # now a VIC tree
do_rpc(otdbRPC, {'MomID':new_tree1, 'StartTime':'2016-03-01 12:00:00', 'StopTime':'2016-03-01 12:34:56'})
do_rpc(otdbRPC, "TaskPrepareForScheduling", {'MomID':new_tree1}) # template
do_rpc(otdbRPC, "TaskPrepareForScheduling", {'MomID':new_tree1}) # now a VIC tree
do_rpc(otdbRPC, "TaskPrepareForScheduling",
{'MomID':new_tree1, 'StartTime':'2016-03-01 12:00:00',
'StopTime':'2016-03-01 12:34:56'})
do_rpc_catch_exception("on invalid stoptime", otdbRPC,
{'MomID':new_tree1, 'StartTime':'2016-03-01 12:00:00', 'StopTime':'2016'})
"TaskPrepareForScheduling",
{'MomID':new_tree1, 'StartTime':'2016-03-01 12:00:00',
'StopTime':'2016'})
with RPC("OTDBService.TaskDelete", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC:
do_rpc(otdbRPC, {'MomID':new_tree2})
do_rpc(otdbRPC, "TaskDelete", {'MomID':new_tree2})
with RPC("OTDBService.TaskGetSpecification", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC:
do_rpc(otdbRPC, {'OtdbID':1099269}) # PIC
do_rpc(otdbRPC, {'OtdbID':1099238}) # Template
do_rpc(otdbRPC, {'OtdbID':1099266}) # VIC
do_rpc_catch_exception('on non-existing treeID', otdbRPC, {'OtdbID':5}) # Non existing
do_rpc(otdbRPC, "TaskGetSpecification", {'OtdbID':1099269}) # PIC
do_rpc(otdbRPC, "TaskGetSpecification", {'OtdbID':1099238}) # Template
do_rpc(otdbRPC, "TaskGetSpecification", {'OtdbID':1099266}) # VIC
do_rpc_catch_exception('on non-existing treeID', otdbRPC,
"TaskGetSpecification", {'OtdbID':5}) # Non existing
with RPC("OTDBService.TaskSetStatus", ForwardExceptions=True, busname=busname, timeout=5) as status_update_command:
# PIC
do_rpc(status_update_command, {'OtdbID':1099269, 'NewStatus':'finished', 'UpdateTimestamps':True})
do_rpc(otdbRPC, "TaskSetStatus",
{'OtdbID':1099269, 'NewStatus':'finished', 'UpdateTimestamps':True})
# Template
do_rpc(status_update_command, {'OtdbID':1099238, 'NewStatus':'finished', 'UpdateTimestamps':True})
do_rpc(otdbRPC, "TaskSetStatus",
{'OtdbID':1099238, 'NewStatus':'finished', 'UpdateTimestamps':True})
# VIC
do_rpc(status_update_command, {'OtdbID':1099266, 'NewStatus':'finished', 'UpdateTimestamps':True})
do_rpc(otdbRPC, "TaskSetStatus",
{'OtdbID':1099266, 'NewStatus':'finished', 'UpdateTimestamps':True})
# Nonexisting tree
do_rpc_catch_exception('on invalid treeID',
status_update_command, {'OtdbID':10, 'NewStatus':'finished', 'UpdateTimestamps':True})
do_rpc_catch_exception('on invalid treeID', otdbRPC,
"TaskSetStatus",
{'OtdbID':10, 'NewStatus':'finished',
'UpdateTimestamps':True})
# VIC tree: invalid status
do_rpc_catch_exception('on invalid status',
status_update_command, {'OtdbID':1099266, 'NewStatus':'what_happend', 'UpdateTimestamps':True})
do_rpc_catch_exception('on invalid status', otdbRPC, "TaskSetStatus",
{'OtdbID':1099266, 'NewStatus':'what_happend',
'UpdateTimestamps':True})
# Set PIC back to active...
do_rpc(status_update_command, {'OtdbID':1099269, 'NewStatus':'active', 'UpdateTimestamps':True})
do_rpc(otdbRPC, "TaskSetStatus",
{'OtdbID':1099269, 'NewStatus':'active', 'UpdateTimestamps':True})
with RPC("OTDBService.GetStations", ForwardExceptions=True, busname=busname, timeout=10) as otdbRPC:
do_rpc(otdbRPC,{})
do_rpc(otdbRPC, "GetStations", {})
with RPC("OTDBService.TaskSetSpecification", ForwardExceptions=True, busname=busname, timeout=5) as key_update:
# VIC tree: valid
do_rpc(key_update, {'OtdbID':1099266,
'Specification':{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.pythonHost':'NameOfTestHost'}})
do_rpc(otdbRPC, "TaskSetSpecification",
{'OtdbID':1099266,
'Specification':
{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.pythonHost':
'NameOfTestHost'}})
# Template tree: not supported yet
do_rpc(key_update, {'OtdbID':1099238,
do_rpc(otdbRPC, "TaskSetSpecification", {'OtdbID':1099238,
'Specification':{'LOFAR.ObsSW.Observation.Scheduler.priority':'0.1'}})
# PIC tree: not supported yet
do_rpc_catch_exception('on invalid treetype (PIC)', key_update,
{'OtdbID':1099269, 'Specification':{'LOFAR.PIC.Core.CS001.status_state':'50'}})
do_rpc_catch_exception('on invalid treetype (PIC)', otdbRPC,
"TaskSetSpecification",
{'OtdbID':1099269,
'Specification':{'LOFAR.PIC.Core.CS001.status_state':'50'}})
# Non exsisting tree
do_rpc_catch_exception('on invalid treeID', key_update, {'OtdbID':10,
'Specification':{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.pythonHost':'NameOfTestHost'}})
do_rpc_catch_exception('on invalid treeID', otdbRPC,
"TaskSetSpecification",
{'OtdbID':10,
'Specification':
{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.pythonHost':'NameOfTestHost'}})
# VIC tree: wrong key
do_rpc_catch_exception('on invalid key', key_update, {'OtdbID':1099266,
do_rpc_catch_exception('on invalid key', otdbRPC, "TaskSetSpecification", {'OtdbID':1099266,
'Specification':{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.NoSuchKey':'NameOfTestHost'}})
finally:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment