From 572c5cec21dcddbb64f33b81da254d004657438a Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Fri, 1 Feb 2019 16:10:22 +0000 Subject: [PATCH] SW-488: commisioning fixes from test with Joern Sander Thomas and me --- LCS/PyCommon/subprocess_utils.py | 2 +- MAC/Services/TBB/TBBServer/lib/tbbservice.py | 36 +++++++++--- MAC/TBB/lib/tbb_upload_to_cep.py | 58 ++++++++++++-------- 3 files changed, 65 insertions(+), 31 deletions(-) diff --git a/LCS/PyCommon/subprocess_utils.py b/LCS/PyCommon/subprocess_utils.py index fac7bd84c55..3fa36b70b56 100644 --- a/LCS/PyCommon/subprocess_utils.py +++ b/LCS/PyCommon/subprocess_utils.py @@ -71,7 +71,7 @@ def execute_in_parallel(cmd_lists, timeout=3600, max_parallel=32): # log results of commands for cmd_list, result in zip(cmd_lists, results): - logger.debug("Results for cmd: %s\n returncode=%s\n stdout=%s\n stderr=%s", + logger.info("Results for cmd: %s\n returncode=%s\n stdout=%s\n stderr=%s", " ".join(cmd_list), result.returncode, result.stdout.rstrip(), result.stderr.rstrip()) return results diff --git a/MAC/Services/TBB/TBBServer/lib/tbbservice.py b/MAC/Services/TBB/TBBServer/lib/tbbservice.py index 096f4a3095a..cfeb22faaf4 100644 --- a/MAC/Services/TBB/TBBServer/lib/tbbservice.py +++ b/MAC/Services/TBB/TBBServer/lib/tbbservice.py @@ -47,7 +47,7 @@ from lofar.mac.tbb.tbb_set_storage import set_tbb_storage, create_mapping from lofar.mac.tbb.tbb_start_recording import start_tbb from lofar.mac.tbb.tbb_upload_to_cep import upload_tbb_data from lofar.parameterset import parameterset -from lofar.mac.tbb.tbb_util import parse_parset_from_voevent +from lofar.mac.tbb.tbb_util import parse_parset_from_voevent, get_cpu_nodes_available_for_tbb_datawriters_sorted_by_load from lofar.common.lcu_utils import stationname2hostname from lofar.mac.tbb.tbb_caltables import add_station_calibration_tables_h5_files_in_directory from lofar.mac.tbb.tbb_cable_delays import add_dipole_cable_delays_h5_files_in_directory @@ -222,16 +222,16 @@ class TBBControlService: 'Observation.TBB.TBBsetting.observatoryCoordinates': 0, 'Observation.TBB.TBBsetting.observatoryCoordinatesCoordinateSystem': 0, 'Observation.TBB.TBBsetting.triggerId': 0, - 'Observation.TBB.TBBsetting.additionalInfo': voevent, + #'Observation.TBB.TBBsetting.additionalInfo': voevent, # older keys we probably still want to fill here: 'Observation.TBB.TBBsetting.triggerType': 'Unknown', 'Observation.TBB.TBBsetting.triggerVersion': 0 } - self._update_parset(parset, defaults_parset) + parset = self._update_parset(parset, defaults_parset) # update parset with values from received voevent voevent_parset = parse_parset_from_voevent(voevent) - self._update_parset(parset, voevent_parset) + parset = self._update_parset(parset, voevent_parset) return parset_path, parset @@ -250,11 +250,28 @@ class TBBControlService: # TODO: make dynamic, depending on which stations are actually used ports = ','.join(str(x) for x in range(31664, 31670)) + # TODO: do not start more dw's than there are stations. available_nodes = get_cpu_nodes_available_for_tbb_datawriters_sorted_by_load(min_nr_of_free_nodes=1) start = datetime.utcnow() timeout = timedelta(days=1) #TODO: make timeout configurable (or an argument) + #create output dir + cmd = ['mkdir', '-p', output_path] + cmd = wrap_command_in_cep4_head_node_ssh_call(cmd) + logger.info('creating output dir on cep4: %s, executing: %s', output_path, ' '.join(cmd)) + proc = Popen(cmd) + proc.wait() + + #write parset + parset_dict = parset.dict() + parset_str = '\n'.join('%s = %s' % (k,parset_dict[k]) for k in sorted(parset_dict.keys())).replace('"', '').replace('ObsSW.', '') + cmd = ['echo', '\"%s\"' % (parset_str,), '>', parset_path] + cmd = wrap_command_in_cep4_head_node_ssh_call(cmd) + logger.info('writing tbb datawriter parset, executing: %s', ' '.join(cmd)) + proc = Popen(cmd) + proc.wait() + # TODO: what should be do if datawriters are already running? kill/wait for them first? self.procs = {} @@ -264,10 +281,13 @@ class TBBControlService: '-p', ports, '-k', '0', '-t', '60', + '-d', '3840', #TODO: compute instead of hardcoded '-o', output_path] # wrap the command in a cep4 docker ssh call - cmd = wrap_command_for_docker(cmd, DATAWRITER_DOCKER_IMAGE_NAME, DATAWRITER_DOCKER_IMAGE_TAG, added_privileges=True) + cmd = wrap_command_for_docker(cmd, DATAWRITER_DOCKER_IMAGE_NAME, DATAWRITER_DOCKER_IMAGE_TAG, + mount_dirs=['/data/projects', '/data/scratch', '/data/share', '/data/parsets'], + added_privileges=True) cmd = wrap_command_in_cep4_cpu_node_ssh_call(cmd, node, via_head=True) logger.info('starting datawriter on node %s, executing: %s', node, ' '.join(cmd)) @@ -283,7 +303,7 @@ class TBBControlService: Monitor started procs until they are all done or timeout seconds have passed. :param timeout: Timeout in seconds until data writers will be - forcefully killed. The default is 24 hours. + forcefully killed. The default is 24 hours. ''' start = datetime.utcnow() while self.procs: @@ -496,10 +516,10 @@ class TBBControlService: datawriter_nodes = self.start_datawriters(output_path=output_path, voevent=voevent_xml) if nodes is None: logger.info('Nodes to use are not configured, using all with datawriter') - nodes = datawriter_nodes + nodes = ['cpu%02d' % (node,) for node in datawriter_nodes] else: logger.info('Filtering node list for those who actually have a running datawriter') - nodes = [node for node in nodes if node in datawriter_nodes] + nodes = ['cpu%02d' % (node,) for node in nodes if node in datawriter_nodes] # create mapping for storage nodes try: diff --git a/MAC/TBB/lib/tbb_upload_to_cep.py b/MAC/TBB/lib/tbb_upload_to_cep.py index 1a782ae8212..2a457c0c4c8 100755 --- a/MAC/TBB/lib/tbb_upload_to_cep.py +++ b/MAC/TBB/lib/tbb_upload_to_cep.py @@ -12,7 +12,7 @@ import subprocess import logging from lofar.mac.tbb.tbb_config import * from lofar.mac.tbb.tbb_util import split_stations_by_boardnumber, expand_list, calculate_adjusted_start_time, wrap_remote_composite_command - +from lofar.common.lcu_utils import execute_in_parallel_over_stations def upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, boards): """ @@ -40,33 +40,47 @@ def upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, bo # batch handle all stations with same number of boards through lcurun for num_boards in stationlists.keys(): logging.debug("Creating TBB commands for stations with %s boards..." % num_boards) - relay = lcurun_command + [",".join(stationlists[num_boards])] + #relay = lcurun_command + [",".join(stationlists[num_boards])] + stations_with_num_boards = stationlists[num_boards] # iterate over tbb boards - board_list = "" + board_list = [] for board in boards: if int(board) <= num_boards: - board_list += " %s" % (board) + board_list.append(board) else: logging.error("Stations \"%s\" do not have a board #%s! The stations have only %s boards. Continuing without this board." % (stationlists[num_boards], board, num_boards)) continue - # Now build the two loops that execute tbbctl for every channel and - # for every board. - # Why first loop over channels and then over boards? That allows - # to add only one sleep after a channel is uploaded for all boards. - # Otherwise each board would have the sleep and we would sleep - # # of boards times for every channel instead. - cmd = "for((channel = 0; channel <= 15; ++channel)); do for board in" - cmd += board_list - cmd += "; do " - cmd += tbb_cmd % (sub_band, adjusted_start_time, slice_nr, duration) - cmd += "; done; sleep " - cmd += str(wait_time) - cmd += "; done" - cmd = wrap_remote_composite_command(cmd) - command = relay + [cmd] - logging.info('Executing %s', ' '.join(command)) - subprocess.check_call(command) + + cmd_list = [] + for channel in range(15): + for board in board_list: + tbb_cmd = "tbbctl --readband=%s,%s,%s,%s,%s,%s" % (board, channel, sub_band, adjusted_start_time, slice_nr, duration) + cmd_list.append(tbb_cmd) + cmd_list.append("sleep %s" % (wait_time,)) + + full_cmd = '\"%s\"' % (' ; '.join(cmd_list),) + + execute_in_parallel_over_stations(full_cmd, stations_with_num_boards) + + + # # Now build the two loops that execute tbbctl for every channel and + # # for every board. + # # Why first loop over channels and then over boards? That allows + # # to add only one sleep after a channel is uploaded for all boards. + # # Otherwise each board would have the sleep and we would sleep + # # # of boards times for every channel instead. + # cmd = "for((channel = 0; channel <= 15; ++channel)); do for board in" + # cmd += board_list + # cmd += "; do " + # cmd += tbb_cmd % (sub_band, adjusted_start_time, slice_nr, duration) + # cmd += "; done; sleep " + # cmd += str(wait_time) + # cmd += "; done" + # cmd = wrap_remote_composite_command(cmd) + # command = relay + [cmd] + # logging.info('Executing %s', ' '.join(command)) + # subprocess.check_call(command) time.sleep(wait_time) logging.info("Uploading TBB data done.") @@ -94,7 +108,7 @@ def parse_args(): if args.wait_time_between_sub_bands is not None and args.wait_time_between_sub_bands > 0.0: args.wait_time = float(args.wait_time_between_sub_bands) else: - args.wait_time = float(args.duration) * 0.00012 + args.wait_time = float(args.duration/1000.0) * 0.00012 return args -- GitLab