From 6e5fc56a37ca164786e7c204fa92a467d701b268 Mon Sep 17 00:00:00 2001
From: zwart <sdos@astron.nl>
Date: Wed, 31 May 2006 11:35:47 +0000
Subject: [PATCH] BugID: 719

Added semaphore for incoming connections. This way the user can control the maximum incoming connection that are communicating simultaneously.
---
 CEP/CEPFrame/src/DataManager.cc          |  5 ++++
 CEP/CEPFrame/src/DataManager.h           |  1 +
 CEP/CEPFrame/src/SynchronisityManager.cc | 33 +++++++++++++++++++-----
 CEP/CEPFrame/src/SynchronisityManager.h  |  5 ++--
 4 files changed, 35 insertions(+), 9 deletions(-)

diff --git a/CEP/CEPFrame/src/DataManager.cc b/CEP/CEPFrame/src/DataManager.cc
index 644a21e919d..97b0f3f1af9 100644
--- a/CEP/CEPFrame/src/DataManager.cc
+++ b/CEP/CEPFrame/src/DataManager.cc
@@ -432,6 +432,11 @@ void DataManager::setOutRoundRobinPolicy(vector<int> channels, unsigned maxConcu
   itsSynMan->setOutRoundRobinPolicy(channels, maxConcurrent);
 }
 
+void DataManager::setInRoundRobinPolicy(vector<int> channels, unsigned maxConcurrent)
+{
+  itsSynMan->setInRoundRobinPolicy(channels, maxConcurrent);
+}
+
 bool DataManager::isInSynchronous(int channel)
 {
   DBGASSERTSTR(channel >= 0, "input channel too low");
diff --git a/CEP/CEPFrame/src/DataManager.h b/CEP/CEPFrame/src/DataManager.h
index 57715335d42..748c1584d6a 100644
--- a/CEP/CEPFrame/src/DataManager.h
+++ b/CEP/CEPFrame/src/DataManager.h
@@ -109,6 +109,7 @@ public:
 
   // limit the number of concurrent writers in a group of output channels
   void setOutRoundRobinPolicy(vector<int> channels, unsigned maxConcurrent);
+  void setInRoundRobinPolicy(vector<int> channels, unsigned maxConcurrent);
 
   // Is data transport of input channel synchronous?
   bool isInSynchronous(int channel);
diff --git a/CEP/CEPFrame/src/SynchronisityManager.cc b/CEP/CEPFrame/src/SynchronisityManager.cc
index db192856a33..6a789f25901 100644
--- a/CEP/CEPFrame/src/SynchronisityManager.cc
+++ b/CEP/CEPFrame/src/SynchronisityManager.cc
@@ -65,6 +65,8 @@ SynchronisityManager::SynchronisityManager (DataManager* dm, int inputs, int out
     itsReadersData[i].manager = 0;
     itsReadersData[i].conn = 0;
     itsReadersData[i].stopThread = true;
+    itsReadersData[i].commAllowed.up();	// initial semaphore level is 1
+    itsReadersData[i].nextThread = &itsReadersData[i];	// point to self
   }
 
   for (int j = 0; j < outputs; j++)
@@ -76,8 +78,8 @@ SynchronisityManager::SynchronisityManager (DataManager* dm, int inputs, int out
     itsWritersData[j].manager = 0;
     itsWritersData[j].conn = 0;
     itsWritersData[j].stopThread = true;
-    itsWritersData[j].writeAllowed.up();	// initial semaphore level is 1
-    itsWritersData[j].nextWriter = &itsWritersData[j];	// point to self
+    itsWritersData[j].commAllowed.up();	// initial semaphore level is 1
+    itsWritersData[j].nextThread = &itsWritersData[j];	// point to self
   }
 }
 
@@ -154,11 +156,23 @@ void SynchronisityManager::setOutRoundRobinPolicy(vector<int> channels, unsigned
 {
   for (unsigned i = 0; i < channels.size(); i ++) {
     thread_data *data = &itsWritersData[channels[i]];
-    DBGASSERTSTR(data->nextWriter == data, "Round Robin policy of out channel " << channels[i] << " set multiple times");
+    DBGASSERTSTR(data->nextThread == data, "Round Robin policy of out channel " << channels[i] << " set multiple times");
     if (i >= maxConcurrent) {
-      data->writeAllowed.down(); // initial semaphore level is 0
+      data->commAllowed.down(); // initial semaphore level is 0
     }
-    data->nextWriter = &itsWritersData[channels[(i + maxConcurrent) % channels.size()]];
+    data->nextThread = &itsWritersData[channels[(i + maxConcurrent) % channels.size()]];
+  }
+}
+
+void SynchronisityManager::setInRoundRobinPolicy(vector<int> channels, unsigned maxConcurrent)
+{
+  for (unsigned i = 0; i < channels.size(); i ++) {
+    thread_data *data = &itsReadersData[channels[i]];
+    DBGASSERTSTR(data->nextThread == data, "Round Robin policy of in channel " << channels[i] << " set multiple times");
+    if (i >= maxConcurrent) {
+      data->commAllowed.down(); // initial semaphore level is 0
+    }
+    data->nextThread = &itsReadersData[channels[(i + maxConcurrent) % channels.size()]];
   }
 }
 
@@ -189,7 +203,12 @@ void* SynchronisityManager::startReaderThread(void* thread_arg)
     pConn->setDestinationDH(dh);           // Set connection to new DataHolder
     pConn->getTransportHolder()->reset();  // Reset TransportHolder
 
+    data->commAllowed.down();
+
     Connection::State result = pConn->read();
+
+    data->nextThread->commAllowed.up();
+
     ASSERTSTR(result != Connection::Error,
 	      "Reader thread encountered error in reading");
     manager->writeUnlock(id);
@@ -214,13 +233,13 @@ void* SynchronisityManager::startWriterThread(void* thread_arg)
     pConn->setSourceDH(dh);               // Set connection to new DataHolder
     pConn->getTransportHolder()->reset(); // Reset TransportHolder
     cerr << /*MPI_Wtime() <<*/ ": thread " << pthread_self() << " waits for write right\n";
-    data->writeAllowed.down();
+    data->commAllowed.down();
     cerr << /*MPI_Wtime() <<*/ ": thread " << pthread_self() << " received write right\n";
     Connection::State result = pConn->write();
     ASSERTSTR(result != Connection::Error,
 	      "Writer thread encountered error in writing");
     cerr << /*MPI_Wtime() <<*/ ": thread " << pthread_self() << " releases write right\n";
-    data->nextWriter->writeAllowed.up();
+    data->nextThread->commAllowed.up();
     manager->readUnlock(id);
   }
 
diff --git a/CEP/CEPFrame/src/SynchronisityManager.h b/CEP/CEPFrame/src/SynchronisityManager.h
index af9e98b94a7..07034573491 100644
--- a/CEP/CEPFrame/src/SynchronisityManager.h
+++ b/CEP/CEPFrame/src/SynchronisityManager.h
@@ -54,8 +54,8 @@ typedef struct thread_args{
   DHPoolManager*   manager;
   Connection*      conn;
   bool             stopThread;
-  Semaphore	   writeAllowed;
-  thread_args	   *nextWriter;
+  Semaphore	   commAllowed;
+  thread_args	   *nextThread;
 }thread_data;
 
 
@@ -90,6 +90,7 @@ public:
 
   // limit the number of concurrent writers in a group of output channels
   void setOutRoundRobinPolicy(vector<int> channels, unsigned maxConcurrent);
+  void setInRoundRobinPolicy(vector<int> channels, unsigned maxConcurrent);
 
   void preprocess();
   void postprocess();
-- 
GitLab