Skip to content
Snippets Groups Projects
Commit f8559a42 authored by Jan David Mol's avatar Jan David Mol
Browse files

bug 1362: added (untested) circular buffer, and some bug fixes

parent 58799766
No related branches found
No related tags found
No related merge requests found
...@@ -878,6 +878,7 @@ MAC/APL/CASATools/test/tCasaConverter.cc -text ...@@ -878,6 +878,7 @@ MAC/APL/CASATools/test/tCasaConverter.cc -text
MAC/APL/CASATools/test/tCasaConverter.log_prop -text MAC/APL/CASATools/test/tCasaConverter.log_prop -text
MAC/APL/CEPCU/src/BGPlogProcessor/CEPDatapoints.dpl -text MAC/APL/CEPCU/src/BGPlogProcessor/CEPDatapoints.dpl -text
MAC/APL/CEPCU/src/BGPlogProcessor/CEPDatapointtypes.dpl -text MAC/APL/CEPCU/src/BGPlogProcessor/CEPDatapointtypes.dpl -text
MAC/APL/CEPCU/src/BGPlogProcessor/CircularBuffer.h -text
MAC/APL/CURTDBDaemons/src/LogProcessor/LogProcessor.conf -text MAC/APL/CURTDBDaemons/src/LogProcessor/LogProcessor.conf -text
MAC/APL/CURTDBDaemons/src/LogProcessor/LogProcessor.log_prop -text MAC/APL/CURTDBDaemons/src/LogProcessor/LogProcessor.log_prop -text
MAC/APL/DEPENDENCIES -text svneol=native#application/octet-stream MAC/APL/DEPENDENCIES -text svneol=native#application/octet-stream
......
...@@ -383,7 +383,7 @@ void CEPlogProcessor::_deleteStream(GCFPortInterface& port) ...@@ -383,7 +383,7 @@ void CEPlogProcessor::_deleteStream(GCFPortInterface& port)
LOG_DEBUG_STR("_deleteStream"); LOG_DEBUG_STR("_deleteStream");
map<GCFPortInterface*, streamBuffer_t>::iterator theStream = itsLogStreams.find(&port); map<GCFPortInterface*, streamBuffer_t>::iterator theStream = itsLogStreams.find(&port);
if (theStream != itsLogStreams.end()) { if (theStream != itsLogStreams.end()) {
delete [] theStream->second.buffer; delete theStream->second.buffer;
theStream->second.buffer = 0; theStream->second.buffer = 0;
itsLogStreams.erase(theStream); itsLogStreams.erase(theStream);
} }
...@@ -406,9 +406,7 @@ void CEPlogProcessor::_handleConnectionRequest() ...@@ -406,9 +406,7 @@ void CEPlogProcessor::_handleConnectionRequest()
// give stream its own buffer. // give stream its own buffer.
streamBuffer_t stream; streamBuffer_t stream;
stream.socket = pNewClient; stream.socket = pNewClient;
stream.buffer = new char[itsBufferSize]; stream.buffer = new CircularBuffer(itsBufferSize);
stream.inPtr = 0;
stream.outPtr = 0;
itsLogStreams[pNewClient] = stream; itsLogStreams[pNewClient] = stream;
LOG_INFO_STR("Added new client to my admin"); LOG_INFO_STR("Added new client to my admin");
} }
...@@ -419,57 +417,32 @@ void CEPlogProcessor::_handleConnectionRequest() ...@@ -419,57 +417,32 @@ void CEPlogProcessor::_handleConnectionRequest()
void CEPlogProcessor::_handleDataStream(GCFPortInterface* port) void CEPlogProcessor::_handleDataStream(GCFPortInterface* port)
{ {
// read in the new bytes // read in the new bytes
streamBuffer_t stream = itsLogStreams[port]; streamBuffer_t &stream = itsLogStreams[port];
LOG_DEBUG_STR("handleDataStream:in=" << stream.inPtr << ", out=" << stream.outPtr); int newBytes = stream.socket->recv( stream.buffer->tail, stream.buffer->tailFreeSpace() );
int newBytes = stream.socket->recv(stream.buffer + stream.inPtr, itsBufferSize - stream.inPtr);
LOG_DEBUG_STR("received " << newBytes << " new bytes");
if (newBytes < 0) { if (newBytes < 0) {
// LOG_ERROR_STR("read on socket " << sid << " returned " << newBytes << ". Closing connection"); LOG_DEBUG_STR("Closing connection.");
port->close(); port->close();
_deleteStream(*port); _deleteStream(*port);
return; return;
} }
// LOG_DEBUG_STR("Received " << newBytes << " bytes at sid " << sid);
stream.inPtr += newBytes;
// process as much data as possible from the buffer.
for (int i = stream.outPtr; i <= stream.inPtr; i++) {
if (stream.buffer[i] != '\n') {
continue;
}
stream.buffer[i] = '\0'; LOG_DEBUG_STR("Read " << newBytes << " bytes.");
// LOG_INFO(formatString("SID %d:>%s<", sid, &(stream.buffer[stream.outPtr]))); stream.buffer->incTail( newBytes );
LOG_INFO(formatString("(%d,%d)>%s<", stream.outPtr, i, &(stream.buffer[stream.outPtr])));
_processLogLine(&(stream.buffer[stream.outPtr]));
stream.outPtr = i+1;
if (stream.outPtr >= stream.inPtr) { // All received bytes handled?
LOG_DEBUG("Reset of read/write pointers");
stream.inPtr = 0;
stream.outPtr = 0;
itsLogStreams[port] = stream; // copy changes back to admin
return;
}
}
if (stream.outPtr > (int)(0.5*itsBufferSize)) { char lineBuf[1024];
// When buffer becomes full shift leftovers to the left. while (stream.buffer->getLine( lineBuf, sizeof lineBuf )) {
LOG_DEBUG_STR("move with: " << stream.inPtr << ", " << stream.outPtr); LOG_DEBUG_STR("Read log line " << lineBuf );
memmove (stream.buffer, stream.buffer + stream.outPtr, (stream.inPtr - stream.outPtr + 1)); _processLogLine(lineBuf);
stream.inPtr -= stream.outPtr; }
stream.outPtr = 0;
}
itsLogStreams[port] = stream; // copy changes back to admin
} }
time_t CEPlogProcessor::_parseDateTime(const string &date, const string &time) const time_t CEPlogProcessor::_parseDateTime(const char *datestr, const char *timestr) const
{ {
struct tm tm; struct tm tm;
time_t ts; time_t ts;
bool validtime = true; bool validtime = true;
if (sscanf(date.c_str(), "%u-%u-%u", if (sscanf(datestr, "%u-%u-%u",
&tm.tm_year, &tm.tm_mon, &tm.tm_mday) != 3) { &tm.tm_year, &tm.tm_mon, &tm.tm_mday) != 3) {
validtime = false; validtime = false;
} else { } else {
...@@ -477,7 +450,7 @@ time_t CEPlogProcessor::_parseDateTime(const string &date, const string &time) c ...@@ -477,7 +450,7 @@ time_t CEPlogProcessor::_parseDateTime(const string &date, const string &time) c
tm.tm_year -= 1900; tm.tm_year -= 1900;
} }
if (sscanf(time.c_str(), "%u:%u:%u", if (sscanf(timestr, "%u:%u:%u", // ignore milliseconds
&tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 3) { &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 3) {
validtime = false; validtime = false;
} }
...@@ -485,9 +458,9 @@ time_t CEPlogProcessor::_parseDateTime(const string &date, const string &time) c ...@@ -485,9 +458,9 @@ time_t CEPlogProcessor::_parseDateTime(const string &date, const string &time) c
if (validtime) { if (validtime) {
ts = mktime(&tm); ts = mktime(&tm);
} else { } else {
LOG_WARN_STR("Invalid timestamp: " << date << " " << time); LOG_WARN_STR("Invalid timestamp: " << datestr << " " << timestr << "; using now()");
ts = ::time(0L); ts = time(0L);
} }
return ts; return ts;
...@@ -498,28 +471,18 @@ time_t CEPlogProcessor::_parseDateTime(const string &date, const string &time) c ...@@ -498,28 +471,18 @@ time_t CEPlogProcessor::_parseDateTime(const string &date, const string &time) c
// _processLogLine(char*) // _processLogLine(char*)
// //
// //
void CEPlogProcessor::_processLogLine(char* cString) void CEPlogProcessor::_processLogLine(const char *cString)
{ {
// example log line: // example log line:
// Storage@00 09-12-10 11:33:13.240 DEBUG [obs 21855 output 1 subband 223] InputThread::~InputThread() // Storage@00 09-12-10 11:33:13.240 DEBUG [obs 21855 output 1 subband 223] InputThread::~InputThread()
unsigned bufsize = strlen( cString ) + 1; unsigned bufsize = strlen( cString ) + 1;
string processName; vector<char> processName(bufsize), date(bufsize), time(bufsize), loglevel(bufsize), msg(bufsize);
int processNr; int processNr;
string date;
string time; if (sscanf(cString, "%[^@]@%d %s %s %s %[^\n]",
string loglevel;
string msg;
processName.reserve( bufsize );
date.reserve( bufsize );
time.reserve( bufsize );
loglevel.reserve( bufsize );
msg.reserve( bufsize );
if (sscanf(cString, "%[^@]@%d %s %s %s %[^\n]",
&processName[0], &processName[0],
&processNr, &processNr,
&date[0], &date[0],
&time[0], &time[0],
&loglevel[0], &loglevel[0],
...@@ -536,23 +499,23 @@ void CEPlogProcessor::_processLogLine(char* cString) ...@@ -536,23 +499,23 @@ void CEPlogProcessor::_processLogLine(char* cString)
return; return;
} }
LOG_DEBUG_STR("Processname = " << processName); // eg IONProc LOG_DEBUG_STR("Processname = " << &processName[0]); // eg IONProc
time_t ts = _parseDateTime(date, time); time_t ts = _parseDateTime(&date[0], &time[0]);
if (processName == "IONProc") { if (!strcmp(&processName[0],"IONProc")) {
_processIONProcLine(processNr, ts, loglevel, msg); _processIONProcLine(processNr, ts, &loglevel[0], &msg[0]);
} else if (processName == "CNProc") { } else if (!strcmp(&processName[0],"CNProc")) {
_processCNProcLine(processNr, ts, loglevel, msg); _processCNProcLine(processNr, ts, &loglevel[0], &msg[0]);
} else if (processName == "Storage") { } else if (!strcmp(&processName[0],"Storage")) {
_processStorageLine(processNr, ts, loglevel, msg); _processStorageLine(processNr, ts, &loglevel[0], &msg[0]);
} }
} }
// //
// _processIONProcLine(cstring) // _processIONProcLine(cstring)
// //
void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string &loglevel, const string &msg) void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const char *loglevel, const char *msg)
{ {
LOG_DEBUG_STR("_processIONProcLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")"); LOG_DEBUG_STR("_processIONProcLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")");
...@@ -567,7 +530,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string ...@@ -567,7 +530,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
// InputBuffer // InputBuffer
// //
if ((result = strstr(msg.c_str(), " late: "))) { if ((result = strstr(msg, " late: "))) {
float late; float late;
if (sscanf(result, " late: %f ", &late) == 1 ) { if (sscanf(result, " late: %f ", &late) == 1 ) {
LOG_DEBUG_STR("[" << processNr << "] Late: " << late); LOG_DEBUG_STR("[" << processNr << "] Late: " << late);
...@@ -576,7 +539,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string ...@@ -576,7 +539,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
// 0% flags look like : flags 0: (0%) // 0% flags look like : flags 0: (0%)
// filled% flags look like : flags 0: [nr..nr> (10.5%) // filled% flags look like : flags 0: [nr..nr> (10.5%)
if ((result = strstr(msg.c_str(), "flags 0:"))) { if ((result = strstr(msg, "flags 0:"))) {
float flags0(0.0), flags1(0.0), flags2(0.0), flags3(0.0); float flags0(0.0), flags1(0.0), flags2(0.0), flags3(0.0);
if (sscanf(result, "flags 0:%*[^(](%f%%), flags 1:%*[^(](%f%%), flags 2:%*[^(](%f%%), flags 3:%*[^(](%f%%)", if (sscanf(result, "flags 0:%*[^(](%f%%), flags 1:%*[^(](%f%%), flags 2:%*[^(](%f%%), flags 3:%*[^(](%f%%)",
&flags0, &flags1, &flags2, &flags3) == 4) { &flags0, &flags1, &flags2, &flags3) == 4) {
...@@ -592,7 +555,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string ...@@ -592,7 +555,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
return; return;
} }
if ((result = strstr(msg.c_str(), "ION->CN:"))) { if ((result = strstr(msg, "ION->CN:"))) {
float ioTime; float ioTime;
if (sscanf(result, "ION->CN:%f", &ioTime) == 1) { if (sscanf(result, "ION->CN:%f", &ioTime) == 1) {
LOG_DEBUG_STR("[" << processNr << "] ioTime: " << ioTime); LOG_DEBUG_STR("[" << processNr << "] ioTime: " << ioTime);
...@@ -602,7 +565,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string ...@@ -602,7 +565,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
} }
if ((result = strstr(msg.c_str(), "received ["))) { if ((result = strstr(msg, "received ["))) {
int blocks0(0), blocks1(0), blocks2(0), blocks3(0); int blocks0(0), blocks1(0), blocks2(0), blocks3(0);
if (sscanf(result, "received [%d,%d,%d,%d]", &blocks0, &blocks1, &blocks2, &blocks3) == 4) { if (sscanf(result, "received [%d,%d,%d,%d]", &blocks0, &blocks1, &blocks2, &blocks3) == 4) {
LOG_DEBUG(formatString("[%d] blocks: %d, %d, %d, %d", processNr, blocks0, blocks1, blocks2, blocks3)); LOG_DEBUG(formatString("[%d] blocks: %d, %d, %d, %d", processNr, blocks0, blocks1, blocks2, blocks3));
...@@ -615,7 +578,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string ...@@ -615,7 +578,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
// if rejected was found in same line this means that a certain amount of blocks was rejected, // if rejected was found in same line this means that a certain amount of blocks was rejected,
// set this into the database. If no rejected was found, it means 0 blocks were rejected, so DB can be reset to 0 // set this into the database. If no rejected was found, it means 0 blocks were rejected, so DB can be reset to 0
if ((result = strstr(msg.c_str(), " rejected ["))) { if ((result = strstr(msg, " rejected ["))) {
int blocks0(0), blocks1(0), blocks2(0), blocks3(0); int blocks0(0), blocks1(0), blocks2(0), blocks3(0);
if (sscanf(result, " rejected [%d,%d,%d,%d]", &blocks0, &blocks1, &blocks2, &blocks3) == 4) { if (sscanf(result, " rejected [%d,%d,%d,%d]", &blocks0, &blocks1, &blocks2, &blocks3) == 4) {
LOG_DEBUG(formatString("[%d] rejected: blocks: %d, %d, %d, %d", processNr, blocks0, blocks1, blocks2, blocks3)); LOG_DEBUG(formatString("[%d] rejected: blocks: %d, %d, %d, %d", processNr, blocks0, blocks1, blocks2, blocks3));
...@@ -642,7 +605,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string ...@@ -642,7 +605,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
// Adder // Adder
// //
if ((result = strstr(msg.c_str(), "dropping data"))) { if ((result = strstr(msg, "dropping data"))) {
LOG_DEBUG(formatString("[%d] Dropping data started ",processNr)); LOG_DEBUG(formatString("[%d] Dropping data started ",processNr));
itsAdders[processNr]->setValue(PN_ADD_DROPPING, GCFPVBool(true), ts); itsAdders[processNr]->setValue(PN_ADD_DROPPING, GCFPVBool(true), ts);
itsAdders[processNr]->setValue(PN_ADD_LOG_LINE,GCFPVString(result),ts); itsAdders[processNr]->setValue(PN_ADD_LOG_LINE,GCFPVString(result),ts);
...@@ -651,7 +614,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string ...@@ -651,7 +614,7 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
return; return;
} }
if ((result = strstr(msg.c_str(), "dropped "))) { if ((result = strstr(msg, "dropped "))) {
int dropped(0); int dropped(0);
if (sscanf(result, "dropped %d ", &dropped) == 1) { if (sscanf(result, "dropped %d ", &dropped) == 1) {
LOG_DEBUG(formatString("[%d] Dropped %d ",processNr,dropped)); LOG_DEBUG(formatString("[%d] Dropped %d ",processNr,dropped));
...@@ -670,13 +633,13 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string ...@@ -670,13 +633,13 @@ void CEPlogProcessor::_processIONProcLine(int processNr, time_t ts, const string
} }
} }
void CEPlogProcessor::_processCNProcLine(int processNr, time_t ts, const string &loglevel, const string &msg) void CEPlogProcessor::_processCNProcLine(int processNr, time_t ts, const char *loglevel, const char *msg)
{ {
LOG_DEBUG_STR("_processCNProcLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")"); LOG_DEBUG_STR("_processCNProcLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")");
// TODO // TODO
} }
void CEPlogProcessor::_processStorageLine(int processNr, time_t ts, const string &loglevel, const string &msg) void CEPlogProcessor::_processStorageLine(int processNr, time_t ts, const char *loglevel, const char *msg)
{ {
LOG_DEBUG_STR("_processStorageLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")"); LOG_DEBUG_STR("_processStorageLine(" << processNr << "," << ts << "," << loglevel << "," << msg << ")");
...@@ -687,7 +650,7 @@ void CEPlogProcessor::_processStorageLine(int processNr, time_t ts, const string ...@@ -687,7 +650,7 @@ void CEPlogProcessor::_processStorageLine(int processNr, time_t ts, const string
return; return;
} }
if ((result = strstr(msg.c_str(), "time ="))) { if ((result = strstr(msg, "time ="))) {
int rank(0), count(0); int rank(0), count(0);
char tim[24]; char tim[24];
...@@ -702,7 +665,7 @@ void CEPlogProcessor::_processStorageLine(int processNr, time_t ts, const string ...@@ -702,7 +665,7 @@ void CEPlogProcessor::_processStorageLine(int processNr, time_t ts, const string
} }
/* /*
if ((result = strstr(msg.c_str(), "dropped "))) { if ((result = strstr(msg, "dropped "))) {
int blocks(0), subband(0), output(0); int blocks(0), subband(0), output(0);
LOG_DEBUG_STR("_processStorageLine(" << processNr << "," << result << ")"); LOG_DEBUG_STR("_processStorageLine(" << processNr << "," << result << ")");
......
...@@ -30,6 +30,8 @@ ...@@ -30,6 +30,8 @@
#include <GCF/TM/GCF_Control.h> #include <GCF/TM/GCF_Control.h>
#include <GCF/RTDB/RTDB_PropertySet.h> #include <GCF/RTDB/RTDB_PropertySet.h>
#include "CircularBuffer.h"
#include <time.h> #include <time.h>
namespace LOFAR { namespace LOFAR {
...@@ -75,12 +77,12 @@ private: ...@@ -75,12 +77,12 @@ private:
// Routines for processing the loglines. // Routines for processing the loglines.
void _handleDataStream (GCFPortInterface* port); void _handleDataStream (GCFPortInterface* port);
time_t _parseDateTime (const string &date, const string &time) const; time_t _parseDateTime (const char *datestr, const char *timestr) const;
void _processLogLine (char* cString); void _processLogLine (const char *cString);
void _processIONProcLine(int processNr, time_t ts, const string &loglevel, const string &msg); void _processIONProcLine(int processNr, time_t ts, const char *loglevel, const char *msg);
void _processCNProcLine(int processNr, time_t ts, const string &loglevel, const string &msg); void _processCNProcLine(int processNr, time_t ts, const char *loglevel, const char *msg);
void _processStorageLine(int processNr, time_t ts, const string &loglevel, const string &msg); void _processStorageLine(int processNr, time_t ts, const char *loglevel, const char *msg);
//# --- Datamembers --- //# --- Datamembers ---
// The listener socket to receive the requests on. // The listener socket to receive the requests on.
...@@ -92,9 +94,7 @@ private: ...@@ -92,9 +94,7 @@ private:
// internal structure for admin for 1 stream // internal structure for admin for 1 stream
typedef struct { typedef struct {
GCFTCPPort* socket; GCFTCPPort* socket;
char* buffer; CircularBuffer* buffer;
int inPtr;
int outPtr;
} streamBuffer_t; } streamBuffer_t;
// internal structure for lse based logging // internal structure for lse based logging
......
//# CircularBuffer.h: Moves the operator info from the logfiles to PVSS
//#
//# Copyright (C) 2009
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id: CEPlogProcessor.h 16954 2010-12-15 10:03:09Z mol $
#ifndef LOFAR_APL_CIRCULARBUFFER_H
#define LOFAR_APL_CIRCULARBUFFER_H
// \file
// Daemon for launching Application Controllers
//# Never #include <config.h> or #include <lofar_config.h> in a header file!
//# Includes
namespace LOFAR {
namespace APL {
class CircularBuffer {
public:
CircularBuffer( unsigned capacity ):
capacity(capacity),
full(capacity == 0)
{
buffer = new char[capacity];
begin = buffer;
end = buffer + capacity;
head = buffer;
tail = buffer;
}
~CircularBuffer() {
delete[] buffer;
}
bool empty() const {
return head == tail && !full;
}
unsigned freeSpace() const {
return full ? 0 : (head <= tail ? end - tail + head - begin : tail - head);
}
unsigned tailFreeSpace() const {
return full ? 0 : (head <= tail ? end - tail : head - tail);
}
void incTail( unsigned len ) {
tail += len;
if (tail == end) tail = begin;
if (tail == head) full = true;
}
unsigned putData( char *buf, unsigned buflen ) {
if (full)
return 0;
if (buflen == 0)
return 0;
unsigned first_buflen = tailFreeSpace();
unsigned second_buflen = freeSpace() - first_buflen;
if (buflen <= first_buflen) {
first_buflen = buflen;
second_buflen = 0;
} else if (buflen <= first_buflen + second_buflen) {
second_buflen = buflen - first_buflen;
} else {
// discard data beyond freeSpace()
}
memcpy( tail, buf, first_buflen );
incTail( first_buflen );
if (second_buflen) {
memcpy( tail, buf + first_buflen, second_buflen );
incTail( second_buflen );
}
return first_buflen + second_buflen;
}
bool getLine( char *buf, unsigned buflen ) {
if (empty())
return false;
if (buflen == 0)
return true;
if (buflen == 1) {
*buf = 0;
return true;
}
buflen--; // reserve space for the trailing 0
char *c;
bool twoparts = head > tail;
char *firstend = twoparts ? end : tail;
for (c = head; c == head || c != firstend; c++) {
if (*c != '\n')
continue;
// line found
unsigned linelen = c - head;
if (linelen >= buflen )
linelen = buflen;
memcpy( buf, head, linelen );
buf[linelen] = 0;
head = c+1;
if (head == end)
head = begin;
full = false;
return true;
}
if (twoparts) {
for (c = begin; c != tail; c++) {
if (*c != '\n')
continue;
// line found
unsigned first_linelen = end - head;
unsigned second_linelen = c - begin;
if (first_linelen >= buflen) {
first_linelen = buflen;
second_linelen = 0;
} else if (first_linelen + second_linelen >= buflen) {
second_linelen = buflen - first_linelen;
}
memcpy( buf, head, first_linelen );
memcpy( buf + first_linelen, begin, second_linelen );
buf[first_linelen + second_linelen] = 0;
head = c+1;
if (head == end)
head = begin;
full = false;
return true;
}
}
// no line found
return false;
}
public:
char *buffer;
char *begin, *end;
char *head, *tail;
unsigned capacity;
bool full;
};
// @} addgroup
} // namespace APL
} // namespace LOFAR
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment