Skip to content
Snippets Groups Projects
Commit 848bb974 authored by Ruud Overeem's avatar Ruud Overeem
Browse files

Bug 1000: First working version of the CEP logProcessor prototype. It can...

Bug 1000: First working version of the CEP logProcessor prototype. It can handle multiple incoming streams and will log the messages with their SocketID on INFO level.
parent cb645371
No related branches found
No related tags found
No related merge requests found
......@@ -74,6 +74,7 @@ dnl
AC_OUTPUT(
src/Makefile
src/ApplController/Makefile
src/CEPlogProcessor/Makefile
src/OnlineControl/Makefile
dnl src/OfflineControl/Makefile
Makefile
......
......@@ -29,15 +29,13 @@
#include <Common/SystemUtil.h>
#include <ALC/ACRequest.h>
#include "CEPlogProcessor.h"
#include "lofarDirs.h"
namespace LOFAR {
namespace ACC {
namespace APL {
CEPlogProcessor::CEPlogProcessor(const string& progName) :
itsListener (0),
itsParamSet (new ParameterSet),
itsACPool (0)
itsParamSet (new ParameterSet)
{
// Read in the parameterfile with network parameters.
ConfigLocator aCL;
......@@ -57,7 +55,6 @@ CEPlogProcessor::~CEPlogProcessor()
{
if (itsListener) { delete itsListener; }
if (itsParamSet) { delete itsParamSet; }
if (itsACPool) { delete itsACPool; }
}
void CEPlogProcessor::doWork() throw (Exception)
......@@ -96,7 +93,7 @@ void CEPlogProcessor::doWork() throw (Exception)
handleConnectionRequest();
}
else {
handleDataMessage(sid);
handleDataStream(sid);
}
} // for
} // while
......@@ -111,15 +108,15 @@ void CEPlogProcessor::handleConnectionRequest()
Socket* dataSocket = itsListener->accept(-1);
ASSERTSTR(dataSocket,
"Serious problems on listener socket, exiting! : " << itsListener->errstr());
itsConnSet.add(dataSocket):
itsConnSet.add(dataSocket->getSid());
// give stream its own buffer.
streamBuffer_t stream;
stream.socket = dataSocket;
stream.buffer = alloc(itsBufferSize);
stream.readPtr = 0;
stream.writePtr = 0;
itsLogStreams[sid] = stream;
stream.buffer = (char*)malloc(itsBufferSize);
stream.inPtr = 0;
stream.outPtr = 0;
itsLogStreams[dataSocket->getSid()] = stream;
}
//
......@@ -128,15 +125,18 @@ void CEPlogProcessor::handleConnectionRequest()
void CEPlogProcessor::handleDataStream(int sid)
{
// read in the new bytes
stream& = itsLogStreams[sid];
int newBytes = socket.read(stream.buffer + stream.inPtr, itsBufferSize - stream.inPtr);
streamBuffer_t stream = itsLogStreams[sid];
int newBytes = stream.socket->read(stream.buffer + stream.inPtr, itsBufferSize - stream.inPtr);
if (newBytes < 0) {
LOG_ERROR_STR("read on socket " << sid << " returned " << newBytes << ". Closing connection");
free (stream.buffer);
stream.socket->close();
free(stream.socket);
itsLogStreams.erase(sid);
itsConnSet.remove(sid);
return;
}
LOG_DEBUG_STR("Received " << newBytes " bytes at sid " << sid);
LOG_DEBUG_STR("Received " << newBytes << " bytes at sid " << sid);
stream.inPtr += newBytes;
// process as much data as possible from the buffer.
......@@ -145,23 +145,23 @@ void CEPlogProcessor::handleDataStream(int sid)
continue;
}
stream.buffer[i] = \0';
LOG_INFO(formatString("SID %d:%s\n", sid, streamBuffer.outPtr));
outPtr = i+1;
if (outPtr >= inPtr) { // All received bytes handled?
inPtr = 0;
outPtr = 0;
stream.buffer[i] = '\0';
LOG_INFO(formatString("SID %d:>%s<", sid, &(stream.buffer[stream.outPtr])));
stream.outPtr = i+1;
if (stream.outPtr >= stream.inPtr) { // All received bytes handled?
stream.inPtr = 0;
stream.outPtr = 0;
return;
}
}
if (outPtr > (int)(0.5*itsBufferSize)) {
if (stream.outPtr > (int)(0.5*itsBufferSize)) {
// When buffer becomes full shift leftovers to the left.
memmove (stream.buffer, stream.buffer + stream.outPtr, (stream.inPtr - stream.outPtr + 1));
stream.inPtr -= stream.outPtr;
stream.outPut = 0;
stream.outPtr = 0;
}
}
} // namespace ACC
} // namespace APL
} // namespace LOFAR
......@@ -38,7 +38,7 @@
namespace LOFAR {
namespace APL {
// \addtogroup ACCbin
// \addtogroup CEPCU
// @{
......@@ -90,7 +90,7 @@ private:
};
// @} addgroup
} // namespace ACC
} // namespace APL
} // namespace LOFAR
#endif
......@@ -31,7 +31,7 @@
#include "CEPlogProcessor.h"
using namespace LOFAR;
using namespace LOFAR::ACC;
using namespace LOFAR::APL;
//
// MAIN (parameterfile)
......@@ -58,7 +58,7 @@ int main (int /*argc*/, char* argv[]) {
LOG_INFO_STR("Starting up: " << argv[0]);
try {
//#if REAL_DAEMON
#if REAL_DAEMON
pid_t pid = fork();
switch (pid) {
case -1: // error
......@@ -71,7 +71,7 @@ int main (int /*argc*/, char* argv[]) {
LOG_INFO_STR("Daemon succesfully started, pid = " << pid);
return (0);
}
//#endif
#endif
// TODO: active the next two calls.
// setsid(); // disconnect from terminalsession
// chdir("/"); // might be on a mounted file system
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment