Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
subprocess_utils.py 4.64 KiB
import logging
from datetime import datetime, timedelta
from time import sleep
from threading import Thread
from subprocess import Popen, PIPE
from collections import namedtuple
try:
    from queue import Queue, Empty
except ImportError:
    from queue import Queue, Empty  # python 3.x

logger = logging.getLogger()

class SubprocessTimoutError(RuntimeError):
    '''an error class indication that the running subprocess to longer than expected to complete'''
    pass

def wrap_composite_command(cmd):
    """
    wrap the whole commandline like so: "<cmd>"
    so, for example, multiple ';'-seperated commands are interpreted as one single argument for ssh for example
    :param cmd: string of list of strings with the commandline to be executed. May or may not contain ';'
    :return: the encapsulated command
    """
    return '''"%s" ''' % (cmd if isinstance(cmd, str) else ' '.join(cmd))

def execute_in_parallel(cmd_lists, timeout=3600, max_parallel=32):
    """
    Execute all commands in the cmd_lists in parallel, limited to max_parallel concurrent processes.
    :param list cmd_lists: a list of subprocess-cmd-list's
    :param int timeout: time out after this many seconds
    :param int max_parallel: maximum number of concurrent executed commands.
    :raises a SubprocessTimoutError if any of the commands time out
    :return: list with the results in the same order as the cmd_lists. Each result contains the returncode, stdout and stderr.
    """
    procs = []
    start = datetime.utcnow()
    for cmd_list in cmd_lists:
        # first check how many procs are already running in parallel
        # wait a little while we exceed the number of max_parallel procs
        # or else continue starting procs
        while True:
            num_running = len([proc for proc in procs if proc.poll() is None])
            if num_running < max_parallel:
                break
            sleep(0.01)

        # not limited by the number of parallel procs (anymore)
        # so, start a new proc
        logger.info('Executing %s', ' '.join(cmd_list))
        proc = Popen(cmd_list, stdout=PIPE, stderr=PIPE)
        procs.append(proc)
        sleep(0.01)

    try:
        while not all(proc.poll() is not None for proc in procs):
            if datetime.utcnow() - start >= timedelta(seconds=timeout):
                raise SubprocessTimoutError("Timeout while waiting for subprocess")
            sleep(0.01)

    except SubprocessTimoutError as e:
        logger.error(e)
        for proc in procs:
            if proc.poll() is None:
                proc.kill()
        raise

    PopenResult = namedtuple('PopenResult', ['returncode', 'stdout', 'stderr'], verbose=False)

    results = [PopenResult(p.returncode, p.stdout.read(), p.stderr.read()) for p in procs]

    # log results of commands
    for cmd_list, result in zip(cmd_lists, results):
        logger.info("Results for cmd: %s\n  returncode=%s\n  stdout=%s\n  stderr=%s",
                     " ".join(cmd_list),
                     result.returncode, result.stdout.rstrip(), result.stderr.rstrip())
    return results

class PipeReader:
    '''
    helper class to do non-blocking readline calls to a subprocess stdput or stderr pipe.
    '''
    def __init__(self, pipe):
        self.__line_buffer = ''
        self.__queue = Queue()
        self.__thread = Thread(target=PipeReader.enqueue_output, args=(pipe, self.__queue))
        self.__thread.daemon = True # thread dies with the program
        self.__thread.start()

    @staticmethod
    def enqueue_output(pipe, queue):
        try:
            for line in iter(pipe.readline, b''):
                queue.put(line)
        except Exception as e:
            logger.error(e)

    def __fill_line_buffer(self, timeout=None):
        start = datetime.now()
        while timeout==None or datetime.now() - start <= timedelta(seconds=timeout):
            try:
                result = self.__queue.get(True, timeout)
                if result:
                    self.__line_buffer += result
            except Empty:
                pass

    def readline(self, timeout=None):
        self.__fill_line_buffer(timeout)

        endline_idx = self.__line_buffer.find('\n')

        if endline_idx > -1:
            line = self.__line_buffer[:endline_idx]
            self.__line_buffer = self.__line_buffer[endline_idx+1:]
            return line
        return ''

    def readlines(self, timeout=None):
        self.__fill_line_buffer(timeout)

        last_line_end_idx = self.__line_buffer.rfind('\n')
        head = self.__line_buffer[:last_line_end_idx]
        self.__line_buffer = self.__line_buffer[last_line_end_idx:]

        lines = [l for l in head.split('\n') if l]

        return lines