Skip to content
Snippets Groups Projects

SW-824: Resolve SW-824

Merged Jorrit Schaap requested to merge SW-824 into LOFAR-Release-4_0
1 unresolved thread
Files
2
+ 71
21
@@ -53,7 +53,9 @@ def wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(cmd, via_h
:param bool via_head: when True, route the cmd first via the cep4 head node
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
'''
lowest_load_node_nr = get_cep4_available_cpu_node_with_lowest_load()
lowest_load_node_nr = get_cep4_cpu_node_with_lowest_load()
if lowest_load_node_nr is None:
raise RuntimeError("No cpu node available to run the cmd on. cmd=%s" %(cmd,))
return wrap_command_in_cep4_cpu_node_ssh_call(cmd, lowest_load_node_nr, via_head=via_head)
def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True):
@@ -99,29 +101,50 @@ def wrap_command_for_docker(cmd, image_name, image_label='', mount_dirs=['/data'
dockerized_cmd += cmd
return dockerized_cmd
def get_cep4_available_cpu_nodes(include_allocated_nodes=False):
# a selection of slurm states relevant for lofar usage
SLURM_AVAILABLE_STATES = {'idle','mix'}
SLURM_IN_USES_STATES = {'alloc','mix'}
SLURM_UNAVAILABLE_STATES = {'down','drain','drng','resv','maint'}
SLURM_STATES = SLURM_AVAILABLE_STATES | SLURM_IN_USES_STATES | SLURM_UNAVAILABLE_STATES
# lofar cep4 default slurm partitions
SLURM_CPU_PARTITION = 'cpu'
SLURM_GPU_PARTITION = 'gpu'
SLURM_PARTITIONS = {SLURM_CPU_PARTITION, SLURM_GPU_PARTITION}
def get_cep4_slurm_nodes(states, partition: str) -> []:
'''
get a list of cep4 cpu nodes which are currently up and running according to slurm
:return: a list of cpu node numbers (ints) for the up and running cpu nodes
get a list of cep4 nodes from the given partition which have (one of) the given states according to slurm
states: set/list of slurm states, see predefined sets: SLURM_AVAILABLE_STATES, SLURM_IN_USES_STATES, SLURM_UNAVAILABLE_STATES
partition: one of the known SLURM_PARTITIONS
:return: a list of node numbers (ints) of the nodes in the partition
'''
available_cep4_nodes = []
slurm_nodes = []
try:
logger.debug('determining available cep4 cpu nodes')
# filter out unknown states
_states = [s for s in states if s in SLURM_STATES]
if not _states:
raise ValueError("the given states:%s are not valid slurm states:%s" % (states, SLURM_STATES))
if partition not in SLURM_PARTITIONS:
raise ValueError("the given partition:%s is not one of the valid cep4 slurm partition:%s" % (partition, SLURM_PARTITIONS))
logger.debug('determining available cep4 nodes states:%s, partition:%s', _states, partition)
# find out which nodes are available
cmd = ['sinfo -p cpu -t idle,mix%s' % (',alloc' if include_allocated_nodes else '')]
cmd = ['sinfo -p %s -t %s' % (partition, ','.join(_states))]
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
logger.debug('executing command: %s', ' '.join(cmd))
out = check_output_returning_strings(cmd)
lines = out.split('\n')
for state in ['idle', 'mix', 'alloc']:
for state in _states:
try:
line = next(l for l in lines if state in l).strip()
# get nodes string part of line:
nodes_part = line.split(' ')[-1]
available_cep4_nodes += convert_slurm_nodes_string_to_node_number_list(nodes_part)
slurm_nodes += convert_slurm_nodes_string_to_node_number_list(nodes_part)
except StopIteration:
pass # no line with state in line
@@ -129,13 +152,34 @@ def get_cep4_available_cpu_nodes(include_allocated_nodes=False):
except Exception as e:
logger.exception(e)
available_cep4_nodes = sorted(list(set(available_cep4_nodes)))
logger.debug('available cep4 cpu nodes: %s', ','.join(str(x) for x in available_cep4_nodes))
slurm_nodes = sorted(list(set(slurm_nodes)))
logger.debug('cep4 nodes with states:%s in partition:%s are: %s', _states, partition, slurm_nodes)
return slurm_nodes
def get_cep4_available_cpu_nodes():
'''
get a list of cep4 cpu nodes which are currently up and running (either totally free, or partially free) according to slurm
:return: a list of cpu node numbers (ints)
'''
available_cep4_nodes = get_cep4_slurm_nodes(SLURM_AVAILABLE_STATES, SLURM_CPU_PARTITION)
if not available_cep4_nodes:
logger.warning('no cep4 cpu nodes available')
return available_cep4_nodes
def get_cep4_up_and_running_cpu_nodes():
'''
get a list of cep4 cpu nodes which are currently up and running (either totally free, partially free, or totally used) according to slurm
:return: a list of cpu node numbers (ints)
'''
cep4_nodes = get_cep4_slurm_nodes(SLURM_AVAILABLE_STATES | SLURM_IN_USES_STATES, SLURM_CPU_PARTITION)
if not cep4_nodes:
logger.warning('no cep4 cpu nodes up and running')
return cep4_nodes
def convert_slurm_nodes_string_to_node_number_list(slurm_string):
''' converts strings like: cpu[01-03,11-12]' to [1,2,3,11,12]
or 'cpu01' to [1]
@@ -168,15 +212,15 @@ def convert_slurm_nodes_string_to_node_number_list(slurm_string):
result.append(node)
return result
def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False, include_allocated_nodes=False):
def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False):
'''
get the 5min load for each given cep4 cpu node nr
:param node_nrs: optional list of node numbers to get the load for. If None, then all available nodes are queried.
:param node_nrs: optional list of node numbers to get the load for. If None, then all up-and-running nodes are queried.
:param bool normalized: when True, then normalize the loads with the number of cores.
:return: dict with node_nr -> load mapping
'''
if node_nrs == None:
node_nrs = get_cep4_available_cpu_nodes(include_allocated_nodes=include_allocated_nodes)
node_nrs = get_cep4_up_and_running_cpu_nodes()
procs = {}
loads = {}
@@ -224,17 +268,17 @@ def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False, include_allocated_
', '.join('cpu%02d:%.3f' % (nr, loads[nr]) for nr in sorted(loads.keys())))
return loads
def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.33, min_nr_of_nodes=0, node_nrs=None, include_allocated_nodes=False):
def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.33, min_nr_of_nodes=0, node_nrs=None):
'''
get the cep4 available cpu node numbers sorted ascending by load (5min).
:param float max_normalized_load: filter available nodes which are at most max_normalized_load
:param int min_nr_of_nodes: do return this minimum number of nodes, even if their load is higher than max_normalized_load
If not enough nodes are up, then of course it cannot be guaranteed that we return this amount.
:param list node_nrs: optional list of node numbers to apply the filtering on. If None, then all available nodes are queried.
:param list node_nrs: optional list of node numbers to apply the filtering on. If None, then all up-and-running nodes are queried.
:return: sorted list of node numbers.
'''
if not node_nrs:
node_nrs = get_cep4_available_cpu_nodes(include_allocated_nodes=include_allocated_nodes)
node_nrs = get_cep4_up_and_running_cpu_nodes()
loads = get_cep4_cpu_nodes_loads(node_nrs, normalized=True)
load_tuples_list = [(cpu_nr,load) for cpu_nr,load in list(loads.items())]
sorted_load_tuples_list = sorted(load_tuples_list, key=lambda x: x[1])
@@ -249,15 +293,21 @@ def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.
max_normalized_load, min_nr_of_nodes, sorted_node_nrs)
return sorted_node_nrs
def get_cep4_available_cpu_node_with_lowest_load(max_normalized_load=0.33, include_allocated_nodes=False):
def get_cep4_cpu_node_with_lowest_load(max_normalized_load=0.33):
'''
get the cep4 cpu node which is available and has the lowest (5min) load of them all.
:param float max_normalized_load: filter available nodes which a at most max_normalized_load
get the cep4 cpu node which has the lowest (5min) load of them all. Preferably a node which is not fully used yet.
:param float max_normalized_load: filter nodes which a at most max_normalized_load
:return: the node number (int) with the lowest load.
'''
# first see if there are any not-fully-used nodes available
nodes = get_cep4_available_cpu_nodes()
if not nodes: # if not, then just query all up and running nodes
nodes = get_cep4_up_and_running_cpu_nodes()
node_nrs = get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=max_normalized_load,
min_nr_of_nodes=1,
include_allocated_nodes=include_allocated_nodes)
node_nrs=nodes)
if node_nrs:
logger.debug('cep4 cpu node with lowest load: %s', node_nrs[0])
return node_nrs[0]
Loading