From e146811b8abfc0360dba90f23ab48512fbfd8bd3 Mon Sep 17 00:00:00 2001
From: John Romein <romein@astron.nl>
Date: Tue, 11 Nov 2008 16:38:51 +0000
Subject: [PATCH] bug 225: * Use separate Input/OutputThread per Stream. * Use
 one stream per subband (no more multiplexing). * Buffering is done per
 stream.

---
 RTCP/IONProc/src/Makefile.am                 |   2 +
 RTCP/IONProc/src/OutputSection.cc            | 132 +++++++------------
 RTCP/IONProc/src/OutputSection.h             |  15 +--
 RTCP/IONProc/src/OutputThread.cc             |  88 +++++++++++++
 RTCP/IONProc/src/OutputThread.h              |  57 ++++++++
 RTCP/Storage/include/Storage/InputThread.h   |  57 ++++++++
 RTCP/Storage/include/Storage/Makefile.am     |   1 +
 RTCP/Storage/include/Storage/SubbandWriter.h |  14 +-
 RTCP/Storage/src/InputThread.cc              |  98 ++++++++++++++
 RTCP/Storage/src/Makefile.am                 |   1 +
 RTCP/Storage/src/SubbandWriter.cc            | 100 +++-----------
 11 files changed, 376 insertions(+), 189 deletions(-)
 create mode 100644 RTCP/IONProc/src/OutputThread.cc
 create mode 100644 RTCP/IONProc/src/OutputThread.h
 create mode 100644 RTCP/Storage/include/Storage/InputThread.h
 create mode 100644 RTCP/Storage/src/InputThread.cc

diff --git a/RTCP/IONProc/src/Makefile.am b/RTCP/IONProc/src/Makefile.am
index a8d9b358004..8f822b5de84 100644
--- a/RTCP/IONProc/src/Makefile.am
+++ b/RTCP/IONProc/src/Makefile.am
@@ -9,6 +9,7 @@ InputThreadAsm.h		\
 ION_Allocator.h			\
 LockedRanges.h			\
 LogThread.h			\
+OutputThread.h			\
 OutputSection.h			\
 ReaderWriterSynchronization.h	\
 RSP.h				\
@@ -26,6 +27,7 @@ InputThread.cc			\
 InputThreadAsm.S		\
 ION_Allocator.cc		\
 LogThread.cc			\
+OutputThread.cc			\
 OutputSection.cc		\
 ReaderWriterSynchronization.cc	\
 WH_DelayCompensation.cc		\
diff --git a/RTCP/IONProc/src/OutputSection.cc b/RTCP/IONProc/src/OutputSection.cc
index 4f88f995fec..d0a6f79af3c 100644
--- a/RTCP/IONProc/src/OutputSection.cc
+++ b/RTCP/IONProc/src/OutputSection.cc
@@ -56,67 +56,40 @@ OutputSection::OutputSection(unsigned psetNumber, const std::vector<Stream *> &s
 }
 
 
-OutputSection::~OutputSection()
-{
-}
-
-
-void *OutputSection::sendThreadStub(void *arg)
-{
-  std::clog << "sendThread started" << std::endl;
-
-  try {
-    static_cast<OutputSection *>(arg)->sendThread();
-  } catch (std::exception &ex) {
-    std::cerr << "Caught std::exception: " << ex.what() << std::endl;
-  } catch (...) {
-    std::cerr << "Caught non-std::exception" << std::endl;
-  }
-
-  std::clog << "sendThread finished" << std::endl;
-  return 0;
-}
-
-
-void OutputSection::sendThread()
-{
-  CorrelatedData *data;
-
-  while ((data = itsSendQueue.remove()) != 0) {
-    data->write(itsStreamToStorage);
-    itsFreeQueue.append(data);
-  }
-}
-
-
 void OutputSection::connectToStorage(const Parset *ps)
 {
   unsigned myPsetIndex       = ps->outputPsetIndex(itsPsetNumber);
   unsigned nrPsetsPerStorage = ps->nrPsetsPerStorage();
   unsigned storageHostIndex  = myPsetIndex / nrPsetsPerStorage;
-  unsigned storagePortIndex  = myPsetIndex % nrPsetsPerStorage;
+  //unsigned storagePortIndex  = myPsetIndex % nrPsetsPerStorage;
 
   string   prefix	     = "OLAP.OLAP_Conn.IONProc_Storage";
   string   connectionType    = ps->getString(prefix + "_Transport");
 
-  if (connectionType == "NULL") {
-    std::clog << "output section discards data to null:" << std::endl;
-    itsStreamToStorage = new NullStream;
-  } else if (connectionType == "TCP") {
-    std::string    server = ps->getStringVector(prefix + "_ServerHosts")[storageHostIndex];
-    unsigned short port   = boost::lexical_cast<unsigned short>(ps->getPortsOf(prefix)[storagePortIndex]);
-
-    std::clog << "output section connects to tcp:" << server << ':' << port << std::endl;
-    itsStreamToStorage = new SocketStream(server.c_str(), port, SocketStream::TCP, SocketStream::Client);
-  } else if (connectionType == "FILE") {
-    std::string filename = ps->getString(prefix + "_BaseFileName") + '.' +
-		    boost::lexical_cast<std::string>(storageHostIndex) + '.' +
-		    boost::lexical_cast<std::string>(storagePortIndex);
-
-    std::clog << "output section write to file:" << filename << std::endl;
-    itsStreamToStorage = new FileStream(filename.c_str(), 0666);
-  } else {
-    THROW(IONProcException, "unsupported ION->Storage stream type");
+  for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) {
+    unsigned subbandNumber = myPsetIndex * itsNrSubbandsPerPset + subband;
+
+    if (connectionType == "NULL") {
+      std::clog << "subband " << subbandNumber << " written to null:" << std::endl;
+      itsStreamsToStorage.push_back(new NullStream);
+    } else if (connectionType == "TCP") {
+      std::string    server = ps->getStringVector(prefix + "_ServerHosts")[storageHostIndex];
+      //unsigned short port   = boost::lexical_cast<unsigned short>(ps->getPortsOf(prefix)[storagePortIndex]);
+      unsigned short port   = boost::lexical_cast<unsigned short>(ps->getPortsOf(prefix)[subbandNumber]);
+
+      std::clog << "subband " << subbandNumber << " written to tcp:" << server << ':' << port << std::endl;
+      itsStreamsToStorage.push_back(new SocketStream(server.c_str(), port, SocketStream::TCP, SocketStream::Client));
+    } else if (connectionType == "FILE") {
+      std::string filename = ps->getString(prefix + "_BaseFileName") + '.' +
+		      boost::lexical_cast<std::string>(storageHostIndex) + '.' +
+		      boost::lexical_cast<std::string>(subbandNumber);
+		      //boost::lexical_cast<std::string>(storagePortIndex);
+
+      std::clog << "subband " << subbandNumber << " written to file:" << filename << std::endl;
+      itsStreamsToStorage.push_back(new FileStream(filename.c_str(), 0666));
+    } else {
+      THROW(IONProcException, "unsupported ION->Storage stream type");
+    }
   }
 }
 
@@ -139,11 +112,8 @@ void OutputSection::preprocess(const Parset *ps)
   for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++)
     itsVisibilitySums.push_back(new CorrelatedData(nrBaselines, nrChannels, hugeMemoryAllocator));
 
-  for (unsigned i = 0; i < maxSendQueueSize; i ++)
-    itsFreeQueue.append(new CorrelatedData(nrBaselines, nrChannels, hugeMemoryAllocator));
-
-  if (pthread_create(&itsSendThread, 0, sendThreadStub, this) != 0)
-    THROW(IONProcException, "could not create send thread");
+  for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++)
+    itsOutputThreads.push_back(new OutputThread(itsStreamsToStorage[subband], nrBaselines, nrChannels));
 }
 
 
@@ -155,25 +125,22 @@ void OutputSection::process()
   for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) {
     // TODO: make sure that there are more free buffers than subbandsPerPset
 
-    CorrelatedData *data   = lastTime ? itsFreeQueue.remove() : firstTime ? itsVisibilitySums[subband] : itsTmpSum;
-  
-    unsigned	   channel = CN_Mapping::mapCoreOnPset(itsCurrentComputeCore, itsPsetNumber);
+    unsigned inputChannel = CN_Mapping::mapCoreOnPset(itsCurrentComputeCore, itsPsetNumber);
 
-    data->read(itsStreamsFromCNs[channel]);
+    if (lastTime) {
+      CorrelatedData *data = itsOutputThreads[subband]->itsFreeQueue.remove();
+    
+      data->read(itsStreamsFromCNs[inputChannel]);
 
-    if (!firstTime)
-      if (lastTime)
+      if (!firstTime)
 	*data += *itsVisibilitySums[subband];
-      else
-	*itsVisibilitySums[subband] += *data;
-
-    if (lastTime) {
-#if 0
-      for (unsigned ch = 1; ch < 256; ch ++)
-	std::clog << std::setprecision(7) << ch << ' ' << abs(data->visibilities[0][ch][0][0]) << std::endl;
-#endif
 
-      itsSendQueue.append(data);
+      itsOutputThreads[subband]->itsSendQueue.append(data);
+    } else if (firstTime) {
+      itsVisibilitySums[subband]->read(itsStreamsFromCNs[inputChannel]);
+    } else {
+      itsTmpSum->read(itsStreamsFromCNs[inputChannel]);
+      *itsVisibilitySums[subband] += *itsTmpSum;
     }
 
     if (++ itsCurrentComputeCore == itsNrComputeCores)
@@ -187,24 +154,19 @@ void OutputSection::process()
 
 void OutputSection::postprocess()
 {
-  itsSendQueue.append(0); // 0 indicates that no more messages will be sent
-
-  if (pthread_join(itsSendThread, 0) != 0)
-    THROW(IONProcException, "could not join send thread");
-
-  delete itsStreamToStorage; // closes stream, stopping the Storage section
-  itsStreamToStorage = 0;
-
-  for (unsigned subband = 0; subband < itsVisibilitySums.size(); subband ++)
+  for (unsigned subband = 0; subband < itsNrSubbandsPerPset; subband ++) {
+    itsOutputThreads[subband]->itsSendQueue.append(0); // 0 indicates that no more messages will be sent
+    delete itsOutputThreads[subband];
     delete itsVisibilitySums[subband];
+    delete itsStreamsToStorage[subband];
+  }
 
-  itsVisibilitySums.resize(0);
+  itsOutputThreads.clear();
+  itsVisibilitySums.clear();
+  itsStreamsToStorage.clear();
 
   delete itsTmpSum;
   itsTmpSum = 0;
-
-  for (unsigned i = 0; i < maxSendQueueSize; i ++)
-    delete itsFreeQueue.remove();
 }
 
 }
diff --git a/RTCP/IONProc/src/OutputSection.h b/RTCP/IONProc/src/OutputSection.h
index 90d4d0d67f4..6be05515115 100644
--- a/RTCP/IONProc/src/OutputSection.h
+++ b/RTCP/IONProc/src/OutputSection.h
@@ -23,11 +23,9 @@
 
 #include <Interface/CorrelatedData.h>
 #include <Interface/Parset.h>
-#include <Interface/Queue.h>
+#include <IONProc/OutputThread.h>
 #include <Stream/Stream.h>
 
-#include <pthread.h>
-
 #include <vector>
 
 
@@ -38,31 +36,24 @@ class OutputSection
 {
   public:
     OutputSection(unsigned psetNumber, const std::vector<Stream *> &streamsFromCNs);
-   ~OutputSection();
 
     void			preprocess(const Parset *);
     void			process();
     void			postprocess();
 
   private:
-    static void			*sendThreadStub(void *);
-    void			sendThread();
     void			connectToStorage(const Parset *);
 
-    static const unsigned	maxSendQueueSize = 3;
-
     std::vector<CorrelatedData *> itsVisibilitySums;
     CorrelatedData		*itsTmpSum;
-    Queue<CorrelatedData *>	itsFreeQueue, itsSendQueue;
+    std::vector<OutputThread *> itsOutputThreads;
 
     unsigned			itsPsetNumber, itsNrComputeCores, itsCurrentComputeCore;
     unsigned			itsNrSubbandsPerPset;
     unsigned			itsNrIntegrationSteps, itsCurrentIntegrationStep;
 
     const std::vector<Stream *> &itsStreamsFromCNs;
-    Stream			*itsStreamToStorage;
-
-    pthread_t			itsSendThread;
+    std::vector<Stream *>	itsStreamsToStorage;
 };
 
 }
diff --git a/RTCP/IONProc/src/OutputThread.cc b/RTCP/IONProc/src/OutputThread.cc
new file mode 100644
index 00000000000..04734a885ef
--- /dev/null
+++ b/RTCP/IONProc/src/OutputThread.cc
@@ -0,0 +1,88 @@
+//#  OutputThread.cc:
+//#
+//#  Copyright (C) 2008
+//#  ASTRON (Netherlands Foundation for Research in Astronomy)
+//#  P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
+//#
+//#  This program is free software; you can redistribute it and/or modify
+//#  it under the terms of the GNU General Public License as published by
+//#  the Free Software Foundation; either version 2 of the License, or
+//#  (at your option) any later version.
+//#
+//#  This program is distributed in the hope that it will be useful,
+//#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+//#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//#  GNU General Public License for more details.
+//#
+//#  You should have received a copy of the GNU General Public License
+//#  along with this program; if not, write to the Free Software
+//#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+//#
+//#  $Id$
+
+//# Always #include <lofar_config.h> first!
+#include <lofar_config.h>
+
+#include <IONProc/OutputThread.h>
+#include <IONProc/ION_Allocator.h>
+
+
+namespace LOFAR {
+namespace RTCP {
+
+
+OutputThread::OutputThread(Stream *streamToStorage, unsigned nrBaselines, unsigned nrChannels)
+:
+  itsStreamToStorage(streamToStorage)
+{
+  for (unsigned i = 0; i < maxSendQueueSize; i ++)
+    itsFreeQueue.append(new CorrelatedData(nrBaselines, nrChannels, hugeMemoryAllocator));
+
+  if (pthread_create(&thread, 0, mainLoopStub, this) != 0) {
+    std::cerr << "could not create output thread" << std::endl;
+    exit(1);
+  }
+}
+
+
+OutputThread::~OutputThread()
+{
+  if (pthread_join(thread, 0) != 0) {
+    std::cerr << "could not join output thread" << std::endl;
+    exit(1);
+  }
+
+  for (unsigned i = 0; i < maxSendQueueSize; i ++)
+    delete itsFreeQueue.remove();
+}
+
+
+void OutputThread::mainLoop()
+{
+  CorrelatedData *data;
+
+  while ((data = itsSendQueue.remove()) != 0) {
+    data->write(itsStreamToStorage);
+    itsFreeQueue.append(data);
+  }
+}
+
+
+void *OutputThread::mainLoopStub(void *outputThread)
+{
+  try {
+    static_cast<OutputThread *>(outputThread)->mainLoop();
+  } catch (Exception &ex) {
+    std::cerr << "caught Exception: " << ex.what() << std::endl;
+  } catch (std::exception &ex) {
+    std::cerr << "caught std::exception: " << ex.what() << std::endl;
+  } catch (...) {
+    std::cerr << "caught non-std:exception" << std::endl;
+  }
+
+  //static_cast<OutputThread *>(outputThread)->stopped = true;
+  return 0;
+}
+
+} // namespace RTCP
+} // namespace LOFAR
diff --git a/RTCP/IONProc/src/OutputThread.h b/RTCP/IONProc/src/OutputThread.h
new file mode 100644
index 00000000000..8df55c24a76
--- /dev/null
+++ b/RTCP/IONProc/src/OutputThread.h
@@ -0,0 +1,57 @@
+//#  OutputThread.h
+//#
+//#  Copyright (C) 2006
+//#  ASTRON (Netherlands Foundation for Research in Astronomy)
+//#  P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
+//#
+//#  This program is free software; you can redistribute it and/or modify
+//#  it under the terms of the GNU General Public License as published by
+//#  the Free Software Foundation; either version 2 of the License, or
+//#  (at your option) any later version.
+//#
+//#  This program is distributed in the hope that it will be useful,
+//#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+//#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//#  GNU General Public License for more details.
+//#
+//#  You should have received a copy of the GNU General Public License
+//#  along with this program; if not, write to the Free Software
+//#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+//#
+//#  $Id$
+
+#ifndef LOFAR_IONPROC_OUTPUT_THREAD_H
+#define LOFAR_IONPROC_OUTPUT_THREAD_H
+
+//# Never #include <config.h> or #include <lofar_config.h> in a header file!
+
+#include <Interface/CorrelatedData.h>
+#include <Interface/Queue.h>
+#include <Stream/Stream.h>
+
+#include <pthread.h>
+
+namespace LOFAR {
+namespace RTCP {
+
+class OutputThread
+{
+  public:
+			    OutputThread(Stream *streamToStorage, unsigned nrBaselines, unsigned nrChannels);
+			    ~OutputThread();
+
+    static const unsigned   maxSendQueueSize = 3;
+    Queue<CorrelatedData *> itsFreeQueue, itsSendQueue;
+
+  private:
+    static void		    *mainLoopStub(void *outputThread);
+    void		    mainLoop();
+
+    Stream		    *itsStreamToStorage; 
+    pthread_t		    thread;
+};
+
+} // namespace RTCP
+} // namespace LOFAR
+
+#endif
diff --git a/RTCP/Storage/include/Storage/InputThread.h b/RTCP/Storage/include/Storage/InputThread.h
new file mode 100644
index 00000000000..b88b4f048a0
--- /dev/null
+++ b/RTCP/Storage/include/Storage/InputThread.h
@@ -0,0 +1,57 @@
+//#  InputThread.h
+//#
+//#  Copyright (C) 2008
+//#  ASTRON (Netherlands Foundation for Research in Astronomy)
+//#  P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
+//#
+//#  This program is free software; you can redistribute it and/or modify
+//#  it under the terms of the GNU General Public License as published by
+//#  the Free Software Foundation; either version 2 of the License, or
+//#  (at your option) any later version.
+//#
+//#  This program is distributed in the hope that it will be useful,
+//#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+//#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//#  GNU General Public License for more details.
+//#
+//#  You should have received a copy of the GNU General Public License
+//#  along with this program; if not, write to the Free Software
+//#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+//#
+//#  $Id$
+
+#ifndef LOFAR_RTCP_STORAGE_INPUT_THREAD_H
+#define LOFAR_RTCP_STORAGE_INPUT_THREAD_H
+
+//# Never #include <config.h> or #include <lofar_config.h> in a header file!
+
+#include <Interface/CorrelatedData.h>
+#include <Interface/Queue.h>
+#include <Stream/Stream.h>
+
+#include <pthread.h>
+
+namespace LOFAR {
+namespace RTCP {
+
+class InputThread
+{
+  public:
+			    InputThread(Stream *streamFromION, unsigned nrBaselines, unsigned nrChannels);
+			    ~InputThread();
+
+    static const unsigned   maxReceiveQueueSize = 3;
+    Queue<CorrelatedData *> itsFreeQueue, itsReceiveQueue;
+
+  private:
+    static void		    *mainLoopStub(void *inputThread);
+    void		    mainLoop();
+
+    Stream		    *itsStreamFromION; 
+    pthread_t		    thread;
+};
+
+} // namespace RTCP
+} // namespace LOFAR
+
+#endif
diff --git a/RTCP/Storage/include/Storage/Makefile.am b/RTCP/Storage/include/Storage/Makefile.am
index 561d92cb870..53a7e891d01 100644
--- a/RTCP/Storage/include/Storage/Makefile.am
+++ b/RTCP/Storage/include/Storage/Makefile.am
@@ -1,6 +1,7 @@
 pkginclude_HEADERS = Package__Version.h
 
 noinst_HEADERS =		\
+	InputThread.h		\
 	SubbandWriter.h		\
 	MSWriter.h		\
 	MSWriterCasa.h		\
diff --git a/RTCP/Storage/include/Storage/SubbandWriter.h b/RTCP/Storage/include/Storage/SubbandWriter.h
index d01de7c92f6..07882de1feb 100644
--- a/RTCP/Storage/include/Storage/SubbandWriter.h
+++ b/RTCP/Storage/include/Storage/SubbandWriter.h
@@ -37,10 +37,9 @@
 #include <Interface/Parset.h>
 #include <Interface/CorrelatedData.h>
 #include <Interface/Queue.h>
+#include <Storage/InputThread.h>
 #include <Stream/Stream.h>
 
-#include <pthread.h>
-
 
 namespace LOFAR {
 namespace RTCP {
@@ -63,20 +62,11 @@ class SubbandWriter
     void		    writeLogMessage();
     bool		    processSubband(unsigned sb);
 
-    void		    createInputThread();
-    void		    stopInputThread();
-    static void		    *inputThreadStub(void *);
-    void		    inputThread();
-
-  
     const Parset	    *itsPS;
     unsigned		    itsRank;
 
     std::vector<Stream *>   itsInputStreams;
-
-    static const unsigned   nrInputBuffers = 10;
-    Queue<CorrelatedData *> itsFreeQueue, itsReceiveQueue;
-    pthread_t		    itsInputThread;
+    std::vector<InputThread *> itsInputThreads;
 
     unsigned		    itsNStations;
     unsigned		    itsNBaselines;
diff --git a/RTCP/Storage/src/InputThread.cc b/RTCP/Storage/src/InputThread.cc
new file mode 100644
index 00000000000..2c04e210dad
--- /dev/null
+++ b/RTCP/Storage/src/InputThread.cc
@@ -0,0 +1,98 @@
+//#  InputThread.cc:
+//#
+//#  Copyright (C) 2008
+//#  ASTRON (Netherlands Foundation for Research in Astronomy)
+//#  P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
+//#
+//#  This program is free software; you can redistribute it and/or modify
+//#  it under the terms of the GNU General Public License as published by
+//#  the Free Software Foundation; either version 2 of the License, or
+//#  (at your option) any later version.
+//#
+//#  This program is distributed in the hope that it will be useful,
+//#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+//#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//#  GNU General Public License for more details.
+//#
+//#  You should have received a copy of the GNU General Public License
+//#  along with this program; if not, write to the Free Software
+//#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+//#
+//#  $Id$
+
+//# Always #include <lofar_config.h> first!
+#include <lofar_config.h>
+
+#include <Storage/InputThread.h>
+#include <Stream/NullStream.h>
+
+
+namespace LOFAR {
+namespace RTCP {
+
+
+InputThread::InputThread(Stream *streamFromION, unsigned nrBaselines, unsigned nrChannels)
+:
+  itsStreamFromION(streamFromION)
+{
+  for (unsigned i = 0; i < maxReceiveQueueSize; i ++)
+    itsFreeQueue.append(new CorrelatedData(nrBaselines, nrChannels));
+
+  if (pthread_create(&thread, 0, mainLoopStub, this) != 0) {
+    std::cerr << "could not create input thread" << std::endl;
+    exit(1);
+  }
+}
+
+
+InputThread::~InputThread()
+{
+  if (pthread_join(thread, 0) != 0) {
+    std::cerr << "could not join input thread" << std::endl;
+    exit(1);
+  }
+
+  for (unsigned i = 0; i < maxReceiveQueueSize; i ++)
+    delete itsFreeQueue.remove();
+}
+
+
+void InputThread::mainLoop()
+{
+  // limit reads from NullStream to 10 blocks; otherwise unlimited
+  bool		 nullInput = dynamic_cast<NullStream *>(itsStreamFromION) != 0;
+  unsigned	 increment = nullInput ? 1 : 0;
+  CorrelatedData *data     = 0;
+
+  try {
+    for (unsigned count = 0; count < 10; count += increment) {
+      data = itsFreeQueue.remove();
+      data->read(itsStreamFromION);
+      itsReceiveQueue.append(data);
+    }
+  } catch (Stream::EndOfStreamException &) {
+    itsFreeQueue.append(data);
+  }
+
+  itsReceiveQueue.append(0); // no more data
+}
+
+
+void *InputThread::mainLoopStub(void *inputThread)
+{
+  try {
+    static_cast<InputThread *>(inputThread)->mainLoop();
+  } catch (Exception &ex) {
+    std::cerr << "caught Exception: " << ex.what() << std::endl;
+  } catch (std::exception &ex) {
+    std::cerr << "caught std::exception: " << ex.what() << std::endl;
+  } catch (...) {
+    std::cerr << "caught non-std:exception" << std::endl;
+  }
+
+  //static_cast<InputThread *>(inputThread)->stopped = true;
+  return 0;
+}
+
+} // namespace RTCP
+} // namespace LOFAR
diff --git a/RTCP/Storage/src/Makefile.am b/RTCP/Storage/src/Makefile.am
index 5812818e3ad..6bda101cc00 100644
--- a/RTCP/Storage/src/Makefile.am
+++ b/RTCP/Storage/src/Makefile.am
@@ -1,6 +1,7 @@
 lib_LTLIBRARIES			= libstorage.la
 
 libstorage_la_SOURCES	= Package__Version.cc \
+				  InputThread.cc \
 				  SubbandWriter.cc \
 				  MSWriter.cc      \
 				  MSWriterCasa.cc  \
diff --git a/RTCP/Storage/src/SubbandWriter.cc b/RTCP/Storage/src/SubbandWriter.cc
index f0a63de1f19..0979087f3fc 100644
--- a/RTCP/Storage/src/SubbandWriter.cc
+++ b/RTCP/Storage/src/SubbandWriter.cc
@@ -107,89 +107,32 @@ void SubbandWriter::createInputStreams()
   string   prefix            = "OLAP.OLAP_Conn.IONProc_Storage";
   string   connectionType    = itsPS->getString(prefix + "_Transport");
 
-  itsInputStreams.resize(itsPS->nrPsetsPerStorage());
+  for (unsigned subband = 0; subband < itsNrSubbandsPerStorage; subband ++) {
+    unsigned subbandNumber = itsRank * itsNrSubbandsPerStorage + subband;
 
-  for (unsigned i = 0; i < itsPS->nrPsetsPerStorage(); i ++)
     if (connectionType == "NULL") {
-      std::cout << "input " << i << ": null stream" << std::endl;
-      itsInputStreams[i] = new NullStream;
+      std::cout << "subband " << subbandNumber << " read from null stream" << std::endl;
+      itsInputStreams.push_back(new NullStream);
     } else if (connectionType == "TCP") {
       std::string    server = itsPS->getStringVector(prefix + "_ServerHosts")[itsRank];
-      unsigned short port   = boost::lexical_cast<unsigned short>(itsPS->getPortsOf(prefix)[i]);
+      unsigned short port   = boost::lexical_cast<unsigned short>(itsPS->getPortsOf(prefix)[subbandNumber]);
 
-      std::cout << "input " << i << ": tcp:" << server << ':' << port << std::endl;
-      itsInputStreams[i] = new SocketStream(server.c_str(), port, SocketStream::TCP, SocketStream::Server);
+      std::cout << "subband " << subbandNumber << " read from tcp:" << server << ':' << port << std::endl;
+      itsInputStreams.push_back(new SocketStream(server.c_str(), port, SocketStream::TCP, SocketStream::Server));
     } else if (connectionType == "FILE") {
       std::string filename = itsPS->getString(prefix + "_BaseFileName") + '.' +
 			      boost::lexical_cast<std::string>(itsRank) + '.' +
-			      boost::lexical_cast<std::string>(i);
+			      boost::lexical_cast<std::string>(subbandNumber);
 
-      std::cout << "input " << i << ": file:" << filename << std::endl;
-      itsInputStreams[i] = new FileStream(filename.c_str());
+      std::cout << "subband " << subbandNumber << " read from file:" << filename << std::endl;
+      itsInputStreams.push_back(new FileStream(filename.c_str()));
     } else {
       THROW(StorageException, "unsupported ION->Storage stream type");
     }
-}
-
-
-void *SubbandWriter::inputThreadStub(void *arg)
-{
-  try {
-    static_cast<SubbandWriter *>(arg)->inputThread();
-  } catch (Exception &ex) {
-    std::cerr << "caught Exception: " << ex.what() << std::endl;
-  } catch (std::exception &ex) {
-    std::cerr << "caught std::exception: " << ex.what() << std::endl;
-  } catch (...) {
-    std::cerr << "caught non-std::exception" << std::endl;
-  } 
-
-  return 0;
-}
-
-
-void SubbandWriter::inputThread()
-{
-  bool nullInput = dynamic_cast<NullStream *>(itsInputStreams[0]) != 0;
-
-  do {
-    for (unsigned sb = 0; sb < itsNrSubbandsPerStorage; ++ sb) {
-      // find out from which input channel we should read
-      unsigned	     pset  = sb / itsNrSubbandsPerPset;
-      CorrelatedData *data = itsFreeQueue.remove();
-
-      try {
-	data->read(itsInputStreams[pset]);
-      } catch (Stream::EndOfStreamException &) {
-	itsFreeQueue.append(data);
-	goto end; // nested loop; cannot use "break"
-      }
-
-      itsReceiveQueue.append(data);
-    }
-  } while (!nullInput);  // prevent infinite loop when using NullStream
-
-end:
-  itsReceiveQueue.append(0); // signal main thread that this was the last
-}
-
-
-void SubbandWriter::createInputThread()
-{
-  if (pthread_create(&itsInputThread, 0, inputThreadStub, this) != 0) {
-    std::cerr << "could not create input thread" << std::endl;
-    exit(1);
   }
 }
 
 
-void SubbandWriter::stopInputThread()
-{
-  if (pthread_join(itsInputThread, 0) != 0)
-    std::cerr << "could not join input thread";
-}
-
-
 void SubbandWriter::preprocess() 
 {
 #if defined HAVE_AIPSPP
@@ -204,9 +147,6 @@ void SubbandWriter::preprocess()
   };
 #endif
 
-  for (unsigned i = 0; i < nrInputBuffers; i ++)
-    itsFreeQueue.append(new CorrelatedData(itsNBaselines, itsNChannels));
-
   double startTime = itsPS->startTime();
   LOG_TRACE_VAR_STR("startTime = " << startTime);
   
@@ -278,7 +218,9 @@ void SubbandWriter::preprocess()
 #endif // defined HAVE_AIPSPP
 
   createInputStreams();
-  createInputThread();
+  
+  for (unsigned sb = 0; sb < itsNrSubbandsPerStorage; sb ++)
+    itsInputThreads.push_back(new InputThread(itsInputStreams[sb], itsNBaselines, itsNChannels));
 }
 
 
@@ -312,7 +254,7 @@ void SubbandWriter::writeLogMessage()
 
 bool SubbandWriter::processSubband(unsigned sb)
 {
-  CorrelatedData *data = itsReceiveQueue.remove();
+  CorrelatedData *data = itsInputThreads[sb]->itsReceiveQueue.remove();
 
   if (data == 0)
     return false;
@@ -367,7 +309,7 @@ bool SubbandWriter::processSubband(unsigned sb)
   }
 #endif
 
-  itsFreeQueue.append(data);
+  itsInputThreads[sb]->itsFreeQueue.append(data);
   return true;
 }
 
@@ -393,11 +335,12 @@ void SubbandWriter::process()
 
 void SubbandWriter::postprocess() 
 {
-  stopInputThread();
-
-  for (unsigned i = 0; i < itsInputStreams.size(); i ++)
-    delete itsInputStreams[i];
+  for (unsigned sb = 0; sb < itsNrSubbandsPerStorage; sb ++) {
+    delete itsInputThreads[sb];
+    delete itsInputStreams[sb];
+  }
 
+  itsInputThreads.clear();
   itsInputStreams.clear();
 
   delete [] itsFlagsBuffers;	itsFlagsBuffers   = 0;
@@ -410,9 +353,6 @@ void SubbandWriter::postprocess()
   itsWriters.clear();
 #endif
 
-  for (unsigned i = 0; i < nrInputBuffers; i ++)
-    delete itsFreeQueue.remove();
-
   delete itsVisibilities;	itsVisibilities   = 0;
 
   cout<<itsWriteTimer<<endl;
-- 
GitLab