diff --git a/LCS/PyCommon/cep4_utils.py b/LCS/PyCommon/cep4_utils.py index 4793ea63676b8e105092c0b3a5ca7a8ea63d4bc2..0dec675b9872b023680057f4b310aa187a236c7f 100755 --- a/LCS/PyCommon/cep4_utils.py +++ b/LCS/PyCommon/cep4_utils.py @@ -53,7 +53,9 @@ def wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(cmd, via_h :param bool via_head: when True, route the cmd first via the cep4 head node :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls ''' - lowest_load_node_nr = get_cep4_available_cpu_node_with_lowest_load() + lowest_load_node_nr = get_cep4_cpu_node_with_lowest_load() + if lowest_load_node_nr is None: + raise RuntimeError("No cpu node available to run the cmd on. cmd=%s" %(cmd,)) return wrap_command_in_cep4_cpu_node_ssh_call(cmd, lowest_load_node_nr, via_head=via_head) def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True): @@ -99,29 +101,50 @@ def wrap_command_for_docker(cmd, image_name, image_label='', mount_dirs=['/data' dockerized_cmd += cmd return dockerized_cmd -def get_cep4_available_cpu_nodes(include_allocated_nodes=False): +# a selection of slurm states relevant for lofar usage +SLURM_AVAILABLE_STATES = {'idle','mix'} +SLURM_IN_USES_STATES = {'alloc','mix'} +SLURM_UNAVAILABLE_STATES = {'down','drain','drng','resv','maint'} +SLURM_STATES = SLURM_AVAILABLE_STATES | SLURM_IN_USES_STATES | SLURM_UNAVAILABLE_STATES + +# lofar cep4 default slurm partitions +SLURM_CPU_PARTITION = 'cpu' +SLURM_GPU_PARTITION = 'gpu' +SLURM_PARTITIONS = {SLURM_CPU_PARTITION, SLURM_GPU_PARTITION} + +def get_cep4_slurm_nodes(states, partition: str) -> []: ''' - get a list of cep4 cpu nodes which are currently up and running according to slurm - :return: a list of cpu node numbers (ints) for the up and running cpu nodes + get a list of cep4 nodes from the given partition which have (one of) the given states according to slurm + states: set/list of slurm states, see predefined sets: SLURM_AVAILABLE_STATES, SLURM_IN_USES_STATES, SLURM_UNAVAILABLE_STATES + partition: one of the known SLURM_PARTITIONS + :return: a list of node numbers (ints) of the nodes in the partition ''' - available_cep4_nodes = [] + slurm_nodes = [] try: - logger.debug('determining available cep4 cpu nodes') + # filter out unknown states + _states = [s for s in states if s in SLURM_STATES] + if not _states: + raise ValueError("the given states:%s are not valid slurm states:%s" % (states, SLURM_STATES)) + + if partition not in SLURM_PARTITIONS: + raise ValueError("the given partition:%s is not one of the valid cep4 slurm partition:%s" % (partition, SLURM_PARTITIONS)) + + logger.debug('determining available cep4 nodes states:%s, partition:%s', _states, partition) # find out which nodes are available - cmd = ['sinfo -p cpu -t idle,mix%s' % (',alloc' if include_allocated_nodes else '')] + cmd = ['sinfo -p %s -t %s' % (partition, ','.join(_states))] cmd = wrap_command_in_cep4_head_node_ssh_call(cmd) logger.debug('executing command: %s', ' '.join(cmd)) out = check_output_returning_strings(cmd) lines = out.split('\n') - for state in ['idle', 'mix', 'alloc']: + for state in _states: try: line = next(l for l in lines if state in l).strip() # get nodes string part of line: nodes_part = line.split(' ')[-1] - available_cep4_nodes += convert_slurm_nodes_string_to_node_number_list(nodes_part) + slurm_nodes += convert_slurm_nodes_string_to_node_number_list(nodes_part) except StopIteration: pass # no line with state in line @@ -129,13 +152,34 @@ def get_cep4_available_cpu_nodes(include_allocated_nodes=False): except Exception as e: logger.exception(e) - available_cep4_nodes = sorted(list(set(available_cep4_nodes))) - logger.debug('available cep4 cpu nodes: %s', ','.join(str(x) for x in available_cep4_nodes)) + slurm_nodes = sorted(list(set(slurm_nodes))) + logger.debug('cep4 nodes with states:%s in partition:%s are: %s', _states, partition, slurm_nodes) + return slurm_nodes + +def get_cep4_available_cpu_nodes(): + ''' + get a list of cep4 cpu nodes which are currently up and running (either totally free, or partially free) according to slurm + :return: a list of cpu node numbers (ints) + ''' + available_cep4_nodes = get_cep4_slurm_nodes(SLURM_AVAILABLE_STATES, SLURM_CPU_PARTITION) + if not available_cep4_nodes: logger.warning('no cep4 cpu nodes available') return available_cep4_nodes +def get_cep4_up_and_running_cpu_nodes(): + ''' + get a list of cep4 cpu nodes which are currently up and running (either totally free, partially free, or totally used) according to slurm + :return: a list of cpu node numbers (ints) + ''' + cep4_nodes = get_cep4_slurm_nodes(SLURM_AVAILABLE_STATES | SLURM_IN_USES_STATES, SLURM_CPU_PARTITION) + + if not cep4_nodes: + logger.warning('no cep4 cpu nodes up and running') + + return cep4_nodes + def convert_slurm_nodes_string_to_node_number_list(slurm_string): ''' converts strings like: cpu[01-03,11-12]' to [1,2,3,11,12] or 'cpu01' to [1] @@ -168,15 +212,15 @@ def convert_slurm_nodes_string_to_node_number_list(slurm_string): result.append(node) return result -def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False, include_allocated_nodes=False): +def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False): ''' get the 5min load for each given cep4 cpu node nr - :param node_nrs: optional list of node numbers to get the load for. If None, then all available nodes are queried. + :param node_nrs: optional list of node numbers to get the load for. If None, then all up-and-running nodes are queried. :param bool normalized: when True, then normalize the loads with the number of cores. :return: dict with node_nr -> load mapping ''' if node_nrs == None: - node_nrs = get_cep4_available_cpu_nodes(include_allocated_nodes=include_allocated_nodes) + node_nrs = get_cep4_up_and_running_cpu_nodes() procs = {} loads = {} @@ -224,17 +268,17 @@ def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False, include_allocated_ ', '.join('cpu%02d:%.3f' % (nr, loads[nr]) for nr in sorted(loads.keys()))) return loads -def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.33, min_nr_of_nodes=0, node_nrs=None, include_allocated_nodes=False): +def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.33, min_nr_of_nodes=0, node_nrs=None): ''' get the cep4 available cpu node numbers sorted ascending by load (5min). :param float max_normalized_load: filter available nodes which are at most max_normalized_load :param int min_nr_of_nodes: do return this minimum number of nodes, even if their load is higher than max_normalized_load If not enough nodes are up, then of course it cannot be guaranteed that we return this amount. - :param list node_nrs: optional list of node numbers to apply the filtering on. If None, then all available nodes are queried. + :param list node_nrs: optional list of node numbers to apply the filtering on. If None, then all up-and-running nodes are queried. :return: sorted list of node numbers. ''' if not node_nrs: - node_nrs = get_cep4_available_cpu_nodes(include_allocated_nodes=include_allocated_nodes) + node_nrs = get_cep4_up_and_running_cpu_nodes() loads = get_cep4_cpu_nodes_loads(node_nrs, normalized=True) load_tuples_list = [(cpu_nr,load) for cpu_nr,load in list(loads.items())] sorted_load_tuples_list = sorted(load_tuples_list, key=lambda x: x[1]) @@ -249,15 +293,21 @@ def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0. max_normalized_load, min_nr_of_nodes, sorted_node_nrs) return sorted_node_nrs -def get_cep4_available_cpu_node_with_lowest_load(max_normalized_load=0.33, include_allocated_nodes=False): +def get_cep4_cpu_node_with_lowest_load(max_normalized_load=0.33): ''' - get the cep4 cpu node which is available and has the lowest (5min) load of them all. - :param float max_normalized_load: filter available nodes which a at most max_normalized_load + get the cep4 cpu node which has the lowest (5min) load of them all. Preferably a node which is not fully used yet. + :param float max_normalized_load: filter nodes which a at most max_normalized_load :return: the node number (int) with the lowest load. ''' + # first see if there are any not-fully-used nodes available + nodes = get_cep4_available_cpu_nodes() + + if not nodes: # if not, then just query all up and running nodes + nodes = get_cep4_up_and_running_cpu_nodes() + node_nrs = get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=max_normalized_load, min_nr_of_nodes=1, - include_allocated_nodes=include_allocated_nodes) + node_nrs=nodes) if node_nrs: logger.debug('cep4 cpu node with lowest load: %s', node_nrs[0]) return node_nrs[0]