diff --git a/.gitattributes b/.gitattributes index e042858d9b151aa02124bb07baea3e91f9bbb7d3..320a36841a69367b64f7290405ccd3306880e0e3 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1672,6 +1672,7 @@ CEP/Pipeline/test/regression_tests/target_pipeline.py -text CEP/Pipeline/test/regression_tests/trunk_imaging_regression.parset -text CEP/Pipeline/test/support/__init__.py eol=lf CEP/Pipeline/test/support/loggingdecorators_test.py -text +CEP/Pipeline/test/support/output_stderr_stdout.sh eol=lf CEP/Pipeline/test/support/subprocessgroup_test.py eol=lf CEP/Pipeline/test/support/xmllogging_test.py -text CEP/Pipeline/test/test_framework/CMakeLists.txt eol=lf diff --git a/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py b/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py index 286bb823e3ba5be15fbcf02a85597be554ba176b..7dc11f4b406d49e10a212c23a9cb91fffe0db9eb 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py +++ b/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py @@ -1,5 +1,7 @@ import subprocess import select +import fcntl +import os import time from lofarpipe.support.lofarexceptions import PipelineException @@ -11,11 +13,18 @@ class SubProcess(object): """ Start a subprocess for `cmd' in working directory `cwd'. - Output is sent to `logger'. + Output is sent to `logger', or stdout if logger is None. """ + def print_logger(line): + print line + self.cmd = cmd self.completed = False + self.logger = logger.info if logger else print_logger + + # report we are starting + self.logger("Subprocess starting: %s" % self.cmd) # start process self.process = subprocess.Popen( @@ -45,15 +54,15 @@ class SubProcess(object): self.STDERR: "" } # output sink - def print_logger(line): - print line - self.output_loggers = { self.STDOUT: logger.debug if logger else print_logger, self.STDERR: logger.warn if logger else print_logger } - # report we started - self.logger = logger.info if logger else print_logger - self.logger("Subprocess started: %s" % self.cmd) + + # Set fds to non-blocking to allow <4k reads. This is needed if the process + # alternates between stdout and stderr output. + for f in self.output_streams.values(): + flag = fcntl.fcntl(f, fcntl.F_GETFL) + fcntl.fcntl(f, fcntl.F_SETFL, flag | os.O_NONBLOCK) def done(self): if self.completed: @@ -96,7 +105,7 @@ class SubProcess(object): if flush: self.output_loggers[stdtype](remainder) remainder = "" - + self.output_buffers[stdtype] = remainder # 0-byte read means they closed the fd diff --git a/CEP/Pipeline/test/support/output_stderr_stdout.sh b/CEP/Pipeline/test/support/output_stderr_stdout.sh new file mode 100755 index 0000000000000000000000000000000000000000..31b86045230847fe7844cf4a41da656a6e49e622 --- /dev/null +++ b/CEP/Pipeline/test/support/output_stderr_stdout.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +# Spam stderr, but not enough to fill a 4k buffer. +# Reading from stderr should not block. +for i in `seq 1 10`; do + echo e${i}e 1>&2 +done + +# Now spam stdout, filling any buffer. +# If reading from stderr blocks in the previuos loop, +# we can't finish this loop. +for i in `seq 1 4096`; do + echo o${i}o o${i}o o${i}o o${i}o o${i}o o${i}o +done diff --git a/CEP/Pipeline/test/support/subprocessgroup_test.py b/CEP/Pipeline/test/support/subprocessgroup_test.py index b7b0a6589f5157dc34e99dbbf7a7ffbee2accd39..816d7fe4628b11e602979c075a34bef054d51c7b 100644 --- a/CEP/Pipeline/test/support/subprocessgroup_test.py +++ b/CEP/Pipeline/test/support/subprocessgroup_test.py @@ -25,11 +25,11 @@ class SubProcessGroupTest(unittest.TestCase): """ - def test_output_bigger_than_pipe(self): + def test_alternating_output(self): process_group = SubProcessGroup(polling_interval=1) # print a lot of numbers - cmd = 'seq -s, 1 4096' + cmd = '%s/output_stderr_stdout.sh' % (os.path.dirname(__file__) or ".",) start_time = time.time() # Start it multiple times