Skip to content
Snippets Groups Projects
Commit f9561b19 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-488: improved determination of available cep4 nodes

parent ebe2cb07
No related branches found
No related tags found
No related merge requests found
...@@ -32,7 +32,8 @@ def wrap_command_in_cep4_head_node_ssh_call(cmd): ...@@ -32,7 +32,8 @@ def wrap_command_in_cep4_head_node_ssh_call(cmd):
cpu node. Otherwise, the command is executed on the head node. cpu node. Otherwise, the command is executed on the head node.
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
''' '''
return ssh_cmd_list(user='lofarsys', host='head.cep4.control.lofar') + cmd ssh_cmd = ssh_cmd_list(user='lofarsys', host='head.cep4.control.lofar')
return ssh_cmd + ([cmd] if isinstance(cmd, basestring) else cmd)
def wrap_command_in_cep4_random_cpu_node_ssh_call(cmd, via_head=True): def wrap_command_in_cep4_random_cpu_node_ssh_call(cmd, via_head=True):
'''wrap the command in an ssh call an available random cep4 cpu node (via head.cep4) '''wrap the command in an ssh call an available random cep4 cpu node (via head.cep4)
...@@ -61,7 +62,8 @@ def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True): ...@@ -61,7 +62,8 @@ def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True):
:param bool via_head: when True, route the cmd first via the cep4 head node :param bool via_head: when True, route the cmd first via the cep4 head node
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
''' '''
remote_cmd = ssh_cmd_list(host='cpu%02d.cep4' % cpu_node_nr, user='lofarsys') + cmd ssh_cmd = ssh_cmd_list(host='cpu%02d.cep4' % cpu_node_nr, user='lofarsys')
remote_cmd = ssh_cmd + ([cmd] if isinstance(cmd, basestring) else cmd)
if via_head: if via_head:
return wrap_command_in_cep4_head_node_ssh_call(remote_cmd) return wrap_command_in_cep4_head_node_ssh_call(remote_cmd)
else: else:
...@@ -118,21 +120,7 @@ def get_cep4_available_cpu_nodes(): ...@@ -118,21 +120,7 @@ def get_cep4_available_cpu_nodes():
line = next(l for l in lines if state in l).strip() line = next(l for l in lines if state in l).strip()
# get nodes string part of line: # get nodes string part of line:
nodes_part = line.split(' ')[-1] nodes_part = line.split(' ')[-1]
if '[' in nodes_part: available_cep4_nodes += convert_slurm_nodes_string_to_node_number_list(nodes_part)
# example: line='cpu* up infinite 42 mix cpu[01-17,23-47]'
# then: nodes='01-17,23-47'
nodes = nodes_part[4:-1]
for part in nodes.split(','):
if '-' in part:
lower, sep, upper = part.partition('-')
available_cep4_nodes += list(range(int(lower), int(upper) + 1))
else:
available_cep4_nodes.append(int(part))
else:
# example: line='cpu* up infinite 42 mix cpu01'
# then: nodes='01'
node = int(nodes_part[3:])
available_cep4_nodes += [node]
except StopIteration: except StopIteration:
pass # no line with state in line pass # no line with state in line
...@@ -147,6 +135,35 @@ def get_cep4_available_cpu_nodes(): ...@@ -147,6 +135,35 @@ def get_cep4_available_cpu_nodes():
return available_cep4_nodes return available_cep4_nodes
def convert_slurm_nodes_string_to_node_number_list(slurm_string):
''' converts strings like: cpu[01-03,11-12]' to [1,2,3,11,12]
or 'cpu01' to [1]
:param slurm_string: a string in 'slurm-like' node format, like cpu[01-03,11-12] or cpu01
:return: a list of node numbers (ints)
'''
result = []
stripped_slurm_string = slurm_string.strip()
left_bracket_idx = stripped_slurm_string.find('[')
right_bracket_idx = stripped_slurm_string.find(']', left_bracket_idx)
if left_bracket_idx != -1 and right_bracket_idx != -1:
# example: cpu[01-17,23-47]'
# then: nodes='01-17,23-47'
nodes_string = stripped_slurm_string[left_bracket_idx+1:right_bracket_idx]
for part in nodes_string.split(','):
if '-' in part:
lower, sep, upper = part.partition('-')
result += list(range(int(lower), int(upper) + 1))
else:
result.append(int(part))
else:
# example: 'cpu01'
# then: nodes='01'
# assume all nodes always start with 'cpu' (which is the case on cep4)
node = int(stripped_slurm_string[3:])
result.append(node)
return result
def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False): def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False):
''' '''
get the 5min load for each given cep4 cpu node nr get the 5min load for each given cep4 cpu node nr
...@@ -203,14 +220,17 @@ def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False): ...@@ -203,14 +220,17 @@ def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False):
', '.join('cpu%02d:%.3f' % (nr, loads[nr]) for nr in sorted(loads.keys()))) ', '.join('cpu%02d:%.3f' % (nr, loads[nr]) for nr in sorted(loads.keys())))
return loads return loads
def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.33, min_nr_of_nodes=0): def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.33, min_nr_of_nodes=0, node_nrs=None):
''' '''
get the cep4 available cpu node numbers sorted ascending by load (5min). get the cep4 available cpu node numbers sorted ascending by load (5min).
:param float max_normalized_load: filter available nodes which a at most max_normalized_load :param float max_normalized_load: filter available nodes which are at most max_normalized_load
:param int min_nr_of_nodes: do return this minimum number of nodes, even if their load is higher than max_normalized_load :param int min_nr_of_nodes: do return this minimum number of nodes, even if their load is higher than max_normalized_load
If not enough nodes are up, then of course it cannot be guaranteed that we return this amount.
:param list node_nrs: optional list of node numbers to apply the filtering on. If None, then all available nodes are queried.
:return: sorted list of node numbers. :return: sorted list of node numbers.
''' '''
node_nrs = get_cep4_available_cpu_nodes() if not node_nrs:
node_nrs = get_cep4_available_cpu_nodes()
loads = get_cep4_cpu_nodes_loads(node_nrs, normalized=True) loads = get_cep4_cpu_nodes_loads(node_nrs, normalized=True)
load_tuples_list = [(cpu_nr,load) for cpu_nr,load in loads.items()] load_tuples_list = [(cpu_nr,load) for cpu_nr,load in loads.items()]
sorted_load_tuples_list = sorted(load_tuples_list, key=lambda x: x[1]) sorted_load_tuples_list = sorted(load_tuples_list, key=lambda x: x[1])
...@@ -238,15 +258,20 @@ def get_cep4_available_cpu_node_with_lowest_load(max_normalized_load=0.33): ...@@ -238,15 +258,20 @@ def get_cep4_available_cpu_node_with_lowest_load(max_normalized_load=0.33):
return node_nrs[0] return node_nrs[0]
return None return None
def parallelize_cmd_over_cep4_cpu_nodes(cmd, parallelizable_option, parallelizable_option_values, timeout=3600): def parallelize_cmd_over_cep4_cpu_nodes(cmd, parallelizable_option, parallelizable_option_values,
max_normalized_load=0.5, min_nr_of_nodes=1,
timeout=3600):
'''run the given cmd in parallel on multiple available cpu nodes. '''run the given cmd in parallel on multiple available cpu nodes.
:param list cmd: a subprocess cmd list :param list cmd: a subprocess cmd list
:param string parallelizable_option: the option which is given to the parallelized cmd for a subset of the parallelizable_option_values :param string parallelizable_option: the option which is given to the parallelized cmd for a subset of the parallelizable_option_values
:param list parallelizable_option_values: the list of values which is chunked for the parallelized cmd for the parallelizable_option :param list parallelizable_option_values: the list of values which is chunked for the parallelized cmd for the parallelizable_option
:param float max_normalized_load: filter available nodes which have at most max_normalized_load
:param int min_nr_of_nodes: run on this minimum number of nodes, even if their load is higher than max_normalized_load
:param int timeout: timeout in seconds after which the workers are killed :param int timeout: timeout in seconds after which the workers are killed
:return: True if all processes on all cpu nodes exited ok, else False :return: True if all processes on all cpu nodes exited ok, else False
''' '''
available_cep4_nodes = get_cep4_available_cpu_nodes_sorted_ascending_by_load() available_cep4_nodes = get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=max_normalized_load,
min_nr_of_nodes=min_nr_of_nodes)
if len(available_cep4_nodes) == 0: if len(available_cep4_nodes) == 0:
logger.warning('No cep4 cpu nodes available..') logger.warning('No cep4 cpu nodes available..')
...@@ -319,5 +344,8 @@ def parallelize_cmd_over_cep4_cpu_nodes(cmd, parallelizable_option, parallelizab ...@@ -319,5 +344,8 @@ def parallelize_cmd_over_cep4_cpu_nodes(cmd, parallelizable_option, parallelizab
return success return success
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.DEBUG)
print get_cep4_available_cpu_nodes_sorted_ascending_by_load(min_nr_of_nodes=4, max_normalized_load=0.01) print convert_slurm_nodes_string_to_node_number_list(' \t cpu[20-39,41,45-48] ')
print convert_slurm_nodes_string_to_node_number_list(' \t cpu03 ')
print get_cep4_available_cpu_nodes()
print get_cep4_available_cpu_nodes_sorted_ascending_by_load(min_nr_of_nodes=3)
\ No newline at end of file
...@@ -35,7 +35,8 @@ def wrap_command_in_lcu_head_node_ssh_call(cmd): ...@@ -35,7 +35,8 @@ def wrap_command_in_lcu_head_node_ssh_call(cmd):
cpu node. Otherwise, the command is executed on the head node. cpu node. Otherwise, the command is executed on the head node.
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
''' '''
return ssh_cmd_list('lcuhead.control.lofar', 'lofarsys') + cmd ssh_cmd = ssh_cmd_list('lcuhead.control.lofar', 'lofarsys')
return ssh_cmd + ([cmd] if isinstance(cmd, basestring) else cmd)
def wrap_command_in_lcu_station_ssh_call(cmd, station, via_head=True): def wrap_command_in_lcu_station_ssh_call(cmd, station, via_head=True):
'''wrap the command in an ssh call the given station lcu node (via lcuhead) '''wrap the command in an ssh call the given station lcu node (via lcuhead)
...@@ -44,8 +45,8 @@ def wrap_command_in_lcu_station_ssh_call(cmd, station, via_head=True): ...@@ -44,8 +45,8 @@ def wrap_command_in_lcu_station_ssh_call(cmd, station, via_head=True):
:param bool via_head: when True, route the cmd first via the lcuhead node :param bool via_head: when True, route the cmd first via the lcuhead node
:return: the same subprocess cmd list, but then wrapped with lcu ssh calls :return: the same subprocess cmd list, but then wrapped with lcu ssh calls
''' '''
cmd_list = cmd if isinstance(cmd, list) else [cmd] ssh_cmd = ssh_cmd_list(stationname2hostname(station), 'lofarsys')
remote_cmd = ssh_cmd_list(stationname2hostname(station), 'lofarsys') + cmd_list remote_cmd = ssh_cmd + ([cmd] if isinstance(cmd, basestring) else cmd)
if via_head: if via_head:
return wrap_command_in_lcu_head_node_ssh_call(remote_cmd) return wrap_command_in_lcu_head_node_ssh_call(remote_cmd)
...@@ -384,4 +385,4 @@ if __name__ == '__main__': ...@@ -384,4 +385,4 @@ if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)
import pprint import pprint
pprint.pprint(get_station_cable_delays(['CS004', 'CS005'])) pprint.pprint(get_station_cable_delays(['CS004', 'CS005']))
#print get_station_calibration_tables(['CS001', 'RS407'], antenna_set_and_filter='LBA_INNER-10_90') #['CS001', 'DE601']) print get_station_calibration_tables(['CS001', 'RS407'], antenna_set_and_filter='LBA_INNER-10_90') #['CS001', 'DE601'])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment