Skip to content
Snippets Groups Projects
Commit d8c779ea authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Merge branch 'SW-818' into 'LOFAR-Release-4_0'

SW-818: Resolve SW-818

See merge request !61
parents 91e0d851 e9dc69fb
No related branches found
No related tags found
2 merge requests!62Lofar release 4 0 into master,!61SW-818: Resolve SW-818
Showing
with 146 additions and 89 deletions
......@@ -142,6 +142,9 @@ def convert_slurm_nodes_string_to_node_number_list(slurm_string):
:param slurm_string: a string in 'slurm-like' node format, like cpu[01-03,11-12] or cpu01
:return: a list of node numbers (ints)
'''
if isinstance(slurm_string, bytes):
slurm_string = slurm_string.decode('utf-8')
result = []
stripped_slurm_string = slurm_string.strip()
left_bracket_idx = stripped_slurm_string.find('[')
......
......@@ -88,6 +88,30 @@ def execute_in_parallel_over_station_group(cmd, station_group='today', timeout=3
stations = get_current_stations(station_group=station_group, as_host_names=True)
return execute_in_parallel_over_stations(cmd=cmd, stations=stations, timeout=timeout, max_parallel=max_parallel)
def translate_user_station_string_into_station_list(user_station_string: str):
'''
try to deal with user input like 'cs001,cs001' or 'today' or ... etc
No guarantees! just best effort!
:param user_station_string: a string like 'cs001,cs001' or 'today' or ... etc
:return: a list of station names
'''
if isinstance(user_station_string, bytes):
user_station_string = user_station_string.decode('utf-8')
if not isinstance(user_station_string, str):
raise ValueError("cannot parse user_station_string")
if ',' in user_station_string:
return user_station_string.split(',')
# maybe 'stations' is a group. Do lookup.
current_stations = get_current_stations(user_station_string, as_host_names=False)
if current_stations:
return current_stations
# just treat the stations string as list of stations and hope for the best
return [user_station_string]
def get_current_stations(station_group='today', as_host_names=True):
'''
Wrapper function around the amazing lcurun and stations.txt operators system.
......
......@@ -7,5 +7,6 @@ from lofar.mac.tbb.tbb_freeze import parse_args
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
args = parse_args()
with TBBRPC() as rpc:
with TBBRPC.create() as rpc:
rpc.freeze_data(args.stations, args.dm, args.timesec, args.timensec)
......@@ -8,6 +8,7 @@ from lofar.mac.tbb.tbb_load_firmware import parse_args
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
args = parse_args()
with TBBRPC() as rpc:
with TBBRPC.create() as rpc:
rpc.switch_firmware(args.stations, args.mode)
......@@ -8,6 +8,7 @@ from lofar.mac.tbb.tbb_release_recording import parse_args
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
args = parse_args()
with TBBRPC() as rpc:
with TBBRPC.create() as rpc:
rpc.release_data(args.stations)
......@@ -7,6 +7,7 @@ from lofar.mac.tbb.tbb_restart_recording import parse_args
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
args = parse_args()
with TBBRPC() as rpc:
with TBBRPC.create() as rpc:
rpc.restart_recording(args.stations)
......@@ -7,5 +7,6 @@ from lofar.mac.tbb.tbb_set_storage import parse_args
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
args = parse_args()
with TBBRPC() as rpc:
with TBBRPC.create() as rpc:
rpc.set_storage(args.map)
......@@ -2,8 +2,7 @@
import logging
from optparse import OptionParser
from lofar.mac.tbbservice.config import DEFAULT_TBB_BUSNAME, DEFAULT_TBB_SERVICENAME, DEFAULT_BROKER
from lofar.mac.tbbservice.config import DEFAULT_TBB_NOTIFICATION_BUSNAME
from lofar.mac.tbbservice.config import DEFAULT_BUSNAME, DEFAULT_BROKER
from lofar.mac.tbbservice.client import start_datawriters_and_wait_until_finished
if __name__ == '__main__':
......@@ -13,26 +12,13 @@ if __name__ == '__main__':
parser = OptionParser('%prog [options]',
description='issue a command to the tbb service to start the datawriters, and wait until they are done.')
parser.add_option('-o', '--output_path', dest='output_path', type='string', help='directory path to write the data file to.')
parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default')
parser.add_option("-b", "--tbb_service_busname", dest="tbb_service_busname", type="string",
default=DEFAULT_TBB_BUSNAME,
help="Name of the bus on which the tbb service listens. [default: %default]")
parser.add_option("-s", "--tbb_service_name", dest="tbb_service_name", type="string",
default=DEFAULT_TBB_SERVICENAME,
help="Name of the tbb service. [default: %default]")
parser.add_option("-n", "--tbb_notification_busname", dest="tbb_notification_busname", type="string",
default=DEFAULT_TBB_NOTIFICATION_BUSNAME,
help='Name of the notification bus exchange on the qpid broker on which the tbb notifications are published, default: %default')
parser.add_option("-d", "--duration", dest='num_samples_per_subband', required=True,
help="Expected duration in number of samples per subband.")
parser.add_option("-d", "--duration", dest='num_samples_per_subband', help="Expected duration in number of samples per subband.")
(options, args) = parser.parse_args()
if options.output_path is None or options.num_samples_per_subband is None:
parser.print_help()
exit(1)
start_datawriters_and_wait_until_finished(output_path=options.output_path,
num_samples_per_subband=options.num_samples_per_subband,
service_busname=options.tbb_service_busname,
service_name=options.tbb_service_name,
notification_busname=options.tbb_notification_busname,
broker=options.broker)
num_samples_per_subband=options.num_samples_per_subband)
......@@ -7,5 +7,6 @@ from lofar.mac.tbb.tbb_start_recording import parse_args
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
args = parse_args()
with TBBRPC() as rpc:
with TBBRPC.create() as rpc:
rpc.start_recording(args.stations, args.mode, args.subbands)
......@@ -2,30 +2,15 @@
import logging
from optparse import OptionParser
from lofar.mac.tbbservice.config import DEFAULT_TBB_BUSNAME, DEFAULT_TBB_SERVICENAME, DEFAULT_BROKER
from lofar.mac.tbbservice.config import DEFAULT_TBB_NOTIFICATION_BUSNAME
from lofar.mac.tbbservice.config import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.mac.tbbservice.client import stop_datawriters_and_wait_until_finished
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='issue a command to the tbb service to stop the datawriters, and wait until they are done.')
parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default')
parser = OptionParser('%prog', description='issue a command to the tbb service to stop the datawriters, and wait until they are done.')
parser.add_option("-b", "--tbb_service_busname", dest="tbb_service_busname", type="string",
default=DEFAULT_TBB_BUSNAME,
help="Name of the bus on which the tbb service listens. [default: %default]")
parser.add_option("-s", "--tbb_service_name", dest="tbb_service_name", type="string",
default=DEFAULT_TBB_SERVICENAME,
help="Name of the tbb service. [default: %default]")
parser.add_option("-n", "--tbb_notification_busname", dest="tbb_notification_busname", type="string",
default=DEFAULT_TBB_NOTIFICATION_BUSNAME,
help='Name of the notification bus exchange on the qpid broker on which the tbb notifications are published, default: %default')
(options, args) = parser.parse_args()
stop_datawriters_and_wait_until_finished(service_busname=options.tbb_service_busname,
service_name=options.tbb_service_name,
notification_busname=options.tbb_notification_busname,
broker=options.broker)
stop_datawriters_and_wait_until_finished()
......@@ -7,6 +7,7 @@ from lofar.mac.tbb.tbb_upload_to_cep import parse_args
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
args = parse_args()
with TBBRPC() as rpc:
with TBBRPC.create() as rpc:
rpc.upload_data(args.stations, args.dm, args.start_time, args.duration, args.sub_bands, args.wait_time, args.boards)
#!/usr/bin/env python3
from threading import Event
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME, BusListenerJanitor
from lofar.mac.tbbservice.client.tbbservice_rpc import TBBRPC
from lofar.mac.tbbservice.client.tbbbuslistener import TBBBusListener
from lofar.mac.tbbservice.client.tbbbuslistener import TBBBusListener, TBBEventMessageHandler
from lofar.common.util import single_line_with_single_spaces
class Waiter(TBBBusListener):
'''Helper class which overrides TBBBusListener.onDataWritersFinished
and waits for synchronization using threading Events'''
def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs):
'''Helper class waiting for the DataWritersFinished event'''
class WaiterTBBEventMessageHandler(TBBEventMessageHandler):
'''concrete implementation of the TBBEventMessageHandler,
setting the waiter's wait_event (threading.Event) to signal the waiter that waiting is over.'''
def __init__(self, waiter):
super().__init__()
self._waiter = waiter
def onDataWritersStarting(self, msg_content):
logger.info("received DataWritersStarting event: %s", single_line_with_single_spaces(str(msg_content)))
def onDataWritersStarted(self, msg_content):
logger.info("received DataWritersStarted event: %s", single_line_with_single_spaces(str(msg_content)))
def onDataWritersStopping(self, msg_content):
logger.info("received DataWritersStopping event: %s", single_line_with_single_spaces(str(msg_content)))
def onDataWritersStopped(self, msg_content):
logger.info("received DataWritersStopped event: %s", single_line_with_single_spaces(str(msg_content)))
def onDataWritersFinished(self, msg_content):
logger.info("received DataWritersFinished event: %s", single_line_with_single_spaces(str(msg_content)))
# signal that we're done waiting
self._waiter.wait_event.set()
def __init__(self, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
self.wait_event = Event()
super(Waiter, self).__init__(busname=busname, broker=broker, **kwargs)
super(Waiter, self).__init__(handler_type=Waiter.WaiterTBBEventMessageHandler,
handler_kwargs={'waiter': self},
exchange=exchange, broker=broker)
def onDataWritersFinished(self, msg_content):
# signal that we're done waiting
self.wait_event.set()
def wait(self, timeout: int=24*3600, log_interval: int=60):
'''wait until the DataWritersFinished event is received.
:param timeout: timeout in seconds
:param log_interval: interval in seconds to log a message that we are still waiting.
:raises TimeoutError after timeout seconds
'''
start = datetime.utcnow()
while datetime.utcnow() - start <= timedelta(seconds=timeout):
logger.info("waiting for DataWritersFinished event... timout in %s", timedelta(seconds=timeout)-(datetime.utcnow() - start))
if self.wait_event.wait(max(1, min(timeout, log_interval))):
return
def wait(self, timeout=24*3600):
# wait until onDataWritersFinished
self.wait_event.wait(timeout)
raise TimeoutError("Did not receive a DataWritersFinished event within %s seconds" %(timeout,))
def start_datawriters_and_wait_until_finished(output_path,
num_samples_per_subband,
busname=DEFAULT_BUSNAME,
timeout: int = 24 * 3600, log_interval: int = 60,
exchange=DEFAULT_BUSNAME,
broker=DEFAULT_BROKER):
'''
convenience method which issues a start_datawriters command to the tbbservice, and waits until all writers are done
'''
with Waiter() as waiter:
with TBBRPC(busname=busname, broker=broker) as rpc:
with BusListenerJanitor(Waiter(exchange=exchange, broker=broker)) as waiter:
with TBBRPC.create(exchange=exchange, broker=broker) as rpc:
rpc.start_datawriters(output_path=output_path, num_samples_per_subband=num_samples_per_subband)
waiter.wait()
logger.info("it's ok to cancel this program. datawriters will continue in the background.")
waiter.wait(timeout, log_interval)
def stop_datawriters_and_wait_until_finished(busname=DEFAULT_BUSNAME,
broker=DEFAULT_BROKER):
logger.info("Datawriters finished")
def stop_datawriters_and_wait_until_finished(timeout: int = 24 * 3600, log_interval: int = 60,
exchange=DEFAULT_BUSNAME,
broker=DEFAULT_BROKER):
'''
convenience method which issues a stop_datawriters command to the tbbservice, and waits until all writers are done
'''
with Waiter() as waiter:
with TBBRPC(busname=busname, broker=broker) as rpc:
with BusListenerJanitor(Waiter(exchange=exchange, broker=broker)) as waiter:
with TBBRPC.create(exchange=exchange, broker=broker) as rpc:
rpc.stop_datawriters()
waiter.wait()
logger.info("it's ok to cancel this program. datawriters will stop in the background.")
waiter.wait(timeout, log_interval)
logger.info("Datawriters stopped")
......@@ -22,6 +22,7 @@
from lofar.messaging.messagebus import AbstractMessageHandler, BusListener, LofarMessage, EventMessage
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.mac.tbbservice.config import DEFAULT_TBB_NOTIFICATION_PREFIX
from lofar.common.util import single_line_with_single_spaces
import logging
logger = logging.getLogger()
......@@ -38,9 +39,7 @@ class TBBEventMessageHandler(AbstractMessageHandler):
stripped_subject = msg.subject.replace("%s." % DEFAULT_TBB_NOTIFICATION_PREFIX, '')
logger.info("TBBEventMessageHandler.handleMessage on%s: %s" % (stripped_subject, str(msg.content).replace('\n', ' ')))
logger.info("on%s: %s", msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' '))
logger.debug("TBBEventMessageHandler.handle_message: on%s content=%s", stripped_subject, single_line_with_single_spaces(str(msg.content)))
if stripped_subject == 'DataWritersStarting':
self.onDataWritersStarting(msg.content)
......@@ -82,10 +81,8 @@ class TBBEventMessageHandler(AbstractMessageHandler):
class TBBBusListener(BusListener):
"""The OTDBBusListener is a normal BusListener listening specifically to EventMessages with OTDB notification subjects.
It uses by default the OTDBEventMessageHandler to handle the EventMessages.
If you want to implement your own behaviour, then derive a subclass of the OTDBEventMessageHandler, and inject that in this OTDBBusListener.
See example at the top of this file.
"""The TBBBusListener is a normal BusListener listening specifically to EventMessages with TBB notification subjects.
It uses by default the TBBEventMessageHandler to handle the EventMessages.
"""
def __init__(self, handler_type: TBBEventMessageHandler.__class__ = TBBEventMessageHandler,
handler_kwargs: dict = None,
......
#!/usr/bin/env python3
import logging
from lofar.messaging import RPCClientContextManagerMixin, RPCClient, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging import RPCClientContextManagerMixin, RPCClient, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT
from lofar.mac.tbbservice.config import DEFAULT_TBB_SERVICENAME
logger = logging.getLogger(__name__)
DEFAULT_TIMEOUT = 24*60*60, # tbb dumps can take a long time.... timeout of 24 hours is ok.
class TBBRPC(RPCClientContextManagerMixin):
def __init__(self, rpc_client: RPCClient = None):
super(TBBRPC, self).__init__()
self._rpc_client = rpc_client or RPCClient(DEFAULT_TBB_SERVICENAME)
@staticmethod
def create(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, timeout: int = DEFAULT_TIMEOUT):
def create(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, timeout: int = DEFAULT_RPC_TIMEOUT):
return TBBRPC(RPCClient(service_name=DEFAULT_TBB_SERVICENAME, exchange=exchange, broker=broker, timeout=timeout))
def start_datawriters(self, output_path, num_samples_per_subband, voevent_xml=""):
......@@ -82,7 +80,7 @@ if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.INFO)
import pprint
with TBBRPC() as rpc:
with TBBRPC.create() as rpc:
import time
from lofar.mac.tbb.tbb_util import expand_list
logger.info(rpc.start_datawriters(output_path=None, num_samples_per_subband=None))
......
......@@ -51,6 +51,7 @@ from lofar.common.lcu_utils import stationname2hostname
from lofar.mac.tbb.tbb_caltables import add_station_calibration_tables_h5_files_in_directory
from lofar.mac.tbb.tbb_cable_delays import add_dipole_cable_delays_h5_files_in_directory
from lofar.mac.tbbservice.server.tbbservice_config import *
from lofar.common.util import single_line_with_single_spaces
class TBBServiceMessageHandler(ServiceMessageHandler):
......@@ -61,10 +62,11 @@ class TBBServiceMessageHandler(ServiceMessageHandler):
self.procs = {}
def _send_event_message(self, subject, content):
prefixed_subject = '%s.%s' % (DEFAULT_TBB_NOTIFICATION_PREFIX, subject)
logger.info('Sending notification to %s with subject \'%s\' %s' % (self.exchange,
subject,
str(content).replace('\n', ' ')))
self.send(EventMessage(subject='%s%s' % (DEFAULT_TBB_NOTIFICATION_PREFIX, subject),
prefixed_subject,
single_line_with_single_spaces(str(content))))
self.send(EventMessage(subject=prefixed_subject,
content=content))
def _update_parset(self, parset, updates):
......@@ -105,6 +107,8 @@ class TBBServiceMessageHandler(ServiceMessageHandler):
:return: the parset of the running obs
"""
logger.info("determining running observation...")
# fetch parset of current active observation and save a modified version for the tbb writers
with RADBRPC.create(exchange=self.exchange, broker=self.broker) as rarpc:
running_observations = rarpc.getTasks(task_status='active', task_type='observation')
......@@ -115,6 +119,8 @@ class TBBServiceMessageHandler(ServiceMessageHandler):
# pick first TODO: determine actual relevant observation
otdb_id = running_observations[0]['otdb_id']
logger.info("running observation otdb_id=%s. Fetching parset...", otdb_id)
with OTDBRPC.create(exchange=self.exchange, broker=self.broker) as otdbrpc:
return parameterset(otdbrpc.taskGetSpecification(otdb_id=otdb_id)['specification'])
......@@ -499,7 +505,7 @@ def main():
handler_type=TBBServiceMessageHandler,
exchange=options.busname,
broker=options.broker,
num_threads=2):
num_threads=1):
logger.info('*****************************************')
logger.info('Started TBBService')
......
DEFAULT_TBB_SERVICENAME = 'TBB.Service'
from lofar.messaging.config import DEFAULT_BUSNAME, DEFAULT_BROKER
DEFAULT_TBB_SERVICENAME = 'TBB.Service'
DEFAULT_TBB_NOTIFICATION_PREFIX = 'TBB.notification'
......@@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
from lofar.mac.tbb.tbb_config import lcurun_command, tbb_command
from lofar.mac.tbb.tbb_util import split_stations_by_boardnumber
from lofar.common.lcu_utils import execute_in_parallel_over_stations
from lofar.common.lcu_utils import execute_in_parallel_over_stations, translate_user_station_string_into_station_list
from lofar.common.subprocess_utils import wrap_composite_command
def freeze_tbb(stations, dm, timesec, timensec):
......@@ -26,8 +26,7 @@ def freeze_tbb(stations, dm, timesec, timensec):
:return:
"""
if isinstance(stations, str):
stations = stations.split(',')
stations = translate_user_station_string_into_station_list(stations)
logger.info('Freezing TBB boards for stations: %s', ', '.join(stations))
......@@ -91,6 +90,7 @@ def parse_args():
if args.dm is None:
logger.error("No dm provided")
parser.print_help()
exit(1)
return args
......
......@@ -11,7 +11,8 @@ import time
import subprocess
import logging
from lofar.mac.tbb.tbb_config import supported_modes, lcurun_command, tbb_command
from lofar.common.lcu_utils import translate_user_station_string_into_station_list
from lofar.common.subprocess_utils import check_output_returning_strings
def load_tbb_firmware(stations, mode):
logging.info('Loading TBB firmware for mode \"%s\"' % (mode))
......@@ -26,6 +27,8 @@ def load_tbb_firmware(stations, mode):
else:
slot = 1
stations = translate_user_station_string_into_station_list(stations)
logging.info("It is assumed that the firmware for mode \"%s\" is in slot %d!" % (mode, slot))
relay = lcurun_command + [stations]
......@@ -47,7 +50,7 @@ def load_tbb_firmware(stations, mode):
cmd = [tbb_command, '--imageinfo=%s' % str(board)]
cmd = relay + cmd
logging.info('Executing %s' % ' '.join(cmd))
logging.info(subprocess.check_output(cmd))
logging.info(subprocess.check_output_returning_strings(cmd))
def parse_args():
......
......@@ -11,10 +11,12 @@ import time
import subprocess
import logging
from lofar.mac.tbb.tbb_config import lcurun_command, tbb_command
from lofar.common.lcu_utils import translate_user_station_string_into_station_list
def release_tbb(stations):
logging.info('Releasing TBB recording')
stations = translate_user_station_string_into_station_list(stations)
relay = lcurun_command + [stations]
cmd = relay + [tbb_command, '--free']
......
......@@ -12,11 +12,14 @@ import time
import subprocess
import logging
from lofar.mac.tbb.tbb_config import supported_modes, lcurun_command, tbb_command
from lofar.common.lcu_utils import translate_user_station_string_into_station_list
def restart_tbb_recording(stations):
logging.info("Restarting TBB recording")
stations = translate_user_station_string_into_station_list(stations)
relay = lcurun_command + [stations]
cmd = relay + [tbb_command, "--record"]
logging.info("Executing %s" % " ".join(cmd))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment