Skip to content
Snippets Groups Projects
Commit 8e4a7559 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Merge branch 'SW-824' into 'LOFAR-Release-4_0'

SW-824: Resolve SW-824

See merge request !73
parents dfe9f671 d6efc8f4
No related branches found
No related tags found
2 merge requests!74Lofar release 4 0,!73SW-824: Resolve SW-824
Showing with 130 additions and 56 deletions
...@@ -53,7 +53,9 @@ def wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(cmd, via_h ...@@ -53,7 +53,9 @@ def wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(cmd, via_h
:param bool via_head: when True, route the cmd first via the cep4 head node :param bool via_head: when True, route the cmd first via the cep4 head node
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
''' '''
lowest_load_node_nr = get_cep4_available_cpu_node_with_lowest_load() lowest_load_node_nr = get_cep4_cpu_node_with_lowest_load()
if lowest_load_node_nr is None:
raise RuntimeError("No cpu node available to run the cmd on. cmd=%s" %(cmd,))
return wrap_command_in_cep4_cpu_node_ssh_call(cmd, lowest_load_node_nr, via_head=via_head) return wrap_command_in_cep4_cpu_node_ssh_call(cmd, lowest_load_node_nr, via_head=via_head)
def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True): def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True):
...@@ -99,29 +101,49 @@ def wrap_command_for_docker(cmd, image_name, image_label='', mount_dirs=['/data' ...@@ -99,29 +101,49 @@ def wrap_command_for_docker(cmd, image_name, image_label='', mount_dirs=['/data'
dockerized_cmd += cmd dockerized_cmd += cmd
return dockerized_cmd return dockerized_cmd
def get_cep4_available_cpu_nodes(): # a selection of slurm states relevant for lofar usage
SLURM_AVAILABLE_STATES = {'idle','mix'}
SLURM_IN_USES_STATES = {'alloc','mix'}
SLURM_UNAVAILABLE_STATES = {'down','drain','drng','resv','maint'}
SLURM_STATES = SLURM_AVAILABLE_STATES | SLURM_IN_USES_STATES | SLURM_UNAVAILABLE_STATES
# lofar cep4 default slurm partitions
SLURM_CPU_PARTITION = 'cpu'
SLURM_GPU_PARTITION = 'gpu'
SLURM_PARTITIONS = {SLURM_CPU_PARTITION, SLURM_GPU_PARTITION}
def get_cep4_slurm_nodes(states, partition: str) -> []:
''' '''
get a list of cep4 cpu nodes which are currently up and running according to slurm get a list of cep4 nodes from the given partition which have (one of) the given states according to slurm
:return: a list of cpu node numbers (ints) for the up and running cpu nodes states: set/list of slurm states, see predefined sets: SLURM_AVAILABLE_STATES, SLURM_IN_USES_STATES, SLURM_UNAVAILABLE_STATES
partition: one of the known SLURM_PARTITIONS
:return: a list of node numbers (ints) of the nodes in the partition
''' '''
available_cep4_nodes = [] slurm_nodes = []
try: try:
logger.debug('determining available cep4 cpu nodes') # filter out unknown states
if any(s for s in states if s not in SLURM_STATES):
raise ValueError("the given states:%s are not valid slurm states:%s" % (states, SLURM_STATES))
if partition not in SLURM_PARTITIONS:
raise ValueError("the given partition:%s is not one of the valid cep4 slurm partition:%s" % (partition, SLURM_PARTITIONS))
logger.debug('determining available cep4 nodes states:%s, partition:%s', states, partition)
# find out which nodes are available # find out which nodes are available
cmd = ['sinfo -p cpu -t idle,mix'] cmd = ['sinfo -p %s -t %s' % (partition, ','.join(states))]
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd) cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
logger.debug('executing command: %s', ' '.join(cmd)) logger.debug('executing command: %s', ' '.join(cmd))
out = check_output_returning_strings(cmd) out = check_output_returning_strings(cmd)
lines = out.split('\n') lines = out.split('\n')
for state in ['idle', 'mix']: for state in states:
try: try:
line = next(l for l in lines if state in l).strip() line = next(l for l in lines if state in l).strip()
# get nodes string part of line: # get nodes string part of line:
nodes_part = line.split(' ')[-1] nodes_part = line.split(' ')[-1]
available_cep4_nodes += convert_slurm_nodes_string_to_node_number_list(nodes_part) slurm_nodes += convert_slurm_nodes_string_to_node_number_list(nodes_part)
except StopIteration: except StopIteration:
pass # no line with state in line pass # no line with state in line
...@@ -129,13 +151,34 @@ def get_cep4_available_cpu_nodes(): ...@@ -129,13 +151,34 @@ def get_cep4_available_cpu_nodes():
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
available_cep4_nodes = sorted(list(set(available_cep4_nodes))) slurm_nodes = sorted(list(set(slurm_nodes)))
logger.debug('available cep4 cpu nodes: %s', ','.join(str(x) for x in available_cep4_nodes)) logger.debug('cep4 nodes with states:%s in partition:%s are: %s', _states, partition, slurm_nodes)
return slurm_nodes
def get_cep4_available_cpu_nodes():
'''
get a list of cep4 cpu nodes which are currently up and running (either totally free, or partially free) according to slurm
:return: a list of cpu node numbers (ints)
'''
available_cep4_nodes = get_cep4_slurm_nodes(SLURM_AVAILABLE_STATES, SLURM_CPU_PARTITION)
if not available_cep4_nodes: if not available_cep4_nodes:
logger.warning('no cep4 cpu nodes available') logger.warning('no cep4 cpu nodes available')
return available_cep4_nodes return available_cep4_nodes
def get_cep4_up_and_running_cpu_nodes():
'''
get a list of cep4 cpu nodes which are currently up and running (either totally free, partially free, or totally used) according to slurm
:return: a list of cpu node numbers (ints)
'''
cep4_nodes = get_cep4_slurm_nodes(SLURM_AVAILABLE_STATES | SLURM_IN_USES_STATES, SLURM_CPU_PARTITION)
if not cep4_nodes:
logger.warning('no cep4 cpu nodes up and running')
return cep4_nodes
def convert_slurm_nodes_string_to_node_number_list(slurm_string): def convert_slurm_nodes_string_to_node_number_list(slurm_string):
''' converts strings like: cpu[01-03,11-12]' to [1,2,3,11,12] ''' converts strings like: cpu[01-03,11-12]' to [1,2,3,11,12]
or 'cpu01' to [1] or 'cpu01' to [1]
...@@ -171,12 +214,12 @@ def convert_slurm_nodes_string_to_node_number_list(slurm_string): ...@@ -171,12 +214,12 @@ def convert_slurm_nodes_string_to_node_number_list(slurm_string):
def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False): def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False):
''' '''
get the 5min load for each given cep4 cpu node nr get the 5min load for each given cep4 cpu node nr
:param node_nrs: optional list of node numbers to get the load for. If None, then all available nodes are queried. :param node_nrs: optional list of node numbers to get the load for. If None, then all up-and-running nodes are queried.
:param bool normalized: when True, then normalize the loads with the number of cores. :param bool normalized: when True, then normalize the loads with the number of cores.
:return: dict with node_nr -> load mapping :return: dict with node_nr -> load mapping
''' '''
if node_nrs == None: if node_nrs == None:
node_nrs = get_cep4_available_cpu_nodes() node_nrs = get_cep4_up_and_running_cpu_nodes()
procs = {} procs = {}
loads = {} loads = {}
...@@ -230,11 +273,11 @@ def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0. ...@@ -230,11 +273,11 @@ def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.
:param float max_normalized_load: filter available nodes which are at most max_normalized_load :param float max_normalized_load: filter available nodes which are at most max_normalized_load
:param int min_nr_of_nodes: do return this minimum number of nodes, even if their load is higher than max_normalized_load :param int min_nr_of_nodes: do return this minimum number of nodes, even if their load is higher than max_normalized_load
If not enough nodes are up, then of course it cannot be guaranteed that we return this amount. If not enough nodes are up, then of course it cannot be guaranteed that we return this amount.
:param list node_nrs: optional list of node numbers to apply the filtering on. If None, then all available nodes are queried. :param list node_nrs: optional list of node numbers to apply the filtering on. If None, then all up-and-running nodes are queried.
:return: sorted list of node numbers. :return: sorted list of node numbers.
''' '''
if not node_nrs: if not node_nrs:
node_nrs = get_cep4_available_cpu_nodes() node_nrs = get_cep4_up_and_running_cpu_nodes()
loads = get_cep4_cpu_nodes_loads(node_nrs, normalized=True) loads = get_cep4_cpu_nodes_loads(node_nrs, normalized=True)
load_tuples_list = [(cpu_nr,load) for cpu_nr,load in list(loads.items())] load_tuples_list = [(cpu_nr,load) for cpu_nr,load in list(loads.items())]
sorted_load_tuples_list = sorted(load_tuples_list, key=lambda x: x[1]) sorted_load_tuples_list = sorted(load_tuples_list, key=lambda x: x[1])
...@@ -249,14 +292,21 @@ def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0. ...@@ -249,14 +292,21 @@ def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.
max_normalized_load, min_nr_of_nodes, sorted_node_nrs) max_normalized_load, min_nr_of_nodes, sorted_node_nrs)
return sorted_node_nrs return sorted_node_nrs
def get_cep4_available_cpu_node_with_lowest_load(max_normalized_load=0.33): def get_cep4_cpu_node_with_lowest_load(max_normalized_load=0.33):
''' '''
get the cep4 cpu node which is available and has the lowest (5min) load of them all. get the cep4 cpu node which has the lowest (5min) load of them all. Preferably a node which is not fully used yet.
:param float max_normalized_load: filter available nodes which a at most max_normalized_load :param float max_normalized_load: filter nodes which a at most max_normalized_load
:return: the node number (int) with the lowest load. :return: the node number (int) with the lowest load.
''' '''
# first see if there are any not-fully-used nodes available
nodes = get_cep4_available_cpu_nodes()
if not nodes: # if not, then just query all up and running nodes
nodes = get_cep4_up_and_running_cpu_nodes()
node_nrs = get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=max_normalized_load, node_nrs = get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=max_normalized_load,
min_nr_of_nodes=1) min_nr_of_nodes=1,
node_nrs=nodes)
if node_nrs: if node_nrs:
logger.debug('cep4 cpu node with lowest load: %s', node_nrs[0]) logger.debug('cep4 cpu node with lowest load: %s', node_nrs[0])
return node_nrs[0] return node_nrs[0]
......
...@@ -95,21 +95,27 @@ def translate_user_station_string_into_station_list(user_station_string: str): ...@@ -95,21 +95,27 @@ def translate_user_station_string_into_station_list(user_station_string: str):
:param user_station_string: a string like 'cs001,cs001' or 'today' or ... etc :param user_station_string: a string like 'cs001,cs001' or 'today' or ... etc
:return: a list of station names :return: a list of station names
''' '''
logger.debug("translating '%s'", user_station_string)
if isinstance(user_station_string, bytes): if isinstance(user_station_string, bytes):
user_station_string = user_station_string.decode('utf-8') user_station_string = user_station_string.decode('utf-8')
if not isinstance(user_station_string, str): if not isinstance(user_station_string, str):
raise ValueError("cannot parse user_station_string") raise ValueError("cannot parse user_station_string: %s", (user_station_string,))
if ',' in user_station_string: if ',' in user_station_string:
return user_station_string.split(',') result = user_station_string.split(',')
logger.info("translate_user_station_string_into_station_list(%s) -> %s", user_station_string, result)
return result
# maybe 'stations' is a group. Do lookup. # maybe 'stations' is a group. Do lookup.
current_stations = get_current_stations(user_station_string, as_host_names=False) current_stations = get_current_stations(user_station_string, as_host_names=False)
if current_stations: if current_stations:
logger.info("translate_user_station_string_into_station_list(%s) -> %s", user_station_string, current_stations)
return current_stations return current_stations
# just treat the stations string as list of stations and hope for the best # just treat the stations string as list of stations and hope for the best
logger.info("translate_user_station_string_into_station_list(%s) -> %s", user_station_string, [user_station_string])
return [user_station_string] return [user_station_string]
def get_current_stations(station_group='today', as_host_names=True): def get_current_stations(station_group='today', as_host_names=True):
......
...@@ -9,6 +9,6 @@ if __name__ == '__main__': ...@@ -9,6 +9,6 @@ if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
args = parse_args() args = parse_args()
with TBBRPC.create() as rpc: with TBBRPC.create(timeout=5*60) as rpc:
rpc.switch_firmware(args.stations, args.mode) rpc.switch_firmware(args.stations, args.mode)
...@@ -8,6 +8,6 @@ if __name__ == '__main__': ...@@ -8,6 +8,6 @@ if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
args = parse_args() args = parse_args()
with TBBRPC.create() as rpc: with TBBRPC.create(timeout=24*60*60) as rpc: # tbb dumps can take a long time.... timeout of 24 hours is ok
rpc.upload_data(args.stations, args.dm, args.start_time, args.duration, args.sub_bands, args.wait_time, args.boards) rpc.upload_data(args.stations, args.dm, args.start_time, args.duration, args.sub_bands, args.wait_time, args.boards)
...@@ -36,6 +36,7 @@ from lofar.common.util import waitForInterrupt ...@@ -36,6 +36,7 @@ from lofar.common.util import waitForInterrupt
from lofar.mac.tbbservice.config import * from lofar.mac.tbbservice.config import *
from lofar.common.lcu_utils import * from lofar.common.lcu_utils import *
from lofar.common.cep4_utils import * from lofar.common.cep4_utils import *
from lofar.common.subprocess_utils import communicate_returning_strings
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.mac.tbb.tbb_load_firmware import load_tbb_firmware from lofar.mac.tbb.tbb_load_firmware import load_tbb_firmware
...@@ -166,8 +167,11 @@ class TBBServiceMessageHandler(ServiceMessageHandler): ...@@ -166,8 +167,11 @@ class TBBServiceMessageHandler(ServiceMessageHandler):
parset = self._update_parset(parset, defaults_parset) parset = self._update_parset(parset, defaults_parset)
# update parset with values from received voevent # update parset with values from received voevent
voevent_parset = parse_parset_from_voevent(voevent) try:
parset = self._update_parset(parset, voevent_parset) voevent_parset = parse_parset_from_voevent(voevent)
parset = self._update_parset(parset, voevent_parset)
except Exception as e:
logger.warning("prepare_alert_tbb_parset: error '%s' while parsing voevent %s", e, voevent)
return parset_path, parset return parset_path, parset
...@@ -188,7 +192,7 @@ class TBBServiceMessageHandler(ServiceMessageHandler): ...@@ -188,7 +192,7 @@ class TBBServiceMessageHandler(ServiceMessageHandler):
ports = ','.join(str(x) for x in range(31664, 31670)) ports = ','.join(str(x) for x in range(31664, 31670))
# TODO: do not start more dw's than there are stations. # TODO: do not start more dw's than there are stations.
available_nodes = get_cpu_nodes_available_for_tbb_datawriters_sorted_by_load(min_nr_of_free_nodes=1) available_nodes = get_cpu_nodes_available_for_tbb_datawriters_sorted_by_load(min_nr_of_free_nodes=5)
start = datetime.utcnow() start = datetime.utcnow()
timeout = timedelta(days=1) #TODO: make timeout configurable (or an argument) timeout = timedelta(days=1) #TODO: make timeout configurable (or an argument)
...@@ -197,8 +201,11 @@ class TBBServiceMessageHandler(ServiceMessageHandler): ...@@ -197,8 +201,11 @@ class TBBServiceMessageHandler(ServiceMessageHandler):
cmd = ['mkdir', '-p', output_path] cmd = ['mkdir', '-p', output_path]
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd) cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
logger.info('creating output dir on cep4: %s, executing: %s', output_path, ' '.join(cmd)) logger.info('creating output dir on cep4: %s, executing: %s', output_path, ' '.join(cmd))
proc = Popen(cmd) proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
proc.wait() out, err = communicate_returning_strings(proc)
if proc.returncode != 0:
raise RuntimeError("Error while creating output dir '%s': %s" % (output_path, out+err))
#write parset #write parset
parset_dict = parset.dict() parset_dict = parset.dict()
...@@ -206,8 +213,11 @@ class TBBServiceMessageHandler(ServiceMessageHandler): ...@@ -206,8 +213,11 @@ class TBBServiceMessageHandler(ServiceMessageHandler):
cmd = ['echo', '\"%s\"' % (parset_str,), '>', parset_path] cmd = ['echo', '\"%s\"' % (parset_str,), '>', parset_path]
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd) cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
logger.info('writing tbb datawriter parset, executing: %s', ' '.join(cmd)) logger.info('writing tbb datawriter parset, executing: %s', ' '.join(cmd))
proc = Popen(cmd) proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
proc.wait() out, err = communicate_returning_strings(proc)
if proc.returncode != 0:
raise RuntimeError("Error while writing parset: %s" % (out+err, ))
# TODO: what should be do if datawriters are already running? kill/wait for them first? # TODO: what should be do if datawriters are already running? kill/wait for them first?
self.procs = {} self.procs = {}
......
...@@ -75,7 +75,7 @@ def freeze_tbb(stations, dm, timesec, timensec): ...@@ -75,7 +75,7 @@ def freeze_tbb(stations, dm, timesec, timensec):
def parse_args(): def parse_args():
parser = argparse.ArgumentParser("This script will freeze TBB boards on a bunch of stations.") parser = argparse.ArgumentParser("This script will freeze TBB boards on a bunch of stations.")
parser.add_argument('-s', '--stations', dest='stations', help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like 'today', 'nl', ...)", default='today') parser.add_argument('-s', '--stations', dest='stations', help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like 'today', 'nl', ...)", default=None)
parser.add_argument('-d', '--dm', dest='dm', help="dispersion measure as float", type=float) parser.add_argument('-d', '--dm', dest='dm', help="dispersion measure as float", type=float)
parser.add_argument('-t', '--stoptime-seconds', dest='timesec', type=int, help="Freeze time since epoch in seconds") parser.add_argument('-t', '--stoptime-seconds', dest='timesec', type=int, help="Freeze time since epoch in seconds")
parser.add_argument('-n', '--stoptime-nanoseconds', dest='timensec', type=int, help="Freeze time offset in nanoseconds", default=0) parser.add_argument('-n', '--stoptime-nanoseconds', dest='timensec', type=int, help="Freeze time offset in nanoseconds", default=0)
...@@ -102,4 +102,4 @@ def main(): ...@@ -102,4 +102,4 @@ def main():
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
main() main()
\ No newline at end of file
...@@ -11,7 +11,7 @@ import time ...@@ -11,7 +11,7 @@ import time
import subprocess import subprocess
import logging import logging
from lofar.mac.tbb.tbb_config import supported_modes, lcurun_command, tbb_command from lofar.mac.tbb.tbb_config import supported_modes, lcurun_command, tbb_command
from lofar.common.lcu_utils import translate_user_station_string_into_station_list from lofar.common.lcu_utils import translate_user_station_string_into_station_list,stationname2hostname
from lofar.common.subprocess_utils import check_output_returning_strings from lofar.common.subprocess_utils import check_output_returning_strings
def load_tbb_firmware(stations, mode): def load_tbb_firmware(stations, mode):
...@@ -28,10 +28,11 @@ def load_tbb_firmware(stations, mode): ...@@ -28,10 +28,11 @@ def load_tbb_firmware(stations, mode):
slot = 1 slot = 1
stations = translate_user_station_string_into_station_list(stations) stations = translate_user_station_string_into_station_list(stations)
station_hostname_csv_string = ','.join(stationname2hostname(s) for s in stations)
logging.info("It is assumed that the firmware for mode \"%s\" is in slot %d!" % (mode, slot)) logging.info("It is assumed that the firmware for mode \"%s\" is in slot %d!" % (mode, slot))
relay = lcurun_command + [stations] relay = lcurun_command + [station_hostname_csv_string]
cmd = [tbb_command, '--config=%s' % slot] cmd = [tbb_command, '--config=%s' % slot]
cmd = relay + cmd cmd = relay + cmd
logging.info('Executing %s' % ' '.join(cmd)) logging.info('Executing %s' % ' '.join(cmd))
...@@ -50,7 +51,7 @@ def load_tbb_firmware(stations, mode): ...@@ -50,7 +51,7 @@ def load_tbb_firmware(stations, mode):
cmd = [tbb_command, '--imageinfo=%s' % str(board)] cmd = [tbb_command, '--imageinfo=%s' % str(board)]
cmd = relay + cmd cmd = relay + cmd
logging.info('Executing %s' % ' '.join(cmd)) logging.info('Executing %s' % ' '.join(cmd))
logging.info(subprocess.check_output_returning_strings(cmd)) logging.info(check_output_returning_strings(cmd))
def parse_args(): def parse_args():
...@@ -58,7 +59,7 @@ def parse_args(): ...@@ -58,7 +59,7 @@ def parse_args():
"This script will load a TBB firmware on a bunch of stations and the respective boards there.") "This script will load a TBB firmware on a bunch of stations and the respective boards there.")
parser.add_argument('-s', '--stations', dest='stations', parser.add_argument('-s', '--stations', dest='stations',
help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like 'today', 'nl', ...)", help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like 'today', 'nl', ...)",
default='today') default=None)
parser.add_argument('-m', '--mode', dest='mode', help="supported tbb modes: %s" % supported_modes, parser.add_argument('-m', '--mode', dest='mode', help="supported tbb modes: %s" % supported_modes,
default='subband') default='subband')
args = parser.parse_args() args = parser.parse_args()
......
...@@ -11,13 +11,14 @@ import time ...@@ -11,13 +11,14 @@ import time
import subprocess import subprocess
import logging import logging
from lofar.mac.tbb.tbb_config import lcurun_command, tbb_command from lofar.mac.tbb.tbb_config import lcurun_command, tbb_command
from lofar.common.lcu_utils import translate_user_station_string_into_station_list from lofar.common.lcu_utils import translate_user_station_string_into_station_list, stationname2hostname
def release_tbb(stations): def release_tbb(stations):
logging.info('Releasing TBB recording') logging.info('Releasing TBB recording')
stations = translate_user_station_string_into_station_list(stations) stations = translate_user_station_string_into_station_list(stations)
relay = lcurun_command + [stations] station_hostname_csv_string = ','.join(stationname2hostname(s) for s in stations)
relay = lcurun_command + [station_hostname_csv_string]
cmd = relay + [tbb_command, '--free'] cmd = relay + [tbb_command, '--free']
logging.info('Executing %s' % ' '.join(cmd)) logging.info('Executing %s' % ' '.join(cmd))
...@@ -28,7 +29,7 @@ def parse_args(): ...@@ -28,7 +29,7 @@ def parse_args():
parser = argparse.ArgumentParser("This script will release TBB recording on a bunch of stations.") parser = argparse.ArgumentParser("This script will release TBB recording on a bunch of stations.")
parser.add_argument('-s', '--stations', dest='stations', parser.add_argument('-s', '--stations', dest='stations',
help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like 'today', 'nl', ...)", help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like 'today', 'nl', ...)",
default='today') default=None)
return parser.parse_args() return parser.parse_args()
def main(): def main():
......
...@@ -12,15 +12,16 @@ import time ...@@ -12,15 +12,16 @@ import time
import subprocess import subprocess
import logging import logging
from lofar.mac.tbb.tbb_config import supported_modes, lcurun_command, tbb_command from lofar.mac.tbb.tbb_config import supported_modes, lcurun_command, tbb_command
from lofar.common.lcu_utils import translate_user_station_string_into_station_list from lofar.common.lcu_utils import translate_user_station_string_into_station_list, stationname2hostname
def restart_tbb_recording(stations): def restart_tbb_recording(stations):
logging.info("Restarting TBB recording") logging.info("Restarting TBB recording")
stations = translate_user_station_string_into_station_list(stations) stations = translate_user_station_string_into_station_list(stations)
station_hostname_csv_string = ','.join(stationname2hostname(s) for s in stations)
relay = lcurun_command + [stations] relay = lcurun_command + [station_hostname_csv_string]
cmd = relay + [tbb_command, "--record"] cmd = relay + [tbb_command, "--record"]
logging.info("Executing %s" % " ".join(cmd)) logging.info("Executing %s" % " ".join(cmd))
subprocess.check_call(cmd) subprocess.check_call(cmd)
...@@ -28,7 +29,7 @@ def restart_tbb_recording(stations): ...@@ -28,7 +29,7 @@ def restart_tbb_recording(stations):
def parse_args(): def parse_args():
parser = argparse.ArgumentParser("This script will restart TBB recording on a bunch of stations.") parser = argparse.ArgumentParser("This script will restart TBB recording on a bunch of stations.")
parser.add_argument("-s", "--stations", dest="stations", help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like 'today', 'nl', ...)", default="today") parser.add_argument("-s", "--stations", dest="stations", help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like 'today', 'nl', ...)", default=None)
return parser.parse_args() return parser.parse_args()
def main(): def main():
......
...@@ -11,15 +11,15 @@ import time ...@@ -11,15 +11,15 @@ import time
import subprocess import subprocess
import logging import logging
from lofar.mac.tbb.tbb_config import lcurun_command, tbb_command from lofar.mac.tbb.tbb_config import lcurun_command, tbb_command
from lofar.common.lcu_utils import translate_user_station_string_into_station_list from lofar.common.lcu_utils import translate_user_station_string_into_station_list, stationname2hostname
def set_tbb_storage(map): def set_tbb_storage(map):
logging.info('Setting TBB storage nodes') logging.info('Setting TBB storage nodes')
for stations, node in map.items(): for stations, node in map.items():
stations = translate_user_station_string_into_station_list(stations) stations = translate_user_station_string_into_station_list(stations)
station_hostname_csv_string = ','.join(stationname2hostname(s) for s in stations)
relay = lcurun_command + [stations] relay = lcurun_command + [station_hostname_csv_string]
cmds = [ cmds = [
[tbb_command, '--storage=%s' % node], [tbb_command, '--storage=%s' % node],
...@@ -69,7 +69,8 @@ def create_mapping(stations, nodes): ...@@ -69,7 +69,8 @@ def create_mapping(stations, nodes):
def parse_args(): def parse_args():
parser = argparse.ArgumentParser("This script will set the target node for TBB data on a bunch of stations.") parser = argparse.ArgumentParser("This script will set the target node for TBB data on a bunch of stations.")
parser.add_argument('-s', '--stations', dest='stations', parser.add_argument('-s', '--stations', dest='stations',
help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like 'today', 'nl', ...)") help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like 'today', 'nl', ...)",
default=None)
node_group = parser.add_mutually_exclusive_group(required=True) node_group = parser.add_mutually_exclusive_group(required=True)
node_group.add_argument('-n', '--nodes', dest='nodes', node_group.add_argument('-n', '--nodes', dest='nodes',
help="comma-separated list of target nodes to receive tbb data, stations will mapped automatically, use option -m instead to be specific") help="comma-separated list of target nodes to receive tbb data, stations will mapped automatically, use option -m instead to be specific")
...@@ -86,9 +87,8 @@ def parse_args(): ...@@ -86,9 +87,8 @@ def parse_args():
if args.mapping: if args.mapping:
args.map = parse_mapping(args.mapping) args.map = parse_mapping(args.mapping)
else: else:
stations = args.stations.split(',')
nodes = args.nodes.split(',') nodes = args.nodes.split(',')
args.map = create_mapping(stations, nodes) args.map = create_mapping(args.stations, nodes)
return args return args
......
...@@ -11,7 +11,7 @@ import time ...@@ -11,7 +11,7 @@ import time
import subprocess import subprocess
import logging import logging
from lofar.mac.tbb.tbb_config import supported_modes, lcurun_command, tbb_command, rsp_command from lofar.mac.tbb.tbb_config import supported_modes, lcurun_command, tbb_command, rsp_command
from lofar.common.lcu_utils import translate_user_station_string_into_station_list from lofar.common.lcu_utils import translate_user_station_string_into_station_list, stationname2hostname
def start_tbb(stations, mode, subbands): def start_tbb(stations, mode, subbands):
...@@ -33,8 +33,9 @@ def start_tbb(stations, mode, subbands): ...@@ -33,8 +33,9 @@ def start_tbb(stations, mode, subbands):
] ]
stations = translate_user_station_string_into_station_list(stations) stations = translate_user_station_string_into_station_list(stations)
station_hostname_csv_string = ','.join(stationname2hostname(s) for s in stations)
relay = lcurun_command + [stations] relay = lcurun_command + [station_hostname_csv_string]
for cmd in cmds: for cmd in cmds:
cmd = relay + cmd cmd = relay + cmd
...@@ -46,7 +47,7 @@ def parse_args(): ...@@ -46,7 +47,7 @@ def parse_args():
parser = argparse.ArgumentParser("This script will start TBB recording on a bunch of stations.") parser = argparse.ArgumentParser("This script will start TBB recording on a bunch of stations.")
parser.add_argument('-s', '--stations', dest='stations', parser.add_argument('-s', '--stations', dest='stations',
help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like 'today', 'nl', ...)", help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like 'today', 'nl', ...)",
default='today') default=None)
parser.add_argument('-m', '--mode', dest='mode', help="supported tbb modes: %s" % supported_modes, parser.add_argument('-m', '--mode', dest='mode', help="supported tbb modes: %s" % supported_modes,
default='subband') default='subband')
parser.add_argument('-b', '--subbands', dest='subbands', help='Subband range, e.g. 10:496', default='10:496') parser.add_argument('-b', '--subbands', dest='subbands', help='Subband range, e.g. 10:496', default='10:496')
......
...@@ -73,7 +73,8 @@ def upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, bo ...@@ -73,7 +73,8 @@ def upload_tbb_data(stations, dm, start_time, duration, sub_bands, wait_time, bo
def parse_args(): def parse_args():
parser = argparse.ArgumentParser("This script will upload TBB data from a bunch of stations to CEP.") parser = argparse.ArgumentParser("This script will upload TBB data from a bunch of stations to CEP.")
parser.add_argument("-s", "--stations", required=True, dest="stations", parser.add_argument("-s", "--stations", required=True, dest="stations",
help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like \"today\", \"nl\", ...)") help="comma-separated list of station LCUs (e.g. cs030c,cs031c; also accepts lcurun aliases like \"today\", \"nl\", ...)",
default=None)
parser.add_argument('-d', '--dm', required=True, type=float, dest='dm', help="dispersion measure as float") parser.add_argument('-d', '--dm', required=True, type=float, dest='dm', help="dispersion measure as float")
parser.add_argument("-t", "--start_time", required=True, type=float, dest="start_time", parser.add_argument("-t", "--start_time", required=True, type=float, dest="start_time",
help="The start time of the data dump in fractional seconds since 1970-01-01T00.00.00.") help="The start time of the data dump in fractional seconds since 1970-01-01T00.00.00.")
...@@ -82,7 +83,7 @@ def parse_args(): ...@@ -82,7 +83,7 @@ def parse_args():
parser.add_argument("-u", "--subbands", required=True, dest="sub_bands", parser.add_argument("-u", "--subbands", required=True, dest="sub_bands",
help="A list of sub-bands or ranges of sub-bands. A range can be specified with \"-\" between the first and the last sub-band of the range, list items are separated with \",\". Example: --subbands=17-22,38,211-319,321,323") help="A list of sub-bands or ranges of sub-bands. A range can be specified with \"-\" between the first and the last sub-band of the range, list items are separated with \",\". Example: --subbands=17-22,38,211-319,321,323")
parser.add_argument("-p", "--duration", required=True, type=float, dest="duration", parser.add_argument("-p", "--duration", required=True, type=float, dest="duration",
help="Duration of data dump (upload) in milli-seconds.") help="Duration of data dump (upload) in seconds.")
parser.add_argument("-w", "--wait_time", type=float, dest="wait_time_between_sub_bands", parser.add_argument("-w", "--wait_time", type=float, dest="wait_time_between_sub_bands",
help="Time in seconds that will be waited between uploading a sub-band. The default is: t = duration * 0.00012") help="Time in seconds that will be waited between uploading a sub-band. The default is: t = duration * 0.00012")
args = parser.parse_args() args = parser.parse_args()
...@@ -93,7 +94,7 @@ def parse_args(): ...@@ -93,7 +94,7 @@ def parse_args():
if args.wait_time_between_sub_bands is not None and args.wait_time_between_sub_bands > 0.0: if args.wait_time_between_sub_bands is not None and args.wait_time_between_sub_bands > 0.0:
args.wait_time = float(args.wait_time_between_sub_bands) args.wait_time = float(args.wait_time_between_sub_bands)
else: else:
args.wait_time = args.duration / 1000.0 * 0.00012 args.wait_time = args.duration * 0.00012
return args return args
......
...@@ -33,7 +33,7 @@ def get_cpu_nodes_running_tbb_datawriter(timeout=60): ...@@ -33,7 +33,7 @@ def get_cpu_nodes_running_tbb_datawriter(timeout=60):
:return: list cpu node numbers (ints) :return: list cpu node numbers (ints)
''' '''
result = [] result = []
available_nodes = get_cep4_available_cpu_nodes() available_nodes = get_cep4_up_and_running_cpu_nodes()
procs = {} procs = {}
start = datetime.utcnow() start = datetime.utcnow()
...@@ -64,7 +64,7 @@ def get_cpu_nodes_available_for_tbb_datawriters_sorted_by_load(min_nr_of_free_no ...@@ -64,7 +64,7 @@ def get_cpu_nodes_available_for_tbb_datawriters_sorted_by_load(min_nr_of_free_no
:param float max_normalized_load: filter free nodes which are at most max_normalized_load :param float max_normalized_load: filter free nodes which are at most max_normalized_load
:return: list of node numbers :return: list of node numbers
""" """
all_available_nodes = set(get_cep4_available_cpu_nodes()) all_available_nodes = set(get_cep4_up_and_running_cpu_nodes())
nodes_running_datawriters1 = set(get_cpu_nodes_running_tbb_datawriter_via_slurm()) nodes_running_datawriters1 = set(get_cpu_nodes_running_tbb_datawriter_via_slurm())
nodes_running_datawriters2 = set(get_cpu_nodes_running_tbb_datawriter()) nodes_running_datawriters2 = set(get_cpu_nodes_running_tbb_datawriter())
nodes_running_datawriters = nodes_running_datawriters1.union(nodes_running_datawriters2) nodes_running_datawriters = nodes_running_datawriters1.union(nodes_running_datawriters2)
......
...@@ -242,6 +242,9 @@ class ALERTHandler(VOEventListenerInterface): ...@@ -242,6 +242,9 @@ class ALERTHandler(VOEventListenerInterface):
super(ALERTHandler, self).stop_listening() super(ALERTHandler, self).stop_listening()
def handle_event(self, voevent_xml, voevent_etree): def handle_event(self, voevent_xml, voevent_etree):
if voevent_xml is None or voevent_etree is None:
logger.warning("skipping empty vo_event")
return
identifier = voevent_etree.attrib['ivorn'] identifier = voevent_etree.attrib['ivorn']
logger.info('Handling new ALERT event %s...' % identifier) logger.info('Handling new ALERT event %s...' % identifier)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment