From f8559a42c0846d4737a3ae04af1b378fa721fdf6 Mon Sep 17 00:00:00 2001
From: Jan David Mol <mol@astron.nl>
Date: Thu, 16 Dec 2010 10:31:22 +0000
Subject: [PATCH] bug 1362: added (untested) circular buffer, and some bug
 fixes

---
 .gitattributes                                |   1 +
 .../src/BGPlogProcessor/CEPlogProcessor.cc    | 125 ++++--------
 .../src/BGPlogProcessor/CEPlogProcessor.h     |  16 +-
 .../src/BGPlogProcessor/CircularBuffer.h      | 188 ++++++++++++++++++
 4 files changed, 241 insertions(+), 89 deletions(-)
 create mode 100644 MAC/APL/CEPCU/src/BGPlogProcessor/CircularBuffer.h

diff --git a/.gitattributes b/.gitattributes
index dc261312ad0..da048258f4b 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -878,6 +878,7 @@ MAC/APL/CASATools/test/tCasaConverter.cc -text
 MAC/APL/CASATools/test/tCasaConverter.log_prop -text
 MAC/APL/CEPCU/src/BGPlogProcessor/CEPDatapoints.dpl -text
 MAC/APL/CEPCU/src/BGPlogProcessor/CEPDatapointtypes.dpl -text
+MAC/APL/CEPCU/src/BGPlogProcessor/CircularBuffer.h -text
 MAC/APL/CURTDBDaemons/src/LogProcessor/LogProcessor.conf -text
 MAC/APL/CURTDBDaemons/src/LogProcessor/LogProcessor.log_prop -text
 MAC/APL/DEPENDENCIES -text svneol=native#application/octet-stream
diff --git a/MAC/APL/CEPCU/src/BGPlogProcessor/CEPlogProcessor.cc b/MAC/APL/CEPCU/src/BGPlogProcessor/CEPlogProcessor.cc
index e59bef92739..be5eec0cc6b 100644
--- a/MAC/APL/CEPCU/src/BGPlogProcessor/CEPlogProcessor.cc
+++ b/MAC/APL/CEPCU/src/BGPlogProcessor/CEPlogProcessor.cc
@@ -383,7 +383,7 @@ void CEPlogProcessor::_deleteStream(GCFPortInterface&	port)
 	LOG_DEBUG_STR("_deleteStream");
 	map<GCFPortInterface*, streamBuffer_t>::iterator	theStream = itsLogStreams.find(&port);
 	if (theStream != itsLogStreams.end()) {
-		delete [] theStream->second.buffer;
+		delete theStream->second.buffer;
 		theStream->second.buffer = 0;
 		itsLogStreams.erase(theStream);
 	}
@@ -406,9 +406,7 @@ void CEPlogProcessor::_handleConnectionRequest()
 	// give stream its own buffer.
 	streamBuffer_t		stream;
 	stream.socket	= pNewClient;
-	stream.buffer 	= new char[itsBufferSize];
-	stream.inPtr  	= 0;
-	stream.outPtr 	= 0;
+	stream.buffer 	= new CircularBuffer(itsBufferSize);
 	itsLogStreams[pNewClient] = stream;
 	LOG_INFO_STR("Added new client to my admin");
 }
@@ -419,57 +417,32 @@ void CEPlogProcessor::_handleConnectionRequest()
 void CEPlogProcessor::_handleDataStream(GCFPortInterface*	port)
 {
 	// read in the new bytes
-	streamBuffer_t	stream		= itsLogStreams[port];
-	LOG_DEBUG_STR("handleDataStream:in=" << stream.inPtr << ", out=" << stream.outPtr);
-	int	newBytes = stream.socket->recv(stream.buffer + stream.inPtr, itsBufferSize - stream.inPtr);
-	LOG_DEBUG_STR("received " << newBytes << " new bytes");
+	streamBuffer_t	&stream	= itsLogStreams[port];
+	int	newBytes = stream.socket->recv( stream.buffer->tail, stream.buffer->tailFreeSpace() );
 	if (newBytes < 0) {
-//		LOG_ERROR_STR("read on socket " << sid << " returned " << newBytes << ". Closing connection");
+	    LOG_DEBUG_STR("Closing connection.");
 		port->close();
 		_deleteStream(*port);
 		return;
 	}
-//	LOG_DEBUG_STR("Received " << newBytes << " bytes at sid " << sid);
-	stream.inPtr += newBytes;
-	
-	// process as much data as possible from the buffer.
-	for (int i = stream.outPtr; i <= stream.inPtr; i++) {
-		if (stream.buffer[i] != '\n') {
-			continue;
-		}
 
-		stream.buffer[i] = '\0';
-//		LOG_INFO(formatString("SID %d:>%s<", sid, &(stream.buffer[stream.outPtr])));
-		LOG_INFO(formatString("(%d,%d)>%s<", stream.outPtr, i, &(stream.buffer[stream.outPtr])));
-		_processLogLine(&(stream.buffer[stream.outPtr]));
-		stream.outPtr = i+1;
-		if (stream.outPtr >= stream.inPtr) {	// All received bytes handled?
-			LOG_DEBUG("Reset of read/write pointers");
-			stream.inPtr = 0;
-			stream.outPtr = 0;
-			itsLogStreams[port] = stream;	// copy changes back to admin
-			return;
-		} 
-	}
+	LOG_DEBUG_STR("Read " << newBytes << " bytes.");
+    stream.buffer->incTail( newBytes );
 
-	if (stream.outPtr > (int)(0.5*itsBufferSize)) {
-		// When buffer becomes full shift leftovers to the left.
-		LOG_DEBUG_STR("move with: " << stream.inPtr << ", " << stream.outPtr);
-		memmove (stream.buffer, stream.buffer + stream.outPtr, (stream.inPtr - stream.outPtr + 1));
-		stream.inPtr -= stream.outPtr;
-		stream.outPtr = 0;
-	}
-
-	itsLogStreams[port] = stream; // copy changes back to admin
+    char lineBuf[1024];
+    while (stream.buffer->getLine( lineBuf, sizeof lineBuf )) {
+      LOG_DEBUG_STR("Read log line " << lineBuf );
+	  _processLogLine(lineBuf);
+    }
 }
 
-time_t CEPlogProcessor::_parseDateTime(const string &date, const string &time) const
+time_t CEPlogProcessor::_parseDateTime(const char *datestr, const char *timestr) const
 {
   struct tm tm;
   time_t ts;
   bool validtime = true;
 
-  if (sscanf(date.c_str(), "%u-%u-%u", 
+  if (sscanf(datestr, "%u-%u-%u", 
     &tm.tm_year, &tm.tm_mon, &tm.tm_mday) != 3) {
     validtime = false;
    } else {
@@ -477,7 +450,7 @@ time_t CEPlogProcessor::_parseDateTime(const string &date, const string &time) c
     tm.tm_year -= 1900;
    }
 
-  if (sscanf(time.c_str(), "%u:%u:%u", 
+  if (sscanf(timestr, "%u:%u:%u",  // ignore milliseconds
     &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 3) {
     validtime = false;
   }
@@ -485,9 +458,9 @@ time_t CEPlogProcessor::_parseDateTime(const string &date, const string &time) c
   if (validtime) {
     ts = mktime(&tm);
   } else {
-    LOG_WARN_STR("Invalid timestamp: " << date << " " << time);
+    LOG_WARN_STR("Invalid timestamp: " << datestr << " " << timestr << "; using now()");
 
-    ts = ::time(0L);
+    ts = time(0L);
   }
 
   return ts;
@@ -498,28 +471,18 @@ time_t CEPlogProcessor::_parseDateTime(const string &date, const string &time) c
 // _processLogLine(char*)
 //
 //
-void CEPlogProcessor::_processLogLine(char*		cString)
+void CEPlogProcessor::_processLogLine(const char *cString)
 {
-        // example log line:
+    // example log line:
 	// Storage@00 09-12-10 11:33:13.240 DEBUG [obs 21855 output 1 subband 223] InputThread::~InputThread()
 	unsigned bufsize = strlen( cString ) + 1;
 
-	string processName;
+	vector<char> processName(bufsize), date(bufsize), time(bufsize), loglevel(bufsize), msg(bufsize);
 	int processNr;
-	string date;
-	string time;
-	string loglevel;
-	string msg;
-
-	processName.reserve( bufsize );
-	date.reserve( bufsize );
-	time.reserve( bufsize );
-	loglevel.reserve( bufsize );
-	msg.reserve( bufsize );
-
-        if (sscanf(cString, "%[^@]@%d %s %s %s %[^\n]",
+
+    if (sscanf(cString, "%[^@]@%d %s %s %s %[^\n]",
 	  &processName[0],
-          &processNr,
+      &processNr,
 	  &date[0],
 	  &time[0],
 	  &loglevel[0],
@@ -536,23 +499,23 @@ void CEPlogProcessor::_processLogLine(char*		cString)
 	  return;
 	}
 
-	LOG_DEBUG_STR("Processname = " << processName);		// eg IONProc
+	LOG_DEBUG_STR("Processname = " << &processName[0]);		// eg IONProc
 	
-        time_t ts = _parseDateTime(date, time);
-
-	if (processName == "IONProc") {
-	  _processIONProcLine(processNr, ts, loglevel, msg);
-	} else if (processName == "CNProc") {
-	  _processCNProcLine(processNr, ts, loglevel, msg);
-	} else if (processName == "Storage") {
-	  _processStorageLine(processNr, ts, loglevel, msg);
+    time_t ts = _parseDateTime(&date[0], &time[0]);
+
+	if (!strcmp(&processName[0],"IONProc")) {
+	  _processIONProcLine(processNr, ts, &loglevel[0], &msg[0]);
+	} else if (!strcmp(&processName[0],"CNProc")) {
+	  _processCNProcLine(processNr, ts, &loglevel[0], &msg[0]);
+	} else if (!strcmp(&processName[0],"Storage")) {
+	  _processStorageLine(processNr, ts, &loglevel[0], &msg[0]);
 	}
 }
 
 //
 // _processIONProcLine(cstring)
 //
-void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string &loglevel, const string &msg)
+void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const char *loglevel, const char *msg)
 {
 	LOG_DEBUG_STR("_processIONProcLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")");
 
@@ -567,7 +530,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
 	// InputBuffer
 	//
 
-	if ((result = strstr(msg.c_str(), " late: "))) {
+	if ((result = strstr(msg, " late: "))) {
 	        float	late;
 		if (sscanf(result, " late: %f ", &late) == 1 ) {
 		        LOG_DEBUG_STR("[" << processNr << "] Late: " << late);
@@ -576,7 +539,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
 
 		// 0% flags look like : flags 0: (0%)
 		// filled% flags look like : flags 0: [nr..nr> (10.5%)
-		if ((result = strstr(msg.c_str(), "flags 0:"))) {
+		if ((result = strstr(msg, "flags 0:"))) {
 		        float	flags0(0.0), flags1(0.0), flags2(0.0), flags3(0.0);
 			if (sscanf(result, "flags 0:%*[^(](%f%%), flags 1:%*[^(](%f%%), flags 2:%*[^(](%f%%), flags 3:%*[^(](%f%%)",
 			  &flags0, &flags1, &flags2, &flags3) == 4) {
@@ -592,7 +555,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
 		return;
 	}
 
-	if ((result = strstr(msg.c_str(), "ION->CN:"))) {
+	if ((result = strstr(msg, "ION->CN:"))) {
 		float	ioTime;
 		if (sscanf(result, "ION->CN:%f", &ioTime) == 1) {
 		        LOG_DEBUG_STR("[" << processNr << "] ioTime: " << ioTime);
@@ -602,7 +565,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
 	}
 
 
-	if ((result = strstr(msg.c_str(), "received ["))) {
+	if ((result = strstr(msg, "received ["))) {
 		int	blocks0(0), blocks1(0), blocks2(0), blocks3(0);
 		if (sscanf(result, "received [%d,%d,%d,%d]", &blocks0, &blocks1, &blocks2, &blocks3) == 4) {
 		  LOG_DEBUG(formatString("[%d] blocks: %d, %d, %d, %d", processNr, blocks0, blocks1, blocks2, blocks3));
@@ -615,7 +578,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
 
 		// if rejected was found in same line this means that a certain amount of blocks was rejected, 
 		// set this into the database. If no rejected was found, it means 0 blocks were rejected, so DB can be reset to 0
-		if ((result = strstr(msg.c_str(), " rejected ["))) {
+		if ((result = strstr(msg, " rejected ["))) {
 		        int	blocks0(0), blocks1(0), blocks2(0), blocks3(0);
 			if (sscanf(result, " rejected [%d,%d,%d,%d]", &blocks0, &blocks1, &blocks2, &blocks3) == 4) {
 			        LOG_DEBUG(formatString("[%d] rejected: blocks: %d, %d, %d, %d", processNr, blocks0, blocks1, blocks2, blocks3));
@@ -642,7 +605,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
 	// Adder
 	//
 
-	if ((result = strstr(msg.c_str(), "dropping data"))) {
+	if ((result = strstr(msg, "dropping data"))) {
 		LOG_DEBUG(formatString("[%d] Dropping data started ",processNr));
 		itsAdders[processNr]->setValue(PN_ADD_DROPPING, GCFPVBool(true), ts);
 		itsAdders[processNr]->setValue(PN_ADD_LOG_LINE,GCFPVString(result),ts);
@@ -651,7 +614,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
 		return;
 	}
 
-	if ((result = strstr(msg.c_str(), "dropped "))) {
+	if ((result = strstr(msg, "dropped "))) {
 	        int	dropped(0);
 		if (sscanf(result, "dropped %d ", &dropped) == 1) {
 		        LOG_DEBUG(formatString("[%d] Dropped %d ",processNr,dropped));
@@ -670,13 +633,13 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
 	}
 }
 
-void CEPlogProcessor::_processCNProcLine(int processNr, time_t ts, const string &loglevel, const string &msg)
+void CEPlogProcessor::_processCNProcLine(int processNr, time_t ts, const char *loglevel, const char *msg)
 {
 	LOG_DEBUG_STR("_processCNProcLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")");
 	// TODO
 }
 
-void CEPlogProcessor::_processStorageLine(int processNr, time_t ts, const string &loglevel, const string &msg)
+void CEPlogProcessor::_processStorageLine(int processNr, time_t ts, const char *loglevel, const char *msg)
 {
 	LOG_DEBUG_STR("_processStorageLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")");
 
@@ -687,7 +650,7 @@ void CEPlogProcessor::_processStorageLine(int processNr, time_t ts, const string
 		return;
 	}
 
-	if ((result = strstr(msg.c_str(), "time ="))) {
+	if ((result = strstr(msg, "time ="))) {
 		int	rank(0), count(0);
 		char   tim[24]; 
 		
@@ -702,7 +665,7 @@ void CEPlogProcessor::_processStorageLine(int processNr, time_t ts, const string
 	}
 
 	/*
-	if ((result = strstr(msg.c_str(), "dropped "))) {
+	if ((result = strstr(msg, "dropped "))) {
 	  int blocks(0), subband(0), output(0);
 		
 		LOG_DEBUG_STR("_processStorageLine(" << processNr << "," << result << ")");
diff --git a/MAC/APL/CEPCU/src/BGPlogProcessor/CEPlogProcessor.h b/MAC/APL/CEPCU/src/BGPlogProcessor/CEPlogProcessor.h
index 2fac81839d9..a5c06715511 100644
--- a/MAC/APL/CEPCU/src/BGPlogProcessor/CEPlogProcessor.h
+++ b/MAC/APL/CEPCU/src/BGPlogProcessor/CEPlogProcessor.h
@@ -30,6 +30,8 @@
 #include <GCF/TM/GCF_Control.h>
 #include <GCF/RTDB/RTDB_PropertySet.h>
 
+#include "CircularBuffer.h"
+
 #include <time.h>
 
 namespace LOFAR {
@@ -75,12 +77,12 @@ private:
 
 	// Routines for processing the loglines.
 	void	 _handleDataStream  (GCFPortInterface*	port);
-	time_t	 _parseDateTime     (const string &date, const string &time) const;
-	void	 _processLogLine    (char*	cString);
+	time_t	 _parseDateTime     (const char *datestr, const char *timestr) const;
+	void	 _processLogLine    (const char *cString);
 
-	void _processIONProcLine(int processNr, time_t ts, const string &loglevel, const string &msg);
-	void _processCNProcLine(int processNr, time_t ts, const string &loglevel, const string &msg);
-	void _processStorageLine(int processNr, time_t ts, const string &loglevel, const string &msg);
+	void _processIONProcLine(int processNr, time_t ts, const char *loglevel, const char *msg);
+	void _processCNProcLine(int processNr, time_t ts, const char *loglevel, const char *msg);
+	void _processStorageLine(int processNr, time_t ts, const char *loglevel, const char *msg);
 
 	//# --- Datamembers --- 
 	// The listener socket to receive the requests on.
@@ -92,9 +94,7 @@ private:
 	// internal structure for admin for 1 stream
 	typedef struct {
 		GCFTCPPort*	socket;
-		char*		buffer;
-		int			inPtr;
-		int			outPtr;
+		CircularBuffer*	buffer;
 	} streamBuffer_t;
 
 	// internal structure for lse based logging
diff --git a/MAC/APL/CEPCU/src/BGPlogProcessor/CircularBuffer.h b/MAC/APL/CEPCU/src/BGPlogProcessor/CircularBuffer.h
new file mode 100644
index 00000000000..44406bf75d8
--- /dev/null
+++ b/MAC/APL/CEPCU/src/BGPlogProcessor/CircularBuffer.h
@@ -0,0 +1,188 @@
+//#  CircularBuffer.h: Moves the operator info from the logfiles to PVSS
+//#
+//#  Copyright (C) 2009
+//#  ASTRON (Netherlands Foundation for Research in Astronomy)
+//#  P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
+//#
+//#  This program is free software; you can redistribute it and/or modify
+//#  it under the terms of the GNU General Public License as published by
+//#  the Free Software Foundation; either version 2 of the License, or
+//#  (at your option) any later version.
+//#
+//#  This program is distributed in the hope that it will be useful,
+//#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+//#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//#  GNU General Public License for more details.
+//#
+//#  You should have received a copy of the GNU General Public License
+//#  along with this program; if not, write to the Free Software
+//#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+//#
+//#  $Id: CEPlogProcessor.h 16954 2010-12-15 10:03:09Z mol $
+#ifndef LOFAR_APL_CIRCULARBUFFER_H
+#define LOFAR_APL_CIRCULARBUFFER_H
+
+// \file
+// Daemon for launching Application Controllers
+
+//# Never #include <config.h> or #include <lofar_config.h> in a header file!
+//# Includes
+
+namespace LOFAR {
+	namespace APL {
+
+class CircularBuffer {
+public:
+  CircularBuffer( unsigned capacity ):
+    capacity(capacity),
+    full(capacity == 0)
+  {
+    buffer = new char[capacity];
+    begin  = buffer;
+    end    = buffer + capacity;
+    head   = buffer;
+    tail   = buffer;
+  }
+
+  ~CircularBuffer() {
+    delete[] buffer;
+  }
+
+  bool empty() const {
+    return head == tail && !full;
+  }
+
+  unsigned freeSpace() const {
+    return full ? 0 : (head <= tail ? end - tail + head - begin : tail - head);
+  }
+
+  unsigned tailFreeSpace() const {
+    return full ? 0 : (head <= tail ?  end - tail : head - tail);
+  }
+
+  void incTail( unsigned len ) {
+    tail += len;
+    if (tail == end) tail = begin;
+    if (tail == head) full = true;
+  }
+
+  unsigned putData( char *buf, unsigned buflen ) {
+    if (full)
+      return 0;
+
+    if (buflen == 0)
+      return 0;
+
+    unsigned first_buflen = tailFreeSpace();
+    unsigned second_buflen = freeSpace() - first_buflen;
+
+    if (buflen <= first_buflen) {
+      first_buflen = buflen;
+      second_buflen = 0;
+    } else if (buflen <= first_buflen + second_buflen) {
+      second_buflen = buflen - first_buflen;
+    } else {
+      // discard data beyond freeSpace()
+    }
+
+    memcpy( tail, buf, first_buflen );
+    incTail( first_buflen );
+
+    if (second_buflen) {
+      memcpy( tail, buf + first_buflen, second_buflen );
+      incTail( second_buflen );
+    }
+
+    return first_buflen + second_buflen;
+  }
+
+  bool getLine( char *buf, unsigned buflen ) {
+    if (empty())
+      return false;
+
+    if (buflen == 0)
+      return true;
+
+    if (buflen == 1) {
+      *buf = 0;
+      return true;
+    }
+
+    buflen--; // reserve space for the trailing 0
+
+    char *c;
+    bool twoparts = head > tail;
+
+    char *firstend = twoparts ? end : tail;
+
+    for (c = head; c == head || c != firstend; c++) {
+      if (*c != '\n')
+        continue;
+
+      // line found
+      unsigned linelen = c - head;
+      if (linelen >= buflen )
+        linelen = buflen;
+
+      memcpy( buf, head, linelen );
+      buf[linelen] = 0;
+
+      head = c+1;
+      if (head == end)
+        head = begin;
+
+      full = false;
+
+      return true;
+    }
+
+    if (twoparts) {
+      for (c = begin; c != tail; c++) {
+        if (*c != '\n')
+          continue;
+
+        // line found
+        unsigned first_linelen = end - head;
+        unsigned second_linelen = c - begin;
+
+        if (first_linelen >= buflen) {
+          first_linelen = buflen;
+          second_linelen = 0;
+        } else if (first_linelen + second_linelen >= buflen) {
+          second_linelen = buflen - first_linelen;
+        }
+
+        memcpy( buf, head, first_linelen );
+        memcpy( buf + first_linelen, begin, second_linelen );
+
+        buf[first_linelen + second_linelen] = 0;
+
+        head = c+1;
+        if (head == end)
+          head = begin;
+
+        full = false;
+
+        return true;
+      }
+    }
+
+    // no line found
+    return false;
+  }
+
+public:
+  char *buffer;
+  char *begin, *end;
+  char *head, *tail;
+  unsigned capacity;
+  bool full;
+
+};
+
+// @} addgroup
+  } // namespace APL
+} // namespace LOFAR
+
+#endif
+
-- 
GitLab