diff --git a/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py b/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py index d20f67aaa991a1ed4868cb778049d8d5985eb8b0..0cdf9223bd95343a2f5797ce442d0ef866630310 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py +++ b/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py @@ -96,12 +96,11 @@ class SubProcess(object): def fds(self): return self.output_streams.values() - def read(self, fds): - for fd in fds: - if fd == self.process.stdout: - self._addoutput(self.STDOUT, fd.read(4096)) - if fd == self.process.stderr: - self._addoutput(self.STDERR, fd.read(4096)) + def read(self, fileno): + if fileno == self.process.stdout.fileno(): + self._addoutput(self.STDOUT, self.process.stdout.read(4096)) + if fileno == self.process.stderr.fileno(): + self._addoutput(self.STDERR, self.process.stderr.read(4096)) def _addoutput(self, stdtype, output, flush=False): buf = self.output_buffers[stdtype] + output @@ -215,17 +214,20 @@ class SubProcessGroup(object): for process in processes: process.kill() - # collect fds we need to poll - fds = [] + # collect fds we need to poll -- we need select.poll to support fd > 1024 + poller = select.poll() + fd_lookup = {} for process in processes: - fds.extend(process.fds()) + for fd in process.fds(): + poller.register(fd, select.POLLIN) + fd_lookup[fd.fileno()] = process # poll for data - rlist, _, _ = select.select(fds, [], [], self.polling_interval) + events = poller.poll(self.polling_interval) # let processed read their data - for process in processes: - process.read(rlist) + for (fileno, _) in events: + fd_lookup[fileno].read(fileno) # check all the running processes for completion for process in self.process_group: diff --git a/CEP/Pipeline/test/support/subprocessgroup_test.py b/CEP/Pipeline/test/support/subprocessgroup_test.py index 816d7fe4628b11e602979c075a34bef054d51c7b..3a792b8fc86dc94474d3b18f54018b9124da7252 100644 --- a/CEP/Pipeline/test/support/subprocessgroup_test.py +++ b/CEP/Pipeline/test/support/subprocessgroup_test.py @@ -57,7 +57,17 @@ class SubProcessGroupTest(unittest.TestCase): process_group.wait_for_finish() end_time = time.time() self.assertTrue((end_time - start_time) > 3) - + + + def test_fd_bigger_than_1024(self): + process_group = SubProcessGroup(polling_interval=1, max_concurrent_processes=1000) + + cmd = "sleep 2" + for idx in range(513): # each process uses 2 fds, so we only need 513 processes to ensure fds > 1024 + process_group.run(cmd) + + process_group.wait_for_finish() + def test_start_without_jobs(self): process_group = SubProcessGroup(polling_interval=1)