Skip to content
Snippets Groups Projects
Commit 52889834 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #3696: Added tGenerator, and split Station::StreamReader off into PacketReader class

parent fcaf7ed0
No related branches found
No related tags found
No related merge requests found
......@@ -3625,6 +3625,7 @@ RTCP/InputProc/src/CMakeLists.txt -text
RTCP/InputProc/src/Generator.cc -text
RTCP/InputProc/src/Generator.h -text
RTCP/InputProc/src/OMPThread.h -text
RTCP/InputProc/src/PacketReader.h -text
RTCP/InputProc/src/Poll.h -text
RTCP/InputProc/src/Ranges.cc -text
RTCP/InputProc/src/Ranges.h -text
......@@ -3642,6 +3643,7 @@ RTCP/InputProc/src/TimeSync.h -text
RTCP/InputProc/src/newInputSection.cc -text
RTCP/InputProc/src/shmtest.cc -text
RTCP/InputProc/test/CMakeLists.txt -text
RTCP/InputProc/test/tGenerator.cc -text
RTCP/InputProc/test/tRSPTimeStamp2.cc -text
RTCP/InputProc/test/tRanges.cc -text
RTCP/InputProc/test/tSharedMemory.cc -text
......
#ifndef __PACKETREADER__
#define __PACKETREADER__
#include <Common/LofarLogger.h>
#include <Stream/Stream.h>
#include <Stream/SocketStream.h>
#include <Interface/RSPTimeStamp.h>
#include <Interface/SmartPtr.h>
#include <Interface/Stream.h>
#include <IONProc/RSP.h>
#include "SampleBuffer.h"
#include "BufferSettings.h"
#include <boost/format.hpp>
#include <string>
namespace LOFAR {
namespace RTCP {
/* Receives input of one RSP board and stores it in shared memory. */
class PacketReader {
public:
PacketReader( const std::string &logPrefix, const std::string &streamDescriptor, const struct BufferSettings &settings );
// Reads a packet from the input stream. Returns true if a packet was
// succesfully read.
bool readPacket( struct RSP &packet );
void logStatistics();
private:
const std::string logPrefix;
SmartPtr<Stream> inputStream;
bool supportPartialReads;
const struct BufferSettings settings;
size_t nrReceived, nrBadSize, nrBadTime, nrBadData, nrBadMode;
bool hadSizeError, hadModeError;
};
PacketReader::PacketReader( const std::string &logPrefix, const std::string &streamDescriptor, const struct BufferSettings &settings )
:
logPrefix(str(boost::format("%s [PacketReader] ") % logPrefix)),
settings(settings),
nrReceived(0),
nrBadSize(0),
nrBadTime(0),
nrBadData(0),
nrBadMode(0),
hadSizeError(false),
hadModeError(false)
{
inputStream = createStream(streamDescriptor, true);
SocketStream *asSocket = dynamic_cast<SocketStream *>(inputStream.get());
bool isUDP = asSocket && asSocket->protocol == SocketStream::UDP;
supportPartialReads = !isUDP;
}
bool PacketReader::readPacket( struct RSP &packet )
{
if (supportPartialReads) {
// read header first
inputStream->read(&packet.header, sizeof packet.header);
// read rest of packet
inputStream->read(&packet.payload.data, packet.packetSize() - sizeof packet.header);
++nrReceived;
} else {
// read full packet at once -- numbytes will tell us how much we've actually read
size_t numbytes = inputStream->tryRead(&packet, sizeof packet);
++nrReceived;
if( numbytes < sizeof(struct RSP::Header)
|| numbytes != packet.packetSize() ) {
if (!hadSizeError) {
LOG_ERROR_STR( logPrefix << "Packet is " << numbytes << " bytes, but should be " << packet.packetSize() << " bytes" );
hadSizeError = true;
}
++nrBadSize;
return false;
}
}
// illegal timestamp means illegal packet
if (packet.header.timestamp == ~0U) {
++nrBadTime;
return false;
}
// check sanity of packet
// discard packets with errors
if (packet.payloadError()) {
++nrBadData;
return false;
}
// check whether the station configuration matches ours
if (packet.clockMHz() * 1000000 != settings.station.clock
|| packet.bitMode() != settings.station.bitmode) {
if (!hadModeError) {
LOG_ERROR_STR( logPrefix << "Packet has mode (" << packet.clockMHz() << " MHz, " << packet.bitMode() << " bit), but should be mode (" << settings.station.clock / 1000000 << " MHz, " << settings.station.bitmode << " bit)");
hadModeError = true;
}
++nrBadMode;
return false;
}
return true;
}
void PacketReader::logStatistics()
{
LOG_INFO_STR( logPrefix << "Received " << nrReceived << " packets: " << nrBadTime << " bad timestamps, " << nrBadSize << " bad sizes, " << nrBadData << " payload errors, " << nrBadMode << " clock/bitmode errors" );
nrReceived = 0;
nrBadTime = 0;
nrBadSize = 0;
nrBadData = 0;
nrBadMode = 0;
hadSizeError = false;
hadModeError = false;
}
}
}
#endif
......@@ -11,6 +11,7 @@
#include <IONProc/WallClockTime.h>
#include "SampleBuffer.h"
#include "BufferSettings.h"
#include "PacketReader.h"
#include "Ranges.h"
#include "time.h"
#include <boost/format.hpp>
......@@ -30,29 +31,6 @@ public:
protected:
SampleBuffer<T> buffer;
/* Receives input of one RSP board and stores it in shared memory. */
class StreamReader {
public:
StreamReader( const std::string &logPrefix, const std::string &streamDescriptor, const struct BufferSettings &settings );
// Reads a packet from the input stream. Returns true if a packet was
// succesfully read.
bool readPacket( struct RSP &packet );
void logStatistics();
private:
const std::string logPrefix;
SmartPtr<Stream> inputStream;
bool supportPartialReads;
const struct BufferSettings settings;
size_t nrReceived, nrBadSize, nrBadTime, nrBadData, nrBadMode;
bool hadSizeError, hadModeError;
};
class BufferWriter {
public:
BufferWriter( const std::string &logPrefix, SampleBuffer<T> &buffer, Ranges &flags, size_t firstBeamlet, const struct BufferSettings &settings );
......@@ -73,7 +51,7 @@ protected:
size_t nrWritten;
};
std::vector< SmartPtr<StreamReader> > readers;
std::vector< SmartPtr<PacketReader> > readers;
std::vector< SmartPtr<BufferWriter> > writers;
// process data for this board until interrupted or end of data
......@@ -100,7 +78,7 @@ template<typename T> void Station<T>::processBoard( size_t nr )
try {
LOG_INFO_STR( logPrefix << "Connecting to " << streamDescriptors[nr] );
readers[nr] = new StreamReader(logPrefix, streamDescriptors[nr], settings);
readers[nr] = new PacketReader(logPrefix, streamDescriptors[nr], settings);
LOG_INFO_STR( logPrefix << "Connecting to shared memory buffer 0x" << std::hex << settings.dataKey );
size_t firstBeamlet = settings.nrBeamlets / settings.nrBoards * nr;
......@@ -144,103 +122,6 @@ template<typename T> void Station<T>::logStatistics()
}
template<typename T> Station<T>::StreamReader::StreamReader( const std::string &logPrefix, const std::string &streamDescriptor, const struct BufferSettings &settings )
:
logPrefix(str(boost::format("%s [StreamReader] ") % logPrefix)),
settings(settings),
nrReceived(0),
nrBadSize(0),
nrBadTime(0),
nrBadData(0),
nrBadMode(0),
hadSizeError(false),
hadModeError(false)
{
inputStream = createStream(streamDescriptor, true);
SocketStream *asSocket = dynamic_cast<SocketStream *>(inputStream.get());
bool isUDP = asSocket && asSocket->protocol == SocketStream::UDP;
supportPartialReads = !isUDP;
}
template<typename T> bool Station<T>::StreamReader::readPacket( struct RSP &packet )
{
if (supportPartialReads) {
// read header first
inputStream->read(&packet.header, sizeof packet.header);
// read rest of packet
inputStream->read(&packet.payload.data, packet.packetSize() - sizeof packet.header);
++nrReceived;
} else {
// read full packet at once -- numbytes will tell us how much we've actually read
size_t numbytes = inputStream->tryRead(&packet, sizeof packet);
++nrReceived;
if( numbytes < sizeof(struct RSP::Header)
|| numbytes != packet.packetSize() ) {
if (!hadSizeError) {
LOG_ERROR_STR( logPrefix << "Packet is " << numbytes << " bytes, but should be " << packet.packetSize() << " bytes" );
hadSizeError = true;
}
++nrBadSize;
return false;
}
}
// illegal timestamp means illegal packet
if (packet.header.timestamp == ~0U) {
++nrBadTime;
return false;
}
// check sanity of packet
// discard packets with errors
if (packet.payloadError()) {
++nrBadData;
return false;
}
// check whether the station configuration matches ours
if (packet.clockMHz() * 1000000 != settings.station.clock
|| packet.bitMode() != settings.station.bitmode) {
if (!hadModeError) {
LOG_ERROR_STR( logPrefix << "Packet has mode (" << packet.clockMHz() << " MHz, " << packet.bitMode() << " bit), but should be mode (" << settings.station.clock / 1000000 << " MHz, " << settings.station.bitmode << " bit)");
hadModeError = true;
}
++nrBadMode;
return false;
}
return true;
}
template<typename T> void Station<T>::StreamReader::logStatistics()
{
LOG_INFO_STR( logPrefix << "Received " << nrReceived << " packets: " << nrBadTime << " bad timestamps, " << nrBadSize << " bad sizes, " << nrBadData << " payload errors, " << nrBadMode << " clock/bitmode errors" );
nrReceived = 0;
nrBadTime = 0;
nrBadSize = 0;
nrBadData = 0;
nrBadMode = 0;
hadSizeError = false;
hadModeError = false;
}
template<typename T> Station<T>::BufferWriter::BufferWriter( const std::string &logPrefix, SampleBuffer<T> &buffer, Ranges &flags, size_t firstBeamlet, const struct BufferSettings &settings )
:
logPrefix(str(boost::format("%s [BufferWriter] ") % logPrefix)),
......
......@@ -35,7 +35,11 @@ void StationStreams::process()
for (size_t i = 0; i < nrBoards; ++i) {
OMPThread::ScopedRun sr(threads[i]);
processBoard(i);
try {
processBoard(i);
} catch(Exception &ex) {
LOG_ERROR_STR("Caught exception: " << ex);
}
}
// we're done
......@@ -50,10 +54,15 @@ void StationStreams::process()
for (size_t i = 0; i < nrBoards; ++i) {
OMPThread::ScopedRun sr(threads[i + nrBoards]);
for(;;) {
sleep(1);
logStatistics();
try {
for(;;) {
if (usleep(999999) == -1 && errno == EINTR)
break;
logStatistics();
}
} catch(Exception &ex) {
LOG_ERROR_STR("Caught exception: " << ex);
}
}
}
......@@ -68,7 +77,11 @@ void StationStreams::process()
LOG_INFO_STR( logPrefix << "Stopping all boards" );
#pragma omp parallel for num_threads(threads.size())
for (size_t i = 0; i < threads.size(); ++i)
threads[i].kill();
try {
threads[i].kill();
} catch(Exception &ex) {
LOG_ERROR_STR("Caught exception: " << ex);
}
}
}
......
......@@ -5,6 +5,7 @@ include(LofarCTest)
# Add project's source directory to -I path.
include_directories(${PACKAGE_SOURCE_DIR}/src)
lofar_add_test(tGenerator tGenerator.cc)
lofar_add_test(tRSPTimeStamp2 tRSPTimeStamp2.cc)
lofar_add_test(tRanges tRanges.cc)
lofar_add_test(tSharedMemory tSharedMemory.cc)
#include <lofar_config.h>
#include "Generator.h"
#include "PacketReader.h"
#include <Common/LofarLogger.h>
#include <Common/Thread/Thread.h>
#include <Interface/SmartPtr.h>
#include <Interface/Stream.h>
#include <unistd.h>
#include <vector>
#include <string>
#include <unistd.h>
#include "omp.h"
using namespace LOFAR;
using namespace RTCP;
using namespace std;
// Duration of the test (seconds)
#define DURATION 5
void sighandler(int)
{
/* no-op */
}
int main( int, char **argv ) {
INIT_LOGGER( argv[0] );
omp_set_nested(true);
omp_set_num_threads(32);
signal(SIGHUP, sighandler);
siginterrupt(SIGHUP, 1);
vector<string> streamDescs(1, "tcp:localhost:54321");
unsigned clock = 200 * 1000 * 1000;
struct StationID stationID("RS106", "LBA", clock, 16);
struct BufferSettings settings;
settings.station = stationID;
settings.nrBeamlets = 61;
settings.nrBoards = 1;
settings.nrSamples = (2 * stationID.clock / 1024);// & ~0xFL;
settings.nrFlagRanges = 64;
settings.dataKey = stationID.hash();
Generator g(settings, streamDescs);
#pragma omp parallel sections num_threads(3)
{
#pragma omp section
{
// Generate packets
try {
g.process();
} catch(Exception &ex) {
LOG_ERROR_STR("Caught exception: " << ex);
}
}
#pragma omp section
{
// Read and verify the generated packets
try {
PacketReader reader("", streamDescs[0], settings);
for(;;) {
struct RSP packet;
if (!reader.readPacket(packet)) {
reader.logStatistics();
ASSERT(false);
}
}
} catch(Stream::EndOfStreamException &ex) {
} catch(Exception &ex) {
LOG_ERROR_STR("Caught exception: " << ex);
}
}
#pragma omp section
{
// Stop the experiment after a while
sleep(DURATION);
g.stop();
}
}
return 0;
}
#include <lofar_config.h>
#include "Ranges.h"
#include <Common/LofarLogger.h>
#include <Common/Thread/Thread.h>
#include <unistd.h>
#include <vector>
using namespace LOFAR;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment