Skip to content
Snippets Groups Projects
Commit 572c5cec authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-488: commisioning fixes from test with Joern Sander Thomas and me

parent 383eabec
No related branches found
No related tags found
No related merge requests found
...@@ -71,7 +71,7 @@ def execute_in_parallel(cmd_lists, timeout=3600, max_parallel=32): ...@@ -71,7 +71,7 @@ def execute_in_parallel(cmd_lists, timeout=3600, max_parallel=32):
# log results of commands # log results of commands
for cmd_list, result in zip(cmd_lists, results): for cmd_list, result in zip(cmd_lists, results):
logger.debug("Results for cmd: %s\n returncode=%s\n stdout=%s\n stderr=%s", logger.info("Results for cmd: %s\n returncode=%s\n stdout=%s\n stderr=%s",
" ".join(cmd_list), " ".join(cmd_list),
result.returncode, result.stdout.rstrip(), result.stderr.rstrip()) result.returncode, result.stdout.rstrip(), result.stderr.rstrip())
return results return results
......
...@@ -47,7 +47,7 @@ from lofar.mac.tbb.tbb_set_storage import set_tbb_storage, create_mapping ...@@ -47,7 +47,7 @@ from lofar.mac.tbb.tbb_set_storage import set_tbb_storage, create_mapping
from lofar.mac.tbb.tbb_start_recording import start_tbb from lofar.mac.tbb.tbb_start_recording import start_tbb
from lofar.mac.tbb.tbb_upload_to_cep import upload_tbb_data from lofar.mac.tbb.tbb_upload_to_cep import upload_tbb_data
from lofar.parameterset import parameterset from lofar.parameterset import parameterset
from lofar.mac.tbb.tbb_util import parse_parset_from_voevent from lofar.mac.tbb.tbb_util import parse_parset_from_voevent, get_cpu_nodes_available_for_tbb_datawriters_sorted_by_load
from lofar.common.lcu_utils import stationname2hostname from lofar.common.lcu_utils import stationname2hostname
from lofar.mac.tbb.tbb_caltables import add_station_calibration_tables_h5_files_in_directory from lofar.mac.tbb.tbb_caltables import add_station_calibration_tables_h5_files_in_directory
from lofar.mac.tbb.tbb_cable_delays import add_dipole_cable_delays_h5_files_in_directory from lofar.mac.tbb.tbb_cable_delays import add_dipole_cable_delays_h5_files_in_directory
...@@ -222,16 +222,16 @@ class TBBControlService: ...@@ -222,16 +222,16 @@ class TBBControlService:
'Observation.TBB.TBBsetting.observatoryCoordinates': 0, 'Observation.TBB.TBBsetting.observatoryCoordinates': 0,
'Observation.TBB.TBBsetting.observatoryCoordinatesCoordinateSystem': 0, 'Observation.TBB.TBBsetting.observatoryCoordinatesCoordinateSystem': 0,
'Observation.TBB.TBBsetting.triggerId': 0, 'Observation.TBB.TBBsetting.triggerId': 0,
'Observation.TBB.TBBsetting.additionalInfo': voevent, #'Observation.TBB.TBBsetting.additionalInfo': voevent,
# older keys we probably still want to fill here: # older keys we probably still want to fill here:
'Observation.TBB.TBBsetting.triggerType': 'Unknown', 'Observation.TBB.TBBsetting.triggerType': 'Unknown',
'Observation.TBB.TBBsetting.triggerVersion': 0 'Observation.TBB.TBBsetting.triggerVersion': 0
} }
self._update_parset(parset, defaults_parset) parset = self._update_parset(parset, defaults_parset)
# update parset with values from received voevent # update parset with values from received voevent
voevent_parset = parse_parset_from_voevent(voevent) voevent_parset = parse_parset_from_voevent(voevent)
self._update_parset(parset, voevent_parset) parset = self._update_parset(parset, voevent_parset)
return parset_path, parset return parset_path, parset
...@@ -250,11 +250,28 @@ class TBBControlService: ...@@ -250,11 +250,28 @@ class TBBControlService:
# TODO: make dynamic, depending on which stations are actually used # TODO: make dynamic, depending on which stations are actually used
ports = ','.join(str(x) for x in range(31664, 31670)) ports = ','.join(str(x) for x in range(31664, 31670))
# TODO: do not start more dw's than there are stations.
available_nodes = get_cpu_nodes_available_for_tbb_datawriters_sorted_by_load(min_nr_of_free_nodes=1) available_nodes = get_cpu_nodes_available_for_tbb_datawriters_sorted_by_load(min_nr_of_free_nodes=1)
start = datetime.utcnow() start = datetime.utcnow()
timeout = timedelta(days=1) #TODO: make timeout configurable (or an argument) timeout = timedelta(days=1) #TODO: make timeout configurable (or an argument)
#create output dir
cmd = ['mkdir', '-p', output_path]
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
logger.info('creating output dir on cep4: %s, executing: %s', output_path, ' '.join(cmd))
proc = Popen(cmd)
proc.wait()
#write parset
parset_dict = parset.dict()
parset_str = '\n'.join('%s = %s' % (k,parset_dict[k]) for k in sorted(parset_dict.keys())).replace('"', '').replace('ObsSW.', '')
cmd = ['echo', '\"%s\"' % (parset_str,), '>', parset_path]
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
logger.info('writing tbb datawriter parset, executing: %s', ' '.join(cmd))
proc = Popen(cmd)
proc.wait()
# TODO: what should be do if datawriters are already running? kill/wait for them first? # TODO: what should be do if datawriters are already running? kill/wait for them first?
self.procs = {} self.procs = {}
...@@ -264,10 +281,13 @@ class TBBControlService: ...@@ -264,10 +281,13 @@ class TBBControlService:
'-p', ports, '-p', ports,
'-k', '0', '-k', '0',
'-t', '60', '-t', '60',
'-d', '3840', #TODO: compute instead of hardcoded
'-o', output_path] '-o', output_path]
# wrap the command in a cep4 docker ssh call # wrap the command in a cep4 docker ssh call
cmd = wrap_command_for_docker(cmd, DATAWRITER_DOCKER_IMAGE_NAME, DATAWRITER_DOCKER_IMAGE_TAG, added_privileges=True) cmd = wrap_command_for_docker(cmd, DATAWRITER_DOCKER_IMAGE_NAME, DATAWRITER_DOCKER_IMAGE_TAG,
mount_dirs=['/data/projects', '/data/scratch', '/data/share', '/data/parsets'],
added_privileges=True)
cmd = wrap_command_in_cep4_cpu_node_ssh_call(cmd, node, via_head=True) cmd = wrap_command_in_cep4_cpu_node_ssh_call(cmd, node, via_head=True)
logger.info('starting datawriter on node %s, executing: %s', node, ' '.join(cmd)) logger.info('starting datawriter on node %s, executing: %s', node, ' '.join(cmd))
...@@ -283,7 +303,7 @@ class TBBControlService: ...@@ -283,7 +303,7 @@ class TBBControlService:
Monitor started procs until they are all done or timeout seconds have Monitor started procs until they are all done or timeout seconds have
passed. passed.
:param timeout: Timeout in seconds until data writers will be :param timeout: Timeout in seconds until data writers will be
forcefully killed. The default is 24 hours. forcefully killed. The default is 24 hours.
''' '''
start = datetime.utcnow() start = datetime.utcnow()
while self.procs: while self.procs:
...@@ -496,10 +516,10 @@ class TBBControlService: ...@@ -496,10 +516,10 @@ class TBBControlService:
datawriter_nodes = self.start_datawriters(output_path=output_path, voevent=voevent_xml) datawriter_nodes = self.start_datawriters(output_path=output_path, voevent=voevent_xml)
if nodes is None: if nodes is None:
logger.info('Nodes to use are not configured, using all with datawriter') logger.info('Nodes to use are not configured, using all with datawriter')
nodes = datawriter_nodes nodes = ['cpu%02d' % (node,) for node in datawriter_nodes]
else: else:
logger.info('Filtering node list for those who actually have a running datawriter') logger.info('Filtering node list for those who actually have a running datawriter')
nodes = [node for node in nodes if node in datawriter_nodes] nodes = ['cpu%02d' % (node,) for node in nodes if node in datawriter_nodes]
# create mapping for storage nodes # create mapping for storage nodes
try: try:
......
...@@ -12,7 +12,7 @@ import subprocess ...@@ -12,7 +12,7 @@ import subprocess
import logging import logging
from lofar.mac.tbb.tbb_config import * from lofar.mac.tbb.tbb_config import *
from lofar.mac.tbb.tbb_util import split_stations_by_boardnumber, expand_list, calculate_adjusted_start_time, wrap_remote_composite_command from lofar.mac.tbb.tbb_util import split_stations_by_boardnumber, expand_list, calculate_adjusted_start_time, wrap_remote_composite_command
from lofar.common.lcu_utils import execute_in_parallel_over_stations
def upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, boards): def upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, boards):
""" """
...@@ -40,33 +40,47 @@ def upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, bo ...@@ -40,33 +40,47 @@ def upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, bo
# batch handle all stations with same number of boards through lcurun # batch handle all stations with same number of boards through lcurun
for num_boards in stationlists.keys(): for num_boards in stationlists.keys():
logging.debug("Creating TBB commands for stations with %s boards..." % num_boards) logging.debug("Creating TBB commands for stations with %s boards..." % num_boards)
relay = lcurun_command + [",".join(stationlists[num_boards])] #relay = lcurun_command + [",".join(stationlists[num_boards])]
stations_with_num_boards = stationlists[num_boards]
# iterate over tbb boards # iterate over tbb boards
board_list = "" board_list = []
for board in boards: for board in boards:
if int(board) <= num_boards: if int(board) <= num_boards:
board_list += " %s" % (board) board_list.append(board)
else: else:
logging.error("Stations \"%s\" do not have a board #%s! The stations have only %s boards. Continuing without this board." % (stationlists[num_boards], board, num_boards)) logging.error("Stations \"%s\" do not have a board #%s! The stations have only %s boards. Continuing without this board." % (stationlists[num_boards], board, num_boards))
continue continue
# Now build the two loops that execute tbbctl for every channel and
# for every board. cmd_list = []
# Why first loop over channels and then over boards? That allows for channel in range(15):
# to add only one sleep after a channel is uploaded for all boards. for board in board_list:
# Otherwise each board would have the sleep and we would sleep tbb_cmd = "tbbctl --readband=%s,%s,%s,%s,%s,%s" % (board, channel, sub_band, adjusted_start_time, slice_nr, duration)
# # of boards times for every channel instead. cmd_list.append(tbb_cmd)
cmd = "for((channel = 0; channel <= 15; ++channel)); do for board in" cmd_list.append("sleep %s" % (wait_time,))
cmd += board_list
cmd += "; do " full_cmd = '\"%s\"' % (' ; '.join(cmd_list),)
cmd += tbb_cmd % (sub_band, adjusted_start_time, slice_nr, duration)
cmd += "; done; sleep " execute_in_parallel_over_stations(full_cmd, stations_with_num_boards)
cmd += str(wait_time)
cmd += "; done"
cmd = wrap_remote_composite_command(cmd) # # Now build the two loops that execute tbbctl for every channel and
command = relay + [cmd] # # for every board.
logging.info('Executing %s', ' '.join(command)) # # Why first loop over channels and then over boards? That allows
subprocess.check_call(command) # # to add only one sleep after a channel is uploaded for all boards.
# # Otherwise each board would have the sleep and we would sleep
# # # of boards times for every channel instead.
# cmd = "for((channel = 0; channel <= 15; ++channel)); do for board in"
# cmd += board_list
# cmd += "; do "
# cmd += tbb_cmd % (sub_band, adjusted_start_time, slice_nr, duration)
# cmd += "; done; sleep "
# cmd += str(wait_time)
# cmd += "; done"
# cmd = wrap_remote_composite_command(cmd)
# command = relay + [cmd]
# logging.info('Executing %s', ' '.join(command))
# subprocess.check_call(command)
time.sleep(wait_time) time.sleep(wait_time)
logging.info("Uploading TBB data done.") logging.info("Uploading TBB data done.")
...@@ -94,7 +108,7 @@ def parse_args(): ...@@ -94,7 +108,7 @@ def parse_args():
if args.wait_time_between_sub_bands is not None and args.wait_time_between_sub_bands > 0.0: if args.wait_time_between_sub_bands is not None and args.wait_time_between_sub_bands > 0.0:
args.wait_time = float(args.wait_time_between_sub_bands) args.wait_time = float(args.wait_time_between_sub_bands)
else: else:
args.wait_time = float(args.duration) * 0.00012 args.wait_time = float(args.duration/1000.0) * 0.00012
return args return args
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment