Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
cep4_utils.py 9.11 KiB
# Copyright (C) 2018    ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.

from subprocess import check_output, Popen, PIPE
from random import randint

import logging
logger = logging.getLogger(__name__)

CEP4_HEAD_NODE = 'head.cep4.control.lofar'
LOFARSYS_AT_CEP4_HEAD_NODE = 'lofarsys@%s' % (CEP4_HEAD_NODE,)

def ssh_cmd_list(host):
    """
    returns a subprocess compliant command list to do an ssh call to the given node
    uses ssh option -tt to force remote pseudo terminal
    uses ssh option -q for ssh quiet mode (no ssh warnings/errors)
    uses ssh option -o StrictHostKeyChecking=no to prevent prompts about host keys
    :param host: the node name or ip address
    :return: a subprocess compliant command list
    """
    return ['ssh', '-T', '-q', '-o StrictHostKeyChecking=no', host]


def wrap_command_in_cep4_head_node_ssh_call(cmd):
    """wrap the command in an ssh call to head.cep4
    :param list cmd: a subprocess cmd list
    cpu node. Otherwise, the command is executed on the head node.
    :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
    """
    return ssh_cmd_list(LOFARSYS_AT_CEP4_HEAD_NODE) + cmd

def wrap_command_in_cep4_random_cpu_node_ssh_call(cmd, via_head=True):
    """wrap the command in an ssh call an available random cep4 cpu node (via head.cep4)
    :param list cmd: a subprocess cmd list
    :param bool via_head: when True, route the cmd first via the cep4 head node
    :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
    """
    # pick a random  available cpu node
    node_nrs = get_cep4_available_cpu_nodes()
    node_nr = node_nrs[randint(0, len(node_nrs)-1)]
    return wrap_command_in_cep4_cpu_node_ssh_call(cmd, node_nr, via_head=via_head)

def wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(cmd, via_head=True):
    """wrap the command in an ssh call to the available random cep4 cpu node with the lowest load (via head.cep4)
    :param list cmd: a subprocess cmd list
    :param bool via_head: when True, route the cmd first via the cep4 head node
    :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
    """
    lowest_load_node_nr = get_cep4_available_cpu_node_with_lowest_load()
    return wrap_command_in_cep4_cpu_node_ssh_call(cmd, lowest_load_node_nr, via_head=via_head)

def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True):
    """wrap the command in an ssh call the given cep4 cpu node (via head.cep4)
    :param list cmd: a subprocess cmd list
    :param int cpu_node_nr: the number of the cpu node where to execute the command
    :param bool via_head: when True, route the cmd first via the cep4 head node
    :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
    """
    # hard-coded cpu-node hostname. Might change for future clusters or cluster upgrades.
    lofarsys_at_cpu_node = 'lofarsys@cpu%02d.cep4' % (cpu_node_nr,)
    remote_cmd = ssh_cmd_list(lofarsys_at_cpu_node) + cmd
    if via_head:
        return wrap_command_in_cep4_head_node_ssh_call(remote_cmd)
    else:
        return remote_cmd

def wrap_command_for_docker(cmd, image_name, image_label=''):
    """wrap the command to be run in a docker container for the lofarsys user and environment
    :param list cmd: a subprocess cmd list
    :param string image_name: the name of the docker image to run
    :param string image_label: the optional label of the docker image to run
    :return: the same subprocess cmd list, but then wrapped with docker calls
    """
    #fetch the lofarsys user id and group id first from the cep4 head node
    id_string = '%s:%s' % (check_output(wrap_command_in_cep4_head_node_ssh_call(['id', '-u'])).strip(),
                           check_output(wrap_command_in_cep4_head_node_ssh_call(['id', '-g'])).strip())

    #return the docker run command for the lofarsys user and environment
    return ['docker', 'run', '--rm', '--net=host', '-v', '/data:/data',
            '-u', id_string,
            '-v', '/etc/passwd:/etc/passwd:ro',
            '-v', '/etc/group:/etc/group:ro',
            '-v', '$HOME:$HOME',
            '-e', 'HOME=$HOME',
            '-e', 'USER=$USER',
            '-w', '$HOME',
            '%s:%s' % (image_name, image_label) if image_label else image_name] + cmd

def get_cep4_available_cpu_nodes():
    """
    get a list of cep4 cpu nodes which are currently up and running according to slurm
    :return: a list of cpu node numbers (ints) for the up and running cpu nodes
    """
    available_cep4_nodes = []

    try:
        logger.debug('determining available cep4 cpu nodes')

        # find out which nodes are available
        cmd = ['sinfo -p cpu -t idle,mix']
        cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)

        logger.debug('executing command: %s', ' '.join(cmd))
        out = check_output(cmd)
        lines = out.split('\n')
        for state in ['idle', 'mix']:
            try:
                line = next(l for l in lines if state in l).strip()
                # get nodes string part of line:
                nodes_part = line.split(' ')[-1]
                if '[' in nodes_part:
                    # example: line='cpu*         up   infinite     42    mix cpu[01-17,23-47]'
                    # then: nodes='01-17,23-47'
                    nodes = nodes_part[4:-1]
                    for part in nodes.split(','):
                        if '-' in part:
                            lower, sep, upper = part.partition('-')
                            available_cep4_nodes += list(range(int(lower), int(upper) + 1))
                        else:
                            available_cep4_nodes.append(int(part))
                else:
                    # example: line='cpu*         up   infinite     42    mix cpu01'
                    # then: nodes='01'
                    node = int(nodes_part[3:])
                    available_cep4_nodes += [node]

            except StopIteration:
                pass  # no line with state in line

    except Exception as e:
        logger.exception(e)

    available_cep4_nodes = sorted(list(set(available_cep4_nodes)))
    logger.debug('available cep4 cpu nodes: %s', ','.join(str(x) for x in available_cep4_nodes))
    if not available_cep4_nodes:
        logger.warning('no cep4 cpu nodes available')

    return available_cep4_nodes

def get_cep4_cpu_nodes_loads(node_nrs=None):
    """
    get the 5min load for each given cep4 cpu node nr
    :param node_nrs: optional list of node numbers to get the load for. If None, then all available nodes are queried.
    :return: dict with node_nr -> load mapping
    """
    if node_nrs is None:
        node_nrs = get_cep4_available_cpu_nodes()

    procs = {}
    loads = {}
    logger.debug('getting 5min load for cep4 cpu nodes %s', ', '.join((str(x) for x in node_nrs)))
    # spawn load commands in parallel
    for node_nr in node_nrs:
        load_cmd = ['cat', '/proc/loadavg', '|', 'awk', "'{print $2}'"]
        node_load_cmd = wrap_command_in_cep4_cpu_node_ssh_call(load_cmd, node_nr, via_head=True)
        logger.debug('executing command: %s', ' '.join(node_load_cmd))

        proc = Popen(node_load_cmd, stdout=PIPE, stderr=PIPE)
        procs[node_nr] = proc

    # wait for procs to finish, and try to parse the resulting load value
    for node_nr, proc in procs.items():
        out, err = proc.communicate()
        try:
            load = float(out.strip())
        except:
            load = 1e10
        loads[node_nr] = load

    logger.debug('5min loads for cep4 cpu nodes: %s', loads)
    return loads

def get_cep4_available_cpu_nodes_sorted_ascending_by_load():
    """
    get the cep4 available cpu node numbers sorted ascending by load (5min).
    :return: sorted list of node numbers.
    """
    node_nrs = get_cep4_available_cpu_nodes()
    loads = get_cep4_cpu_nodes_loads(node_nrs)
    sorted_loads = sorted(loads.items(), key=lambda x: x[1])
    sorted_node_nrs = [item[0] for item in sorted_loads]
    logger.debug('cep4 cpu nodes sorted (asc) by load: %s', sorted_node_nrs)
    return sorted_node_nrs

def get_cep4_available_cpu_node_with_lowest_load():
    """
    get the cep4 cpu node which is available and has the lowest (5min) load of them all.
    :return: the node number (int) with the lowest load.
    """
    node_nrs = get_cep4_available_cpu_nodes_sorted_ascending_by_load()
    if node_nrs:
        logger.debug('cep4 cpu node with lowest load: %s', node_nrs[0])
        return node_nrs[0]
    return None