Skip to content
Snippets Groups Projects
Commit 804c45de authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #8888: Changed OTDB TreeStatusEvents and RATaskSpecified services to use...

Task #8888: Changed OTDB TreeStatusEvents and RATaskSpecified services to use native datetime() types across the qpid bus
parent ada6e1b7
No related branches found
No related tags found
No related merge requests found
......@@ -37,7 +37,7 @@ logger = logging.getLogger(__name__)
class OTDBBusListener(AbstractBusListener):
def __init__(self, busname='lofar.otdb.status', subject='otdb.treestatus', broker=None, **kwargs):
def __init__(self, busname='lofar.otdb.notification', subject='otdb.treestatus', broker=None, **kwargs):
"""
OTDBBusListener listens on the lofar otdb message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from OTDBBusListener and implement the specific on<SomeMessage> methods that you are interested in.
......@@ -56,12 +56,7 @@ class OTDBBusListener(AbstractBusListener):
logger.debug("OTDBBusListener.handleMessage: %s" %str(msg))
treeId = msg.content['treeID']
modificationTime = datetime.utcnow()
if 'time_of_change' in msg.content:
try:
modificationTime = datetime.strptime(msg.content['time_of_change'], '%Y-%m-%d %H:%M:%S.%f')
except ValueError as e:
logger.error('could not parse time_of_change %s : %s' % (msg.content['time_of_change'], e))
modificationTime = msg.content['time_of_change'].datetime()
if msg.content['state'] == 'described':
self.onObservationDescribed(treeId, modificationTime)
......
......@@ -24,7 +24,7 @@
Daemon that watches the OTDB database for status changes of trees and publishes those on the messagebus.
"""
import sys, time, pg
import sys, time, pg, datetime
import logging
from lofar.messaging import EventMessage, ToBus
......@@ -47,8 +47,8 @@ def PollForStatusChanges(start_time, end_time, otdb_connection):
"""
Function that asked the database for status changes in the given period
Input : start_time (string) - Oldest time of change to include in the selection.
end_time (string) - Most recent time of change to include in the selection
Input : start_time (datetime) - Oldest time of change to include in the selection.
end_time (datetime) - Most recent time of change to include in the selection
The times must be specified in the format YYYY-Mon-DD HH24:MI:SS.US.
The selection delivers changes the match: startime <= time_of_change < end_time
......@@ -64,7 +64,7 @@ def PollForStatusChanges(start_time, end_time, otdb_connection):
record_list = []
try:
record_list = otdb_connection.query("select treeid,state,modtime,creation from getStateChanges('%s','%s')" %
(start_time, end_time)).getresult()
(start_time.strftime("%F %T.%f"), end_time.strftime("%F %T.%f"))).getresult()
except QUERY_EXCEPTIONS, exc_info:
raise FunctionError("Error while polling for state changes: %s"% exc_info)
return record_list
......@@ -122,23 +122,23 @@ if __name__ == "__main__":
# When we are connected we can poll the database
if connected:
# Get start_time (= creation time of last retrieved record if any)
start_time = ''
try:
start_time = otdb_connection.query("select treestatusevent from otdb_admin").getresult()[0][0]
start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f")
except IndexError, QUERY_EXCEPTIONS:
start_time = "2015-01-01 00:00:00.00"
start_time = datetime.datetime.datetime(2015, 1, 1)
try:
logger.info("start_time=%s, polling database" % (start_time,))
record_list = PollForStatusChanges(start_time, "now", otdb_connection)
record_list = PollForStatusChanges(start_time, datetime.datetime.utcnow(), otdb_connection)
except FunctionError, exc_info:
logger.error(exc_info)
else:
for (treeid, state, modtime, creation) in record_list:
content = { "treeID" : treeid, "state" : allowed_states.get(state, "unknwon_state"),
content = { "treeID" : treeid, "state" : allowed_states.get(state, "unknown_state"),
"time_of_change" : modtime }
msg = EventMessage(context="otdb.treestatus", content=content)
logger.info("sending message treeid %s state %s modtime %s" % (treeid, allowed_states.get(state, "unknwon_state"), modtime))
logger.info("sending message treeid %s state %s modtime %s" % (treeid, allowed_states.get(state, "unknown_state"), modtime))
send_bus.send(msg)
start_time = creation
......
......@@ -56,13 +56,7 @@ class RATaskSpecifiedBusListener(AbstractBusListener):
logger.debug("RABusListener.handleMessage: %s" %str(msg))
sasId = msg.content['sasID']
modificationTime = datetime.utcnow()
if 'time_of_change' in msg.content:
try:
modificationTime = datetime.strptime(msg.content['time_of_change'], '%Y-%m-%d %H:%M:%S.%f')
except ValueError as e:
logger.error('could not parse time_of_change %s : %s' % (msg.content['time_of_change'], e))
modificationTime = msg.content['time_of_change'].datetime()
resource_indicators = msg.content['resource_indicators']
self.onTaskSpecified(sasId, modificationTime, resource_indicators)
......
......@@ -216,7 +216,7 @@ class RATaskSpecified(OTDBBusListener):
result = {
"sasID": main_obsID,
"state": "prescheduled",
"time_of_change": modificationTime.strftime('%F %T.%f'),
"time_of_change": modificationTime,
"resource_indicators": resourceIndicators,
}
......@@ -224,22 +224,5 @@ class RATaskSpecified(OTDBBusListener):
msg = EventMessage(content=result)
self.send_bus.send(msg)
if __name__ == "__main__":
import sys
from optparse import OptionParser
# Check the invocation arguments
parser = OptionParser("%prog -O otdb_bus -B my_bus [options]")
parser.add_option("-O", "--otdb_bus", dest="otdb_busname", type="string", default="lofar.otdb.notification",
help="Bus or queue OTDB operates on")
parser.add_option("-B", "--my_bus", dest="my_busname", type="string", default="lofar.ra.notification",
help="Bus or queue we publish resource requests on")
(options, args) = parser.parse_args()
if not options.statusbus or not options.parsetbus or not options.busname:
parser.print_help()
sys.exit(1)
with RATaskSpecified("OTDB.TaskSpecified", otdb_busname=options.otdb_busname, my_busname=options.my_busname) as jts:
waitForInterrupt()
logger.info("Result sent")
......@@ -12,6 +12,7 @@ from lofar.messaging import EventMessage, Service
import unittest
from glob import glob
import uuid
import datetime
from threading import Condition, Lock
import logging
......@@ -152,7 +153,7 @@ class TestService(unittest.TestCase):
msg = EventMessage(content={
"treeID": 3,
"state": "prescheduled",
"time_of_change": "2016-01-01 00:00:00.00",
"time_of_change": datetime.datetime(2016,1,1),
})
tb.send(msg)
......@@ -183,7 +184,7 @@ class TestService(unittest.TestCase):
msg = EventMessage(content={
"treeID": 1,
"state": "prescheduled",
"time_of_change": "2016-01-01 00:00:00.00",
"time_of_change": datetime.datetime(2016,1,1),
})
tb.send(msg)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment