diff --git a/LCS/PyCommon/cep4_utils.py b/LCS/PyCommon/cep4_utils.py index 3bb3168638fc5719fa04231b3caa795ebf26d1f0..d8360a805b62b66bbc279aac4843c94025040b97 100644 --- a/LCS/PyCommon/cep4_utils.py +++ b/LCS/PyCommon/cep4_utils.py @@ -142,6 +142,9 @@ def convert_slurm_nodes_string_to_node_number_list(slurm_string): :param slurm_string: a string in 'slurm-like' node format, like cpu[01-03,11-12] or cpu01 :return: a list of node numbers (ints) ''' + if isinstance(slurm_string, bytes): + slurm_string = slurm_string.decode('utf-8') + result = [] stripped_slurm_string = slurm_string.strip() left_bracket_idx = stripped_slurm_string.find('[') diff --git a/LCS/PyCommon/lcu_utils.py b/LCS/PyCommon/lcu_utils.py index e4e3abe0c7373142f9c9cf1ce1c30a51b25a9c37..97916ce208fc9799cc0e2459a2844fb9fc0fbee4 100644 --- a/LCS/PyCommon/lcu_utils.py +++ b/LCS/PyCommon/lcu_utils.py @@ -88,6 +88,30 @@ def execute_in_parallel_over_station_group(cmd, station_group='today', timeout=3 stations = get_current_stations(station_group=station_group, as_host_names=True) return execute_in_parallel_over_stations(cmd=cmd, stations=stations, timeout=timeout, max_parallel=max_parallel) +def translate_user_station_string_into_station_list(user_station_string: str): + ''' + try to deal with user input like 'cs001,cs001' or 'today' or ... etc + No guarantees! just best effort! + :param user_station_string: a string like 'cs001,cs001' or 'today' or ... etc + :return: a list of station names + ''' + if isinstance(user_station_string, bytes): + user_station_string = user_station_string.decode('utf-8') + + if not isinstance(user_station_string, str): + raise ValueError("cannot parse user_station_string") + + if ',' in user_station_string: + return user_station_string.split(',') + + # maybe 'stations' is a group. Do lookup. + current_stations = get_current_stations(user_station_string, as_host_names=False) + if current_stations: + return current_stations + + # just treat the stations string as list of stations and hope for the best + return [user_station_string] + def get_current_stations(station_group='today', as_host_names=True): ''' Wrapper function around the amazing lcurun and stations.txt operators system. diff --git a/MAC/Services/TBB/TBBClient/bin/tbbservice_freeze b/MAC/Services/TBB/TBBClient/bin/tbbservice_freeze index 1a92292a9b94f2d6b377e5d5e80fd406e5131a39..a6cf8e03e226612d1f6fa5b905da23e9242e33a0 100755 --- a/MAC/Services/TBB/TBBClient/bin/tbbservice_freeze +++ b/MAC/Services/TBB/TBBClient/bin/tbbservice_freeze @@ -7,5 +7,6 @@ from lofar.mac.tbb.tbb_freeze import parse_args if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) args = parse_args() - with TBBRPC() as rpc: + + with TBBRPC.create() as rpc: rpc.freeze_data(args.stations, args.dm, args.timesec, args.timensec) diff --git a/MAC/Services/TBB/TBBClient/bin/tbbservice_load_firmware b/MAC/Services/TBB/TBBClient/bin/tbbservice_load_firmware index 55d418a1b8fd079bd0305ef112772046b4724a8d..c2ff68b1d40570e7e2c10e0e7717c80c64ec8d3c 100755 --- a/MAC/Services/TBB/TBBClient/bin/tbbservice_load_firmware +++ b/MAC/Services/TBB/TBBClient/bin/tbbservice_load_firmware @@ -8,6 +8,7 @@ from lofar.mac.tbb.tbb_load_firmware import parse_args if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) args = parse_args() - with TBBRPC() as rpc: + + with TBBRPC.create() as rpc: rpc.switch_firmware(args.stations, args.mode) diff --git a/MAC/Services/TBB/TBBClient/bin/tbbservice_release_recording b/MAC/Services/TBB/TBBClient/bin/tbbservice_release_recording index 3d25cc0d20ca962bc5d72612558e1265c2633876..217696defcacf9936ed538d54f160ac7880f1702 100755 --- a/MAC/Services/TBB/TBBClient/bin/tbbservice_release_recording +++ b/MAC/Services/TBB/TBBClient/bin/tbbservice_release_recording @@ -8,6 +8,7 @@ from lofar.mac.tbb.tbb_release_recording import parse_args if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) args = parse_args() - with TBBRPC() as rpc: + + with TBBRPC.create() as rpc: rpc.release_data(args.stations) diff --git a/MAC/Services/TBB/TBBClient/bin/tbbservice_restart_recording b/MAC/Services/TBB/TBBClient/bin/tbbservice_restart_recording index 1b4aeaa647ac78aa53676e095fe1b24e2c37e586..195e7328ae38bee9dabf6ddbc3cc97da0b8b19a0 100755 --- a/MAC/Services/TBB/TBBClient/bin/tbbservice_restart_recording +++ b/MAC/Services/TBB/TBBClient/bin/tbbservice_restart_recording @@ -7,6 +7,7 @@ from lofar.mac.tbb.tbb_restart_recording import parse_args if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) args = parse_args() - with TBBRPC() as rpc: + + with TBBRPC.create() as rpc: rpc.restart_recording(args.stations) diff --git a/MAC/Services/TBB/TBBClient/bin/tbbservice_set_storage b/MAC/Services/TBB/TBBClient/bin/tbbservice_set_storage index 249d7ac078f19f7376d519317b93ecbeb958592e..952b58e2488b8258ff9b0b9e107d81d07a653272 100755 --- a/MAC/Services/TBB/TBBClient/bin/tbbservice_set_storage +++ b/MAC/Services/TBB/TBBClient/bin/tbbservice_set_storage @@ -7,5 +7,6 @@ from lofar.mac.tbb.tbb_set_storage import parse_args if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) args = parse_args() - with TBBRPC() as rpc: + + with TBBRPC.create() as rpc: rpc.set_storage(args.map) diff --git a/MAC/Services/TBB/TBBClient/bin/tbbservice_start_datawriters b/MAC/Services/TBB/TBBClient/bin/tbbservice_start_datawriters index 968cd6c7ebe8d57ce009024d92697f4a5ccc51bd..420aa3405e58ed98f6385c4f7871ab4f5e30e4c0 100755 --- a/MAC/Services/TBB/TBBClient/bin/tbbservice_start_datawriters +++ b/MAC/Services/TBB/TBBClient/bin/tbbservice_start_datawriters @@ -2,8 +2,7 @@ import logging from optparse import OptionParser -from lofar.mac.tbbservice.config import DEFAULT_TBB_BUSNAME, DEFAULT_TBB_SERVICENAME, DEFAULT_BROKER -from lofar.mac.tbbservice.config import DEFAULT_TBB_NOTIFICATION_BUSNAME +from lofar.mac.tbbservice.config import DEFAULT_BUSNAME, DEFAULT_BROKER from lofar.mac.tbbservice.client import start_datawriters_and_wait_until_finished if __name__ == '__main__': @@ -13,26 +12,13 @@ if __name__ == '__main__': parser = OptionParser('%prog [options]', description='issue a command to the tbb service to start the datawriters, and wait until they are done.') parser.add_option('-o', '--output_path', dest='output_path', type='string', help='directory path to write the data file to.') - - parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') - - parser.add_option("-b", "--tbb_service_busname", dest="tbb_service_busname", type="string", - default=DEFAULT_TBB_BUSNAME, - help="Name of the bus on which the tbb service listens. [default: %default]") - parser.add_option("-s", "--tbb_service_name", dest="tbb_service_name", type="string", - default=DEFAULT_TBB_SERVICENAME, - help="Name of the tbb service. [default: %default]") - parser.add_option("-n", "--tbb_notification_busname", dest="tbb_notification_busname", type="string", - default=DEFAULT_TBB_NOTIFICATION_BUSNAME, - help='Name of the notification bus exchange on the qpid broker on which the tbb notifications are published, default: %default') - parser.add_option("-d", "--duration", dest='num_samples_per_subband', required=True, - help="Expected duration in number of samples per subband.") + parser.add_option("-d", "--duration", dest='num_samples_per_subband', help="Expected duration in number of samples per subband.") (options, args) = parser.parse_args() + if options.output_path is None or options.num_samples_per_subband is None: + parser.print_help() + exit(1) + start_datawriters_and_wait_until_finished(output_path=options.output_path, - num_samples_per_subband=options.num_samples_per_subband, - service_busname=options.tbb_service_busname, - service_name=options.tbb_service_name, - notification_busname=options.tbb_notification_busname, - broker=options.broker) + num_samples_per_subband=options.num_samples_per_subband) diff --git a/MAC/Services/TBB/TBBClient/bin/tbbservice_start_recording b/MAC/Services/TBB/TBBClient/bin/tbbservice_start_recording index 26d5a324a271c09474a3fffb37f392ab11556718..83ebb8ac0f40d32756e3c6a1852c0c9430e0b38d 100755 --- a/MAC/Services/TBB/TBBClient/bin/tbbservice_start_recording +++ b/MAC/Services/TBB/TBBClient/bin/tbbservice_start_recording @@ -7,5 +7,6 @@ from lofar.mac.tbb.tbb_start_recording import parse_args if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) args = parse_args() - with TBBRPC() as rpc: + + with TBBRPC.create() as rpc: rpc.start_recording(args.stations, args.mode, args.subbands) diff --git a/MAC/Services/TBB/TBBClient/bin/tbbservice_stop_datawriters b/MAC/Services/TBB/TBBClient/bin/tbbservice_stop_datawriters index 0ae0a3ed99f5ac1c3bef443e0d223746964dfa9b..f1568b73c43b8e61d2715130026e9c1ba15ed936 100755 --- a/MAC/Services/TBB/TBBClient/bin/tbbservice_stop_datawriters +++ b/MAC/Services/TBB/TBBClient/bin/tbbservice_stop_datawriters @@ -2,30 +2,15 @@ import logging from optparse import OptionParser -from lofar.mac.tbbservice.config import DEFAULT_TBB_BUSNAME, DEFAULT_TBB_SERVICENAME, DEFAULT_BROKER -from lofar.mac.tbbservice.config import DEFAULT_TBB_NOTIFICATION_BUSNAME +from lofar.mac.tbbservice.config import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.mac.tbbservice.client import stop_datawriters_and_wait_until_finished if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) # Check the invocation arguments - parser = OptionParser('%prog [options]', - description='issue a command to the tbb service to stop the datawriters, and wait until they are done.') - parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the qpid broker, default: %default') + parser = OptionParser('%prog', description='issue a command to the tbb service to stop the datawriters, and wait until they are done.') - parser.add_option("-b", "--tbb_service_busname", dest="tbb_service_busname", type="string", - default=DEFAULT_TBB_BUSNAME, - help="Name of the bus on which the tbb service listens. [default: %default]") - parser.add_option("-s", "--tbb_service_name", dest="tbb_service_name", type="string", - default=DEFAULT_TBB_SERVICENAME, - help="Name of the tbb service. [default: %default]") - parser.add_option("-n", "--tbb_notification_busname", dest="tbb_notification_busname", type="string", - default=DEFAULT_TBB_NOTIFICATION_BUSNAME, - help='Name of the notification bus exchange on the qpid broker on which the tbb notifications are published, default: %default') (options, args) = parser.parse_args() - stop_datawriters_and_wait_until_finished(service_busname=options.tbb_service_busname, - service_name=options.tbb_service_name, - notification_busname=options.tbb_notification_busname, - broker=options.broker) + stop_datawriters_and_wait_until_finished() diff --git a/MAC/Services/TBB/TBBClient/bin/tbbservice_upload_to_cep b/MAC/Services/TBB/TBBClient/bin/tbbservice_upload_to_cep index 692e0b0efee8803d98780dd0d5125b80be42c416..59d545d549102281152ecd2674de1f345d1df60a 100755 --- a/MAC/Services/TBB/TBBClient/bin/tbbservice_upload_to_cep +++ b/MAC/Services/TBB/TBBClient/bin/tbbservice_upload_to_cep @@ -7,6 +7,7 @@ from lofar.mac.tbb.tbb_upload_to_cep import parse_args if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) args = parse_args() - with TBBRPC() as rpc: + + with TBBRPC.create() as rpc: rpc.upload_data(args.stations, args.dm, args.start_time, args.duration, args.sub_bands, args.wait_time, args.boards) diff --git a/MAC/Services/TBB/TBBClient/lib/__init__.py b/MAC/Services/TBB/TBBClient/lib/__init__.py index 15440435e7c21d3c801a20a0598c33faa871ea8c..912380e96e3adef66dacab970cfc768441cb9904 100644 --- a/MAC/Services/TBB/TBBClient/lib/__init__.py +++ b/MAC/Services/TBB/TBBClient/lib/__init__.py @@ -1,47 +1,89 @@ #!/usr/bin/env python3 from threading import Event +from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME, BusListenerJanitor from lofar.mac.tbbservice.client.tbbservice_rpc import TBBRPC -from lofar.mac.tbbservice.client.tbbbuslistener import TBBBusListener +from lofar.mac.tbbservice.client.tbbbuslistener import TBBBusListener, TBBEventMessageHandler +from lofar.common.util import single_line_with_single_spaces class Waiter(TBBBusListener): - '''Helper class which overrides TBBBusListener.onDataWritersFinished - and waits for synchronization using threading Events''' - def __init__(self, busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, **kwargs): + '''Helper class waiting for the DataWritersFinished event''' + + class WaiterTBBEventMessageHandler(TBBEventMessageHandler): + '''concrete implementation of the TBBEventMessageHandler, + setting the waiter's wait_event (threading.Event) to signal the waiter that waiting is over.''' + def __init__(self, waiter): + super().__init__() + self._waiter = waiter + + def onDataWritersStarting(self, msg_content): + logger.info("received DataWritersStarting event: %s", single_line_with_single_spaces(str(msg_content))) + + def onDataWritersStarted(self, msg_content): + logger.info("received DataWritersStarted event: %s", single_line_with_single_spaces(str(msg_content))) + + def onDataWritersStopping(self, msg_content): + logger.info("received DataWritersStopping event: %s", single_line_with_single_spaces(str(msg_content))) + + def onDataWritersStopped(self, msg_content): + logger.info("received DataWritersStopped event: %s", single_line_with_single_spaces(str(msg_content))) + + def onDataWritersFinished(self, msg_content): + logger.info("received DataWritersFinished event: %s", single_line_with_single_spaces(str(msg_content))) + # signal that we're done waiting + self._waiter.wait_event.set() + + def __init__(self, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): self.wait_event = Event() - super(Waiter, self).__init__(busname=busname, broker=broker, **kwargs) + super(Waiter, self).__init__(handler_type=Waiter.WaiterTBBEventMessageHandler, + handler_kwargs={'waiter': self}, + exchange=exchange, broker=broker) - def onDataWritersFinished(self, msg_content): - # signal that we're done waiting - self.wait_event.set() + def wait(self, timeout: int=24*3600, log_interval: int=60): + '''wait until the DataWritersFinished event is received. + :param timeout: timeout in seconds + :param log_interval: interval in seconds to log a message that we are still waiting. + :raises TimeoutError after timeout seconds + ''' + start = datetime.utcnow() + while datetime.utcnow() - start <= timedelta(seconds=timeout): + logger.info("waiting for DataWritersFinished event... timout in %s", timedelta(seconds=timeout)-(datetime.utcnow() - start)) + if self.wait_event.wait(max(1, min(timeout, log_interval))): + return - def wait(self, timeout=24*3600): - # wait until onDataWritersFinished - self.wait_event.wait(timeout) + raise TimeoutError("Did not receive a DataWritersFinished event within %s seconds" %(timeout,)) def start_datawriters_and_wait_until_finished(output_path, num_samples_per_subband, - busname=DEFAULT_BUSNAME, + timeout: int = 24 * 3600, log_interval: int = 60, + exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): ''' convenience method which issues a start_datawriters command to the tbbservice, and waits until all writers are done ''' - with Waiter() as waiter: - with TBBRPC(busname=busname, broker=broker) as rpc: + with BusListenerJanitor(Waiter(exchange=exchange, broker=broker)) as waiter: + with TBBRPC.create(exchange=exchange, broker=broker) as rpc: rpc.start_datawriters(output_path=output_path, num_samples_per_subband=num_samples_per_subband) - waiter.wait() + logger.info("it's ok to cancel this program. datawriters will continue in the background.") + waiter.wait(timeout, log_interval) -def stop_datawriters_and_wait_until_finished(busname=DEFAULT_BUSNAME, - broker=DEFAULT_BROKER): + logger.info("Datawriters finished") + +def stop_datawriters_and_wait_until_finished(timeout: int = 24 * 3600, log_interval: int = 60, + exchange=DEFAULT_BUSNAME, + broker=DEFAULT_BROKER): ''' convenience method which issues a stop_datawriters command to the tbbservice, and waits until all writers are done ''' - with Waiter() as waiter: - with TBBRPC(busname=busname, broker=broker) as rpc: + with BusListenerJanitor(Waiter(exchange=exchange, broker=broker)) as waiter: + with TBBRPC.create(exchange=exchange, broker=broker) as rpc: rpc.stop_datawriters() - waiter.wait() + logger.info("it's ok to cancel this program. datawriters will stop in the background.") + waiter.wait(timeout, log_interval) + + logger.info("Datawriters stopped") diff --git a/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py b/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py index 88d87de05d5d198056f3b92df5b48e325c8856b9..59f570a6463005e46061b78d5fbc382ef5ddecde 100644 --- a/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py +++ b/MAC/Services/TBB/TBBClient/lib/tbbbuslistener.py @@ -22,6 +22,7 @@ from lofar.messaging.messagebus import AbstractMessageHandler, BusListener, LofarMessage, EventMessage from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.mac.tbbservice.config import DEFAULT_TBB_NOTIFICATION_PREFIX +from lofar.common.util import single_line_with_single_spaces import logging logger = logging.getLogger() @@ -38,9 +39,7 @@ class TBBEventMessageHandler(AbstractMessageHandler): stripped_subject = msg.subject.replace("%s." % DEFAULT_TBB_NOTIFICATION_PREFIX, '') - logger.info("TBBEventMessageHandler.handleMessage on%s: %s" % (stripped_subject, str(msg.content).replace('\n', ' '))) - - logger.info("on%s: %s", msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' ')) + logger.debug("TBBEventMessageHandler.handle_message: on%s content=%s", stripped_subject, single_line_with_single_spaces(str(msg.content))) if stripped_subject == 'DataWritersStarting': self.onDataWritersStarting(msg.content) @@ -82,10 +81,8 @@ class TBBEventMessageHandler(AbstractMessageHandler): class TBBBusListener(BusListener): - """The OTDBBusListener is a normal BusListener listening specifically to EventMessages with OTDB notification subjects. - It uses by default the OTDBEventMessageHandler to handle the EventMessages. - If you want to implement your own behaviour, then derive a subclass of the OTDBEventMessageHandler, and inject that in this OTDBBusListener. - See example at the top of this file. + """The TBBBusListener is a normal BusListener listening specifically to EventMessages with TBB notification subjects. + It uses by default the TBBEventMessageHandler to handle the EventMessages. """ def __init__(self, handler_type: TBBEventMessageHandler.__class__ = TBBEventMessageHandler, handler_kwargs: dict = None, diff --git a/MAC/Services/TBB/TBBClient/lib/tbbservice_rpc.py b/MAC/Services/TBB/TBBClient/lib/tbbservice_rpc.py index 7cc00db36344b43c6aee1d76978a3be13033732e..4c5c2e665fc19412a8fbed93b5494d0ce33fe356 100644 --- a/MAC/Services/TBB/TBBClient/lib/tbbservice_rpc.py +++ b/MAC/Services/TBB/TBBClient/lib/tbbservice_rpc.py @@ -1,20 +1,18 @@ #!/usr/bin/env python3 import logging -from lofar.messaging import RPCClientContextManagerMixin, RPCClient, DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import RPCClientContextManagerMixin, RPCClient, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT from lofar.mac.tbbservice.config import DEFAULT_TBB_SERVICENAME logger = logging.getLogger(__name__) -DEFAULT_TIMEOUT = 24*60*60, # tbb dumps can take a long time.... timeout of 24 hours is ok. - class TBBRPC(RPCClientContextManagerMixin): def __init__(self, rpc_client: RPCClient = None): super(TBBRPC, self).__init__() self._rpc_client = rpc_client or RPCClient(DEFAULT_TBB_SERVICENAME) @staticmethod - def create(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, timeout: int = DEFAULT_TIMEOUT): + def create(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, timeout: int = DEFAULT_RPC_TIMEOUT): return TBBRPC(RPCClient(service_name=DEFAULT_TBB_SERVICENAME, exchange=exchange, broker=broker, timeout=timeout)) def start_datawriters(self, output_path, num_samples_per_subband, voevent_xml=""): @@ -82,7 +80,7 @@ if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) import pprint - with TBBRPC() as rpc: + with TBBRPC.create() as rpc: import time from lofar.mac.tbb.tbb_util import expand_list logger.info(rpc.start_datawriters(output_path=None, num_samples_per_subband=None)) diff --git a/MAC/Services/TBB/TBBServer/lib/tbbservice.py b/MAC/Services/TBB/TBBServer/lib/tbbservice.py index adf2edd7204acb6786fbaf6b796e026916b252af..83f996d297f83af3fceec355854d657964b65b37 100644 --- a/MAC/Services/TBB/TBBServer/lib/tbbservice.py +++ b/MAC/Services/TBB/TBBServer/lib/tbbservice.py @@ -51,6 +51,7 @@ from lofar.common.lcu_utils import stationname2hostname from lofar.mac.tbb.tbb_caltables import add_station_calibration_tables_h5_files_in_directory from lofar.mac.tbb.tbb_cable_delays import add_dipole_cable_delays_h5_files_in_directory from lofar.mac.tbbservice.server.tbbservice_config import * +from lofar.common.util import single_line_with_single_spaces class TBBServiceMessageHandler(ServiceMessageHandler): @@ -61,10 +62,11 @@ class TBBServiceMessageHandler(ServiceMessageHandler): self.procs = {} def _send_event_message(self, subject, content): + prefixed_subject = '%s.%s' % (DEFAULT_TBB_NOTIFICATION_PREFIX, subject) logger.info('Sending notification to %s with subject \'%s\' %s' % (self.exchange, - subject, - str(content).replace('\n', ' '))) - self.send(EventMessage(subject='%s%s' % (DEFAULT_TBB_NOTIFICATION_PREFIX, subject), + prefixed_subject, + single_line_with_single_spaces(str(content)))) + self.send(EventMessage(subject=prefixed_subject, content=content)) def _update_parset(self, parset, updates): @@ -105,6 +107,8 @@ class TBBServiceMessageHandler(ServiceMessageHandler): :return: the parset of the running obs """ + logger.info("determining running observation...") + # fetch parset of current active observation and save a modified version for the tbb writers with RADBRPC.create(exchange=self.exchange, broker=self.broker) as rarpc: running_observations = rarpc.getTasks(task_status='active', task_type='observation') @@ -115,6 +119,8 @@ class TBBServiceMessageHandler(ServiceMessageHandler): # pick first TODO: determine actual relevant observation otdb_id = running_observations[0]['otdb_id'] + logger.info("running observation otdb_id=%s. Fetching parset...", otdb_id) + with OTDBRPC.create(exchange=self.exchange, broker=self.broker) as otdbrpc: return parameterset(otdbrpc.taskGetSpecification(otdb_id=otdb_id)['specification']) @@ -499,7 +505,7 @@ def main(): handler_type=TBBServiceMessageHandler, exchange=options.busname, broker=options.broker, - num_threads=2): + num_threads=1): logger.info('*****************************************') logger.info('Started TBBService') diff --git a/MAC/Services/TBB/TBBServiceCommon/lib/config.py b/MAC/Services/TBB/TBBServiceCommon/lib/config.py index ed968623e16b466c4c6d6a45fcb6024b8abeba6f..9f86642cb0e30253baf6abadd9e6478bcd8d9533 100644 --- a/MAC/Services/TBB/TBBServiceCommon/lib/config.py +++ b/MAC/Services/TBB/TBBServiceCommon/lib/config.py @@ -1,3 +1,4 @@ -DEFAULT_TBB_SERVICENAME = 'TBB.Service' +from lofar.messaging.config import DEFAULT_BUSNAME, DEFAULT_BROKER +DEFAULT_TBB_SERVICENAME = 'TBB.Service' DEFAULT_TBB_NOTIFICATION_PREFIX = 'TBB.notification' diff --git a/MAC/TBB/lib/tbb_freeze.py b/MAC/TBB/lib/tbb_freeze.py index 815426bac1c2391fb7937176631f11dd8fa2722a..e9edd9be0bd07e793d55e066bd55366eea075626 100755 --- a/MAC/TBB/lib/tbb_freeze.py +++ b/MAC/TBB/lib/tbb_freeze.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) from lofar.mac.tbb.tbb_config import lcurun_command, tbb_command from lofar.mac.tbb.tbb_util import split_stations_by_boardnumber -from lofar.common.lcu_utils import execute_in_parallel_over_stations +from lofar.common.lcu_utils import execute_in_parallel_over_stations, translate_user_station_string_into_station_list from lofar.common.subprocess_utils import wrap_composite_command def freeze_tbb(stations, dm, timesec, timensec): @@ -26,8 +26,7 @@ def freeze_tbb(stations, dm, timesec, timensec): :return: """ - if isinstance(stations, str): - stations = stations.split(',') + stations = translate_user_station_string_into_station_list(stations) logger.info('Freezing TBB boards for stations: %s', ', '.join(stations)) @@ -91,6 +90,7 @@ def parse_args(): if args.dm is None: logger.error("No dm provided") + parser.print_help() exit(1) return args diff --git a/MAC/TBB/lib/tbb_load_firmware.py b/MAC/TBB/lib/tbb_load_firmware.py index 65e9e2ba355b6427266117a12cba90b6426e613f..d0778847648a123d43008c00873f2cce7ed62cdf 100755 --- a/MAC/TBB/lib/tbb_load_firmware.py +++ b/MAC/TBB/lib/tbb_load_firmware.py @@ -11,7 +11,8 @@ import time import subprocess import logging from lofar.mac.tbb.tbb_config import supported_modes, lcurun_command, tbb_command - +from lofar.common.lcu_utils import translate_user_station_string_into_station_list +from lofar.common.subprocess_utils import check_output_returning_strings def load_tbb_firmware(stations, mode): logging.info('Loading TBB firmware for mode \"%s\"' % (mode)) @@ -26,6 +27,8 @@ def load_tbb_firmware(stations, mode): else: slot = 1 + stations = translate_user_station_string_into_station_list(stations) + logging.info("It is assumed that the firmware for mode \"%s\" is in slot %d!" % (mode, slot)) relay = lcurun_command + [stations] @@ -47,7 +50,7 @@ def load_tbb_firmware(stations, mode): cmd = [tbb_command, '--imageinfo=%s' % str(board)] cmd = relay + cmd logging.info('Executing %s' % ' '.join(cmd)) - logging.info(subprocess.check_output(cmd)) + logging.info(subprocess.check_output_returning_strings(cmd)) def parse_args(): diff --git a/MAC/TBB/lib/tbb_release_recording.py b/MAC/TBB/lib/tbb_release_recording.py index 7763247134e088cef6e2c8f17e179056ec160660..8e7435259a1c81a25ed4e20024e43e5077998d40 100755 --- a/MAC/TBB/lib/tbb_release_recording.py +++ b/MAC/TBB/lib/tbb_release_recording.py @@ -11,10 +11,12 @@ import time import subprocess import logging from lofar.mac.tbb.tbb_config import lcurun_command, tbb_command +from lofar.common.lcu_utils import translate_user_station_string_into_station_list def release_tbb(stations): logging.info('Releasing TBB recording') + stations = translate_user_station_string_into_station_list(stations) relay = lcurun_command + [stations] cmd = relay + [tbb_command, '--free'] diff --git a/MAC/TBB/lib/tbb_restart_recording.py b/MAC/TBB/lib/tbb_restart_recording.py index 9f1b869b245ce75358d5f19a296e5daf102ecd88..4dd5f2e2b8bac2a4835a6fcdb3d180fdb5c50420 100755 --- a/MAC/TBB/lib/tbb_restart_recording.py +++ b/MAC/TBB/lib/tbb_restart_recording.py @@ -12,11 +12,14 @@ import time import subprocess import logging from lofar.mac.tbb.tbb_config import supported_modes, lcurun_command, tbb_command +from lofar.common.lcu_utils import translate_user_station_string_into_station_list def restart_tbb_recording(stations): logging.info("Restarting TBB recording") + stations = translate_user_station_string_into_station_list(stations) + relay = lcurun_command + [stations] cmd = relay + [tbb_command, "--record"] logging.info("Executing %s" % " ".join(cmd)) diff --git a/MAC/TBB/lib/tbb_set_storage.py b/MAC/TBB/lib/tbb_set_storage.py index 85f90740b5483c2c815a1eea47fdabfe58901fae..60f620661feba61b07f84f5f0c845900f1e1e55c 100755 --- a/MAC/TBB/lib/tbb_set_storage.py +++ b/MAC/TBB/lib/tbb_set_storage.py @@ -11,11 +11,14 @@ import time import subprocess import logging from lofar.mac.tbb.tbb_config import lcurun_command, tbb_command +from lofar.common.lcu_utils import translate_user_station_string_into_station_list def set_tbb_storage(map): logging.info('Setting TBB storage nodes') for stations, node in map.items(): + stations = translate_user_station_string_into_station_list(stations) + relay = lcurun_command + [stations] cmds = [ @@ -53,6 +56,7 @@ def create_mapping(stations, nodes): :param nodes: list of nodes :return: dict mapping stations to nodes, e.g. {station1: node1, station2: node2} """ + stations = translate_user_station_string_into_station_list(stations) # zip truncates to shortest list, so make sure there are enough nodes, then map each station to a node logging.info("Mapping stations %s on %s nodes " % (stations, nodes)) diff --git a/MAC/TBB/lib/tbb_start_recording.py b/MAC/TBB/lib/tbb_start_recording.py index 118edde5098ba82e23ae41fc7228f771654e7a3b..61877a912ffb9b7f13bfd7eea65412529bd1831a 100755 --- a/MAC/TBB/lib/tbb_start_recording.py +++ b/MAC/TBB/lib/tbb_start_recording.py @@ -11,6 +11,7 @@ import time import subprocess import logging from lofar.mac.tbb.tbb_config import supported_modes, lcurun_command, tbb_command, rsp_command +from lofar.common.lcu_utils import translate_user_station_string_into_station_list def start_tbb(stations, mode, subbands): @@ -31,6 +32,8 @@ def start_tbb(stations, mode, subbands): [tbb_command, '--record'] ] + stations = translate_user_station_string_into_station_list(stations) + relay = lcurun_command + [stations] for cmd in cmds: diff --git a/MAC/TBB/lib/tbb_upload_to_cep.py b/MAC/TBB/lib/tbb_upload_to_cep.py index dace5fd8360d315abb5fd65498a292fb42b39c32..dd8fb859e4411d37bf13f7e986db381b3c9f4f9f 100755 --- a/MAC/TBB/lib/tbb_upload_to_cep.py +++ b/MAC/TBB/lib/tbb_upload_to_cep.py @@ -13,6 +13,7 @@ import logging from lofar.mac.tbb.tbb_config import * from lofar.mac.tbb.tbb_util import split_stations_by_boardnumber, expand_list, calculate_adjusted_start_time, wrap_remote_composite_command from lofar.common.lcu_utils import execute_in_parallel_over_stations +from lofar.common.lcu_utils import translate_user_station_string_into_station_list def upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, boards): """ @@ -27,6 +28,8 @@ def upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, bo """ logging.info("Uploadind TBB data...") + stations = translate_user_station_string_into_station_list(stations) + # determine number of tbb boards per station: stationlists = split_stations_by_boardnumber(stations) diff --git a/MAC/TBB/lib/tbb_util.py b/MAC/TBB/lib/tbb_util.py index cb8efe23f9e29ea40c54479e99e643e18c59ee6f..a4aef9348fdb58c277fac5d7baac53a056a7a11f 100644 --- a/MAC/TBB/lib/tbb_util.py +++ b/MAC/TBB/lib/tbb_util.py @@ -1,7 +1,8 @@ from lofar.mac.tbb.tbb_config import * from lxml import etree from lofar.common.cep4_utils import * -from subprocess import check_output +from lofar.common.subprocess_utils import check_output_returning_strings + import logging logger = logging.getLogger(__name__) @@ -22,7 +23,7 @@ def get_cpu_nodes_running_tbb_datawriter_via_slurm(): cmd = '''sacct --name=run-tbbwriter --user=lofarsys -o nodelist%128 -n -s R -X''' cmd = wrap_command_in_cep4_head_node_ssh_call(cmd) logger.debug('executing command: %s', ' '.join(cmd)) - out = check_output(cmd).strip() + out = check_output_returning_strings(cmd).strip() logger.debug(' out: %s', out) return convert_slurm_nodes_string_to_node_number_list(out) diff --git a/SAS/TriggerServices/lib/config.py b/SAS/TriggerServices/lib/config.py index db920545c16e01dba4d1546078a809fd42ae3a59..df69c858fc417836fc88910c682500dba5ac0475 100644 --- a/SAS/TriggerServices/lib/config.py +++ b/SAS/TriggerServices/lib/config.py @@ -8,18 +8,16 @@ from lofar.mac.tbb.tbb_util import expand_list TRIGGER_SERVICENAME = "triggerservice" # VO-Events -# todo: configure broker host/port and filter_for Package_Type integer as provided for ALERT -ALERT_BROKER_HOST = '127.0.0.1' # arts041.apertif +ALERT_BROKER_HOST = '10.87.15.250' # phobos.nfra.nl. ALERT_BROKER_PORT = 8099 ALERT_PACKET_TYPE_FILTER = None # list of int or None for all DEFAULT_TBB_CEP_NODES = None # list of nodes to dump to, e.g. ["cpu%s" % str(num).zfill(2) for num in expand_list("01-50")], or None for all available DEFAULT_TBB_SUBBANDS = expand_list("10-496") # The subbands to dump. Note: When starting the recording (tbbservice_start_recording), the subband range HAS to cover 487 subbands (typically 10-496) -DEFAULT_TBB_STATIONS = ['cs004'] # ['rs409'] # ['cs001','cs002','cs003','cs004','cs005','cs006','cs007','cs011','cs013','cs017','cs021','cs024','cs026','cs028','cs030','cs031','cs032','cs101','cs103','cs201','cs301','cs302','cs401','cs501','rs106','rs205','rs208','rs210','rs305','rs306','rs307','rs310','rs406','rs407','rs409','rs503','rs508','rs509'] # List of stations to include in tbb dump (filtered for those who are actually observing at event ToA) +DEFAULT_TBB_STATIONS = ['cs001','cs002','cs003','cs004','cs005','cs006','cs007','cs011','cs013','cs017','cs021','cs024','cs026','cs028','cs030','cs031','cs032','cs101','cs103','cs201','cs301','cs302','cs401','cs501','rs106','rs205','rs208','rs210','rs305','rs306','rs307','rs310','rs406','rs407','rs409','rs503','rs508','rs509'] # List of stations to include in tbb dump (filtered for those who are actually observing at event ToA) DEFAULT_TBB_PROJECT = "COM_ALERT" DEFAULT_TBB_ALERT_MODE = "subband" DEFAULT_TBB_BOARDS = expand_list("0-5") -DEFAULT_TBB_DUMP_DURATION = 0.1 # todo! -> should be 5.0 in production +DEFAULT_TBB_DUMP_DURATION = 5.0 # should be 5.0 in production -STOPTIME_DELAY = 0.1 # stop this much after the actual stoptime to make sure the data is not stopped too early. This should be zero, especially when we want to dump the full 5 seconds, but for shorter dump durations, we can give some wiggle room. diff --git a/SAS/TriggerServices/lib/trigger_service.py b/SAS/TriggerServices/lib/trigger_service.py index f7152f0fdd4d443fb4e8be8dc2538052d120ec33..20494437fe704317f4789eddac4dcda959aba04a 100644 --- a/SAS/TriggerServices/lib/trigger_service.py +++ b/SAS/TriggerServices/lib/trigger_service.py @@ -290,10 +290,10 @@ class ALERTHandler(VOEventListenerInterface): # do a fast direct freeze call here, so the boards still contain data for this event. # if we freeze via rpc/service calls, that takes time, so we might loose precious data from the buffers. - freeze_tbb(lcu_str, dm, stoptime_sec + STOPTIME_DELAY, stoptime_nsec) + freeze_tbb(lcu_str, dm, stoptime_sec, stoptime_nsec) # initiate the dumping via an rpc call to the tbbservice which takes care of all bookkeeping. - with TBBRPC() as rpc: + with TBBRPC.create() as rpc: rpc.do_tbb_subband_dump(starttime, duration, dm, DEFAULT_TBB_PROJECT, triggerid, available_stations, DEFAULT_TBB_SUBBANDS, DEFAULT_TBB_BOARDS, DEFAULT_TBB_CEP_NODES, voevent_xml, stoptime=stoptime) else: raise Exception('ALERT event %s rejected by science pre-flight checks!' % triggerid) diff --git a/SAS/TriggerServices/lib/trigger_service_rpc.py b/SAS/TriggerServices/lib/trigger_service_rpc.py index 834d9d454a8aa524609108448837dd3fc58369fd..9eda7b421ba9889de4d3f9d3ad7cfa62a523b662 100644 --- a/SAS/TriggerServices/lib/trigger_service_rpc.py +++ b/SAS/TriggerServices/lib/trigger_service_rpc.py @@ -1,5 +1,5 @@ -from lofar.messaging import RPCClient, RPCClientContextManagerMixin, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_BUS_TIMEOUT +from lofar.messaging import RPCClient, RPCClientContextManagerMixin, DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_RPC_TIMEOUT from .config import TRIGGER_SERVICENAME import logging logger = logging.getLogger(__file__) @@ -12,7 +12,7 @@ class TriggerRPC(RPCClientContextManagerMixin): self._rpc_client = rpc_client or RPCClient(service_name=TRIGGER_SERVICENAME) @staticmethod - def create(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, timeout: int = DEFAULT_BUS_TIMEOUT): + def create(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, timeout: int=DEFAULT_RPC_TIMEOUT): """Create a TriggerRPC connecting to the given exchange/broker on the default TRIGGER_SERVICENAME service""" return TriggerRPC(RPCClient(service_name=TRIGGER_SERVICENAME, exchange=exchange, broker=broker, timeout=timeout))