From 1183a660b832cfe6556c67750f64df65a3897c8a Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Wed, 3 Aug 2016 08:28:05 +0000
Subject: [PATCH] Task #9678: Use select.poll instead of select.select to
 support fd > 1024

---
 .../lofarpipe/support/subprocessgroup.py      | 26 ++++++++++---------
 .../test/support/subprocessgroup_test.py      | 12 ++++++++-
 2 files changed, 25 insertions(+), 13 deletions(-)

diff --git a/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py b/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py
index d20f67aaa99..0cdf9223bd9 100644
--- a/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py
+++ b/CEP/Pipeline/framework/lofarpipe/support/subprocessgroup.py
@@ -96,12 +96,11 @@ class SubProcess(object):
     def fds(self):
         return self.output_streams.values()
 
-    def read(self, fds):
-        for fd in fds:
-            if fd == self.process.stdout:
-                self._addoutput(self.STDOUT, fd.read(4096))
-            if fd == self.process.stderr:
-                self._addoutput(self.STDERR, fd.read(4096))
+    def read(self, fileno):
+        if fileno == self.process.stdout.fileno():
+            self._addoutput(self.STDOUT, self.process.stdout.read(4096))
+        if fileno == self.process.stderr.fileno():
+            self._addoutput(self.STDERR, self.process.stderr.read(4096))
 
     def _addoutput(self, stdtype, output, flush=False):
         buf = self.output_buffers[stdtype] + output
@@ -215,17 +214,20 @@ class SubProcessGroup(object):
                     for process in processes:
                         process.kill()
 
-                # collect fds we need to poll
-                fds = []
+                # collect fds we need to poll -- we need select.poll to support fd > 1024
+                poller = select.poll()
+                fd_lookup = {}
                 for process in processes:
-                    fds.extend(process.fds())
+                    for fd in process.fds():
+                        poller.register(fd, select.POLLIN)
+                        fd_lookup[fd.fileno()] = process
 
                 # poll for data
-                rlist, _, _ = select.select(fds, [], [], self.polling_interval)
+                events = poller.poll(self.polling_interval)
 
                 # let processed read their data
-                for process in processes:
-                    process.read(rlist)
+                for (fileno, _) in events:
+                    fd_lookup[fileno].read(fileno)
 
                 # check all the running processes for completion
                 for process in self.process_group:
diff --git a/CEP/Pipeline/test/support/subprocessgroup_test.py b/CEP/Pipeline/test/support/subprocessgroup_test.py
index 816d7fe4628..3a792b8fc86 100644
--- a/CEP/Pipeline/test/support/subprocessgroup_test.py
+++ b/CEP/Pipeline/test/support/subprocessgroup_test.py
@@ -57,7 +57,17 @@ class SubProcessGroupTest(unittest.TestCase):
         process_group.wait_for_finish()
         end_time = time.time()
         self.assertTrue((end_time - start_time) > 3)
-        
+
+
+    def test_fd_bigger_than_1024(self):
+        process_group = SubProcessGroup(polling_interval=1, max_concurrent_processes=1000)
+
+        cmd = "sleep 2"
+        for idx in range(513): # each process uses 2 fds, so we only need 513 processes to ensure fds > 1024
+            process_group.run(cmd)
+
+        process_group.wait_for_finish()
+
 
     def test_start_without_jobs(self):
         process_group = SubProcessGroup(polling_interval=1)
-- 
GitLab