Newer
Older
Ruud Overeem
committed
#!/usr/bin/env python
#coding: iso-8859-15
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: Backtrace.cc 31468 2015-04-13 23:26:52Z amesfoort $
"""
Daemon that watches the OTDB database for status changes of trees and publishes those on the messagebus.
"""

Jan David Mol
committed
import sys, time, pg, datetime
import logging
from lofar.messaging import EventMessage, ToBus
Ruud Overeem
committed
QUERY_EXCEPTIONS = (TypeError, ValueError, MemoryError, pg.ProgrammingError, pg.InternalError)
Ruud Overeem
committed
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
Ruud Overeem
committed
# Define our own exceptions
class FunctionError(Exception):
"Something when wrong during the execution of the function"
pass
class DatabaseError(Exception):
"Connection with the database could not be made"
pass
# Task Specification Request
def PollForStatusChanges(start_time, end_time, otdb_connection):
"""
Function that asked the database for status changes in the given period

Jan David Mol
committed
Input : start_time (datetime) - Oldest time of change to include in the selection.
end_time (datetime) - Most recent time of change to include in the selection
Ruud Overeem
committed
The times must be specified in the format YYYY-Mon-DD HH24:MI:SS.US.
The selection delivers changes the match: startime <= time_of_change < end_time
Output: (list of tuples) - All status changes between the last polltime and the current time
Tuple = ( tree_id, new_state, time_of_change )
Exceptions:
ArgumentError: There is something wrong with the given input values.
FunctionError: An error occurred during the execution of the function.
Ruud Overeem
committed
The text of the exception explains what is wrong.
"""
# Try to get the specification information
record_list = []
try:
record_list = otdb_connection.query("select treeid,state,modtime,creation from getStateChanges('%s','%s')" %

Jan David Mol
committed
(start_time.strftime("%F %T.%f"), end_time.strftime("%F %T.%f"))).getresult()
Ruud Overeem
committed
except QUERY_EXCEPTIONS, exc_info:
raise FunctionError("Error while polling for state changes: %s"% exc_info)
return record_list
def signal_handler(signum, frame):
"Signal redirection to stop the daemon in a neat way."
logger.info("Stopping program")
Ruud Overeem
committed
if __name__ == "__main__":
from optparse import OptionParser
from lofar.common import dbcredentials
import signal
Ruud Overeem
committed
# Check the invocation arguments
parser = OptionParser("%prog [options]")
parser.add_option("-B", "--busname", dest="busname", type="string", default="",
Ruud Overeem
committed
help="Busname or queue-name the status changes are published on")
parser.add_option_group(dbcredentials.options_group(parser))
Ruud Overeem
committed
(options, args) = parser.parse_args()

Jan David Mol
committed
dbcreds = dbcredentials.parse_options(options)
Ruud Overeem
committed
Ruud Overeem
committed
if not options.busname:
print "Missing busname"
parser.print_help()

Jan David Mol
committed
sys.exit(1)
Ruud Overeem
committed
Ruud Overeem
committed
# Set signalhandler to stop the program in a neat way.
signal.signal(signal.SIGINT, signal_handler)
Ruud Overeem
committed
connected = False
otdb_connection = None
Ruud Overeem
committed
with ToBus(options.busname) as send_bus:
while alive:
while alive and not connected:
Ruud Overeem
committed
# Connect to the database
try:

Jan David Mol
committed
otdb_connection = pg.connect(**dbcreds.pg_connect_options())
Ruud Overeem
committed
connected = True

Jan David Mol
committed
logger.info("Connected to database %s" % (dbcreds,))
Ruud Overeem
committed
# Get list of allowed tree states
allowed_states = {}
for (state_nr, name) in otdb_connection.query("select id,name from treestate").getresult():
allowed_states[state_nr] = name
except (TypeError, SyntaxError, pg.InternalError), e:
Ruud Overeem
committed
connected = False

Jan David Mol
committed
logger.error("Not connected to database %s, retry in 5 seconds: %s" % (dbcreds, e))
Ruud Overeem
committed
time.sleep(5)
Ruud Overeem
committed
# When we are connected we can poll the database
if connected:
# Get start_time (= creation time of last retrieved record if any)
try:
Ruud Overeem
committed
start_time = otdb_connection.query("select treestatusevent from otdb_admin").getresult()[0][0]

Jan David Mol
committed
start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f")
Ruud Overeem
committed
except IndexError, QUERY_EXCEPTIONS:

Jan David Mol
committed
start_time = datetime.datetime.datetime(2015, 1, 1)
Ruud Overeem
committed
try:
logger.info("start_time=%s, polling database" % (start_time,))

Jan David Mol
committed
record_list = PollForStatusChanges(start_time, datetime.datetime.utcnow(), otdb_connection)
Ruud Overeem
committed
except FunctionError, exc_info:
logger.error(exc_info)
Ruud Overeem
committed
else:
for (treeid, state, modtime, creation) in record_list:

Jan David Mol
committed
content = { "treeID" : treeid, "state" : allowed_states.get(state, "unknown_state"),
Ruud Overeem
committed
"time_of_change" : modtime }
Ruud Overeem
committed
msg = EventMessage(context="otdb.treestatus", content=content)

Jan David Mol
committed
logger.info("sending message treeid %s state %s modtime %s" % (treeid, allowed_states.get(state, "unknown_state"), modtime))
Ruud Overeem
committed
send_bus.send(msg)
Ruud Overeem
committed
start_time = creation
logger.info("start_time:=%s" % (start_time,))
otdb_connection.query("update otdb_admin set treestatusevent = '%s'" % start_time)
Ruud Overeem
committed
# Redetermine the database status.
connected = (otdb_connection and otdb_connection.status == 1)
time.sleep(2)