Skip to content
Snippets Groups Projects
Commit 1183a660 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #9678: Use select.poll instead of select.select to support fd > 1024

parent 13107463
No related branches found
No related tags found
No related merge requests found
......@@ -96,12 +96,11 @@ class SubProcess(object):
def fds(self):
return self.output_streams.values()
def read(self, fds):
for fd in fds:
if fd == self.process.stdout:
self._addoutput(self.STDOUT, fd.read(4096))
if fd == self.process.stderr:
self._addoutput(self.STDERR, fd.read(4096))
def read(self, fileno):
if fileno == self.process.stdout.fileno():
self._addoutput(self.STDOUT, self.process.stdout.read(4096))
if fileno == self.process.stderr.fileno():
self._addoutput(self.STDERR, self.process.stderr.read(4096))
def _addoutput(self, stdtype, output, flush=False):
buf = self.output_buffers[stdtype] + output
......@@ -215,17 +214,20 @@ class SubProcessGroup(object):
for process in processes:
process.kill()
# collect fds we need to poll
fds = []
# collect fds we need to poll -- we need select.poll to support fd > 1024
poller = select.poll()
fd_lookup = {}
for process in processes:
fds.extend(process.fds())
for fd in process.fds():
poller.register(fd, select.POLLIN)
fd_lookup[fd.fileno()] = process
# poll for data
rlist, _, _ = select.select(fds, [], [], self.polling_interval)
events = poller.poll(self.polling_interval)
# let processed read their data
for process in processes:
process.read(rlist)
for (fileno, _) in events:
fd_lookup[fileno].read(fileno)
# check all the running processes for completion
for process in self.process_group:
......
......@@ -59,6 +59,16 @@ class SubProcessGroupTest(unittest.TestCase):
self.assertTrue((end_time - start_time) > 3)
def test_fd_bigger_than_1024(self):
process_group = SubProcessGroup(polling_interval=1, max_concurrent_processes=1000)
cmd = "sleep 2"
for idx in range(513): # each process uses 2 fds, so we only need 513 processes to ensure fds > 1024
process_group.run(cmd)
process_group.wait_for_finish()
def test_start_without_jobs(self):
process_group = SubProcessGroup(polling_interval=1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment