Skip to content
Snippets Groups Projects
Commit f7b2be35 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-2827: removed obsolete package OTDB_Services

parent 936ffcfc
No related branches found
No related tags found
1 merge request!1192TMSS-2827
This commit is part of merge request !1192. Comments created here will be created in the context of that merge request.
Showing
with 0 additions and 1810 deletions
......@@ -6,7 +6,6 @@ lofar_add_package(OTDB_SQL OTDB/sql)
lofar_add_package(QPIDInfrastructure)
lofar_add_package(Scheduler)
lofar_add_package(SAS_Feedback Feedback_Service)
lofar_add_package(OTDB_Services)
lofar_add_package(XML_generator)
lofar_add_package(SpecificationServices)
lofar_add_package(XSD)
......
# $Id$
lofar_package(OTDB_Services 1.0 DEPENDS PyMessaging)
lofar_find_package(Python 3.4 REQUIRED)
include(PythonInstall)
#find_python_module(pg REQUIRED) # sudo aptitude install python3-pg
lofar_add_bin_scripts(
getOTDBParset
setOTDBTreeStatus
treeService
treeStatusEvents
setCorrelatorHeadNode
)
set(_py_files
config.py
otdbrpc.py
OTDBBusListener.py
TreeService.py
TreeStatusEvents.py
)
# supervisord config files
lofar_add_sysconf_files(
TreeService.ini
TreeStatusEvents.ini
OTDB_Services.ini
DESTINATION supervisord.d)
python_install(${_py_files} DESTINATION lofar/sas/otdb)
add_subdirectory(test)
[database:OTDB]
type = postgres
host = localhost
port = 0
user = postgres
database = TESTLOFAR_4
#!/usr/bin/env python3
# OTDBEventMessageHandler.py: OTDBEventMessageHandler listens on the lofar otdb message bus and calls (empty) on<SomeMessage> methods when such a message is received.
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: messagebus.py 1580 2015-09-30 14:18:57Z loose $
"""
OTDBEventMessageHandler listens on the lofar message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from OTDBEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in.
Here's a concrete example.
First implement your own MyOTDBMessageHandler which implements behaviour in onObservationStarted
>>> class MyOTDBMessageHandler(OTDBEventMessageHandler):
... def onObservationStarted(self, treeId, modificationTime):
... print("The observation with treeId %s started!" % treeId)
... # implement some more business logic here if you want to.
# and then use the MyOTDBMessageHandler in the OTDBBusListener.
# See TemporaryExchange documentation why we use that here in the example.
>>> with TemporaryExchange() as tmp_exchange:
... with OTDBBusListener(MyOTDBMessageHandler, exchange=tmp_exchange.address):
... # that's it, now the OTDBBusListener is listening,
... # and calling MyOTDBMessageHandler.onObservationStarted when a EventMessage for the OTDB ObservationStarted event comes in.
... # for this example, let's create such an event, so we see something happening.
... with tmp_exchange.create_tobus() as event_sender:
... event_sender.send(EventMessage(subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
... content={'state': 'active', 'treeID': 123}))
...
... # ... do some work ... simulate this by sleeping a little...
... # ...in the mean time, BusListener receives and handles the messages (on its own thread)
... from time import sleep
... sleep(0.25)
The observation with treeId 123 started!
"""
from lofar.messaging import BusListener, AbstractMessageHandler, EventMessage, TemporaryExchange
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_SUBJECT
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class OTDBEventMessageHandler(AbstractMessageHandler):
"""Concrete implementation of an AbstractMessageHandler for handling OTDB EventMessage's,
mapping the events to onObservation<Event> methods.
Typical usage is to derive from this OTDBEventMessageHandler class
and implement one or more onObservation<Event> methods with your desired behaviour code.
See example at the top of this file.
"""
def handle_message(self, msg):
if not isinstance(msg, EventMessage):
raise ValueError("%s: Ignoring non-EventMessage: %s" % (self.__class__.__name__, msg))
logger.debug("OTDBEventMessageHandler.handleMessage: %s" %str(msg))
treeId = msg.content['treeID']
modificationTime = datetime.utcnow()
if 'time_of_change' in msg.content:
try:
if msg.content['time_of_change'][-7] == '.':
modificationTime = datetime.strptime(msg.content['time_of_change'], '%Y-%m-%dT%H:%M:%S.%f')
else:
modificationTime = datetime.strptime(msg.content['time_of_change'], '%Y-%m-%dT%H:%M:%S')
except:
pass
logger.info("%s otdb task status changed: otdb_id=%s status=%s" % (self, treeId, msg.content['state']))
if msg.content['state'] == 'described':
self.onObservationDescribed(treeId, modificationTime)
elif msg.content['state'] == 'prepared':
self.onObservationPrepared(treeId, modificationTime)
elif msg.content['state'] == 'approved':
self.onObservationApproved(treeId, modificationTime)
elif msg.content['state'] == 'on_hold':
self.onObservationOnHold(treeId, modificationTime)
elif msg.content['state'] == 'conflict':
self.onObservationConflict(treeId, modificationTime)
elif msg.content['state'] == 'prescheduled':
self.onObservationPrescheduled(treeId, modificationTime)
elif msg.content['state'] == 'scheduled':
self.onObservationScheduled(treeId, modificationTime)
elif msg.content['state'] == 'queued':
self.onObservationQueued(treeId, modificationTime)
elif msg.content['state'] == 'active':
self.onObservationStarted(treeId, modificationTime)
elif msg.content['state'] == 'completing':
self.onObservationCompleting(treeId, modificationTime)
elif msg.content['state'] == 'finished':
self.onObservationFinished(treeId, modificationTime)
elif msg.content['state'] == 'aborted':
self.onObservationAborted(treeId, modificationTime)
elif msg.content['state'] == 'obsolete':
self.onObservationObsolete(treeId, modificationTime)
elif msg.content['state'] == 'error':
self.onObservationError(treeId, modificationTime)
else:
logger.info("OTDBEventMessageHandler.handleMessage - handled unknown state: %s", msg.content['state'])
# apart from calling the above methods for known predefined states,
# also always call plain onObservationStatusChanged
# so subclasses can act on any status in this generic method.
self.onObservationStatusChanged(treeId, msg.content['state'], modificationTime)
def onObservationStatusChanged(self, treeId, new_status, modificationTime):
"""this method is called upon any OTDB status change event. Override it if you want to act on each status change."""
pass
def onObservationDescribed(self, treeId, modificationTime):
"""this method is called upon the ObservationDescribed status change event. Override it if you want to act on this status change."""
pass
def onObservationPrepared(self, treeId, modificationTime):
"""this method is called upon the ObservationPrepared status change event. Override it if you want to act on this status change."""
pass
def onObservationApproved(self, treeId, modificationTime):
"""this method is called upon the ObservationApproved status change event. Override it if you want to act on this status change."""
pass
def onObservationOnHold(self, treeId, modificationTime):
"""this method is called upon the ObservationOnHold status change event. Override it if you want to act on this status change."""
pass
def onObservationConflict(self, treeId, modificationTime):
"""this method is called upon the ObservationConflict status change event. Override it if you want to act on this status change."""
pass
def onObservationPrescheduled(self, treeId, modificationTime):
"""this method is called upon the ObservationPrescheduled status change event. Override it if you want to act on this status change."""
pass
def onObservationScheduled(self, treeId, modificationTime):
"""this method is called upon the ObservationScheduled status change event. Override it if you want to act on this status change."""
pass
def onObservationQueued(self, treeId, modificationTime):
"""this method is called upon the ObservationQueued status change event. Override it if you want to act on this status change."""
pass
def onObservationStarted(self, treeId, modificationTime):
"""this method is called upon the ObservationStarted status change event. Override it if you want to act on this status change."""
pass
def onObservationCompleting(self, treeId, modificationTime):
"""this method is called upon the ObservationCompleting status change event. Override it if you want to act on this status change."""
pass
def onObservationFinished(self, treeId, modificationTime):
"""this method is called upon the ObservationFinished status change event. Override it if you want to act on this status change."""
pass
def onObservationAborted(self, treeId, modificationTime):
"""this method is called upon the ObservationAborted status change event. Override it if you want to act on this status change."""
pass
def onObservationObsolete(self, treeId, modificationTime):
"""this method is called upon the ObservationObsolete status change event. Override it if you want to act on this status change."""
pass
def onObservationError(self, treeId, modificationTime):
"""this method is called upon the ObservationError status change event. Override it if you want to act on this status change."""
pass
class OTDBBusListener(BusListener):
"""The OTDBBusListener is a normal BusListener listening specifically to EventMessages with OTDB notification subjects.
You have to implement your own concrete subclass of the OTDBEventMessageHandler, and inject that in this OTDBBusListener.
See example at the top of this file.
"""
def __init__(self, handler_type: OTDBEventMessageHandler.__class__, handler_kwargs: dict = None,
exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, num_threads: int = 1):
if not issubclass(handler_type, OTDBEventMessageHandler):
raise TypeError("handler_type should be a OTDBEventMessageHandler subclass")
super(OTDBBusListener, self).__init__(handler_type=handler_type, handler_kwargs=handler_kwargs,
exchange=exchange, routing_key="%s.#" % (DEFAULT_OTDB_NOTIFICATION_SUBJECT),
num_threads=num_threads, broker=broker)
__all__ = ["OTDBEventMessageHandler", "OTDBBusListener"]
[group:OTDB_Services]
programs=TreeService,TreeStatusEvents
priority=100
[program:TreeService]
command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec treeService --dbcredentials=OTDB'
user=lofarsys
stopsignal=INT ; KeyboardInterrupt
stopasgroup=true ; bash does not propagate signals
stdout_logfile=%(program_name)s.log
redirect_stderr=true
stderr_logfile=NONE
stdout_logfile_maxbytes=0
This diff is collapsed.
[program:TreeStatusEvents]
command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec treeStatusEvents --dbcredentials=OTDB'
user=lofarsys
stopsignal=INT ; KeyboardInterrupt
stopasgroup=true ; bash does not propagate signals
stdout_logfile=%(program_name)s.log
redirect_stderr=true
stderr_logfile=NONE
stdout_logfile_maxbytes=0
#!/usr/bin/env python3
#coding: iso-8859-15
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: Backtrace.cc 31468 2015-04-13 23:26:52Z amesfoort $
"""
Daemon that watches the OTDB database for status changes of trees and publishes those on the messagebus.
"""
import os
import os.path
import sys, time, pg, datetime
import logging
from lofar.messaging import EventMessage, ToBus, DEFAULT_BUSNAME
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_SUBJECT
QUERY_EXCEPTIONS = (TypeError, ValueError, MemoryError, pg.ProgrammingError, pg.InternalError)
alive = False
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
# Define our own exceptions
class FunctionError(Exception):
"Something when wrong during the execution of the function"
pass
class DatabaseError(Exception):
"Connection with the database could not be made"
pass
# Task Specification Request
def PollForStatusChanges(start_time, otdb_connection):
"""
Function that asked the database for status changes in the given period
Input : start_time (datetime) - Oldest time of change to include in the selection.
The times must be specified in the format YYYY-Mon-DD HH24:MI:SS.US.
The selection delivers changes the match: startime <= time_of_change
Output: (list of tuples) - All status changes between the last polltime and the current time
Tuple = ( tree_id, new_state, time_of_change )
Exceptions:
ArgumentError: There is something wrong with the given input values.
FunctionError: An error occurred during the execution of the function.
The text of the exception explains what is wrong.
"""
# Try to get the specification information
record_list = []
try:
record_list = otdb_connection.query("select treeid,state,modtime,creation from getStateChanges('%s',NULL)" %
(start_time.strftime("%F %T.%f"),)).getresult()
except QUERY_EXCEPTIONS as exc_info:
raise FunctionError("Error while polling for state changes: %s"% exc_info)
return record_list
def signal_handler(signum, frame):
"Signal redirection to stop the daemon in a neat way."
logger.info("Stopping program")
global alive
alive = False
def main():
from optparse import OptionParser
from lofar.common import dbcredentials
import signal
# Check the invocation arguments
parser = OptionParser("%prog [options]")
parser.add_option("-B", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME,
help="Bus on which the status changes are published on. [default: %default]")
parser.add_option_group(dbcredentials.options_group(parser))
parser.set_defaults(dbcredentials="OTDB")
(options, args) = parser.parse_args()
dbcreds = dbcredentials.parse_options(options)
# Set signalhandler to stop the program in a neat way.
signal.signal(signal.SIGINT, signal_handler)
create_service(options.busname, dbcreds)
def create_service(busname, dbcreds, state_file_path='~/.lofar/otdb_treestatusevent_state'):
alive = True
connected = False
otdb_connection = None
with ToBus(busname) as send_bus:
while alive:
while alive and not connected:
# Connect to the database
try:
otdb_connection = pg.connect(**dbcreds.pg_connect_options())
connected = True
logger.info("Connected to database %s" % (dbcreds,))
# Get list of allowed tree states
allowed_states = {}
for (state_nr, name) in otdb_connection.query("select id,name from treestate").getresult():
allowed_states[state_nr] = name
except (TypeError, SyntaxError, pg.InternalError) as e:
connected = False
logger.error("Not connected to database %s, retry in 5 seconds: %s" % (dbcreds, e))
time.sleep(5)
# When we are connected we can poll the database
if connected:
# Get start_time (= creation time of last retrieved record if any)
try:
treestatuseventfilename = os.path.expanduser(state_file_path)
with open(treestatuseventfilename, 'r') as f:
line = f.readline()
if line.rfind('.') > 0:
start_time = datetime.datetime.strptime(line, "%Y-%m-%d %H:%M:%S.%f")
else:
start_time = datetime.datetime.strptime(line, "%Y-%m-%d %H:%M:%S")
except Exception as e:
logger.warning(e)
# start scanning from events since 'now'
# this timestamp will be stored in the treestatuseventfilename file
start_time = datetime.datetime.utcnow()
try:
logger.info("creating %s" % (treestatuseventfilename,))
if not os.path.exists(os.path.dirname(treestatuseventfilename)):
os.makedirs(os.path.dirname(treestatuseventfilename))
with open(treestatuseventfilename, 'w') as f:
f.write(start_time.strftime("%Y-%m-%d %H:%M:%S"))
except Exception as e:
logger.error(e)
try:
logger.debug("start_time=%s, polling database" % (start_time,))
record_list = PollForStatusChanges(start_time, otdb_connection)
for (treeid, state, modtime, creation) in record_list:
content = { "treeID" : treeid, "state" : allowed_states.get(state, "unknown_state"),
"time_of_change" : modtime }
msg = EventMessage(subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT, content=content)
logger.info("sending message treeid %s state %s modtime %s" % (treeid, allowed_states.get(state, "unknown_state"), modtime))
send_bus.send(msg)
logger.debug("new start_time:=%s" % (creation,))
try:
with open(treestatuseventfilename, 'w') as f:
f.write(creation)
except Exception as e:
logger.error(e)
except FunctionError as exc_info:
logger.error(exc_info)
except Exception as e:
logger.error(e)
# Redetermine the database status.
connected = (otdb_connection and otdb_connection.status == 1)
time.sleep(2)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# $Id$
DEFAULT_OTDB_SERVICENAME = 'OTDB.Service'
DEFAULT_OTDB_NOTIFICATION_SUBJECT = 'OTDB.notification.TaskStatus'
# OTDB Services {#otdb_services}
## GENERAL
The services in this package expose the detailed specifications of observations that are stored in the
[Observation Tree Database (OTDB)](@otdb). This allows other parts of the system to use them, which is
e.g. required for scheduling.
The package consists of the following components:
* The *TreeService* service, which serves RPC requests
* The *otdbrpc* client to interact with the TreeService
* Utilities that use otdbrpc (*getOTDBParset/setOTDBTreeStatus*)
* The *TreeStatusEvents* database watcher, which sends notification on database changes
* the *OTDBBusListener*, which can be used to listen on the notification bus in order to react to OTDB changes
### Author/Owner
- Adriaan Renting <renting@astron.nl>
- Jorrit Schaap <schaap@astron.nl>
- Jan David Mol <mol@astron.nl>
### Overview
- *Add a diagram*
- *Add a link to the overview diagram*
- *Add a link in the overview diagram to link back to this documentation*.
- - -
## DEVELOPMENT
### Analyses
*Add non-technical information and functional considerations here, like user requirements and links to minutes of
meetings with stakeholders.*
### Design
*Add technical considerations and design choices here*
### Source Code
- [Source](https://git.astron.nl/ro/lofar/-/tree/master/SAS/OTDB_Services/)
- *Add a link to (generated?) source code documentation.*
### Testing
Unit testing:
* ctest -V -R t_TreeService
* ctest -V -R t_TreeStatusEvents
- *How do you run integration tests?*
- *Add a link to Jenkins jobs (if available)*
### Build & Deploy
- cmake -DBUILD_PACKAGES=OTDB_Services ../..
- *Add a link to Jenkins jobs (if available)*
- - -
## OPERATIONS
### Configuration
- /SAS/OTDB_Services/config.py
- /etc/supervisord.d/OTDB_Services.ini
- /etc/supervisord.d/TreeService.ini
- /etc/supervisord.d/TreeStatusEvents.ini
### Log Files
- *Where are the log files?*
### Runtime
- *Where does it run? (which user@machine)*
- *How do I run it? (user documentation? examples? commandline parameters?)*
- *Other considerations? (what happens elsewhere when I start or stop it?)*
### Interfaces (API)
- The otdbrpc (the RPC client) writes to `lofar.otdb.command`
- The TreeService (the RPC server) reads from to `lofar.otdb.command`
- The TreeStatusEvents database watcher writes to `lofar.otdb.notification`
- The OTDBBusListener service reads from `lofar.otdb.notification`
- The getOTDBParset/setOTDBTreeStatus make use of the otdbrpc
### Files/Databases
- *Which databases are used?*
- *Which files are used?*
### Dependencies
- *To/from other applications?*
- *Files?*
- *Network locations?*
- *Other?*
### Security
- *Special privileges needed?*
- *User login?*
- *Certificates needed?*
- *Other considerations?*
- - -
## ADDITIONAL INFORMATION
### User Documentation
*e.g. Please refer to URL X for the User Documentation*
### Operations Documentation
*e.g. Please refer to URL X for Operations Documentation*
/**
\ingroup SAS
\defgroup OTDBServices OTDBServices
\ref otdb_services
OTDB_Services contains a collection of services to interact with the observation tree database (OTDB).
*/
#!/usr/bin/env python3
#coding: iso-8859-15
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
"""
"""
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.sas.otdb.otdbrpc import OTDBRPC
if __name__ == "__main__":
from optparse import OptionParser
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
# Check the invocation arguments
parser = OptionParser("%prog -o obsid [options]")
parser.add_option('-b', '--broker', dest = 'broker', type = 'string', default = DEFAULT_BROKER, help = 'Address of the broker, default: %default')
parser.add_option('-e', '--exchange', dest = 'exchange', type = 'string', default = DEFAULT_BUSNAME, help = 'Name of the bus exchange on the broker, default: %s' % DEFAULT_BUSNAME)
parser.add_option("-o", "--obsid", dest="obsid", type="int", default=0,
help="Observation/tree ID to get parset of")
(options, args) = parser.parse_args()
if not options.exchange or not options.obsid:
parser.print_help()
sys.exit(1)
with OTDBRPC.create(exchange=options.exchange) as otdbrpc:
parset = otdbrpc.taskGetSpecification(otdb_id=options.obsid)["specification"]
for key in sorted(parset.keys()):
print("%s = %s" % (key,parset[key]))
#!/usr/bin/env python3
import logging
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT
from lofar.messaging.rpc import RPCClient, RPCClientContextManagerMixin
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICENAME
''' Simple RPC client for Service lofarbus.*Z
'''
logger = logging.getLogger(__name__)
class OTDBPRCException(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return "OTDBPRCException: " + str(self.message)
class OTDBRPC(RPCClientContextManagerMixin):
def __init__(self, rpc_client: RPCClient = None):
"""Create an instance of the IngestRPC using the given RPCClient,
or if None given, to a default RPCClient connecting to the DEFAULT_OTDB_SERVICENAME service"""
super().__init__()
self._rpc_client = rpc_client or RPCClient(service_name=DEFAULT_OTDB_SERVICENAME)
@staticmethod
def create(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, timeout=DEFAULT_RPC_TIMEOUT):
"""Create a OTDBRPC connecting to the given exchange/broker on the default DEFAULT_OTDB_SERVICENAME service"""
return OTDBRPC(RPCClient(service_name=DEFAULT_OTDB_SERVICENAME,
exchange=exchange, broker=broker, timeout=timeout))
def taskGetIDs(self, otdb_id=None, mom_id=None):
if otdb_id:
answer = self._rpc_client.execute('TaskGetIDs', OtdbID=otdb_id, return_tuple=False)
elif mom_id:
answer = self._rpc_client.execute('TaskGetIDs', MomID=mom_id, return_tuple=False)
else:
raise OTDBPRCException("TaskGetIDs was called without OTDB or Mom ID")
if not answer:
raise OTDBPRCException("TaskGetIDs returned an empty dict")
return {"tree_type": answer[0], "otdb_id": answer[1], "mom_id": answer[2]}
def taskGetSpecification(self, otdb_id=None, mom_id=None):
if otdb_id:
answer = self._rpc_client.execute('TaskGetSpecification', OtdbID=otdb_id)
elif mom_id:
answer = self._rpc_client.execute('TaskGetSpecification', MomID=mom_id)
else:
raise OTDBPRCException("TaskGetSpecification was called without OTDB or Mom ID")
if not answer["TaskSpecification"]:
raise OTDBPRCException("TaskGetSpecification returned an empty dict")
return {"specification": answer["TaskSpecification"]}
def taskCreate(self, otdb_id=None, mom_id=None, template_name="", campaign_name="", specification={}):
if otdb_id: ##Can this ever be called with a otdb_id?
answer = self._rpc_client.execute('TaskCreate', OtdbID=otdb_id, TemplateName=template_name, CampaignName=campaign_name, Specification=specification)
elif mom_id:
answer = self._rpc_client.execute('TaskCreate', MomID=mom_id, TemplateName=template_name, CampaignName=campaign_name, Specification=specification)
else:
raise OTDBPRCException("TaskCreate was called without OTDB or Mom ID")
if not answer["Success"]:
raise OTDBPRCException("TaskCreate failed for MoM ID %i" % (mom_id,))
return {"mom_id": answer["MomID"], "otdb_id": answer["OtdbID"]}
def taskGetTreeInfo(self, otdb_id):
info = self._rpc_client.execute('TaskGetTreeInfo', otdb_id=otdb_id)
return info
def taskGetStatus(self, otdb_id):
return self._rpc_client.execute('TaskGetStatus', otdb_id=otdb_id)['status']
def taskSetStatus(self, otdb_id=None, new_status="", update_timestamps=True):
answer = self._rpc_client.execute('TaskSetStatus', OtdbID=otdb_id, NewStatus=new_status, UpdateTimestamps=update_timestamps)
if not answer["Success"]:
raise OTDBPRCException("TaskSetStatus failed for %i" % (otdb_id,))
return {"mom_id": answer["MomID"], "otdb_id": answer["OtdbID"]}
def taskSetSpecification(self, otdb_id=None, specification={}):
answer = self._rpc_client.execute('TaskSetSpecification', OtdbID=otdb_id, Specification=specification)
if "Errors" in answer:
for key, problem in answer["Errors"].items():
logger.warning("TaskSetSpecification for %i failed to set key %s because of %s" % (otdb_id, key, problem))
raise OTDBPRCException("TaskSetSpecification failed to set all keys for %i" % (otdb_id,))
return {"mom_id": answer["MomID"], "otdb_id": answer["OtdbID"]}
def taskPrepareForScheduling(self, otdb_id=None, starttime="", endtime=""):
answer = self._rpc_client.execute('TaskPrepareForScheduling', OtdbID= otdb_id, StartTime=starttime, StopTime=endtime)
return {"otdb_id": answer["OtdbID"]}
def taskDelete(self, otdb_id=None):
answer = self._rpc_client.execute('TaskDelete', OtdbID=otdb_id)
if not answer["Success"]:
logger.warning("TaskDelete failed for %i" % (otdb_id,)) ##Probably was already deleted?
return {"mom_id": answer["MomID"], "otdb_id": answer["OtdbID"]}
def getDefaultTemplates(self):
answer = self._rpc_client.execute('GetDefaultTemplates')
if not answer["DefaultTemplates"]:
raise OTDBPRCException("GetDefaultTemplates returned an empty dict")
return {"default_templates": answer["DefaultTemplates"]}
def getStations(self):
answer = self._rpc_client.execute('GetStations')
if not answer["Stations"]:
raise OTDBPRCException("GetStations returned an empty dict")
return {"stations": answer["Stations"]}
def setProject(self, name=None, title="", pi="", co_i="", contact=""):
if not name:
raise OTDBPRCException("SetProject was called with an empty project")
answer = self._rpc_client.execute('SetProject', name=name, pi=pi, co_i=co_i, contact=contact)
if not answer["projectID"]:
raise OTDBPRCException("SetProject failed for %s" % (name,))
return {"project_id": answer["projectID"]}
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
with OTDBRPC.create() as rpc:
print(rpc.taskGetStatus(452728))
#!/usr/bin/env python
# coding: iso-8859-15
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: getOTDBParset 34753 2016-06-20 10:43:42Z schaap $
"""
"""
from lofar.sas.otdb.otdbrpc import OTDBRPC
if __name__ == "__main__":
from optparse import OptionParser
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.WARNING)
logger = logging.getLogger(__name__)
# Check the invocation arguments
parser = OptionParser("%prog -o obsid [--cobalt1 or --cobalt2]")
parser.add_option("-o", "--obsid", dest="obsid", type="int", default=0,
help="Observation ID to change the correlator headnode for.")
parser.add_option("-1", "--cobalt1", dest="cobalt1", action='store_true',
help="Change the correlator headnode the given observation to cobalt1")
parser.add_option("-2", "--cobalt2", dest="cobalt2", action='store_true',
help="Change the correlator headnode the given observation to cobalt2")
(options, args) = parser.parse_args()
if not options.obsid or (not options.cobalt1 and
not options.cobalt2) or (options.cobalt1 and options.cobalt2):
parser.print_help()
sys.exit(1)
with OTDBRPC() as otdbrpc:
if options.cobalt1:
otdbrpc.taskSetSpecification(options.obsid, {'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.CorrAppl._hostname': 'cbmmaster',
'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.CorrAppl.CorrProc._hostname': 'cbmmaster'})
elif options.cobalt2:
otdbrpc.taskSetSpecification(options.obsid, {'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.CorrAppl._hostname': 'cbm2head.control.lofar',
'LOFAR.ObsSW.Observation.ObservationControl.OnlineControl.CorrAppl.CorrProc._hostname': 'cbm2head.control.lofar'})
parset = otdbrpc.taskGetSpecification(otdb_id=options.obsid)["specification"]
print('New keys for ', options.obsid)
for key in ['ObsSW.Observation.ObservationControl.OnlineControl.CorrAppl._hostname',
'ObsSW.Observation.ObservationControl.OnlineControl.CorrAppl.CorrProc._hostname']:
print("%s = %s" % (key, parset[key]))
#!/usr/bin/env python3
# coding: iso-8859-15
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id$
"""
"""
from lofar.messaging import DEFAULT_BUSNAME
from lofar.sas.otdb.otdbrpc import OTDBRPC
if __name__ == "__main__":
from optparse import OptionParser
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.WARNING)
logger = logging.getLogger(__name__)
# Check the invocation arguments
parser = OptionParser("%prog -o obsid -s status [options]")
parser.add_option("-B", "--exchange", dest="exchange", type="string",
default=DEFAULT_BUSNAME,
help="exchange on which OTDB commands are sent")
parser.add_option("-o", "--obsid", dest="obsid", type="int", default=0,
help="Observation/tree ID to set status for")
parser.add_option("-s", "--status", dest="status", type="string", default="",
help="New status")
(options, args) = parser.parse_args()
if not options.exchange or not options.obsid or not options.status:
parser.print_help()
sys.exit(1)
with OTDBRPC.create(exchange=options.exchange) as otdbrpc:
otdbrpc.taskSetStatus(otdb_id=options.obsid, new_status=options.status)
print(otdbrpc.taskGetStatus(otdb_id=options.obsid))
# $Id: CMakeLists.txt 1576 2015-09-29 15:22:28Z loose $
if(BUILD_TESTING)
include(LofarCTest)
include(FindPythonModule)
find_python_module(dateutil)
include(PythonInstall)
python_install(otdb_common_testing.py DESTINATION lofar/sas/otdb/testing)
lofar_add_test(t_TreeService)
lofar_add_test(t_TreeStatusEvents)
endif()
#!/usr/bin/env python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id$
import subprocess
import os
import logging
logger = logging.getLogger(__name__)
from lofar.common.testing.postgres import PostgresTestDatabaseInstance
class OTDBTestInstance(PostgresTestDatabaseInstance):
'''Helper class which uses the OTDBCommonTestMixin without a unittest.TestCase to setup/teardown a test OTDB instance'''
def __init__(self, gzipped_schema_dump_filename):
super().__init__()
self.gzipped_schema_dump_filename = gzipped_schema_dump_filename
def apply_database_schema(self):
logger.info('applying OTDB sql schema to %s', self.dbcreds)
cmd1 = ['gzip', '-dc', self.gzipped_schema_dump_filename]
cmd2 = ['psql', '-U', self.dbcreds.user, '-h', self.dbcreds.host,
'-p', str(self.dbcreds.port), self.dbcreds.database]
logger.info('executing: %s', ' '.join(cmd1))
logger.info('executing: %s', ' '.join(cmd2))
proc1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
proc2 = subprocess.Popen(cmd2, stdin=proc1.stdout)
proc1.wait(timeout=60)
proc2.wait(timeout=60)
if proc1.returncode != 0:
raise RuntimeError("Could not execute cmd: '%s' error=%s" % (' '.join(cmd1), proc1.stderr))
if proc2.returncode != 0:
raise RuntimeError("Could not execute cmd: '%s' error=%s" % (' '.join(cmd2), proc2.stderr))
File deleted
#!/usr/bin/env python3
#coding: iso-8859-15
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: Backtrace.cc 31468 2015-04-13 23:26:52Z amesfoort $
"""
RPC functions that allow access to (VIC) trees in OTDB.
TaskSpecificationRequest: get the specification(parset) of a tree as dict.
KeyUpdateCommand : function to update the value of multiple (existing) keys.
StatusUpdateCommand : finction to update the status of a tree.
"""
try:
from lofar.sas.otdb.TreeService import create_service
except ModuleNotFoundError as e:
print(str(e), "Skipping Test.")
exit(3)
from lofar.messaging import TemporaryExchange, RPCClient, BusListenerJanitor
from lofar.sas.otdb.testing.otdb_common_testing import OTDBTestInstance
import unittest
from lofar.common.test_utils import integration_test
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
@integration_test
class TreeServiceTest(unittest.TestCase):
def test(self):
with OTDBTestInstance('t_TreeService.in.unittest_db.dump.gz') as test_db:
def do_rpc_catch_exception(exc_text, rpc_instance, method_name, arg_dict):
try:
print("** Executing {0}({1})...".format(method_name, arg_dict))
rpc_instance.execute(method_name=method_name, **arg_dict)
raise Exception("Expected an exception {0}, didn't get any".format(exc_text))
except Exception:
print("Caught expected exception {0}".format(exc_text))
print("======")
def do_rpc(rpc_instance, method_name, arg_dict):
print("** Executing {0}({1})...".format(method_name, arg_dict))
answer = rpc_instance.execute(method_name=method_name, **arg_dict)
print("result =", answer)
print("======")
return answer
with TemporaryExchange(__name__) as tmp_exchange:
exchange = tmp_exchange.address
with BusListenerJanitor(create_service(exchange=exchange, dbcreds=test_db.dbcreds)) as service:
with RPCClient(service_name=service.service_name, exchange=exchange, timeout=10) as otdbRPC: # Existing: otdb_id:1099268, mom_id:353713
do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 1099268, 'MomID': 353713 })
do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 1099268, 'MomID': 5 })
do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 1099268, 'MomID': None })
do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': 5, 'MomID': 353713 })
do_rpc_catch_exception('', otdbRPC, "TaskGetIDs", {'OtdbID': 5, 'MomID': 5 })
do_rpc_catch_exception('', otdbRPC, "TaskGetIDs", {'OtdbID': 5, 'MomID': None })
do_rpc(otdbRPC, "TaskGetIDs", {'OtdbID': None, 'MomID': 353713 })
do_rpc_catch_exception('', otdbRPC, "TaskGetIDs", {'OtdbID': None, 'MomID': 5 })
do_rpc_catch_exception('', otdbRPC, "TaskGetIDs", {'OtdbID': None, 'MomID': None })
do_rpc(otdbRPC, "GetDefaultTemplates", {})
do_rpc(otdbRPC, "SetProject",
{'name':"Taka Tuka Land", "title":"Adventure movie", "pi":"Pippi",
"co_i":"Mr.Nelson", "contact":"Witje"})
do_rpc(otdbRPC, "TaskCreate", {'OtdbID':1099268, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}})
do_rpc(otdbRPC, "TaskCreate", {'MomID':353713, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}})
do_rpc_catch_exception('on non-exsisting campaign', otdbRPC, "TaskCreate",
{'MomID':998877, 'TemplateName':'BeamObservation',
'CampaignName':'No such campaign', 'Specification': {'state':'finished'}})
do_rpc(otdbRPC, "TaskCreate", {'MomID':998877, 'TemplateName':'BeamObservation',
'CampaignName':'Taka Tuka Land', 'Specification': {'state':'finished'}})
data = do_rpc(otdbRPC, "TaskCreate", {'MomID':12345, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}})
new_tree1 = data['MomID']
data = do_rpc(otdbRPC, "TaskCreate", {'MomID':54321, 'TemplateName':'BeamObservation', 'Specification': {'state':'finished'}})
new_tree2= data['MomID']
do_rpc(otdbRPC, "TaskPrepareForScheduling", {'MomID':new_tree1}) # template
do_rpc(otdbRPC, "TaskPrepareForScheduling", {'MomID':new_tree1}) # now a VIC tree
do_rpc(otdbRPC, "TaskPrepareForScheduling",
{'MomID':new_tree1, 'StartTime':'2016-03-01 12:00:00',
'StopTime':'2016-03-01 12:34:56'})
do_rpc_catch_exception("on invalid stoptime", otdbRPC,
"TaskPrepareForScheduling",
{'MomID':new_tree1, 'StartTime':'2016-03-01 12:00:00',
'StopTime':'2016'})
do_rpc(otdbRPC, "TaskDelete", {'MomID':new_tree2})
do_rpc(otdbRPC, "TaskGetSpecification", {'OtdbID':1099269}) # PIC
do_rpc(otdbRPC, "TaskGetSpecification", {'OtdbID':1099238}) # Template
do_rpc(otdbRPC, "TaskGetSpecification", {'OtdbID':1099266}) # VIC
do_rpc_catch_exception('on non-existing treeID', otdbRPC,
"TaskGetSpecification", {'OtdbID':5}) # Non existing
# PIC
do_rpc(otdbRPC, "TaskSetStatus",
{'OtdbID':1099269, 'NewStatus':'finished', 'UpdateTimestamps':True})
# Template
do_rpc(otdbRPC, "TaskSetStatus",
{'OtdbID':1099238, 'NewStatus':'finished', 'UpdateTimestamps':True})
# VIC
do_rpc(otdbRPC, "TaskSetStatus",
{'OtdbID':1099266, 'NewStatus':'finished', 'UpdateTimestamps':True})
# Nonexisting tree
do_rpc_catch_exception('on invalid treeID', otdbRPC,
"TaskSetStatus",
{'OtdbID':10, 'NewStatus':'finished',
'UpdateTimestamps':True})
# VIC tree: invalid status
do_rpc_catch_exception('on invalid status', otdbRPC, "TaskSetStatus",
{'OtdbID':1099266, 'NewStatus':'what_happend',
'UpdateTimestamps':True})
# Set PIC back to active...
do_rpc(otdbRPC, "TaskSetStatus",
{'OtdbID':1099269, 'NewStatus':'active', 'UpdateTimestamps':True})
do_rpc(otdbRPC, "GetStations", {})
# VIC tree: valid
do_rpc(otdbRPC, "TaskSetSpecification",
{'OtdbID':1099266,
'Specification':
{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.pythonHost':
'NameOfTestHost'}})
# Template tree: not supported yet
do_rpc(otdbRPC, "TaskSetSpecification", {'OtdbID':1099238,
'Specification':{'LOFAR.ObsSW.Observation.Scheduler.priority':'0.1'}})
# PIC tree: not supported yet
do_rpc_catch_exception('on invalid treetype (PIC)', otdbRPC,
"TaskSetSpecification",
{'OtdbID':1099269,
'Specification':{'LOFAR.PIC.Core.CS001.status_state':'50'}})
# Non exsisting tree
do_rpc_catch_exception('on invalid treeID', otdbRPC,
"TaskSetSpecification",
{'OtdbID':10,
'Specification':
{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.pythonHost':'NameOfTestHost'}})
# VIC tree: wrong key
do_rpc_catch_exception('on invalid key', otdbRPC, "TaskSetSpecification", {'OtdbID':1099266,
'Specification':{'LOFAR.ObsSW.Observation.ObservationControl.PythonControl.NoSuchKey':'NameOfTestHost'}})
if __name__ == "__main__":
logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s', level = logging.INFO)
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment