Skip to content
Snippets Groups Projects
Commit c1a43df9 authored by Ruud Overeem's avatar Ruud Overeem
Browse files

Task #8531: TreeService should work now. It's a lot of effort to make a decent testdatabase. :-(

TreeStatusEvent also runs and produces message. Here also the remaining work is in the test setup.
parent 8320773f
No related branches found
No related tags found
No related merge requests found
...@@ -4821,6 +4821,7 @@ SAS/OTDB/test/tMetadata.cc -text ...@@ -4821,6 +4821,7 @@ SAS/OTDB/test/tMetadata.cc -text
SAS/OTDB/test/tQueryPIC.cc -text SAS/OTDB/test/tQueryPIC.cc -text
SAS/OTDB_Services/TreeService.py -text SAS/OTDB_Services/TreeService.py -text
SAS/OTDB_Services/TreeStatusEvents.py -text SAS/OTDB_Services/TreeStatusEvents.py -text
SAS/OTDB_Services/test/t_TreeService.py -text
SAS/Scheduler/src/.default_settings.set -text SAS/Scheduler/src/.default_settings.set -text
SAS/Scheduler/src/LOFAR_libScheduler.pro -text SAS/Scheduler/src/LOFAR_libScheduler.pro -text
SAS/Scheduler/src/conflictdialog.ui -text SAS/Scheduler/src/conflictdialog.ui -text
......
...@@ -70,7 +70,7 @@ def TaskSpecificationRequest(input_dict, db_connection): ...@@ -70,7 +70,7 @@ def TaskSpecificationRequest(input_dict, db_connection):
# Try to get the specification information # Try to get the specification information
try: try:
logger.debug("TaskSpecificationRequest:%s" % input_dict) logger.info("TaskSpecificationRequest:%s" % input_dict)
top_node = db_connection.query("select nodeid from getTopNode('%s')" % tree_id).getresult()[0][0] top_node = db_connection.query("select nodeid from getTopNode('%s')" % tree_id).getresult()[0][0]
treeinfo = db_connection.query("select exportTree(1, '%s', '%s')" % (tree_id, top_node)).getresult()[0][0] treeinfo = db_connection.query("select exportTree(1, '%s', '%s')" % (tree_id, top_node)).getresult()[0][0]
except QUERY_EXCEPTIONS, exc_info: except QUERY_EXCEPTIONS, exc_info:
...@@ -86,7 +86,7 @@ def TaskSpecificationRequest(input_dict, db_connection): ...@@ -86,7 +86,7 @@ def TaskSpecificationRequest(input_dict, db_connection):
return answer return answer
# Status Update Command # Status Update Command
def StatusUpdateCommand(input_dict): def StatusUpdateCommand(input_dict, db_connection):
""" """
RPC function to update the status of a tree. RPC function to update the status of a tree.
...@@ -104,7 +104,6 @@ def StatusUpdateCommand(input_dict): ...@@ -104,7 +104,6 @@ def StatusUpdateCommand(input_dict):
FunctionError: An error occurred during the execution of the function. FunctionError: An error occurred during the execution of the function.
The text of the exception explains what is wrong. The text of the exception explains what is wrong.
""" """
global otdb_connection
# Check input # Check input
if not isinstance(input_dict, dict): if not isinstance(input_dict, dict):
raise AttributeError("StatusUpdateCommand: Expected a dict as input") raise AttributeError("StatusUpdateCommand: Expected a dict as input")
...@@ -114,13 +113,14 @@ def StatusUpdateCommand(input_dict): ...@@ -114,13 +113,14 @@ def StatusUpdateCommand(input_dict):
update_times = True update_times = True
if input_dict.has_key("UpdateTimestamps"): if input_dict.has_key("UpdateTimestamps"):
update_times = bool(input_dict["UpdateTimestamps"]) update_times = bool(input_dict["UpdateTimestamps"])
logger.info("StatusUpdateCommand(%s,%s,%s)" % (tree_id, new_status, update_times))
except KeyError, info: except KeyError, info:
raise AttributeError("StatusUpdateCommand: Key %s is missing in the input" % info) raise AttributeError("StatusUpdateCommand: Key %s is missing in the input" % info)
# Get list of allowed tree states # Get list of allowed tree states
allowed_states = {} allowed_states = {}
try: try:
for (id,name) in otdb_connection.query("select id,name from treestate").getresult(): for (id,name) in db_connection.query("select id,name from treestate").getresult():
allowed_states[name] = id allowed_states[name] = id
except QUERY_EXCEPTIONS, exc_info: except QUERY_EXCEPTIONS, exc_info:
raise FunctionError("Error while getting allowed states of tree %d: %s" % (tree_id, exc_info)) raise FunctionError("Error while getting allowed states of tree %d: %s" % (tree_id, exc_info))
...@@ -132,7 +132,7 @@ def StatusUpdateCommand(input_dict): ...@@ -132,7 +132,7 @@ def StatusUpdateCommand(input_dict):
# Finally try to change the status # Finally try to change the status
try: try:
success = (otdb_connection.query("select setTreeState(1, %d, %d::INT2,%s)" % success = (db_connection.query("select setTreeState(1, %d, %d::INT2,%s)" %
(tree_id, allowed_states[new_status], str(update_times))).getresult()[0][0] == 't') (tree_id, allowed_states[new_status], str(update_times))).getresult()[0][0] == 't')
except QUERY_EXCEPTIONS, exc_info: except QUERY_EXCEPTIONS, exc_info:
raise FunctionError("Error while setting the status of tree %d: %s" % (tree_id, exc_info)) raise FunctionError("Error while setting the status of tree %d: %s" % (tree_id, exc_info))
...@@ -140,7 +140,7 @@ def StatusUpdateCommand(input_dict): ...@@ -140,7 +140,7 @@ def StatusUpdateCommand(input_dict):
# Key Update Command # Key Update Command
def KeyUpdateCommand(input_dict): def KeyUpdateCommand(input_dict, db_connection):
""" """
RPC function to update the values of a tree. RPC function to update the values of a tree.
...@@ -154,7 +154,6 @@ def KeyUpdateCommand(input_dict): ...@@ -154,7 +154,6 @@ def KeyUpdateCommand(input_dict):
FunctionError: An error occurred during the execution of the function. FunctionError: An error occurred during the execution of the function.
The text of the exception explains what is wrong. The text of the exception explains what is wrong.
""" """
global otdb_connection
# Check input # Check input
if not isinstance(input_dict, dict): if not isinstance(input_dict, dict):
raise AttributeError("Expected a dict as input") raise AttributeError("Expected a dict as input")
...@@ -167,12 +166,13 @@ def KeyUpdateCommand(input_dict): ...@@ -167,12 +166,13 @@ def KeyUpdateCommand(input_dict):
raise AttributeError("KeyUpdateCommand (tree=%d): Field 'OtdbID' must be of type 'integer'" % tree_id) raise AttributeError("KeyUpdateCommand (tree=%d): Field 'OtdbID' must be of type 'integer'" % tree_id)
if not isinstance(update_list, dict): if not isinstance(update_list, dict):
raise AttributeError("KeyUpdateCommand (tree=%d): Field 'Updates' must be of type 'dict'" % tree_id) raise AttributeError("KeyUpdateCommand (tree=%d): Field 'Updates' must be of type 'dict'" % tree_id)
logger.info("KeyUpdateCommand for tree: %d", tree_id)
# Finally try to update all keys # Finally try to update all keys
errors = {} errors = {}
for (key,value) in update_list.iteritems(): for (key,value) in update_list.iteritems():
try: try:
record_list = (otdb_connection.query("select nodeid,instances,limits from getvhitemlist (%d, '%s')" % record_list = (db_connection.query("select nodeid,instances,limits from getvhitemlist (%d, '%s')" %
(tree_id, key))).getresult() (tree_id, key))).getresult()
if len(record_list) == 0: if len(record_list) == 0:
errors[key] = "Not found for tree %d" % tree_id errors[key] = "Not found for tree %d" % tree_id
...@@ -183,7 +183,7 @@ def KeyUpdateCommand(input_dict): ...@@ -183,7 +183,7 @@ def KeyUpdateCommand(input_dict):
# When one record was found record_list is a list with a single tuple (nodeid, instances, current_value) # When one record was found record_list is a list with a single tuple (nodeid, instances, current_value)
node_id = record_list[0][0] node_id = record_list[0][0]
instances = record_list[0][1] instances = record_list[0][1]
result = ((otdb_connection.query("select updateVTnode(1,%d,%d,%d::INT2,'%s')" % result = ((db_connection.query("select updateVTnode(1,%d,%d,%d::INT2,'%s')" %
(tree_id, node_id, instances, value))).getresult()[0][0] == 't') (tree_id, node_id, instances, value))).getresult()[0][0] == 't')
print "%s: %s ==> %s" % (key, record_list[0][2], value) print "%s: %s ==> %s" % (key, record_list[0][2], value)
except QUERY_EXCEPTIONS, exc: except QUERY_EXCEPTIONS, exc:
...@@ -218,11 +218,11 @@ class PostgressMessageHandlerInterface(MessageHandlerInterface): ...@@ -218,11 +218,11 @@ class PostgressMessageHandlerInterface(MessageHandlerInterface):
try: try:
self.connection = pg.connect(user=self.db_user, host=self.db_host, dbname=self.database) self.connection = pg.connect(user=self.db_user, host=self.db_host, dbname=self.database)
self.connected = True self.connected = True
logger.info("Connected to database %s on host %s" % (dbname, host)) logger.info("Connected to database %s on host %s" % (self.database, self.db_host))
except (TypeError, SyntaxError, pg.InternalError): except (TypeError, SyntaxError, pg.InternalError):
self.connected = False self.connected = False
logger.error("Not connected to database %s on host %s (anymore), retry in 5 seconds" logger.error("Not connected to database %s on host %s (anymore), retry in 5 seconds"
% (dbname, host)) % (self.database, self.db_host))
time.sleep(5) time.sleep(5)
class PostgressTaskSpecificationRequest(PostgressMessageHandlerInterface): class PostgressTaskSpecificationRequest(PostgressMessageHandlerInterface):
...@@ -280,37 +280,21 @@ if __name__ == "__main__": ...@@ -280,37 +280,21 @@ if __name__ == "__main__":
parser.print_help() parser.print_help()
sys.exit(0) sys.exit(0)
def do_specification_rpc(function, args):
try:
(data,status) = function(args)
if data:
for (key,value) in data.iteritems():
print "%s ==> %s" % (key, value)
else:
print status
except Exception as e:
print e
print "################"
busname = sys.argv[1] if len(sys.argv) > 1 else "simpletest" busname = sys.argv[1] if len(sys.argv) > 1 else "simpletest"
serv1 = Service(busname, "TaskSpecification", PostgressTaskSpecificationRequest, serv1 = Service("TaskSpecification", PostgressTaskSpecificationRequest,
numthreads=1, startonwith=True, busname=busname, numthreads=1, startonwith=True,
handler_args = {"database" : options.dbName, "db_host" : options.dbHost}) handler_args = {"database" : options.dbName, "db_host" : options.dbHost})
serv2 = Service(busname, "StatusUpdateCmd", PostgressStatusUpdateCommand, serv2 = Service("StatusUpdateCmd", PostgressStatusUpdateCommand,
numthreads=1, startonwith=True, busname=busname, numthreads=1, startonwith=True,
handler_args = {"database" : options.dbName, "db_host" : options.dbHost}) handler_args = {"database" : options.dbName, "db_host" : options.dbHost})
serv3 = Service(busname, "KeyUpdateCmd", PostgressKeyUpdateCommand, serv3 = Service("KeyUpdateCmd", PostgressKeyUpdateCommand,
numthreads=1, startonwith=True, busname=busname, numthreads=1, startonwith=True,
handler_args = {"database" : options.dbName, "db_host" : options.dbHost}) handler_args = {"database" : options.dbName, "db_host" : options.dbHost})
with serv1,serv2,serv3: with serv1,serv2,serv3:
with RPC(busname, "TaskSpecification", ForwardExceptions=True) as task_spec_request: logger.info("Started the OTDB services")
do_specification_rpc(task_spec_request, {'OtdbID':63370}) serv3.wait_for_interrupt()
do_specification_rpc(task_spec_request, {'OtdbID':82111})
do_specification_rpc(task_spec_request, {'OtdbID':146300}) logger.info("Stopped the OTDB services")
# print StatusUpdateCommand({'OtdbID':146300, 'NewStatus':'finished', 'UpdateTimestamps':True})
# print KeyUpdateCommand({'OtdbID':63370, 'Updates':{'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl._hostname':'CCU099ABC'}})
...@@ -26,6 +26,7 @@ Daemon that watches the OTDB database for status changes of trees and publishes ...@@ -26,6 +26,7 @@ Daemon that watches the OTDB database for status changes of trees and publishes
import os,sys,time,pg, signal import os,sys,time,pg, signal
from optparse import OptionParser from optparse import OptionParser
from lofar.messaging import EventMessage,ToBus
QUERY_EXCEPTIONS = (TypeError, ValueError, MemoryError, pg.ProgrammingError, pg.InternalError) QUERY_EXCEPTIONS = (TypeError, ValueError, MemoryError, pg.ProgrammingError, pg.InternalError)
Alive = False Alive = False
...@@ -82,6 +83,8 @@ if __name__ == "__main__": ...@@ -82,6 +83,8 @@ if __name__ == "__main__":
help="Name of the database") help="Name of the database")
parser.add_option("-H", "--hostname", dest="dbHost", type="string", default="sasdb", parser.add_option("-H", "--hostname", dest="dbHost", type="string", default="sasdb",
help="Hostname of database server") help="Hostname of database server")
parser.add_option("-B", "--busname", dest="busname", type="string", default="",
help="Busname or queue-name the status changes are published on")
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
if not options.dbName: if not options.dbName:
...@@ -94,46 +97,55 @@ if __name__ == "__main__": ...@@ -94,46 +97,55 @@ if __name__ == "__main__":
parser.print_help() parser.print_help()
sys.exit(0) sys.exit(0)
if not options.busname:
print "Missing busname"
parser.print_help()
sys.exit(0)
# Set signalhandler to stop the program in a neat way. # Set signalhandler to stop the program in a neat way.
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
Alive = True Alive = True
connected = False connected = False
otdb_connection = None otdb_connection = None
while Alive: with ToBus(options.busname) as send_bus:
while Alive and not connected: while Alive:
# Connect to the database while Alive and not connected:
try: # Connect to the database
otdb_connection = pg.connect(user="postgres", host=options.dbHost, dbname=options.dbName) try:
connected = True otdb_connection = pg.connect(user="postgres", host=options.dbHost, dbname=options.dbName)
except (TypeError, SyntaxError, pg.InternalError): connected = True
connected = False except (TypeError, SyntaxError, pg.InternalError):
print "DatabaseError: Connection to database could not be made, reconnect attempt in 5 seconds" connected = False
time.sleep(5) print "DatabaseError: Connection to database could not be made, reconnect attempt in 5 seconds"
time.sleep(5)
# When we are connected we can poll the database
if connected: # When we are connected we can poll the database
# Get start_time (= creation time of last retrieved record if any) if connected:
start_time = '' # Get start_time (= creation time of last retrieved record if any)
try: start_time = ''
start_time = open('time_save.txt', 'rb').read() try:
except IOError: start_time = open('time_save.txt', 'rb').read()
start_time = "2015-01-01 00:00:00.00" except IOError:
print "start_time=", start_time start_time = "2015-01-01 00:00:00.00"
print "start_time=", start_time
try:
record_list = PollForStatusChanges(start_time, "now", otdb_connection) try:
except FunctionError, exc_info: record_list = PollForStatusChanges(start_time, "now", otdb_connection)
print exc_info except FunctionError, exc_info:
else: print exc_info
for (treeid, state, modtime, creation) in record_list: else:
print treeid, state, modtime, creation for (treeid, state, modtime, creation) in record_list:
open('time_save.txt', 'wb').write(creation) content = { "treeID" : treeid, "state" : state, "time_of_change" : modtime }
start_time = creation msg = EventMessage(content)
print "===" print treeid, state, modtime, creation
send_bus.send(msg)
# Redetermine the database status. open('time_save.txt', 'wb').write(creation)
connected = (otdb_connection and otdb_connection.status == 1) start_time = creation
print "==="
time.sleep(2)
# Redetermine the database status.
connected = (otdb_connection and otdb_connection.status == 1)
time.sleep(2)
#!/usr/bin/python
#coding: iso-8859-15
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: Backtrace.cc 31468 2015-04-13 23:26:52Z amesfoort $
"""
RPC functions that allow access to (VIC) trees in OTDB.
TaskSpecificationRequest: get the specification(parset) of a tree as dict.
KeyUpdateCommand : function to update the value of multiple (existing) keys.
StatusUpdateCommand : finction to update the status of a tree.
"""
import sys
import logging
from lofar.messaging.RPC import *
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
def do_rpc(rpc_instance, arg_dict):
try:
(data,status) = (rpc_instance)(arg_dict)
if (status != "OK"):
raise Exception("Status returned is %s" % status)
print type(data)
for (key,value) in data.iteritems():
print "%s ==> %s" % (key, value)
except OverflowError as e:
pass
print "======"
if __name__ == "__main__":
busname = sys.argv[1] if len(sys.argv) > 1 else "simpletest"
# with RPC("TaskSpecification", ForwardExceptions=True, busname=busname, timeout=2) as task_spec_request:
# do_rpc(task_spec_request, {'OtdbID':63370})
# do_rpc(task_spec_request, {'OtdbID':146300})
# do_rpc(task_spec_request, {'OtdbID':82111})
# print StatusUpdateCmd({'OtdbID':146300, 'NewStatus':'finished', 'UpdateTimestamps':True})
with RPC("KeyUpdateCmd", ForwardExceptions=True, busname=busname, timeout=2) as key_update:
print key_update({'OtdbID':63370,
'Updates':{'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl._hostname':'CCU099ABC'}})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment