-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
cep4_utils.py 9.11 KiB
# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
from subprocess import check_output, Popen, PIPE
from random import randint
import logging
logger = logging.getLogger(__name__)
CEP4_HEAD_NODE = 'head.cep4.control.lofar'
LOFARSYS_AT_CEP4_HEAD_NODE = 'lofarsys@%s' % (CEP4_HEAD_NODE,)
def ssh_cmd_list(host):
"""
returns a subprocess compliant command list to do an ssh call to the given node
uses ssh option -tt to force remote pseudo terminal
uses ssh option -q for ssh quiet mode (no ssh warnings/errors)
uses ssh option -o StrictHostKeyChecking=no to prevent prompts about host keys
:param host: the node name or ip address
:return: a subprocess compliant command list
"""
return ['ssh', '-T', '-q', '-o StrictHostKeyChecking=no', host]
def wrap_command_in_cep4_head_node_ssh_call(cmd):
"""wrap the command in an ssh call to head.cep4
:param list cmd: a subprocess cmd list
cpu node. Otherwise, the command is executed on the head node.
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
"""
return ssh_cmd_list(LOFARSYS_AT_CEP4_HEAD_NODE) + cmd
def wrap_command_in_cep4_random_cpu_node_ssh_call(cmd, via_head=True):
"""wrap the command in an ssh call an available random cep4 cpu node (via head.cep4)
:param list cmd: a subprocess cmd list
:param bool via_head: when True, route the cmd first via the cep4 head node
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
"""
# pick a random available cpu node
node_nrs = get_cep4_available_cpu_nodes()
node_nr = node_nrs[randint(0, len(node_nrs)-1)]
return wrap_command_in_cep4_cpu_node_ssh_call(cmd, node_nr, via_head=via_head)
def wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(cmd, via_head=True):
"""wrap the command in an ssh call to the available random cep4 cpu node with the lowest load (via head.cep4)
:param list cmd: a subprocess cmd list
:param bool via_head: when True, route the cmd first via the cep4 head node
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
"""
lowest_load_node_nr = get_cep4_available_cpu_node_with_lowest_load()
return wrap_command_in_cep4_cpu_node_ssh_call(cmd, lowest_load_node_nr, via_head=via_head)
def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True):
"""wrap the command in an ssh call the given cep4 cpu node (via head.cep4)
:param list cmd: a subprocess cmd list
:param int cpu_node_nr: the number of the cpu node where to execute the command
:param bool via_head: when True, route the cmd first via the cep4 head node
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
"""
# hard-coded cpu-node hostname. Might change for future clusters or cluster upgrades.
lofarsys_at_cpu_node = 'lofarsys@cpu%02d.cep4' % (cpu_node_nr,)
remote_cmd = ssh_cmd_list(lofarsys_at_cpu_node) + cmd
if via_head:
return wrap_command_in_cep4_head_node_ssh_call(remote_cmd)
else:
return remote_cmd
def wrap_command_for_docker(cmd, image_name, image_label=''):
"""wrap the command to be run in a docker container for the lofarsys user and environment
:param list cmd: a subprocess cmd list
:param string image_name: the name of the docker image to run
:param string image_label: the optional label of the docker image to run
:return: the same subprocess cmd list, but then wrapped with docker calls
"""
#fetch the lofarsys user id and group id first from the cep4 head node
id_string = '%s:%s' % (check_output(wrap_command_in_cep4_head_node_ssh_call(['id', '-u'])).strip(),
check_output(wrap_command_in_cep4_head_node_ssh_call(['id', '-g'])).strip())
#return the docker run command for the lofarsys user and environment
return ['docker', 'run', '--rm', '--net=host', '-v', '/data:/data',
'-u', id_string,
'-v', '/etc/passwd:/etc/passwd:ro',
'-v', '/etc/group:/etc/group:ro',
'-v', '$HOME:$HOME',
'-e', 'HOME=$HOME',
'-e', 'USER=$USER',
'-w', '$HOME',
'%s:%s' % (image_name, image_label) if image_label else image_name] + cmd
def get_cep4_available_cpu_nodes():
"""
get a list of cep4 cpu nodes which are currently up and running according to slurm
:return: a list of cpu node numbers (ints) for the up and running cpu nodes
"""
available_cep4_nodes = []
try:
logger.debug('determining available cep4 cpu nodes')
# find out which nodes are available
cmd = ['sinfo -p cpu -t idle,mix']
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
logger.debug('executing command: %s', ' '.join(cmd))
out = check_output(cmd)
lines = out.split('\n')
for state in ['idle', 'mix']:
try:
line = next(l for l in lines if state in l).strip()
# get nodes string part of line:
nodes_part = line.split(' ')[-1]
if '[' in nodes_part:
# example: line='cpu* up infinite 42 mix cpu[01-17,23-47]'
# then: nodes='01-17,23-47'
nodes = nodes_part[4:-1]
for part in nodes.split(','):
if '-' in part:
lower, sep, upper = part.partition('-')
available_cep4_nodes += list(range(int(lower), int(upper) + 1))
else:
available_cep4_nodes.append(int(part))
else:
# example: line='cpu* up infinite 42 mix cpu01'
# then: nodes='01'
node = int(nodes_part[3:])
available_cep4_nodes += [node]
except StopIteration:
pass # no line with state in line
except Exception as e:
logger.exception(e)
available_cep4_nodes = sorted(list(set(available_cep4_nodes)))
logger.debug('available cep4 cpu nodes: %s', ','.join(str(x) for x in available_cep4_nodes))
if not available_cep4_nodes:
logger.warning('no cep4 cpu nodes available')
return available_cep4_nodes
def get_cep4_cpu_nodes_loads(node_nrs=None):
"""
get the 5min load for each given cep4 cpu node nr
:param node_nrs: optional list of node numbers to get the load for. If None, then all available nodes are queried.
:return: dict with node_nr -> load mapping
"""
if node_nrs is None:
node_nrs = get_cep4_available_cpu_nodes()
procs = {}
loads = {}
logger.debug('getting 5min load for cep4 cpu nodes %s', ', '.join((str(x) for x in node_nrs)))
# spawn load commands in parallel
for node_nr in node_nrs:
load_cmd = ['cat', '/proc/loadavg', '|', 'awk', "'{print $2}'"]
node_load_cmd = wrap_command_in_cep4_cpu_node_ssh_call(load_cmd, node_nr, via_head=True)
logger.debug('executing command: %s', ' '.join(node_load_cmd))
proc = Popen(node_load_cmd, stdout=PIPE, stderr=PIPE)
procs[node_nr] = proc
# wait for procs to finish, and try to parse the resulting load value
for node_nr, proc in procs.items():
out, err = proc.communicate()
try:
load = float(out.strip())
except:
load = 1e10
loads[node_nr] = load
logger.debug('5min loads for cep4 cpu nodes: %s', loads)
return loads
def get_cep4_available_cpu_nodes_sorted_ascending_by_load():
"""
get the cep4 available cpu node numbers sorted ascending by load (5min).
:return: sorted list of node numbers.
"""
node_nrs = get_cep4_available_cpu_nodes()
loads = get_cep4_cpu_nodes_loads(node_nrs)
sorted_loads = sorted(loads.items(), key=lambda x: x[1])
sorted_node_nrs = [item[0] for item in sorted_loads]
logger.debug('cep4 cpu nodes sorted (asc) by load: %s', sorted_node_nrs)
return sorted_node_nrs
def get_cep4_available_cpu_node_with_lowest_load():
"""
get the cep4 cpu node which is available and has the lowest (5min) load of them all.
:return: the node number (int) with the lowest load.
"""
node_nrs = get_cep4_available_cpu_nodes_sorted_ascending_by_load()
if node_nrs:
logger.debug('cep4 cpu node with lowest load: %s', node_nrs[0])
return node_nrs[0]
return None