-
Jörn Künsemöller authoredJörn Künsemöller authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
subprocess_utils.py 4.64 KiB
import logging
from datetime import datetime, timedelta
from time import sleep
from threading import Thread
from subprocess import Popen, PIPE
from collections import namedtuple
try:
from queue import Queue, Empty
except ImportError:
from queue import Queue, Empty # python 3.x
logger = logging.getLogger()
class SubprocessTimoutError(RuntimeError):
'''an error class indication that the running subprocess to longer than expected to complete'''
pass
def wrap_composite_command(cmd):
"""
wrap the whole commandline like so: "<cmd>"
so, for example, multiple ';'-seperated commands are interpreted as one single argument for ssh for example
:param cmd: string of list of strings with the commandline to be executed. May or may not contain ';'
:return: the encapsulated command
"""
return '''"%s" ''' % (cmd if isinstance(cmd, str) else ' '.join(cmd))
def execute_in_parallel(cmd_lists, timeout=3600, max_parallel=32):
"""
Execute all commands in the cmd_lists in parallel, limited to max_parallel concurrent processes.
:param list cmd_lists: a list of subprocess-cmd-list's
:param int timeout: time out after this many seconds
:param int max_parallel: maximum number of concurrent executed commands.
:raises a SubprocessTimoutError if any of the commands time out
:return: list with the results in the same order as the cmd_lists. Each result contains the returncode, stdout and stderr.
"""
procs = []
start = datetime.utcnow()
for cmd_list in cmd_lists:
# first check how many procs are already running in parallel
# wait a little while we exceed the number of max_parallel procs
# or else continue starting procs
while True:
num_running = len([proc for proc in procs if proc.poll() is None])
if num_running < max_parallel:
break
sleep(0.01)
# not limited by the number of parallel procs (anymore)
# so, start a new proc
logger.info('Executing %s', ' '.join(cmd_list))
proc = Popen(cmd_list, stdout=PIPE, stderr=PIPE)
procs.append(proc)
sleep(0.01)
try:
while not all(proc.poll() is not None for proc in procs):
if datetime.utcnow() - start >= timedelta(seconds=timeout):
raise SubprocessTimoutError("Timeout while waiting for subprocess")
sleep(0.01)
except SubprocessTimoutError as e:
logger.error(e)
for proc in procs:
if proc.poll() is None:
proc.kill()
raise
PopenResult = namedtuple('PopenResult', ['returncode', 'stdout', 'stderr'], verbose=False)
results = [PopenResult(p.returncode, p.stdout.read(), p.stderr.read()) for p in procs]
# log results of commands
for cmd_list, result in zip(cmd_lists, results):
logger.info("Results for cmd: %s\n returncode=%s\n stdout=%s\n stderr=%s",
" ".join(cmd_list),
result.returncode, result.stdout.rstrip(), result.stderr.rstrip())
return results
class PipeReader:
'''
helper class to do non-blocking readline calls to a subprocess stdput or stderr pipe.
'''
def __init__(self, pipe):
self.__line_buffer = ''
self.__queue = Queue()
self.__thread = Thread(target=PipeReader.enqueue_output, args=(pipe, self.__queue))
self.__thread.daemon = True # thread dies with the program
self.__thread.start()
@staticmethod
def enqueue_output(pipe, queue):
try:
for line in iter(pipe.readline, b''):
queue.put(line)
except Exception as e:
logger.error(e)
def __fill_line_buffer(self, timeout=None):
start = datetime.now()
while timeout==None or datetime.now() - start <= timedelta(seconds=timeout):
try:
result = self.__queue.get(True, timeout)
if result:
self.__line_buffer += result
except Empty:
pass
def readline(self, timeout=None):
self.__fill_line_buffer(timeout)
endline_idx = self.__line_buffer.find('\n')
if endline_idx > -1:
line = self.__line_buffer[:endline_idx]
self.__line_buffer = self.__line_buffer[endline_idx+1:]
return line
return ''
def readlines(self, timeout=None):
self.__fill_line_buffer(timeout)
last_line_end_idx = self.__line_buffer.rfind('\n')
head = self.__line_buffer[:last_line_end_idx]
self.__line_buffer = self.__line_buffer[last_line_end_idx:]
lines = [l for l in head.split('\n') if l]
return lines