Skip to content
Snippets Groups Projects
Commit 43c2f7cc authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-828: replaced use of flag-argurment by a generic get_cep4_slurm_nodes...

SW-828: replaced use of flag-argurment by a generic get_cep4_slurm_nodes method + specific methods get_cep4_available_cpu_nodes and get_cep4_up_and_running_cpu_nodes
parent 5f2de96d
No related branches found
No related tags found
2 merge requests!74Lofar release 4 0,!73SW-824: Resolve SW-824
...@@ -53,7 +53,9 @@ def wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(cmd, via_h ...@@ -53,7 +53,9 @@ def wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(cmd, via_h
:param bool via_head: when True, route the cmd first via the cep4 head node :param bool via_head: when True, route the cmd first via the cep4 head node
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
''' '''
lowest_load_node_nr = get_cep4_available_cpu_node_with_lowest_load() lowest_load_node_nr = get_cep4_cpu_node_with_lowest_load()
if lowest_load_node_nr is None:
raise RuntimeError("No cpu node available to run the cmd on. cmd=%s" %(cmd,))
return wrap_command_in_cep4_cpu_node_ssh_call(cmd, lowest_load_node_nr, via_head=via_head) return wrap_command_in_cep4_cpu_node_ssh_call(cmd, lowest_load_node_nr, via_head=via_head)
def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True): def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True):
...@@ -99,29 +101,50 @@ def wrap_command_for_docker(cmd, image_name, image_label='', mount_dirs=['/data' ...@@ -99,29 +101,50 @@ def wrap_command_for_docker(cmd, image_name, image_label='', mount_dirs=['/data'
dockerized_cmd += cmd dockerized_cmd += cmd
return dockerized_cmd return dockerized_cmd
def get_cep4_available_cpu_nodes(include_allocated_nodes=False): # a selection of slurm states relevant for lofar usage
SLURM_AVAILABLE_STATES = {'idle','mix'}
SLURM_IN_USES_STATES = {'alloc','mix'}
SLURM_UNAVAILABLE_STATES = {'down','drain','drng','resv','maint'}
SLURM_STATES = SLURM_AVAILABLE_STATES | SLURM_IN_USES_STATES | SLURM_UNAVAILABLE_STATES
# lofar cep4 default slurm partitions
SLURM_CPU_PARTITION = 'cpu'
SLURM_GPU_PARTITION = 'gpu'
SLURM_PARTITIONS = {SLURM_CPU_PARTITION, SLURM_GPU_PARTITION}
def get_cep4_slurm_nodes(states, partition: str) -> []:
''' '''
get a list of cep4 cpu nodes which are currently up and running according to slurm get a list of cep4 nodes from the given partition which have (one of) the given states according to slurm
:return: a list of cpu node numbers (ints) for the up and running cpu nodes states: set/list of slurm states, see predefined sets: SLURM_AVAILABLE_STATES, SLURM_IN_USES_STATES, SLURM_UNAVAILABLE_STATES
partition: one of the known SLURM_PARTITIONS
:return: a list of node numbers (ints) of the nodes in the partition
''' '''
available_cep4_nodes = [] slurm_nodes = []
try: try:
logger.debug('determining available cep4 cpu nodes') # filter out unknown states
_states = [s for s in states if s in SLURM_STATES]
if not _states:
raise ValueError("the given states:%s are not valid slurm states:%s" % (states, SLURM_STATES))
if partition not in SLURM_PARTITIONS:
raise ValueError("the given partition:%s is not one of the valid cep4 slurm partition:%s" % (partition, SLURM_PARTITIONS))
logger.debug('determining available cep4 nodes states:%s, partition:%s', _states, partition)
# find out which nodes are available # find out which nodes are available
cmd = ['sinfo -p cpu -t idle,mix%s' % (',alloc' if include_allocated_nodes else '')] cmd = ['sinfo -p %s -t %s' % (partition, ','.join(_states))]
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd) cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
logger.debug('executing command: %s', ' '.join(cmd)) logger.debug('executing command: %s', ' '.join(cmd))
out = check_output_returning_strings(cmd) out = check_output_returning_strings(cmd)
lines = out.split('\n') lines = out.split('\n')
for state in ['idle', 'mix', 'alloc']: for state in _states:
try: try:
line = next(l for l in lines if state in l).strip() line = next(l for l in lines if state in l).strip()
# get nodes string part of line: # get nodes string part of line:
nodes_part = line.split(' ')[-1] nodes_part = line.split(' ')[-1]
available_cep4_nodes += convert_slurm_nodes_string_to_node_number_list(nodes_part) slurm_nodes += convert_slurm_nodes_string_to_node_number_list(nodes_part)
except StopIteration: except StopIteration:
pass # no line with state in line pass # no line with state in line
...@@ -129,13 +152,34 @@ def get_cep4_available_cpu_nodes(include_allocated_nodes=False): ...@@ -129,13 +152,34 @@ def get_cep4_available_cpu_nodes(include_allocated_nodes=False):
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
available_cep4_nodes = sorted(list(set(available_cep4_nodes))) slurm_nodes = sorted(list(set(slurm_nodes)))
logger.debug('available cep4 cpu nodes: %s', ','.join(str(x) for x in available_cep4_nodes)) logger.debug('cep4 nodes with states:%s in partition:%s are: %s', _states, partition, slurm_nodes)
return slurm_nodes
def get_cep4_available_cpu_nodes():
'''
get a list of cep4 cpu nodes which are currently up and running (either totally free, or partially free) according to slurm
:return: a list of cpu node numbers (ints)
'''
available_cep4_nodes = get_cep4_slurm_nodes(SLURM_AVAILABLE_STATES, SLURM_CPU_PARTITION)
if not available_cep4_nodes: if not available_cep4_nodes:
logger.warning('no cep4 cpu nodes available') logger.warning('no cep4 cpu nodes available')
return available_cep4_nodes return available_cep4_nodes
def get_cep4_up_and_running_cpu_nodes():
'''
get a list of cep4 cpu nodes which are currently up and running (either totally free, partially free, or totally used) according to slurm
:return: a list of cpu node numbers (ints)
'''
cep4_nodes = get_cep4_slurm_nodes(SLURM_AVAILABLE_STATES | SLURM_IN_USES_STATES, SLURM_CPU_PARTITION)
if not cep4_nodes:
logger.warning('no cep4 cpu nodes up and running')
return cep4_nodes
def convert_slurm_nodes_string_to_node_number_list(slurm_string): def convert_slurm_nodes_string_to_node_number_list(slurm_string):
''' converts strings like: cpu[01-03,11-12]' to [1,2,3,11,12] ''' converts strings like: cpu[01-03,11-12]' to [1,2,3,11,12]
or 'cpu01' to [1] or 'cpu01' to [1]
...@@ -168,15 +212,15 @@ def convert_slurm_nodes_string_to_node_number_list(slurm_string): ...@@ -168,15 +212,15 @@ def convert_slurm_nodes_string_to_node_number_list(slurm_string):
result.append(node) result.append(node)
return result return result
def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False, include_allocated_nodes=False): def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False):
''' '''
get the 5min load for each given cep4 cpu node nr get the 5min load for each given cep4 cpu node nr
:param node_nrs: optional list of node numbers to get the load for. If None, then all available nodes are queried. :param node_nrs: optional list of node numbers to get the load for. If None, then all up-and-running nodes are queried.
:param bool normalized: when True, then normalize the loads with the number of cores. :param bool normalized: when True, then normalize the loads with the number of cores.
:return: dict with node_nr -> load mapping :return: dict with node_nr -> load mapping
''' '''
if node_nrs == None: if node_nrs == None:
node_nrs = get_cep4_available_cpu_nodes(include_allocated_nodes=include_allocated_nodes) node_nrs = get_cep4_up_and_running_cpu_nodes()
procs = {} procs = {}
loads = {} loads = {}
...@@ -224,17 +268,17 @@ def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False, include_allocated_ ...@@ -224,17 +268,17 @@ def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False, include_allocated_
', '.join('cpu%02d:%.3f' % (nr, loads[nr]) for nr in sorted(loads.keys()))) ', '.join('cpu%02d:%.3f' % (nr, loads[nr]) for nr in sorted(loads.keys())))
return loads return loads
def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.33, min_nr_of_nodes=0, node_nrs=None, include_allocated_nodes=False): def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.33, min_nr_of_nodes=0, node_nrs=None):
''' '''
get the cep4 available cpu node numbers sorted ascending by load (5min). get the cep4 available cpu node numbers sorted ascending by load (5min).
:param float max_normalized_load: filter available nodes which are at most max_normalized_load :param float max_normalized_load: filter available nodes which are at most max_normalized_load
:param int min_nr_of_nodes: do return this minimum number of nodes, even if their load is higher than max_normalized_load :param int min_nr_of_nodes: do return this minimum number of nodes, even if their load is higher than max_normalized_load
If not enough nodes are up, then of course it cannot be guaranteed that we return this amount. If not enough nodes are up, then of course it cannot be guaranteed that we return this amount.
:param list node_nrs: optional list of node numbers to apply the filtering on. If None, then all available nodes are queried. :param list node_nrs: optional list of node numbers to apply the filtering on. If None, then all up-and-running nodes are queried.
:return: sorted list of node numbers. :return: sorted list of node numbers.
''' '''
if not node_nrs: if not node_nrs:
node_nrs = get_cep4_available_cpu_nodes(include_allocated_nodes=include_allocated_nodes) node_nrs = get_cep4_up_and_running_cpu_nodes()
loads = get_cep4_cpu_nodes_loads(node_nrs, normalized=True) loads = get_cep4_cpu_nodes_loads(node_nrs, normalized=True)
load_tuples_list = [(cpu_nr,load) for cpu_nr,load in list(loads.items())] load_tuples_list = [(cpu_nr,load) for cpu_nr,load in list(loads.items())]
sorted_load_tuples_list = sorted(load_tuples_list, key=lambda x: x[1]) sorted_load_tuples_list = sorted(load_tuples_list, key=lambda x: x[1])
...@@ -249,15 +293,21 @@ def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0. ...@@ -249,15 +293,21 @@ def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.
max_normalized_load, min_nr_of_nodes, sorted_node_nrs) max_normalized_load, min_nr_of_nodes, sorted_node_nrs)
return sorted_node_nrs return sorted_node_nrs
def get_cep4_available_cpu_node_with_lowest_load(max_normalized_load=0.33, include_allocated_nodes=False): def get_cep4_cpu_node_with_lowest_load(max_normalized_load=0.33):
''' '''
get the cep4 cpu node which is available and has the lowest (5min) load of them all. get the cep4 cpu node which has the lowest (5min) load of them all. Preferably a node which is not fully used yet.
:param float max_normalized_load: filter available nodes which a at most max_normalized_load :param float max_normalized_load: filter nodes which a at most max_normalized_load
:return: the node number (int) with the lowest load. :return: the node number (int) with the lowest load.
''' '''
# first see if there are any not-fully-used nodes available
nodes = get_cep4_available_cpu_nodes()
if not nodes: # if not, then just query all up and running nodes
nodes = get_cep4_up_and_running_cpu_nodes()
node_nrs = get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=max_normalized_load, node_nrs = get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=max_normalized_load,
min_nr_of_nodes=1, min_nr_of_nodes=1,
include_allocated_nodes=include_allocated_nodes) node_nrs=nodes)
if node_nrs: if node_nrs:
logger.debug('cep4 cpu node with lowest load: %s', node_nrs[0]) logger.debug('cep4 cpu node with lowest load: %s', node_nrs[0])
return node_nrs[0] return node_nrs[0]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment