From c1a43df9610a8c3f0c3a56de1ae5e55c5852070e Mon Sep 17 00:00:00 2001
From: Ruud Overeem <overeem@astron.nl>
Date: Mon, 2 Nov 2015 16:00:28 +0000
Subject: [PATCH] Task #8531: TreeService should work now. It's a lot of effort
 to make a decent testdatabase. :-( TreeStatusEvent also runs and produces
 message. Here also the remaining work is in the test setup.

---
 .gitattributes                          |  1 +
 SAS/OTDB_Services/TreeService.py        | 58 +++++++----------
 SAS/OTDB_Services/TreeStatusEvents.py   | 84 ++++++++++++++-----------
 SAS/OTDB_Services/test/t_TreeService.py | 65 +++++++++++++++++++
 4 files changed, 135 insertions(+), 73 deletions(-)
 create mode 100644 SAS/OTDB_Services/test/t_TreeService.py

diff --git a/.gitattributes b/.gitattributes
index de7cb545634..1e07452a502 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -4821,6 +4821,7 @@ SAS/OTDB/test/tMetadata.cc -text
 SAS/OTDB/test/tQueryPIC.cc -text
 SAS/OTDB_Services/TreeService.py -text
 SAS/OTDB_Services/TreeStatusEvents.py -text
+SAS/OTDB_Services/test/t_TreeService.py -text
 SAS/Scheduler/src/.default_settings.set -text
 SAS/Scheduler/src/LOFAR_libScheduler.pro -text
 SAS/Scheduler/src/conflictdialog.ui -text
diff --git a/SAS/OTDB_Services/TreeService.py b/SAS/OTDB_Services/TreeService.py
index ce27dc471fd..97b74625392 100644
--- a/SAS/OTDB_Services/TreeService.py
+++ b/SAS/OTDB_Services/TreeService.py
@@ -70,7 +70,7 @@ def TaskSpecificationRequest(input_dict, db_connection):
 
     # Try to get the specification information
     try:
-        logger.debug("TaskSpecificationRequest:%s" % input_dict)
+        logger.info("TaskSpecificationRequest:%s" % input_dict)
         top_node = db_connection.query("select nodeid from getTopNode('%s')" % tree_id).getresult()[0][0]
         treeinfo = db_connection.query("select exportTree(1, '%s', '%s')" % (tree_id, top_node)).getresult()[0][0]
     except QUERY_EXCEPTIONS, exc_info:
@@ -86,7 +86,7 @@ def TaskSpecificationRequest(input_dict, db_connection):
     return answer
 
 # Status Update Command
-def StatusUpdateCommand(input_dict):
+def StatusUpdateCommand(input_dict, db_connection):
     """
     RPC function to update the status of a tree.
 
@@ -104,7 +104,6 @@ def StatusUpdateCommand(input_dict):
     FunctionError: An error occurred during the execution of the function. 
                    The text of the exception explains what is wrong.
     """
-    global otdb_connection
     # Check input
     if not isinstance(input_dict, dict):
         raise AttributeError("StatusUpdateCommand: Expected a dict as input")
@@ -114,13 +113,14 @@ def StatusUpdateCommand(input_dict):
         update_times = True
         if input_dict.has_key("UpdateTimestamps"):
             update_times = bool(input_dict["UpdateTimestamps"])
+        logger.info("StatusUpdateCommand(%s,%s,%s)" % (tree_id, new_status, update_times))
     except KeyError, info:
         raise AttributeError("StatusUpdateCommand: Key %s is missing in the input" % info)
 
     # Get list of allowed tree states
     allowed_states = {}
     try:
-        for (id,name) in otdb_connection.query("select id,name from treestate").getresult():
+        for (id,name) in db_connection.query("select id,name from treestate").getresult():
             allowed_states[name] = id
     except QUERY_EXCEPTIONS, exc_info:
         raise FunctionError("Error while getting allowed states of tree %d: %s" % (tree_id, exc_info))
@@ -132,7 +132,7 @@ def StatusUpdateCommand(input_dict):
 
     # Finally try to change the status
     try:
-        success = (otdb_connection.query("select setTreeState(1, %d, %d::INT2,%s)" % 
+        success = (db_connection.query("select setTreeState(1, %d, %d::INT2,%s)" % 
             (tree_id, allowed_states[new_status], str(update_times))).getresult()[0][0] == 't')
     except QUERY_EXCEPTIONS, exc_info:
         raise FunctionError("Error while setting the status of tree %d: %s" % (tree_id, exc_info))
@@ -140,7 +140,7 @@ def StatusUpdateCommand(input_dict):
 
 
 # Key Update Command
-def KeyUpdateCommand(input_dict):
+def KeyUpdateCommand(input_dict, db_connection):
     """
     RPC function to update the values of a tree.
 
@@ -154,7 +154,6 @@ def KeyUpdateCommand(input_dict):
     FunctionError: An error occurred during the execution of the function. 
                    The text of the exception explains what is wrong.
     """
-    global otdb_connection
     # Check input
     if not isinstance(input_dict, dict):
         raise AttributeError("Expected a dict as input")
@@ -167,12 +166,13 @@ def KeyUpdateCommand(input_dict):
         raise AttributeError("KeyUpdateCommand (tree=%d): Field 'OtdbID' must be of type 'integer'" % tree_id)
     if not isinstance(update_list, dict):
         raise AttributeError("KeyUpdateCommand (tree=%d): Field 'Updates' must be of type 'dict'" % tree_id)
+    logger.info("KeyUpdateCommand for tree: %d", tree_id)
 
     # Finally try to update all keys
     errors = {}
     for (key,value) in update_list.iteritems():
         try:
-            record_list = (otdb_connection.query("select nodeid,instances,limits from getvhitemlist (%d, '%s')" % 
+            record_list = (db_connection.query("select nodeid,instances,limits from getvhitemlist (%d, '%s')" % 
                            (tree_id, key))).getresult()
             if len(record_list) == 0:
                 errors[key] = "Not found for tree %d" % tree_id
@@ -183,7 +183,7 @@ def KeyUpdateCommand(input_dict):
             # When one record was found record_list is a list with a single tuple (nodeid, instances, current_value)
             node_id   = record_list[0][0]
             instances = record_list[0][1]
-            result = ((otdb_connection.query("select updateVTnode(1,%d,%d,%d::INT2,'%s')" % 
+            result = ((db_connection.query("select updateVTnode(1,%d,%d,%d::INT2,'%s')" % 
                        (tree_id, node_id, instances, value))).getresult()[0][0] == 't')
             print "%s: %s ==> %s" % (key, record_list[0][2], value)
         except QUERY_EXCEPTIONS, exc:
@@ -218,11 +218,11 @@ class PostgressMessageHandlerInterface(MessageHandlerInterface):
             try:
                 self.connection = pg.connect(user=self.db_user, host=self.db_host, dbname=self.database)
                 self.connected = True
-                logger.info("Connected to database %s on host %s" % (dbname, host))
+                logger.info("Connected to database %s on host %s" % (self.database, self.db_host))
             except (TypeError, SyntaxError, pg.InternalError):
                 self.connected = False
                 logger.error("Not connected to database %s on host %s (anymore), retry in 5 seconds" 
-                             % (dbname, host))
+                             % (self.database, self.db_host))
                 time.sleep(5)
 
 class PostgressTaskSpecificationRequest(PostgressMessageHandlerInterface):
@@ -280,37 +280,21 @@ if __name__ == "__main__":
         parser.print_help()
         sys.exit(0)
 
-    def do_specification_rpc(function, args):
-        try:
-            (data,status) = function(args)
-            if data:
-                for (key,value) in  data.iteritems():
-                    print "%s ==> %s" % (key, value)
-            else:
-                print status
-        except Exception as e:
-            print e
-        print "################"
-
     busname = sys.argv[1] if len(sys.argv) > 1 else "simpletest"
 
-    serv1 = Service(busname, "TaskSpecification", PostgressTaskSpecificationRequest, 
-                    numthreads=1, startonwith=True,
+    serv1 = Service("TaskSpecification", PostgressTaskSpecificationRequest, 
+                    busname=busname, numthreads=1, startonwith=True,
                     handler_args = {"database" : options.dbName, "db_host" : options.dbHost})
-    serv2 = Service(busname, "StatusUpdateCmd",   PostgressStatusUpdateCommand,
-                    numthreads=1, startonwith=True,
+    serv2 = Service("StatusUpdateCmd",   PostgressStatusUpdateCommand,
+                    busname=busname, numthreads=1, startonwith=True,
                     handler_args = {"database" : options.dbName, "db_host" : options.dbHost})
-    serv3 = Service(busname, "KeyUpdateCmd",      PostgressKeyUpdateCommand,
-                    numthreads=1, startonwith=True,
+    serv3 = Service("KeyUpdateCmd",      PostgressKeyUpdateCommand,
+                    busname=busname, numthreads=1, startonwith=True,
                     handler_args = {"database" : options.dbName, "db_host" : options.dbHost})
 
     with serv1,serv2,serv3:
-        with RPC(busname, "TaskSpecification", ForwardExceptions=True) as task_spec_request:
-            do_specification_rpc(task_spec_request, {'OtdbID':63370})
-            do_specification_rpc(task_spec_request, {'OtdbID':82111})
-            do_specification_rpc(task_spec_request, {'OtdbID':146300})
-
-#    print StatusUpdateCommand({'OtdbID':146300, 'NewStatus':'finished', 'UpdateTimestamps':True})
-#    print KeyUpdateCommand({'OtdbID':63370, 'Updates':{'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl._hostname':'CCU099ABC'}})
-   
+        logger.info("Started the OTDB services")
+        serv3.wait_for_interrupt()
+
+    logger.info("Stopped the OTDB services")
 
diff --git a/SAS/OTDB_Services/TreeStatusEvents.py b/SAS/OTDB_Services/TreeStatusEvents.py
index 1fb08b33fe6..d46fc63aa06 100644
--- a/SAS/OTDB_Services/TreeStatusEvents.py
+++ b/SAS/OTDB_Services/TreeStatusEvents.py
@@ -26,6 +26,7 @@ Daemon that watches the OTDB database for status changes of trees and publishes
 
 import os,sys,time,pg, signal
 from optparse import OptionParser
+from lofar.messaging import EventMessage,ToBus
 
 QUERY_EXCEPTIONS = (TypeError, ValueError, MemoryError, pg.ProgrammingError, pg.InternalError)
 Alive = False
@@ -82,6 +83,8 @@ if __name__ == "__main__":
                       help="Name of the database")
     parser.add_option("-H", "--hostname", dest="dbHost", type="string", default="sasdb", 
                       help="Hostname of database server")
+    parser.add_option("-B", "--busname", dest="busname", type="string", default="", 
+                      help="Busname or queue-name the status changes are published on")
     (options, args) = parser.parse_args()
 
     if not options.dbName:
@@ -94,46 +97,55 @@ if __name__ == "__main__":
         parser.print_help()
         sys.exit(0)
 
+    if not options.busname:
+        print "Missing busname"
+        parser.print_help()
+        sys.exit(0)
+
     # Set signalhandler to stop the program in a neat way.
     signal.signal(signal.SIGINT, signal_handler)
 
     Alive = True
     connected = False
     otdb_connection = None
-    while Alive:
-        while Alive and not connected:
-            # Connect to the database
-            try:
-                otdb_connection = pg.connect(user="postgres", host=options.dbHost, dbname=options.dbName)
-                connected = True
-            except (TypeError, SyntaxError, pg.InternalError):
-                connected = False
-                print "DatabaseError: Connection to database could not be made, reconnect attempt in 5 seconds"
-                time.sleep(5)
-             
-        # When we are connected we can poll the database
-        if connected:
-            # Get start_time (= creation time of last retrieved record if any)
-            start_time = ''
-            try:
-                start_time = open('time_save.txt', 'rb').read()
-            except IOError:
-                start_time = "2015-01-01 00:00:00.00"
-            print "start_time=", start_time
-
-            try:
-                record_list = PollForStatusChanges(start_time, "now", otdb_connection)
-            except FunctionError, exc_info:
-                print exc_info
-            else:
-                for (treeid, state, modtime, creation) in record_list:
-                    print treeid, state, modtime, creation
-                    open('time_save.txt', 'wb').write(creation)
-                    start_time = creation
-                print "==="
-
-            # Redetermine the database status.
-            connected = (otdb_connection and otdb_connection.status == 1)
-
-            time.sleep(2)
+    with ToBus(options.busname) as send_bus:
+        while Alive:
+            while Alive and not connected:
+                # Connect to the database
+                try:
+                    otdb_connection = pg.connect(user="postgres", host=options.dbHost, dbname=options.dbName)
+                    connected = True
+                except (TypeError, SyntaxError, pg.InternalError):
+                    connected = False
+                    print "DatabaseError: Connection to database could not be made, reconnect attempt in 5 seconds"
+                    time.sleep(5)
+                 
+            # When we are connected we can poll the database
+            if connected:
+                # Get start_time (= creation time of last retrieved record if any)
+                start_time = ''
+                try:
+                    start_time = open('time_save.txt', 'rb').read()
+                except IOError:
+                    start_time = "2015-01-01 00:00:00.00"
+                print "start_time=", start_time
+    
+                try:
+                    record_list = PollForStatusChanges(start_time, "now", otdb_connection)
+                except FunctionError, exc_info:
+                    print exc_info
+                else:
+                    for (treeid, state, modtime, creation) in record_list:
+                        content = { "treeID" : treeid, "state" : state, "time_of_change" : modtime }
+                        msg = EventMessage(content)
+                        print treeid, state, modtime, creation
+                        send_bus.send(msg)
+                        open('time_save.txt', 'wb').write(creation)
+                        start_time = creation
+                    print "==="
+
+                # Redetermine the database status.
+                connected = (otdb_connection and otdb_connection.status == 1)
+
+                time.sleep(2)
 
diff --git a/SAS/OTDB_Services/test/t_TreeService.py b/SAS/OTDB_Services/test/t_TreeService.py
new file mode 100644
index 00000000000..c6b78e39f37
--- /dev/null
+++ b/SAS/OTDB_Services/test/t_TreeService.py
@@ -0,0 +1,65 @@
+#!/usr/bin/python
+#coding: iso-8859-15
+#
+# Copyright (C) 2015
+# ASTRON (Netherlands Institute for Radio Astronomy)
+# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
+#
+# This file is part of the LOFAR software suite.
+# The LOFAR software suite is free software: you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# The LOFAR software suite is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
+#
+# $Id: Backtrace.cc 31468 2015-04-13 23:26:52Z amesfoort $
+"""
+RPC functions that allow access to (VIC) trees in OTDB.
+
+TaskSpecificationRequest: get the specification(parset) of a tree as dict.
+KeyUpdateCommand        : function to update the value of multiple (existing) keys.
+StatusUpdateCommand     : finction to update the status of a tree.
+"""
+
+import sys
+import logging
+from lofar.messaging.RPC import *
+
+logging.basicConfig(stream=sys.stdout, level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+def do_rpc(rpc_instance, arg_dict):
+    try:
+        (data,status) = (rpc_instance)(arg_dict)
+        if (status != "OK"):
+            raise Exception("Status returned is %s" % status)
+        print type(data)
+        for (key,value) in data.iteritems():
+            print "%s ==> %s" % (key, value)
+    except OverflowError as e:
+        pass
+    print "======"
+
+if __name__ == "__main__":
+    busname = sys.argv[1] if len(sys.argv) > 1 else "simpletest"
+
+#    with RPC("TaskSpecification", ForwardExceptions=True, busname=busname, timeout=2) as task_spec_request:
+#        do_rpc(task_spec_request, {'OtdbID':63370})
+#        do_rpc(task_spec_request, {'OtdbID':146300})
+#        do_rpc(task_spec_request, {'OtdbID':82111})
+
+#    print StatusUpdateCmd({'OtdbID':146300, 'NewStatus':'finished', 'UpdateTimestamps':True})
+
+    with RPC("KeyUpdateCmd", ForwardExceptions=True, busname=busname, timeout=2) as key_update:
+        print key_update({'OtdbID':63370, 
+                          'Updates':{'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl._hostname':'CCU099ABC'}})
+
+   
+
-- 
GitLab