diff --git a/LCS/PyCommon/cep4_utils.py b/LCS/PyCommon/cep4_utils.py index f03583c8e8d647eb0de46e71d25308e2913db69f..cdef2d86d9f225d5d1c1377b1174532a99747b76 100644 --- a/LCS/PyCommon/cep4_utils.py +++ b/LCS/PyCommon/cep4_utils.py @@ -32,7 +32,8 @@ def wrap_command_in_cep4_head_node_ssh_call(cmd): cpu node. Otherwise, the command is executed on the head node. :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls ''' - return ssh_cmd_list(user='lofarsys', host='head.cep4.control.lofar') + cmd + ssh_cmd = ssh_cmd_list(user='lofarsys', host='head.cep4.control.lofar') + return ssh_cmd + ([cmd] if isinstance(cmd, basestring) else cmd) def wrap_command_in_cep4_random_cpu_node_ssh_call(cmd, via_head=True): '''wrap the command in an ssh call an available random cep4 cpu node (via head.cep4) @@ -61,7 +62,8 @@ def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True): :param bool via_head: when True, route the cmd first via the cep4 head node :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls ''' - remote_cmd = ssh_cmd_list(host='cpu%02d.cep4' % cpu_node_nr, user='lofarsys') + cmd + ssh_cmd = ssh_cmd_list(host='cpu%02d.cep4' % cpu_node_nr, user='lofarsys') + remote_cmd = ssh_cmd + ([cmd] if isinstance(cmd, basestring) else cmd) if via_head: return wrap_command_in_cep4_head_node_ssh_call(remote_cmd) else: @@ -118,21 +120,7 @@ def get_cep4_available_cpu_nodes(): line = next(l for l in lines if state in l).strip() # get nodes string part of line: nodes_part = line.split(' ')[-1] - if '[' in nodes_part: - # example: line='cpu* up infinite 42 mix cpu[01-17,23-47]' - # then: nodes='01-17,23-47' - nodes = nodes_part[4:-1] - for part in nodes.split(','): - if '-' in part: - lower, sep, upper = part.partition('-') - available_cep4_nodes += list(range(int(lower), int(upper) + 1)) - else: - available_cep4_nodes.append(int(part)) - else: - # example: line='cpu* up infinite 42 mix cpu01' - # then: nodes='01' - node = int(nodes_part[3:]) - available_cep4_nodes += [node] + available_cep4_nodes += convert_slurm_nodes_string_to_node_number_list(nodes_part) except StopIteration: pass # no line with state in line @@ -147,6 +135,35 @@ def get_cep4_available_cpu_nodes(): return available_cep4_nodes +def convert_slurm_nodes_string_to_node_number_list(slurm_string): + ''' converts strings like: cpu[01-03,11-12]' to [1,2,3,11,12] + or 'cpu01' to [1] + :param slurm_string: a string in 'slurm-like' node format, like cpu[01-03,11-12] or cpu01 + :return: a list of node numbers (ints) + ''' + result = [] + stripped_slurm_string = slurm_string.strip() + left_bracket_idx = stripped_slurm_string.find('[') + right_bracket_idx = stripped_slurm_string.find(']', left_bracket_idx) + if left_bracket_idx != -1 and right_bracket_idx != -1: + # example: cpu[01-17,23-47]' + # then: nodes='01-17,23-47' + nodes_string = stripped_slurm_string[left_bracket_idx+1:right_bracket_idx] + + for part in nodes_string.split(','): + if '-' in part: + lower, sep, upper = part.partition('-') + result += list(range(int(lower), int(upper) + 1)) + else: + result.append(int(part)) + else: + # example: 'cpu01' + # then: nodes='01' + # assume all nodes always start with 'cpu' (which is the case on cep4) + node = int(stripped_slurm_string[3:]) + result.append(node) + return result + def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False): ''' get the 5min load for each given cep4 cpu node nr @@ -203,14 +220,17 @@ def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False): ', '.join('cpu%02d:%.3f' % (nr, loads[nr]) for nr in sorted(loads.keys()))) return loads -def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.33, min_nr_of_nodes=0): +def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.33, min_nr_of_nodes=0, node_nrs=None): ''' get the cep4 available cpu node numbers sorted ascending by load (5min). - :param float max_normalized_load: filter available nodes which a at most max_normalized_load + :param float max_normalized_load: filter available nodes which are at most max_normalized_load :param int min_nr_of_nodes: do return this minimum number of nodes, even if their load is higher than max_normalized_load + If not enough nodes are up, then of course it cannot be guaranteed that we return this amount. + :param list node_nrs: optional list of node numbers to apply the filtering on. If None, then all available nodes are queried. :return: sorted list of node numbers. ''' - node_nrs = get_cep4_available_cpu_nodes() + if not node_nrs: + node_nrs = get_cep4_available_cpu_nodes() loads = get_cep4_cpu_nodes_loads(node_nrs, normalized=True) load_tuples_list = [(cpu_nr,load) for cpu_nr,load in loads.items()] sorted_load_tuples_list = sorted(load_tuples_list, key=lambda x: x[1]) @@ -238,15 +258,20 @@ def get_cep4_available_cpu_node_with_lowest_load(max_normalized_load=0.33): return node_nrs[0] return None -def parallelize_cmd_over_cep4_cpu_nodes(cmd, parallelizable_option, parallelizable_option_values, timeout=3600): +def parallelize_cmd_over_cep4_cpu_nodes(cmd, parallelizable_option, parallelizable_option_values, + max_normalized_load=0.5, min_nr_of_nodes=1, + timeout=3600): '''run the given cmd in parallel on multiple available cpu nodes. :param list cmd: a subprocess cmd list :param string parallelizable_option: the option which is given to the parallelized cmd for a subset of the parallelizable_option_values :param list parallelizable_option_values: the list of values which is chunked for the parallelized cmd for the parallelizable_option + :param float max_normalized_load: filter available nodes which have at most max_normalized_load + :param int min_nr_of_nodes: run on this minimum number of nodes, even if their load is higher than max_normalized_load :param int timeout: timeout in seconds after which the workers are killed :return: True if all processes on all cpu nodes exited ok, else False ''' - available_cep4_nodes = get_cep4_available_cpu_nodes_sorted_ascending_by_load() + available_cep4_nodes = get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=max_normalized_load, + min_nr_of_nodes=min_nr_of_nodes) if len(available_cep4_nodes) == 0: logger.warning('No cep4 cpu nodes available..') @@ -319,5 +344,8 @@ def parallelize_cmd_over_cep4_cpu_nodes(cmd, parallelizable_option, parallelizab return success if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) - print get_cep4_available_cpu_nodes_sorted_ascending_by_load(min_nr_of_nodes=4, max_normalized_load=0.01) + logging.basicConfig(level=logging.DEBUG) + print convert_slurm_nodes_string_to_node_number_list(' \t cpu[20-39,41,45-48] ') + print convert_slurm_nodes_string_to_node_number_list(' \t cpu03 ') + print get_cep4_available_cpu_nodes() + print get_cep4_available_cpu_nodes_sorted_ascending_by_load(min_nr_of_nodes=3) \ No newline at end of file diff --git a/LCS/PyCommon/lcu_utils.py b/LCS/PyCommon/lcu_utils.py index d5aa453b5c49ddae9fa6e54e02ce87879699132e..fe6eefaa86565b82e4f33e852f51d7434d8bf03b 100644 --- a/LCS/PyCommon/lcu_utils.py +++ b/LCS/PyCommon/lcu_utils.py @@ -35,7 +35,8 @@ def wrap_command_in_lcu_head_node_ssh_call(cmd): cpu node. Otherwise, the command is executed on the head node. :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls ''' - return ssh_cmd_list('lcuhead.control.lofar', 'lofarsys') + cmd + ssh_cmd = ssh_cmd_list('lcuhead.control.lofar', 'lofarsys') + return ssh_cmd + ([cmd] if isinstance(cmd, basestring) else cmd) def wrap_command_in_lcu_station_ssh_call(cmd, station, via_head=True): '''wrap the command in an ssh call the given station lcu node (via lcuhead) @@ -44,8 +45,8 @@ def wrap_command_in_lcu_station_ssh_call(cmd, station, via_head=True): :param bool via_head: when True, route the cmd first via the lcuhead node :return: the same subprocess cmd list, but then wrapped with lcu ssh calls ''' - cmd_list = cmd if isinstance(cmd, list) else [cmd] - remote_cmd = ssh_cmd_list(stationname2hostname(station), 'lofarsys') + cmd_list + ssh_cmd = ssh_cmd_list(stationname2hostname(station), 'lofarsys') + remote_cmd = ssh_cmd + ([cmd] if isinstance(cmd, basestring) else cmd) if via_head: return wrap_command_in_lcu_head_node_ssh_call(remote_cmd) @@ -384,4 +385,4 @@ if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) import pprint pprint.pprint(get_station_cable_delays(['CS004', 'CS005'])) - #print get_station_calibration_tables(['CS001', 'RS407'], antenna_set_and_filter='LBA_INNER-10_90') #['CS001', 'DE601']) + print get_station_calibration_tables(['CS001', 'RS407'], antenna_set_and_filter='LBA_INNER-10_90') #['CS001', 'DE601'])