Skip to content
Snippets Groups Projects
Commit 769ca07a authored by Ruud Overeem's avatar Ruud Overeem
Browse files

Task #8531: Converted the TreeService to work with the MessageHandlerInterface class.

TreeService still needs some cleaning up since it also contains some test. These tests need to be move to a seperate test program.
parent 7fc83570
No related branches found
No related tags found
No related merge requests found
...@@ -4819,6 +4819,8 @@ SAS/OTDB/src/setStatus.conf -text ...@@ -4819,6 +4819,8 @@ SAS/OTDB/src/setStatus.conf -text
SAS/OTDB/test/tBrokenHardware.cc -text SAS/OTDB/test/tBrokenHardware.cc -text
SAS/OTDB/test/tMetadata.cc -text SAS/OTDB/test/tMetadata.cc -text
SAS/OTDB/test/tQueryPIC.cc -text SAS/OTDB/test/tQueryPIC.cc -text
SAS/OTDB_Services/TreeService.py -text
SAS/OTDB_Services/TreeStatusEvents.py -text
SAS/Scheduler/src/.default_settings.set -text SAS/Scheduler/src/.default_settings.set -text
SAS/Scheduler/src/LOFAR_libScheduler.pro -text SAS/Scheduler/src/LOFAR_libScheduler.pro -text
SAS/Scheduler/src/conflictdialog.ui -text SAS/Scheduler/src/conflictdialog.ui -text
......
...@@ -43,10 +43,10 @@ class MessageHandlerInterface(object): ...@@ -43,10 +43,10 @@ class MessageHandlerInterface(object):
handler = <from_MessageHandlerInterface_derived_class>(HandlerArguments) handler = <from_MessageHandlerInterface_derived_class>(HandlerArguments)
handler.before_main_loop() handler.before_main_loop()
while alive: while alive:
handler.loop_before_receive() handler.in_loop_before_receive()
msg = wait for messages() msg = wait for messages()
handler.handle_message(msg) handler.handle_message(msg)
handler.loop_after_handling() handler.in_loop_after_handling()
handler.after_main_loop() handler.after_main_loop()
""" """
def __init__(self, **kwargs): def __init__(self, **kwargs):
...@@ -56,15 +56,15 @@ class MessageHandlerInterface(object): ...@@ -56,15 +56,15 @@ class MessageHandlerInterface(object):
"Called before main processing loop is entered." "Called before main processing loop is entered."
pass pass
def loop_before_receive(self): def in_loop_before_receive(self):
"Called in main processing loop just before a blocking wait for messages is done." "Called in main processing loop just before a blocking wait for messages is done."
pass pass
def handle_message(self, msg): def handle_message(self, msg):
"Function the should handle the received message and return a result." "Function the should handle the received message and return a result."
pass raise Exception("OOPS! YOU ENDED UP IN THE MESSAGE HANDLER OF THE ABSTRACT BASE CLASS!")
def loop_after_handling(self): def in_loop_after_handling(self):
"Called in the main loop after the result was send back to the requester." "Called in the main loop after the result was send back to the requester."
pass pass
...@@ -118,7 +118,7 @@ class Service(object): ...@@ -118,7 +118,7 @@ class Service(object):
self.startonwith = kwargs.pop("startonwith", False) self.startonwith = kwargs.pop("startonwith", False)
self.handler_args = kwargs.pop("handler_args", None) self.handler_args = kwargs.pop("handler_args", None)
if len(kwargs): if len(kwargs):
raise ArgumentError("Unexpected argument passed to Serice class: %s", kwargs) raise AttributeError("Unexpected argument passed to Serice class: %s", kwargs)
# Set appropriate flags for exclusive binding # Set appropriate flags for exclusive binding
if self.exclusive is True: if self.exclusive is True:
...@@ -136,7 +136,7 @@ class Service(object): ...@@ -136,7 +136,7 @@ class Service(object):
self.service_handler = MessageHandlerInterface() self.service_handler = MessageHandlerInterface()
self.service_handler.handle_message = servicehandler self.service_handler.handle_message = servicehandler
else: else:
self.service_handler = servicehandler(self.handler_args) self.service_handler = servicehandler(**self.handler_args)
if not isinstance(self.service_handler, MessageHandlerInterface): if not isinstance(self.service_handler, MessageHandlerInterface):
raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.") raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.")
...@@ -270,9 +270,9 @@ class Service(object): ...@@ -270,9 +270,9 @@ class Service(object):
while self.running: while self.running:
try: try:
self.service_handler.loop_before_receive() self.service_handler.in_loop_before_receive()
except Exception as e: except Exception as e:
logger.error("loop_before_receive() failed with %s", e) logger.error("in_loop_before_receive() failed with %s", e)
continue continue
try: try:
...@@ -303,9 +303,9 @@ class Service(object): ...@@ -303,9 +303,9 @@ class Service(object):
self.okcounter[index] += 1 self.okcounter[index] += 1
self.Listen.ack(msg) self.Listen.ack(msg)
try: try:
self.service_handler.loop_after_handling() self.service_handler.in_loop_after_handling()
except Exception as e: except Exception as e:
logger.error("loop_after_handling() failed with %s", e) logger.error("in_loop_after_handling() failed with %s", e)
continue continue
except Exception as e: except Exception as e:
......
#!/usr/bin/python
#coding: iso-8859-15
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: Backtrace.cc 31468 2015-04-13 23:26:52Z amesfoort $
"""
RPC functions that allow access to (VIC) trees in OTDB.
TaskSpecificationRequest: get the specification(parset) of a tree as dict.
KeyUpdateCommand : function to update the value of multiple (existing) keys.
StatusUpdateCommand : finction to update the status of a tree.
"""
import os,sys,time,pg
import logging
from optparse import OptionParser
from lofar.messaging.Service import *
from lofar.messaging.RPC import *
QUERY_EXCEPTIONS = (TypeError, ValueError, MemoryError, pg.ProgrammingError, pg.InternalError)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
# Define our own exceptions
class FunctionError(Exception):
"Something when wrong during the execution of the function"
pass
class DatabaseError(Exception):
"Connection with the database could not be made"
pass
# Task Specification Request
def TaskSpecificationRequest(input_dict, db_connection):
"""
RPC function that retrieves the task specification from a tree.
Input : OtdbID (integer) - ID of the tree to retrieve the specifications of
Output: (dict) - The 'parset' of the tree
Exceptions:
AttributeError: There is something wrong with the given input values.
FunctionError: An error occurred during the execution of the function.
The text of the exception explains what is wrong.
"""
# Check the input
if not isinstance(input_dict, dict):
raise AttributeError("TaskSpecificationRequest: Expected a dict as input")
try:
tree_id = input_dict['OtdbID']
except KeyError, info:
raise AttributeError("TaskSpecificationRequest: Key %s is missing in the input" % info)
# Try to get the specification information
try:
logger.debug("TaskSpecificationRequest:%s" % input_dict)
top_node = db_connection.query("select nodeid from getTopNode('%s')" % tree_id).getresult()[0][0]
treeinfo = db_connection.query("select exportTree(1, '%s', '%s')" % (tree_id, top_node)).getresult()[0][0]
except QUERY_EXCEPTIONS, exc_info:
raise FunctionError("Error while requesting specs of tree %d: %s"% (tree_id, exc_info))
# convert the single string into the dict
answer = {}
for line in treeinfo.split('\n'):
try:
(key, value) = line.split("=", 1)
answer[key] = value
except ValueError:
pass
return answer
# Status Update Command
def StatusUpdateCommand(input_dict):
"""
RPC function to update the status of a tree.
Input : OtdbID (integer) - ID of the tree to change the status of.
NewStatus (string) - The new status of the tree. The following values are allowed:
described, prepared, approved, on_hold, conflict, prescheduled, scheduled, queued,
active, completing, finished, aborted, error, obsolete
UpdateTimestamps (boolean) - Optional parameter to also update the timestamp of the metadata of the
tree when the status of the tree is changed into 'active', 'finished' or 'aborted'. Resp. starttime
or endtime. Default this option is ON.
Output: (boolean) - Reflects the successful update of the status.
Exceptions:
AttributeError: There is something wrong with the given input values.
FunctionError: An error occurred during the execution of the function.
The text of the exception explains what is wrong.
"""
global otdb_connection
# Check input
if not isinstance(input_dict, dict):
raise AttributeError("StatusUpdateCommand: Expected a dict as input")
try:
tree_id = input_dict['OtdbID']
new_status = input_dict['NewStatus']
update_times = True
if input_dict.has_key("UpdateTimestamps"):
update_times = bool(input_dict["UpdateTimestamps"])
except KeyError, info:
raise AttributeError("StatusUpdateCommand: Key %s is missing in the input" % info)
# Get list of allowed tree states
allowed_states = {}
try:
for (id,name) in otdb_connection.query("select id,name from treestate").getresult():
allowed_states[name] = id
except QUERY_EXCEPTIONS, exc_info:
raise FunctionError("Error while getting allowed states of tree %d: %s" % (tree_id, exc_info))
# Check value of new_status argument
if not new_status in allowed_states:
raise FunctionError("The newstatus(=%s) for tree %d must have one of the following values:%s" %
(new_status, tree_id, allowed_states.keys()))
# Finally try to change the status
try:
success = (otdb_connection.query("select setTreeState(1, %d, %d::INT2,%s)" %
(tree_id, allowed_states[new_status], str(update_times))).getresult()[0][0] == 't')
except QUERY_EXCEPTIONS, exc_info:
raise FunctionError("Error while setting the status of tree %d: %s" % (tree_id, exc_info))
return success
# Key Update Command
def KeyUpdateCommand(input_dict):
"""
RPC function to update the values of a tree.
Input : OtdbID (integer) - ID of the tree to change the status of.
Updates (dict) - The key-value pairs that must be updated.
Output: (dict)
'Errors' (dict) Refects the problems that occured {'key':'problem'}
Field is empty if all fields could be updated.
Exceptions:
AttributeError: There is something wrong with the given input values.
FunctionError: An error occurred during the execution of the function.
The text of the exception explains what is wrong.
"""
global otdb_connection
# Check input
if not isinstance(input_dict, dict):
raise AttributeError("Expected a dict as input")
try:
tree_id = input_dict['OtdbID']
update_list = input_dict['Updates']
except KeyError, info:
raise AttributeError("KeyUpdateCommand: Key %s is missing in the input" % info)
if not isinstance(tree_id, int):
raise AttributeError("KeyUpdateCommand (tree=%d): Field 'OtdbID' must be of type 'integer'" % tree_id)
if not isinstance(update_list, dict):
raise AttributeError("KeyUpdateCommand (tree=%d): Field 'Updates' must be of type 'dict'" % tree_id)
# Finally try to update all keys
errors = {}
for (key,value) in update_list.iteritems():
try:
record_list = (otdb_connection.query("select nodeid,instances,limits from getvhitemlist (%d, '%s')" %
(tree_id, key))).getresult()
if len(record_list) == 0:
errors[key] = "Not found for tree %d" % tree_id
continue
if len(record_list) > 1:
errors[key] = "Not a unique key, found %d occurrences for tree %d" % (len(record_list), tree_id)
continue
# When one record was found record_list is a list with a single tuple (nodeid, instances, current_value)
node_id = record_list[0][0]
instances = record_list[0][1]
result = ((otdb_connection.query("select updateVTnode(1,%d,%d,%d::INT2,'%s')" %
(tree_id, node_id, instances, value))).getresult()[0][0] == 't')
print "%s: %s ==> %s" % (key, record_list[0][2], value)
except QUERY_EXCEPTIONS, exc:
errors[key] = str(exc)
return errors
class PostgressMessageHandlerInterface(MessageHandlerInterface):
"""
Implements a generic message handlers for services that are tied to a postgres database.
kwargs must contain the keys:
database <string> Name of the database to connect to
db_user <string> Name of the user used for logging into the database
db_host <string> Name of the machine the database server is running
function <type> Function to call when a message is received on the message bus.
"""
def __init__(self, **kwargs):
super(PostgressMessageHandlerInterface, self).__init__()
self.database = kwargs.pop("database")
self.db_user = kwargs.pop("db_user", "postgres")
self.db_host = kwargs.pop("db_host", "localhost")
if len(kwargs):
raise AttributeError("Unknown keys in arguments of 'DatabaseTiedMessageHandler: %s" % kwargs)
self.connection = None
self.connected = False
def in_loop_before_receive(self):
"Called in main processing loop just before a blocking wait for messages is done."
"Make sure we are connected with the database."
self.connected = (self.connection and self.connection.status == 1)
while not self.connected:
try:
self.connection = pg.connect(user=self.db_user, host=self.db_host, dbname=self.database)
self.connected = True
logger.info("Connected to database %s on host %s" % (dbname, host))
except (TypeError, SyntaxError, pg.InternalError):
self.connected = False
logger.error("Not connected to database %s on host %s (anymore), retry in 5 seconds"
% (dbname, host))
time.sleep(5)
class PostgressTaskSpecificationRequest(PostgressMessageHandlerInterface):
"""
Embedding of the TaskSpecificationRequest function in the postgress service class.
"""
def __init__(self, **kwargs):
super(PostgressTaskSpecificationRequest, self).__init__(**kwargs)
def handle_message(self, msg):
return TaskSpecificationRequest(msg, self.connection)
class PostgressStatusUpdateCommand(PostgressMessageHandlerInterface):
"""
Embedding of the TaskSpecificationRequest function in the postgress service class.
"""
def __init__(self, **kwargs):
super(PostgressStatusUpdateCommand, self).__init__(**kwargs)
def handle_message(self, msg):
return StatusUpdateCommand(msg, self.connection)
class PostgressKeyUpdateCommand(PostgressMessageHandlerInterface):
"""
Embedding of the TaskSpecificationRequest function in the postgress service class.
"""
def __init__(self, **kwargs):
super(PostgressKeyUpdateCommand, self).__init__(**kwargs)
def handle_message(self, msg):
return KeyUpdateCommand(msg, self.connection)
if __name__ == "__main__":
"""
Daemon that sets-up a set of servicess for the OTDB database.
"""
# Check the invocation arguments
parser = OptionParser("%prog [options]")
parser.add_option("-D", "--database", dest="dbName", type="string", default="",
help="Name of the database")
parser.add_option("-H", "--hostname", dest="dbHost", type="string", default="sasdb",
help="Hostname of database server")
(options, args) = parser.parse_args()
if not options.dbName:
print "Missing database name"
parser.print_help()
sys.exit(0)
if not options.dbHost:
print "Missing database server name"
parser.print_help()
sys.exit(0)
def do_specification_rpc(function, args):
try:
(data,status) = function(args)
if data:
for (key,value) in data.iteritems():
print "%s ==> %s" % (key, value)
else:
print status
except Exception as e:
print e
print "################"
busname = sys.argv[1] if len(sys.argv) > 1 else "simpletest"
serv1 = Service(busname, "TaskSpecification", PostgressTaskSpecificationRequest,
numthreads=1, startonwith=True,
handler_args = {"database" : options.dbName, "db_host" : options.dbHost})
serv2 = Service(busname, "StatusUpdateCmd", PostgressStatusUpdateCommand,
numthreads=1, startonwith=True,
handler_args = {"database" : options.dbName, "db_host" : options.dbHost})
serv3 = Service(busname, "KeyUpdateCmd", PostgressKeyUpdateCommand,
numthreads=1, startonwith=True,
handler_args = {"database" : options.dbName, "db_host" : options.dbHost})
with serv1,serv2,serv3:
with RPC(busname, "TaskSpecification", ForwardExceptions=True) as task_spec_request:
do_specification_rpc(task_spec_request, {'OtdbID':63370})
do_specification_rpc(task_spec_request, {'OtdbID':82111})
do_specification_rpc(task_spec_request, {'OtdbID':146300})
# print StatusUpdateCommand({'OtdbID':146300, 'NewStatus':'finished', 'UpdateTimestamps':True})
# print KeyUpdateCommand({'OtdbID':63370, 'Updates':{'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl._hostname':'CCU099ABC'}})
#!/usr/bin/env python
#coding: iso-8859-15
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: Backtrace.cc 31468 2015-04-13 23:26:52Z amesfoort $
"""
Daemon that watches the OTDB database for status changes of trees and publishes those on the messagebus.
"""
import os,sys,time,pg, signal
from optparse import OptionParser
QUERY_EXCEPTIONS = (TypeError, ValueError, MemoryError, pg.ProgrammingError, pg.InternalError)
Alive = False
# Define our own exceptions
class FunctionError(Exception):
"Something when wrong during the execution of the function"
pass
class DatabaseError(Exception):
"Connection with the database could not be made"
pass
# Task Specification Request
def PollForStatusChanges(start_time, end_time, otdb_connection):
"""
Function that asked the database for status changes in the given period
Input : start_time (string) - Oldest time of change to include in the selection.
end_time (string) - Most recent time of change to include in the selection
The times must be specified in the format YYYY-Mon-DD HH24:MI:SS.US.
The selection delivers changes the match: startime <= time_of_change < end_time
Output: (list of tuples) - All status changes between the last polltime and the current time
Tuple = ( tree_id, new_state, time_of_change )
Exceptions:
ArgumentError: There is something wrong with the given input values.
FunctionError: An error occurred during the execution of the function.
The text of the exception explains what is wrong.
"""
# Try to get the specification information
record_list = []
try:
record_list = otdb_connection.query("select treeid,state,modtime,creation from getStateChanges('%s','%s')" %
(start_time, end_time)).getresult()
except QUERY_EXCEPTIONS, exc_info:
raise FunctionError("Error while polling for state changes: %s"% exc_info)
return record_list
def signal_handler(signum, frame):
"Signal redirection to stop the daemon in a neat way."
print "Stopping program"
global Alive
Alive = False
if __name__ == "__main__":
"""
Daemon that sets-up a set of servicess for the OTDB database.
"""
# Check the invocation arguments
parser = OptionParser("%prog [options]")
parser.add_option("-D", "--database", dest="dbName", type="string", default="",
help="Name of the database")
parser.add_option("-H", "--hostname", dest="dbHost", type="string", default="sasdb",
help="Hostname of database server")
(options, args) = parser.parse_args()
if not options.dbName:
print "Missing database name"
parser.print_help()
sys.exit(0)
if not options.dbHost:
print "Missing database server name"
parser.print_help()
sys.exit(0)
# Set signalhandler to stop the program in a neat way.
signal.signal(signal.SIGINT, signal_handler)
Alive = True
connected = False
otdb_connection = None
while Alive:
while Alive and not connected:
# Connect to the database
try:
otdb_connection = pg.connect(user="postgres", host=options.dbHost, dbname=options.dbName)
connected = True
except (TypeError, SyntaxError, pg.InternalError):
connected = False
print "DatabaseError: Connection to database could not be made, reconnect attempt in 5 seconds"
time.sleep(5)
# When we are connected we can poll the database
if connected:
# Get start_time (= creation time of last retrieved record if any)
start_time = ''
try:
start_time = open('time_save.txt', 'rb').read()
except IOError:
start_time = "2015-01-01 00:00:00.00"
print "start_time=", start_time
try:
record_list = PollForStatusChanges(start_time, "now", otdb_connection)
except FunctionError, exc_info:
print exc_info
else:
for (treeid, state, modtime, creation) in record_list:
print treeid, state, modtime, creation
open('time_save.txt', 'wb').write(creation)
start_time = creation
print "==="
# Redetermine the database status.
connected = (otdb_connection and otdb_connection.status == 1)
time.sleep(2)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment