Skip to content
Snippets Groups Projects
Commit 4d59d179 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #4471: Fixed various timing issues, and made tMPITransfer non-realtime

parent f2c567ea
No related branches found
No related tags found
No related merge requests found
Showing
with 51 additions and 42 deletions
......@@ -132,6 +132,10 @@ namespace LOFAR
std::vector<Board> boards;
};
// Removes the sample buffers that correspond to settings.dataKey,
// as well as any sample buffer that refers to the same station.
void removeSampleBuffers( const BufferSettings &settings );
}
}
......
......@@ -10,6 +10,7 @@ lofar_add_library(inputproc
RSPBoards.cc
Buffer/BufferSettings.cc
Buffer/Ranges.cc
Buffer/SampleBuffer.cc
Buffer/SharedMemory.cc
Buffer/StationID.cc
Delays/Delays.cc
......
......@@ -35,13 +35,15 @@ namespace LOFAR
namespace Cobalt
{
Generator::Generator( const BufferSettings &settings, const std::vector< SmartPtr<Stream> > &outputStreams_, PacketFactory &packetFactory )
Generator::Generator( const BufferSettings &settings, const std::vector< SmartPtr<Stream> > &outputStreams_, PacketFactory &packetFactory, const TimeStamp &from, const TimeStamp &to )
:
RSPBoards(str(boost::format("[station %s %s] [Generator] ") % settings.station.stationName % settings.station.antennaField), outputStreams_.size()),
settings(settings),
outputStreams(outputStreams_.size()),
packetFactory(packetFactory),
nrSent(nrBoards, 0)
nrSent(nrBoards, 0),
from(from),
to(to)
{
for (size_t i = 0; i < outputStreams.size(); ++i) {
outputStreams[i] = outputStreams_[i];
......@@ -60,8 +62,7 @@ namespace LOFAR
LOG_INFO_STR( logPrefix << "Start" );
TimeStamp current(time(0L) + 1, 0, settings.station.clockMHz * 1000000);
for(;; ) {
for(TimeStamp current = from; !to || current < to; /* increment in loop */ ) {
struct RSP packet;
// generate packet
......
......@@ -44,7 +44,7 @@ namespace LOFAR
class Generator : public RSPBoards
{
public:
Generator( const BufferSettings &settings, const std::vector< SmartPtr<Stream> > &outputStreams, PacketFactory &packetFactory );
Generator( const BufferSettings &settings, const std::vector< SmartPtr<Stream> > &outputStreams, PacketFactory &packetFactory, const TimeStamp &from, const TimeStamp &to );
protected:
const BufferSettings settings;
......@@ -53,6 +53,8 @@ namespace LOFAR
std::vector<size_t> nrSent;
const TimeStamp from, to;
virtual void processBoard( size_t nr );
virtual void logStatistics();
};
......
......@@ -37,7 +37,7 @@ namespace LOFAR
{
PacketsToBuffer::PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr, bool cleanup )
PacketsToBuffer::PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr )
:
logPrefix(str(boost::format("[station %s board %u] [PacketsToBuffer] ") % settings.station.stationName % boardNr)),
inputStream(inputStream),
......@@ -46,32 +46,6 @@ namespace LOFAR
boardNr(boardNr)
{
LOG_INFO_STR( logPrefix << "Initialised" );
if (cleanup) {
/*
* Make sure there are no lingering SHM buffers for this
* station from previous runs.
*/
// Remove the provided dataKey, as it could be a custom setting
SharedMemoryArena::remove(settings.dataKey);
// Remove the keys of all possible configurations
StationID station = settings.station;
const unsigned bitmodes[] = { 4, 8, 16 };
const unsigned clocks[] = { 160, 200 };
for (size_t b = 0; b < sizeof bitmodes / sizeof bitmodes[0]; ++b) {
for (size_t c = 0; c < sizeof clocks / sizeof clocks[0]; ++c) {
station.bitMode = bitmodes[b];
station.clockMHz = clocks[c];
// Remove any lingering buffer for this mode
SharedMemoryArena::remove(station.hash());
}
}
}
}
......
......@@ -54,7 +54,7 @@ namespace LOFAR
public:
// cleanup: if true, erase any existing SHM areas corresponding to our
// station.
PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr, bool cleanup = true );
PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr );
// Process data for this board until interrupted or end of data. Auto-senses
// mode (bit mode & clock).
......
......@@ -59,8 +59,11 @@ int main( int argc, char **argv )
struct StationID stationID(stationName, "LBA", 200, 16);
struct BufferSettings settings(stationID, false);
const TimeStamp from(time(0), 0, stationID.clockMHz * 1000000);
const TimeStamp to(0);
PacketFactory factory(settings);
Generator g(settings, outputStreams, factory);
Generator g(settings, outputStreams, factory, from, to);
// Generate packets
g.process();
......
/* PacketsToBuffer.cc
/* tBlockReader.cc
* Copyright (C) 2013 ASTRON (Netherlands Institute for Radio Astronomy)
* P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
*
......@@ -112,7 +112,7 @@ void test( struct BufferSettings &settings, const std::string &filename )
FileStream fs(filename);
// Set up transfer
PacketsToBuffer transfer(fs, settings, 0, false);
PacketsToBuffer transfer(fs, settings, 0);
// Do transfer
transfer.process();
......@@ -152,6 +152,7 @@ int main()
// Use a fixed key, so the test suite knows what to clean
settings.dataKey = 0x10000001;
removeSampleBuffers(settings);
// Limit the array in size to work on systems with only 32MB SHM
settings.nrBoards = 1;
......
......@@ -69,8 +69,10 @@ int main( int, char **argv )
struct StationID stationID("RS106", "LBA", 200, 16);
struct BufferSettings settings(stationID, false);
const TimeStamp from(time(0), 0, stationID.clockMHz * 1000000);
const TimeStamp to = from + NUMPACKETS * 16; /* 16 timeslots/packet */
PacketFactory factory(settings);
Generator g(settings, outputStreams, factory);
Generator g(settings, outputStreams, factory, from, to);
bool error = false;
......
......@@ -69,8 +69,8 @@ Exception::TerminateHandler t(Exception::terminate);
const size_t clockMHz = 200;
const size_t clockHz = clockMHz * 1000 * 1000;
typedef SampleType<i16complex> SampleT;
const TimeStamp from(time(0L) + 10, 0, clockHz);
const TimeStamp to(time(0L) + 10 + DURATION, 0, clockHz);
TimeStamp from;
TimeStamp to;
const size_t blockSize = BLOCKSIZE * clockHz / 1024;
map<int, std::vector<size_t> > beamletDistribution;
const size_t nrHistorySamples = 16;
......@@ -94,6 +94,15 @@ void sender()
struct StationID stationID(str(format("CS%03d") % rank), "LBA", clockMHz, 16);
struct BufferSettings settings(stationID, false);
// sync readers and writers to prevent data loss
// if the reader is delayed w.r.t. the generator
SmartPtr<SyncLock> syncLock;
syncLock = new SyncLock(settings);
settings.syncLock = syncLock;
settings.sync = true;
omp_set_nested(true);
//omp_set_num_threads(32);
OMPThread::init();
......@@ -115,9 +124,11 @@ void sender()
}
}
removeSampleBuffers(settings);
MultiPacketsToBuffer station( settings, inputStreams );
PacketFactory factory( settings );
Generator generator( settings, outputStreams, factory );
Generator generator( settings, outputStreams, factory, from - nrHistorySamples, to );
#pragma omp parallel sections
{
......@@ -192,7 +203,7 @@ void receiver()
// validate meta data
for (int s = 0; s < nrStations; ++s) {
for (size_t b = 0; b < nrBeamlets; ++b) {
ASSERT(blocks[s].beamlets[b].metaData.flags == metaData.flags);
ASSERTSTR(blocks[s].beamlets[b].metaData.flags == metaData.flags, "Got flags " << blocks[s].beamlets[b].metaData.flags << " but expected flags " << metaData.flags);
}
}
......@@ -233,6 +244,12 @@ int main( int argc, char **argv )
// Need at least one sender and one receiver
ASSERT( nrHosts >= 2 );
// agree on the start time
time_t now = time(0L);
MPI_Bcast(&now, sizeof now, MPI_CHAR, 0, MPI_COMM_WORLD);
from = TimeStamp(now + 5, 0, clockHz);
to = TimeStamp(now + 5 + DURATION, 0, clockHz);
// Use half of the nodes as stations
nrStations = nrHosts/2;
nrReceivers = nrHosts - nrStations;
......
......@@ -111,6 +111,7 @@ int main()
// Use a fixed key, so the test suite knows what to clean
settings.dataKey = 0x10000003;
removeSampleBuffers(settings);
// Limit the array in size to work on systems with only 32MB SHM
settings.nrBoards = 1;
......
......@@ -47,7 +47,7 @@ void test( struct BufferSettings &settings, const std::string &filename )
FileStream fs(filename);
// Set up transfer
PacketsToBuffer transfer(fs, settings, 0, false);
PacketsToBuffer transfer(fs, settings, 0);
// Do transfer
transfer.process();
......@@ -73,6 +73,7 @@ int main()
// Use a fixed key, so the test suite knows what to clean
settings.dataKey = 0x10000002;
removeSampleBuffers(settings);
// Limit the array in size to work on systems with only 32MB SHM
settings.nrBoards = 1;
......
......@@ -57,6 +57,7 @@ int main()
// Use a fixed key, so the test suite knows what to clean
settings.dataKey = 0x10000004;
removeSampleBuffers(settings);
// Limit the array in size to work on systems with only 32MB SHM
settings.nrBoards = 1;
......
......@@ -57,6 +57,7 @@ void initBoard()
// Use a fixed key, so the test suite knows what to clean
settings.dataKey = 0x10000005;
removeSampleBuffers(settings);
// Limit the array in size to work on systems with only 32MB SHM
settings.nrBoards = 1;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment