-
Jörn Künsemöller authoredJörn Künsemöller authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tbbservice.py 21.69 KiB
#!/usr/bin/env python
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import os
from socket import gethostname
from optparse import OptionParser
from subprocess import Popen, PIPE
from datetime import datetime, timedelta
from time import sleep
import logging
logger = logging.getLogger()
from lofar.messaging import Service
from lofar.messaging.Service import MessageHandlerInterface
from lofar.messaging.messages import EventMessage
from lofar.messaging.messagebus import ToBus
from lofar.common.util import waitForInterrupt
from lofar.mac.tbbservice.config import *
from lofar.common.cep4_utils import *
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.mac.tbb.tbb_load_firmware import load_tbb_firmware
from lofar.mac.tbb.tbb_freeze import freeze_tbb
from lofar.mac.tbb.tbb_release_recording import release_tbb
from lofar.mac.tbb.tbb_set_storage import set_tbb_storage
from lofar.mac.tbb.tbb_start_recording import start_tbb
from lofar.mac.tbb.tbb_upload_to_cep import upload_tbb_data
from lofar.mac.tbb.tbb_util import parse_parset_from_voevent
class TBBServiceMessageHandler(MessageHandlerInterface):
'''
TBBServiceMessageHandler implements the MessageHandlerInterface
and glues the message handling in Service to the methods in TBBControlService
'''
def __init__(self, tbb_control_service, **kwargs):
self._tbb_control_service = tbb_control_service
# set/overwrite defaults
kwargs['use_service_methods'] = True
kwargs['numthreads'] = 2
# init MessageHandlerInterface with supplied kwargs
super(TBBServiceMessageHandler, self).__init__(**kwargs)
# set the mapping for the MessageHandlerInterface from message command
# to actual method in the _tbb_control_service
self.service2MethodMap = {'start_datawriters': self._tbb_control_service.start_datawriters,
'stop_datawriters': self._tbb_control_service.stop_datawriters,
'switch_firmware': self._tbb_control_service.switch_firmware,
'start_recording': self._tbb_control_service.start_recording,
'release_data': self._tbb_control_service.release_data,
'restart_recording': self._tbb_control_service.restart_recording,
'upload_data': self._tbb_control_service.upload_data,
'freeze_data': self._tbb_control_service.freeze_data,
'set_storage': self._tbb_control_service.set_storage}
class TBBControlService:
def __init__(self,
busname=DEFAULT_TBB_BUSNAME,
servicename=DEFAULT_TBB_SERVICENAME,
notification_busname=DEFAULT_TBB_NOTIFICATION_BUSNAME,
broker=DEFAULT_BROKER):
# the event_bus is used to send out notifications about what this TBBControlService is doing
self.event_bus = ToBus(notification_busname, broker=broker)
# create a service to receive and handle the messages
# supply 'self' as argument to the TBBServiceMessageHandler,
# so the methods in this TBBControlService can be called when a service method is called.
self.service = Service(servicename,
TBBServiceMessageHandler,
busname=busname,
broker=broker,
handler_args={'tbb_control_service': self},
use_service_methods=True)
# Keep the running datawriter process information in this dict.
self.procs = {}
def __enter__(self):
'''enter context, open connections'''
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
'''leave context, close connections'''
self.close()
def open(self):
'''open the qpid connections for the event bus and the service'''
self.event_bus.open()
self.service.start_listening()
self._send_event_message('TBBServiceStarted', 'tbb service started at %s on host %s. rpc-service is listening on %s' % (
datetime.utcnow(), gethostname(), self.service.address))
def close(self):
'''close the qpid connections for the event bus and the service'''
self.service.stop_listening()
self._send_event_message('TBBServiceStopped', 'tbb service stopped at %s on host %s. stopped rpc-service on %s' % (
datetime.utcnow(), gethostname(), self.service.address))
self.event_bus.close()
def _send_event_message(self, subject, content):
logger.info('Sending notification to %s with subject \'%s\' %s' % (self.event_bus.address,
subject,
str(content).replace('\n', ' ')))
self.event_bus.send(EventMessage(context='%s%s' % (DEFAULT_TBB_NOTIFICATION_PREFIX, subject),
content=content))
def _update_parset(self, parset, updates):
"""
Update the values in the parset with the values of the updates dict.
All parset keys that contain the update key are updated, so this matches irrespective of prefixes.
If no existing entry is found, the update key and value are added to the parset.
:param parset: The parset dict to update
:param updates: A second parset with the updates
:return: The updated parset
"""
# Note: this could be a simple dict update, if we don't have to check for substrings.
# But I assume this is required to allow prefixes or sth like that.
# Note: ICD states that non-filled-in parameters should/can be present, and should contain 0 by default.
# todo: parse voevent to gather all info we want.
for dk, dv in updates.items():
found_in_parset = False
for k, v in parset.items():
if dk in k:
found_in_parset = True
parset[k] = dv
if not found_in_parset:
parset[dk] = dv
return parset
def _get_parset_of_running_obs(self):
"""
determine running observation from RA, then query its parset from OTDB.
:return: the parset of the running obs
"""
# fetch parset of current active observation and save a modified version for the tbb writers
with RARPC(broker=self.event_bus.broker) as rarpc:
running_observations = rarpc.getTasks(task_status='active', task_type='observation')
if not running_observations:
raise RuntimeError("No active observation found. Cannot create parset for TBB writer.")
# pick first TODO: determine actual relevant observation
otdb_id = running_observations[0]['otdb_id']
with OTDBRPC(broker=self.event_bus.broker) as otdbrpc:
parset = otdbrpc.taskGetSpecification(otdb_id=otdb_id)['specification']
logger.info(type(parset))
return parset
def prepare_alert_tbb_parset(self, voevent=""):
"""
Create a parset from running obs and voevent xml, write to file
:return: path of the parset file
"""
# TODO, replace with actual path
parset_path = '/data/scratch/tbb_alert_%s.parset' % datetime.utcnow().strftime("%Y%m%d%H%M%S")
# get base parset from otdb
parset = self._get_parset_of_running_obs()
# sanity check
if 'HBA' not in parset.get('ObsSW.Observation.antennaArray'):
raise RuntimeError('Current antennaArray is %s. Not starting TBB datawriters.' % (
parset.get('ObsSW.Observation.antennaArray'),))
# update parset with TBB subband mode keys and default values
defaults_parset = {
'Observation.TBB.TBBsetting.operatingMode': '2',
'Observation.TBB.TBBsetting.subbandList': '[10..489]',
'Observation.TBB.TBBsetting.triggerDispersionMeasure': 0,
'Observation.TBB.TBBsetting.triggerDispersionMeasureUnit': "pc cm^-3",
'Observation.TBB.TBBsetting.time': 0, # -> time.time() ???
'Observation.TBB.TBBsetting.sampleNumber': 0,
'Observation.TBB.TBBsetting.fitDirectionCoordinateSystem': 0,
'Observation.TBB.TBBsetting.fitDirectionAngle1': 0,
'Observation.TBB.TBBsetting.fitDirectionAngle2': 0,
'Observation.TBB.TBBsetting.fitDirectionDistance': 0,
'Observation.TBB.TBBsetting.fitDirectionVariance': 0,
'Observation.TBB.TBBsetting.referenceFrequency': 0,
'Observation.TBB.TBBsetting.observatoryCoordinates': 0,
'Observation.TBB.TBBsetting.observatoryCoordinatesCoordinateSystem': 0,
'Observation.TBB.TBBsetting.triggerId': 0,
'Observation.TBB.TBBsetting.additionalInfo': voevent,
# older keys we probably still want to fill here:
'Observation.TBB.TBBsetting.triggerType': 'Unknown',
'Observation.TBB.TBBsetting.triggerVersion': 0
}
self._update_parset(parset, defaults_parset)
# update parset with values from received voevent
voevent_parset = parse_parset_from_voevent(voevent)
self._update_parset(parset, voevent_parset)
# write parset to file and return path
logger.info('saving ttbwriter parset in %s', parset_path)
parset = '\n'.join("%s=%s" % (k,v) for k,v in parset.items())
logger.info(parset)
cmd = ['echo', '"%s"' % (parset,), ">", parset_path]
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
proc = Popen(cmd)
proc.communicate()
if proc.returncode != 0:
raise RuntimeError("Could not write tbb alert parset in %s" % (parset_path,))
logger.info('saved ttbwriter parset in %s', parset_path)
return parset_path
def start_datawriters(self, voevent=""):
"""
start the tbb datawriters and notify when done.
:param voevent: voevent xml string (for metadata)
"""
self._send_event_message('DataWritersStarting', {})
parset_path = self.prepare_alert_tbb_parset(voevent)
# this is a list of ports for the dutch stations only
# TODO: make dynamic, depending on which stations are actually used
ports = ','.join(str(x) for x in range(31664, 31670))
available_nodes = get_cep4_available_cpu_nodes_sorted_ascending_by_load(0.25)
start = datetime.utcnow()
timeout = timedelta(days=1) #TODO: make timeout configurable (or an argument)
# TODO: what should be do if datawriters are already running? kill/wait for them first?
self.procs = {}
for node in available_nodes:
cmd = ['TBB_Writer',
'-s', parset_path,
'-p', ports,
'-k', '0',
'-t', '60',
'-o', '/data/projects/Commissioning2018/tbb/'] #TODO: make outputpath configurable
# wrap the command in a cep4 docker ssh call
cmd = wrap_command_for_docker(cmd, 'lofar-outputproc', 'TBB-Task11328', added_privileges=True) #TODO: used docker image should be configurable
cmd = wrap_command_in_cep4_cpu_node_ssh_call(cmd, node, via_head=True)
logger.info('starting datawriter on node %s, executing: %s', node, ' '.join(cmd))
proc = Popen(cmd)
self.procs[node] = proc
self._send_event_message('DataWritersStarted', {})
def wait_for_datawriters(self, timeout = 24 * 3600):
'''
Monitor started procs until they are all done or timeout seconds have
passed.
:param timeout: Timeout in seconds until data writers will be
forcefully killed. The default is 24 hours.
'''
start = datetime.utcnow()
while self.procs:
logger.info('waiting for %d datawriters to finish...', len(self.procs))
finished_procs = { node: proc for node, proc in self.procs.items()
if proc.poll() is not None}
if finished_procs:
for node, proc in finished_procs.items():
logger.info('datawriter on node %s finished with exitcode=%d', node, proc.returncode)
del self.procs[node]
else:
sleep(1.0)
if datetime.utcnow() - start >= timeout:
logger.warning('timeout while waiting for %d more datawriters...', len(self.procs))
self.stop_datawriters()
self._send_event_message('DataWritersFinished', {})
def stop_datawriters(self):
'''Stop TBB datawriters running on CEP4 and notify when done'''
self._send_event_message('DataWritersStopping', {})
for node, proc in self.procs.items():
logger.warning('killing datawriter on node: %s', node)
proc.kill()
del self.procs[node]
self._send_event_message('DataWritersStopped', {})
# also call 'normal' wait_for_datawriters method,
# which has no datawriter to wait for anymore...
# but as a result, it sends a 'DataWritersFinished' event.
self.wait_for_datawriters()
def switch_firmware(self, stations, mode):
"""
Command TBBs to switch to one of two firmwares: sub-band mode or raw voltage mode.
:param stations: string - Only TBBs of these stations will be commanded to start data recording.
:param mode: string - Parameter to set-up data recording in either \"subband\" or \"rawvoltage\" mode.
"""
log_message = "Switching TBB firmware on stations %s to \"%s\"" % (stations, mode)
logger.info(log_message + "...")
load_tbb_firmware(stations, mode)
logger.info(log_message + "done.")
def start_recording(self, stations, mode, subbands):
"""
Command TBBs in ALERT mode to start data recording.
:param stations: string - Only TBBs of these stations will be commanded to start data recording.
:param mode: string - Parameter to set-up data recording in either \"subband\" or \"rawvoltage\" mode.
:param sub_bands: string - The list of sub-bands that the RSP will be set-up for.
"""
log_message = "Starting the TBB data recording for mode %s on stations %s in sub-bands %s" % (mode, stations, subbands)
logger.info(log_message + "...")
start_tbb(stations, mode, subbands)
logger.info(log_message + "done.")
def freeze_data(self, stations, dm, timesec, timensec):
"""
Command TBBs in ALERT mode to freeze the contents of their memory. This stops recording but does not discard the already recorded data.
:param stations: string - Only TBBs of these stations will freeze their memory contents.
:param dm: float - The dispersion measure that is to be set in the TBBs.
:param timesec: int- The start time of the data recording in seconds since 1970-01-01T00.00.00. This is the integer part of the seconds.
:param timensec: int - The start time of the data recording in seconds since 1970-01-01T00.00.00. This is the fractional part as integer in nanoseconds.
"""
log_message = "Freezing the TBB data on stations %s. DM = %s, time = %f" % (stations, dm, (float(timesec) + (1e-9 * int(timensec))))
logger.info(log_message + "...")
freeze_tbb(stations, dm, timesec, timensec)
logger.info(log_message + "done.")
def upload_data(self, stations, dm, start_time, duration, sub_bands, wait_time, boards):
"""
Command TBBs in ALERT mode to upload part or all of their memory to CEP.
:param stations: string - Only TBBs of these stations will upload their data to CEP.
:param dm: float - The dispersion measure that was set during data recording.
:param start_time: float - Designates the start of the recorded data in the TBB memory which will be uploaded to CEP. Earlier data in TBB memory will not be uploaded. 0.0s is 1970-01-01T00.00.00.
:param duration: float - The time span for which the data will be uploaded.
:param sub_bands: string - The list of sub-bands that will be uploaded.
:param wait_time: float - The time that has to be waited before another sub-band upload can be commanded.
:param boards: string - Only these boards will be commanded to upload the spectral data to CEP.
"""
log_message = "Uploading the TBB data on stations %s, " \
"boards = %s, " \
"sub-bands = %s, " \
"dm = %f, " \
"start_time = %f, " \
"duration = %f, " \
"wait time = %f" % (stations, boards, sub_bands, dm, start_time, duration, wait_time)
logger.info(log_message + "...")
upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, boards)
logger.info(log_message + "done.")
def release_data(self, stations):
"""
Command TBBs in ALERT mode to release data recording.
:param stations: string - Only TBBs of these stations will be commanded to stop data recording.
"""
log_message = "Releasing the TBB data recording on stations %s" % (stations)
logger.info(log_message + "...")
release_tbb(stations)
logger.info(log_message + "done.")
def restart_recording(self, stations, mode, subbands):
"""
Command TBBs in ALERT mode to re-start data recording. This function just calls release_data and start_recording.
:param stations: string - Only TBBs of these stations will be commanded to re-start data recording.
:param mode: string - Parameter to set-up data recording in either \"subbands\" or \"rawvoltage\" mode.
:param sub_bands: string - The list of sub-bands that the RSP will be set-up for.
"""
log_message = "Restarting the TBB data recording for mode %s " \
"on stations %s in subbands %s" % (mode, stations, subbands)
logger.info(log_message + "...")
self.release_data(stations)
self.start_recording(stations, mode, subbands)
logger.info(log_message + "done.")
def set_storage(self, map):
"""
Command TBBs in ALERT mode to stream data to specific CEP nodes.
:param map: dict - containing a one on one mapping of station LCUs to nodes
"""
log_message = "Setting the storage nodes where TBB data is received to: %s" % (map)
logger.info(log_message + "...")
set_tbb_storage(map)
logger.info(log_message + "done.")
def main():
'''main method which starts the TBBControlService with the cmdline supplied options and then waits until stopped by an interrupt.'''
# make sure we run in UTC timezone
os.environ['TZ'] = 'UTC'
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='run the tbb service, which can be used to e.g. start the datawriters, start/stop tbb recordings, or start streaming to CEP.')
parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default')
parser.add_option("-b", "--tbb_service_busname", dest="tbb_service_busname", type="string",
default=DEFAULT_TBB_BUSNAME,
help="Name of the bus on which the tbb service listens. [default: %default]")
parser.add_option("-s", "--tbb_service_name", dest="tbb_service_name", type="string",
default=DEFAULT_TBB_SERVICENAME,
help="Name of the tbb service. [default: %default]")
parser.add_option("-n", "--tbb_notification_busname", dest="tbb_notification_busname", type="string",
default=DEFAULT_TBB_NOTIFICATION_BUSNAME,
help='Name of the notification bus exchange on the qpid broker on which the tbb notifications are published, default: %default')
(options, args) = parser.parse_args()
with TBBControlService(busname=options.tbb_service_busname,
servicename=options.tbb_service_name,
notification_busname=options.tbb_notification_busname,
broker=options.broker) as service:
logger.info('*****************************************')
logger.info('Started TBBService')
logger.info('*****************************************')
# Note: I removed the automatic start of datawriters that was issued here.
# (Does not seem to be a thing the service is supposed to do by default?)
waitForInterrupt()
if __name__ == '__main__':
main()