diff --git a/.gitattributes b/.gitattributes index b194049473b4396470ae895de845eb4081004386..de7cb545634a50b919f902e7bc63e69b9f457cff 100644 --- a/.gitattributes +++ b/.gitattributes @@ -4819,6 +4819,8 @@ SAS/OTDB/src/setStatus.conf -text SAS/OTDB/test/tBrokenHardware.cc -text SAS/OTDB/test/tMetadata.cc -text SAS/OTDB/test/tQueryPIC.cc -text +SAS/OTDB_Services/TreeService.py -text +SAS/OTDB_Services/TreeStatusEvents.py -text SAS/Scheduler/src/.default_settings.set -text SAS/Scheduler/src/LOFAR_libScheduler.pro -text SAS/Scheduler/src/conflictdialog.ui -text diff --git a/LCS/Messaging/python/messaging/Service.py b/LCS/Messaging/python/messaging/Service.py index 0d333881851ad7673d768117f82bb05d78094489..0c9171bb1847722f883bc2a7d758333d3d778522 100644 --- a/LCS/Messaging/python/messaging/Service.py +++ b/LCS/Messaging/python/messaging/Service.py @@ -43,10 +43,10 @@ class MessageHandlerInterface(object): handler = <from_MessageHandlerInterface_derived_class>(HandlerArguments) handler.before_main_loop() while alive: - handler.loop_before_receive() + handler.in_loop_before_receive() msg = wait for messages() handler.handle_message(msg) - handler.loop_after_handling() + handler.in_loop_after_handling() handler.after_main_loop() """ def __init__(self, **kwargs): @@ -56,15 +56,15 @@ class MessageHandlerInterface(object): "Called before main processing loop is entered." pass - def loop_before_receive(self): + def in_loop_before_receive(self): "Called in main processing loop just before a blocking wait for messages is done." pass def handle_message(self, msg): "Function the should handle the received message and return a result." - pass + raise Exception("OOPS! YOU ENDED UP IN THE MESSAGE HANDLER OF THE ABSTRACT BASE CLASS!") - def loop_after_handling(self): + def in_loop_after_handling(self): "Called in the main loop after the result was send back to the requester." pass @@ -118,7 +118,7 @@ class Service(object): self.startonwith = kwargs.pop("startonwith", False) self.handler_args = kwargs.pop("handler_args", None) if len(kwargs): - raise ArgumentError("Unexpected argument passed to Serice class: %s", kwargs) + raise AttributeError("Unexpected argument passed to Serice class: %s", kwargs) # Set appropriate flags for exclusive binding if self.exclusive is True: @@ -136,7 +136,7 @@ class Service(object): self.service_handler = MessageHandlerInterface() self.service_handler.handle_message = servicehandler else: - self.service_handler = servicehandler(self.handler_args) + self.service_handler = servicehandler(**self.handler_args) if not isinstance(self.service_handler, MessageHandlerInterface): raise TypeError("Servicehandler argument must by a function or a derived class from MessageHandlerInterface.") @@ -270,9 +270,9 @@ class Service(object): while self.running: try: - self.service_handler.loop_before_receive() + self.service_handler.in_loop_before_receive() except Exception as e: - logger.error("loop_before_receive() failed with %s", e) + logger.error("in_loop_before_receive() failed with %s", e) continue try: @@ -303,9 +303,9 @@ class Service(object): self.okcounter[index] += 1 self.Listen.ack(msg) try: - self.service_handler.loop_after_handling() + self.service_handler.in_loop_after_handling() except Exception as e: - logger.error("loop_after_handling() failed with %s", e) + logger.error("in_loop_after_handling() failed with %s", e) continue except Exception as e: diff --git a/SAS/OTDB_Services/TreeService.py b/SAS/OTDB_Services/TreeService.py new file mode 100644 index 0000000000000000000000000000000000000000..77b1212238e7db7cf73b5857dfda95874ad41fe4 --- /dev/null +++ b/SAS/OTDB_Services/TreeService.py @@ -0,0 +1,316 @@ +#!/usr/bin/python +#coding: iso-8859-15 +# +# Copyright (C) 2015 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id: Backtrace.cc 31468 2015-04-13 23:26:52Z amesfoort $ +""" +RPC functions that allow access to (VIC) trees in OTDB. + +TaskSpecificationRequest: get the specification(parset) of a tree as dict. +KeyUpdateCommand : function to update the value of multiple (existing) keys. +StatusUpdateCommand : finction to update the status of a tree. +""" + +import os,sys,time,pg +import logging +from optparse import OptionParser +from lofar.messaging.Service import * +from lofar.messaging.RPC import * + +QUERY_EXCEPTIONS = (TypeError, ValueError, MemoryError, pg.ProgrammingError, pg.InternalError) + +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logger = logging.getLogger(__name__) + +# Define our own exceptions +class FunctionError(Exception): + "Something when wrong during the execution of the function" + pass +class DatabaseError(Exception): + "Connection with the database could not be made" + pass + +# Task Specification Request +def TaskSpecificationRequest(input_dict, db_connection): + """ + RPC function that retrieves the task specification from a tree. + + Input : OtdbID (integer) - ID of the tree to retrieve the specifications of + Output: (dict) - The 'parset' of the tree + + Exceptions: + AttributeError: There is something wrong with the given input values. + FunctionError: An error occurred during the execution of the function. + The text of the exception explains what is wrong. + """ + # Check the input + if not isinstance(input_dict, dict): + raise AttributeError("TaskSpecificationRequest: Expected a dict as input") + try: + tree_id = input_dict['OtdbID'] + except KeyError, info: + raise AttributeError("TaskSpecificationRequest: Key %s is missing in the input" % info) + + # Try to get the specification information + try: + logger.debug("TaskSpecificationRequest:%s" % input_dict) + top_node = db_connection.query("select nodeid from getTopNode('%s')" % tree_id).getresult()[0][0] + treeinfo = db_connection.query("select exportTree(1, '%s', '%s')" % (tree_id, top_node)).getresult()[0][0] + except QUERY_EXCEPTIONS, exc_info: + raise FunctionError("Error while requesting specs of tree %d: %s"% (tree_id, exc_info)) + # convert the single string into the dict + answer = {} + for line in treeinfo.split('\n'): + try: + (key, value) = line.split("=", 1) + answer[key] = value + except ValueError: + pass + return answer + +# Status Update Command +def StatusUpdateCommand(input_dict): + """ + RPC function to update the status of a tree. + + Input : OtdbID (integer) - ID of the tree to change the status of. + NewStatus (string) - The new status of the tree. The following values are allowed: + described, prepared, approved, on_hold, conflict, prescheduled, scheduled, queued, + active, completing, finished, aborted, error, obsolete + UpdateTimestamps (boolean) - Optional parameter to also update the timestamp of the metadata of the + tree when the status of the tree is changed into 'active', 'finished' or 'aborted'. Resp. starttime + or endtime. Default this option is ON. + Output: (boolean) - Reflects the successful update of the status. + + Exceptions: + AttributeError: There is something wrong with the given input values. + FunctionError: An error occurred during the execution of the function. + The text of the exception explains what is wrong. + """ + global otdb_connection + # Check input + if not isinstance(input_dict, dict): + raise AttributeError("StatusUpdateCommand: Expected a dict as input") + try: + tree_id = input_dict['OtdbID'] + new_status = input_dict['NewStatus'] + update_times = True + if input_dict.has_key("UpdateTimestamps"): + update_times = bool(input_dict["UpdateTimestamps"]) + except KeyError, info: + raise AttributeError("StatusUpdateCommand: Key %s is missing in the input" % info) + + # Get list of allowed tree states + allowed_states = {} + try: + for (id,name) in otdb_connection.query("select id,name from treestate").getresult(): + allowed_states[name] = id + except QUERY_EXCEPTIONS, exc_info: + raise FunctionError("Error while getting allowed states of tree %d: %s" % (tree_id, exc_info)) + + # Check value of new_status argument + if not new_status in allowed_states: + raise FunctionError("The newstatus(=%s) for tree %d must have one of the following values:%s" % + (new_status, tree_id, allowed_states.keys())) + + # Finally try to change the status + try: + success = (otdb_connection.query("select setTreeState(1, %d, %d::INT2,%s)" % + (tree_id, allowed_states[new_status], str(update_times))).getresult()[0][0] == 't') + except QUERY_EXCEPTIONS, exc_info: + raise FunctionError("Error while setting the status of tree %d: %s" % (tree_id, exc_info)) + return success + + +# Key Update Command +def KeyUpdateCommand(input_dict): + """ + RPC function to update the values of a tree. + + Input : OtdbID (integer) - ID of the tree to change the status of. + Updates (dict) - The key-value pairs that must be updated. + Output: (dict) + 'Errors' (dict) Refects the problems that occured {'key':'problem'} + Field is empty if all fields could be updated. + Exceptions: + AttributeError: There is something wrong with the given input values. + FunctionError: An error occurred during the execution of the function. + The text of the exception explains what is wrong. + """ + global otdb_connection + # Check input + if not isinstance(input_dict, dict): + raise AttributeError("Expected a dict as input") + try: + tree_id = input_dict['OtdbID'] + update_list = input_dict['Updates'] + except KeyError, info: + raise AttributeError("KeyUpdateCommand: Key %s is missing in the input" % info) + if not isinstance(tree_id, int): + raise AttributeError("KeyUpdateCommand (tree=%d): Field 'OtdbID' must be of type 'integer'" % tree_id) + if not isinstance(update_list, dict): + raise AttributeError("KeyUpdateCommand (tree=%d): Field 'Updates' must be of type 'dict'" % tree_id) + + # Finally try to update all keys + errors = {} + for (key,value) in update_list.iteritems(): + try: + record_list = (otdb_connection.query("select nodeid,instances,limits from getvhitemlist (%d, '%s')" % + (tree_id, key))).getresult() + if len(record_list) == 0: + errors[key] = "Not found for tree %d" % tree_id + continue + if len(record_list) > 1: + errors[key] = "Not a unique key, found %d occurrences for tree %d" % (len(record_list), tree_id) + continue + # When one record was found record_list is a list with a single tuple (nodeid, instances, current_value) + node_id = record_list[0][0] + instances = record_list[0][1] + result = ((otdb_connection.query("select updateVTnode(1,%d,%d,%d::INT2,'%s')" % + (tree_id, node_id, instances, value))).getresult()[0][0] == 't') + print "%s: %s ==> %s" % (key, record_list[0][2], value) + except QUERY_EXCEPTIONS, exc: + errors[key] = str(exc) + + return errors + +class PostgressMessageHandlerInterface(MessageHandlerInterface): + """ + Implements a generic message handlers for services that are tied to a postgres database. + kwargs must contain the keys: + database <string> Name of the database to connect to + db_user <string> Name of the user used for logging into the database + db_host <string> Name of the machine the database server is running + function <type> Function to call when a message is received on the message bus. + """ + def __init__(self, **kwargs): + super(PostgressMessageHandlerInterface, self).__init__() + self.database = kwargs.pop("database") + self.db_user = kwargs.pop("db_user", "postgres") + self.db_host = kwargs.pop("db_host", "localhost") + if len(kwargs): + raise AttributeError("Unknown keys in arguments of 'DatabaseTiedMessageHandler: %s" % kwargs) + self.connection = None + self.connected = False + + def in_loop_before_receive(self): + "Called in main processing loop just before a blocking wait for messages is done." + "Make sure we are connected with the database." + self.connected = (self.connection and self.connection.status == 1) + while not self.connected: + try: + self.connection = pg.connect(user=self.db_user, host=self.db_host, dbname=self.database) + self.connected = True + logger.info("Connected to database %s on host %s" % (dbname, host)) + except (TypeError, SyntaxError, pg.InternalError): + self.connected = False + logger.error("Not connected to database %s on host %s (anymore), retry in 5 seconds" + % (dbname, host)) + time.sleep(5) + +class PostgressTaskSpecificationRequest(PostgressMessageHandlerInterface): + """ + Embedding of the TaskSpecificationRequest function in the postgress service class. + """ + def __init__(self, **kwargs): + super(PostgressTaskSpecificationRequest, self).__init__(**kwargs) + + def handle_message(self, msg): + return TaskSpecificationRequest(msg, self.connection) + + +class PostgressStatusUpdateCommand(PostgressMessageHandlerInterface): + """ + Embedding of the TaskSpecificationRequest function in the postgress service class. + """ + def __init__(self, **kwargs): + super(PostgressStatusUpdateCommand, self).__init__(**kwargs) + + def handle_message(self, msg): + return StatusUpdateCommand(msg, self.connection) + + +class PostgressKeyUpdateCommand(PostgressMessageHandlerInterface): + """ + Embedding of the TaskSpecificationRequest function in the postgress service class. + """ + def __init__(self, **kwargs): + super(PostgressKeyUpdateCommand, self).__init__(**kwargs) + + def handle_message(self, msg): + return KeyUpdateCommand(msg, self.connection) + + +if __name__ == "__main__": + """ + Daemon that sets-up a set of servicess for the OTDB database. + """ + # Check the invocation arguments + parser = OptionParser("%prog [options]") + parser.add_option("-D", "--database", dest="dbName", type="string", default="", + help="Name of the database") + parser.add_option("-H", "--hostname", dest="dbHost", type="string", default="sasdb", + help="Hostname of database server") + (options, args) = parser.parse_args() + + if not options.dbName: + print "Missing database name" + parser.print_help() + sys.exit(0) + + if not options.dbHost: + print "Missing database server name" + parser.print_help() + sys.exit(0) + + def do_specification_rpc(function, args): + try: + (data,status) = function(args) + if data: + for (key,value) in data.iteritems(): + print "%s ==> %s" % (key, value) + else: + print status + except Exception as e: + print e + print "################" + + busname = sys.argv[1] if len(sys.argv) > 1 else "simpletest" + + serv1 = Service(busname, "TaskSpecification", PostgressTaskSpecificationRequest, + numthreads=1, startonwith=True, + handler_args = {"database" : options.dbName, "db_host" : options.dbHost}) + serv2 = Service(busname, "StatusUpdateCmd", PostgressStatusUpdateCommand, + numthreads=1, startonwith=True, + handler_args = {"database" : options.dbName, "db_host" : options.dbHost}) + serv3 = Service(busname, "KeyUpdateCmd", PostgressKeyUpdateCommand, + numthreads=1, startonwith=True, + handler_args = {"database" : options.dbName, "db_host" : options.dbHost}) + + with serv1,serv2,serv3: + with RPC(busname, "TaskSpecification", ForwardExceptions=True) as task_spec_request: + do_specification_rpc(task_spec_request, {'OtdbID':63370}) + do_specification_rpc(task_spec_request, {'OtdbID':82111}) + do_specification_rpc(task_spec_request, {'OtdbID':146300}) + +# print StatusUpdateCommand({'OtdbID':146300, 'NewStatus':'finished', 'UpdateTimestamps':True}) +# print KeyUpdateCommand({'OtdbID':63370, 'Updates':{'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl._hostname':'CCU099ABC'}}) + + diff --git a/SAS/OTDB_Services/TreeStatusEvents.py b/SAS/OTDB_Services/TreeStatusEvents.py new file mode 100644 index 0000000000000000000000000000000000000000..1fb08b33fe6f339b6f5fb0fb575e8ac6bbbe9dd2 --- /dev/null +++ b/SAS/OTDB_Services/TreeStatusEvents.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python +#coding: iso-8859-15 +# +# Copyright (C) 2015 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id: Backtrace.cc 31468 2015-04-13 23:26:52Z amesfoort $ +""" +Daemon that watches the OTDB database for status changes of trees and publishes those on the messagebus. +""" + +import os,sys,time,pg, signal +from optparse import OptionParser + +QUERY_EXCEPTIONS = (TypeError, ValueError, MemoryError, pg.ProgrammingError, pg.InternalError) +Alive = False + +# Define our own exceptions +class FunctionError(Exception): + "Something when wrong during the execution of the function" + pass +class DatabaseError(Exception): + "Connection with the database could not be made" + pass + +# Task Specification Request +def PollForStatusChanges(start_time, end_time, otdb_connection): + """ + Function that asked the database for status changes in the given period + + Input : start_time (string) - Oldest time of change to include in the selection. + end_time (string) - Most recent time of change to include in the selection + The times must be specified in the format YYYY-Mon-DD HH24:MI:SS.US. + The selection delivers changes the match: startime <= time_of_change < end_time + + Output: (list of tuples) - All status changes between the last polltime and the current time + Tuple = ( tree_id, new_state, time_of_change ) + + Exceptions: + ArgumentError: There is something wrong with the given input values. + FunctionError: An error occurred during the execution of the function. + The text of the exception explains what is wrong. + """ + # Try to get the specification information + record_list = [] + try: + record_list = otdb_connection.query("select treeid,state,modtime,creation from getStateChanges('%s','%s')" % + (start_time, end_time)).getresult() + except QUERY_EXCEPTIONS, exc_info: + raise FunctionError("Error while polling for state changes: %s"% exc_info) + return record_list + +def signal_handler(signum, frame): + "Signal redirection to stop the daemon in a neat way." + print "Stopping program" + global Alive + Alive = False + + +if __name__ == "__main__": + """ + Daemon that sets-up a set of servicess for the OTDB database. + """ + # Check the invocation arguments + parser = OptionParser("%prog [options]") + parser.add_option("-D", "--database", dest="dbName", type="string", default="", + help="Name of the database") + parser.add_option("-H", "--hostname", dest="dbHost", type="string", default="sasdb", + help="Hostname of database server") + (options, args) = parser.parse_args() + + if not options.dbName: + print "Missing database name" + parser.print_help() + sys.exit(0) + + if not options.dbHost: + print "Missing database server name" + parser.print_help() + sys.exit(0) + + # Set signalhandler to stop the program in a neat way. + signal.signal(signal.SIGINT, signal_handler) + + Alive = True + connected = False + otdb_connection = None + while Alive: + while Alive and not connected: + # Connect to the database + try: + otdb_connection = pg.connect(user="postgres", host=options.dbHost, dbname=options.dbName) + connected = True + except (TypeError, SyntaxError, pg.InternalError): + connected = False + print "DatabaseError: Connection to database could not be made, reconnect attempt in 5 seconds" + time.sleep(5) + + # When we are connected we can poll the database + if connected: + # Get start_time (= creation time of last retrieved record if any) + start_time = '' + try: + start_time = open('time_save.txt', 'rb').read() + except IOError: + start_time = "2015-01-01 00:00:00.00" + print "start_time=", start_time + + try: + record_list = PollForStatusChanges(start_time, "now", otdb_connection) + except FunctionError, exc_info: + print exc_info + else: + for (treeid, state, modtime, creation) in record_list: + print treeid, state, modtime, creation + open('time_save.txt', 'wb').write(creation) + start_time = creation + print "===" + + # Redetermine the database status. + connected = (otdb_connection and otdb_connection.status == 1) + + time.sleep(2) +