diff --git a/RTCP/Cobalt/InputProc/src/Buffer/SampleBuffer.h b/RTCP/Cobalt/InputProc/src/Buffer/SampleBuffer.h index f99b7d3548418307cc3aa249e76437a66cdc830a..ec6c0a0d3e067723a25d893ca0dd3f24389f9b0c 100644 --- a/RTCP/Cobalt/InputProc/src/Buffer/SampleBuffer.h +++ b/RTCP/Cobalt/InputProc/src/Buffer/SampleBuffer.h @@ -132,6 +132,10 @@ namespace LOFAR std::vector<Board> boards; }; + + // Removes the sample buffers that correspond to settings.dataKey, + // as well as any sample buffer that refers to the same station. + void removeSampleBuffers( const BufferSettings &settings ); } } diff --git a/RTCP/Cobalt/InputProc/src/CMakeLists.txt b/RTCP/Cobalt/InputProc/src/CMakeLists.txt index 4b34a30bd598ce6d954a8f7645b818a536b92e13..0c54f3870da1a201ed0898bbbbf7cbcfb11e87e6 100644 --- a/RTCP/Cobalt/InputProc/src/CMakeLists.txt +++ b/RTCP/Cobalt/InputProc/src/CMakeLists.txt @@ -10,6 +10,7 @@ lofar_add_library(inputproc RSPBoards.cc Buffer/BufferSettings.cc Buffer/Ranges.cc + Buffer/SampleBuffer.cc Buffer/SharedMemory.cc Buffer/StationID.cc Delays/Delays.cc diff --git a/RTCP/Cobalt/InputProc/src/Station/Generator.cc b/RTCP/Cobalt/InputProc/src/Station/Generator.cc index dfcf249f48dea8fd2effb16d611e7256b877807d..6fec9c57eac87a972b38a8d49ae7cae09cb31440 100644 --- a/RTCP/Cobalt/InputProc/src/Station/Generator.cc +++ b/RTCP/Cobalt/InputProc/src/Station/Generator.cc @@ -35,13 +35,15 @@ namespace LOFAR namespace Cobalt { - Generator::Generator( const BufferSettings &settings, const std::vector< SmartPtr<Stream> > &outputStreams_, PacketFactory &packetFactory ) + Generator::Generator( const BufferSettings &settings, const std::vector< SmartPtr<Stream> > &outputStreams_, PacketFactory &packetFactory, const TimeStamp &from, const TimeStamp &to ) : RSPBoards(str(boost::format("[station %s %s] [Generator] ") % settings.station.stationName % settings.station.antennaField), outputStreams_.size()), settings(settings), outputStreams(outputStreams_.size()), packetFactory(packetFactory), - nrSent(nrBoards, 0) + nrSent(nrBoards, 0), + from(from), + to(to) { for (size_t i = 0; i < outputStreams.size(); ++i) { outputStreams[i] = outputStreams_[i]; @@ -60,8 +62,7 @@ namespace LOFAR LOG_INFO_STR( logPrefix << "Start" ); - TimeStamp current(time(0L) + 1, 0, settings.station.clockMHz * 1000000); - for(;; ) { + for(TimeStamp current = from; !to || current < to; /* increment in loop */ ) { struct RSP packet; // generate packet diff --git a/RTCP/Cobalt/InputProc/src/Station/Generator.h b/RTCP/Cobalt/InputProc/src/Station/Generator.h index 43360a4f55d7505ba40791aa4a2160e7f70d84c7..485ed8a74eb1135156a2d05ed0774b0e235c2114 100644 --- a/RTCP/Cobalt/InputProc/src/Station/Generator.h +++ b/RTCP/Cobalt/InputProc/src/Station/Generator.h @@ -44,7 +44,7 @@ namespace LOFAR class Generator : public RSPBoards { public: - Generator( const BufferSettings &settings, const std::vector< SmartPtr<Stream> > &outputStreams, PacketFactory &packetFactory ); + Generator( const BufferSettings &settings, const std::vector< SmartPtr<Stream> > &outputStreams, PacketFactory &packetFactory, const TimeStamp &from, const TimeStamp &to ); protected: const BufferSettings settings; @@ -53,6 +53,8 @@ namespace LOFAR std::vector<size_t> nrSent; + const TimeStamp from, to; + virtual void processBoard( size_t nr ); virtual void logStatistics(); }; diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.cc b/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.cc index 27ad3ddd51732cffe0c13c3290996cd7928b3fbc..0ce0336da2498cd20950ab7121b8e56405bf51cb 100644 --- a/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.cc +++ b/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.cc @@ -37,7 +37,7 @@ namespace LOFAR { - PacketsToBuffer::PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr, bool cleanup ) + PacketsToBuffer::PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr ) : logPrefix(str(boost::format("[station %s board %u] [PacketsToBuffer] ") % settings.station.stationName % boardNr)), inputStream(inputStream), @@ -46,32 +46,6 @@ namespace LOFAR boardNr(boardNr) { LOG_INFO_STR( logPrefix << "Initialised" ); - - if (cleanup) { - /* - * Make sure there are no lingering SHM buffers for this - * station from previous runs. - */ - - // Remove the provided dataKey, as it could be a custom setting - SharedMemoryArena::remove(settings.dataKey); - - // Remove the keys of all possible configurations - StationID station = settings.station; - - const unsigned bitmodes[] = { 4, 8, 16 }; - const unsigned clocks[] = { 160, 200 }; - - for (size_t b = 0; b < sizeof bitmodes / sizeof bitmodes[0]; ++b) { - for (size_t c = 0; c < sizeof clocks / sizeof clocks[0]; ++c) { - station.bitMode = bitmodes[b]; - station.clockMHz = clocks[c]; - - // Remove any lingering buffer for this mode - SharedMemoryArena::remove(station.hash()); - } - } - } } diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.h b/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.h index a0010453b3f9f91564460937678a51b419bea78b..92cabc573846b5202ecf9e0986a7a9c82a699d27 100644 --- a/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.h +++ b/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.h @@ -54,7 +54,7 @@ namespace LOFAR public: // cleanup: if true, erase any existing SHM areas corresponding to our // station. - PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr, bool cleanup = true ); + PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr ); // Process data for this board until interrupted or end of data. Auto-senses // mode (bit mode & clock). diff --git a/RTCP/Cobalt/InputProc/src/Station/generate.cc b/RTCP/Cobalt/InputProc/src/Station/generate.cc index 0e42a9e81afa7ff944f5258b24fbc6ad84a1bbc1..662ee885577d6fd96df20824cc1d230e5f374724 100644 --- a/RTCP/Cobalt/InputProc/src/Station/generate.cc +++ b/RTCP/Cobalt/InputProc/src/Station/generate.cc @@ -59,8 +59,11 @@ int main( int argc, char **argv ) struct StationID stationID(stationName, "LBA", 200, 16); struct BufferSettings settings(stationID, false); + const TimeStamp from(time(0), 0, stationID.clockMHz * 1000000); + const TimeStamp to(0); + PacketFactory factory(settings); - Generator g(settings, outputStreams, factory); + Generator g(settings, outputStreams, factory, from, to); // Generate packets g.process(); diff --git a/RTCP/Cobalt/InputProc/test/tBlockReader.cc b/RTCP/Cobalt/InputProc/test/tBlockReader.cc index 8342765d40674ef4a9e39116e7186cb0b4e47063..3c22775beb3ff9232f203f6d7b08624c132a5e52 100644 --- a/RTCP/Cobalt/InputProc/test/tBlockReader.cc +++ b/RTCP/Cobalt/InputProc/test/tBlockReader.cc @@ -1,4 +1,4 @@ -/* PacketsToBuffer.cc +/* tBlockReader.cc * Copyright (C) 2013 ASTRON (Netherlands Institute for Radio Astronomy) * P.O. Box 2, 7990 AA Dwingeloo, The Netherlands * @@ -112,7 +112,7 @@ void test( struct BufferSettings &settings, const std::string &filename ) FileStream fs(filename); // Set up transfer - PacketsToBuffer transfer(fs, settings, 0, false); + PacketsToBuffer transfer(fs, settings, 0); // Do transfer transfer.process(); @@ -152,6 +152,7 @@ int main() // Use a fixed key, so the test suite knows what to clean settings.dataKey = 0x10000001; + removeSampleBuffers(settings); // Limit the array in size to work on systems with only 32MB SHM settings.nrBoards = 1; diff --git a/RTCP/Cobalt/InputProc/test/tGenerator.cc b/RTCP/Cobalt/InputProc/test/tGenerator.cc index 55f5897822c28d7ee66dfed5311d64c92ae33478..ee45c92f66771f4144cfe18df22f82ac15b0a072 100644 --- a/RTCP/Cobalt/InputProc/test/tGenerator.cc +++ b/RTCP/Cobalt/InputProc/test/tGenerator.cc @@ -69,8 +69,10 @@ int main( int, char **argv ) struct StationID stationID("RS106", "LBA", 200, 16); struct BufferSettings settings(stationID, false); + const TimeStamp from(time(0), 0, stationID.clockMHz * 1000000); + const TimeStamp to = from + NUMPACKETS * 16; /* 16 timeslots/packet */ PacketFactory factory(settings); - Generator g(settings, outputStreams, factory); + Generator g(settings, outputStreams, factory, from, to); bool error = false; diff --git a/RTCP/Cobalt/InputProc/test/tMPITransfer.cc b/RTCP/Cobalt/InputProc/test/tMPITransfer.cc index 87d808d3e8e1ca1e19c168f4aded681939b26fec..9a9e096bf875fadd9c5fbd8813778a1e01a440b5 100644 --- a/RTCP/Cobalt/InputProc/test/tMPITransfer.cc +++ b/RTCP/Cobalt/InputProc/test/tMPITransfer.cc @@ -69,8 +69,8 @@ Exception::TerminateHandler t(Exception::terminate); const size_t clockMHz = 200; const size_t clockHz = clockMHz * 1000 * 1000; typedef SampleType<i16complex> SampleT; -const TimeStamp from(time(0L) + 10, 0, clockHz); -const TimeStamp to(time(0L) + 10 + DURATION, 0, clockHz); +TimeStamp from; +TimeStamp to; const size_t blockSize = BLOCKSIZE * clockHz / 1024; map<int, std::vector<size_t> > beamletDistribution; const size_t nrHistorySamples = 16; @@ -94,6 +94,15 @@ void sender() struct StationID stationID(str(format("CS%03d") % rank), "LBA", clockMHz, 16); struct BufferSettings settings(stationID, false); + // sync readers and writers to prevent data loss + // if the reader is delayed w.r.t. the generator + + SmartPtr<SyncLock> syncLock; + + syncLock = new SyncLock(settings); + settings.syncLock = syncLock; + settings.sync = true; + omp_set_nested(true); //omp_set_num_threads(32); OMPThread::init(); @@ -115,9 +124,11 @@ void sender() } } + removeSampleBuffers(settings); + MultiPacketsToBuffer station( settings, inputStreams ); PacketFactory factory( settings ); - Generator generator( settings, outputStreams, factory ); + Generator generator( settings, outputStreams, factory, from - nrHistorySamples, to ); #pragma omp parallel sections { @@ -192,7 +203,7 @@ void receiver() // validate meta data for (int s = 0; s < nrStations; ++s) { for (size_t b = 0; b < nrBeamlets; ++b) { - ASSERT(blocks[s].beamlets[b].metaData.flags == metaData.flags); + ASSERTSTR(blocks[s].beamlets[b].metaData.flags == metaData.flags, "Got flags " << blocks[s].beamlets[b].metaData.flags << " but expected flags " << metaData.flags); } } @@ -233,6 +244,12 @@ int main( int argc, char **argv ) // Need at least one sender and one receiver ASSERT( nrHosts >= 2 ); + // agree on the start time + time_t now = time(0L); + MPI_Bcast(&now, sizeof now, MPI_CHAR, 0, MPI_COMM_WORLD); + from = TimeStamp(now + 5, 0, clockHz); + to = TimeStamp(now + 5 + DURATION, 0, clockHz); + // Use half of the nodes as stations nrStations = nrHosts/2; nrReceivers = nrHosts - nrStations; diff --git a/RTCP/Cobalt/InputProc/test/tPacketWriter.cc b/RTCP/Cobalt/InputProc/test/tPacketWriter.cc index 4a7019e0d5db15b798499b02157c8c9379d51f83..f97c7bea15b2d1abf475d5e2c2f6756f15173609 100644 --- a/RTCP/Cobalt/InputProc/test/tPacketWriter.cc +++ b/RTCP/Cobalt/InputProc/test/tPacketWriter.cc @@ -111,6 +111,7 @@ int main() // Use a fixed key, so the test suite knows what to clean settings.dataKey = 0x10000003; + removeSampleBuffers(settings); // Limit the array in size to work on systems with only 32MB SHM settings.nrBoards = 1; diff --git a/RTCP/Cobalt/InputProc/test/tPacketsToBuffer.cc b/RTCP/Cobalt/InputProc/test/tPacketsToBuffer.cc index fe777f80f690b946b93e1bd87cd78dcab774c9d1..8edd3719f3e8fad5de4e9eb117003feb27c09d2d 100644 --- a/RTCP/Cobalt/InputProc/test/tPacketsToBuffer.cc +++ b/RTCP/Cobalt/InputProc/test/tPacketsToBuffer.cc @@ -47,7 +47,7 @@ void test( struct BufferSettings &settings, const std::string &filename ) FileStream fs(filename); // Set up transfer - PacketsToBuffer transfer(fs, settings, 0, false); + PacketsToBuffer transfer(fs, settings, 0); // Do transfer transfer.process(); @@ -73,6 +73,7 @@ int main() // Use a fixed key, so the test suite knows what to clean settings.dataKey = 0x10000002; + removeSampleBuffers(settings); // Limit the array in size to work on systems with only 32MB SHM settings.nrBoards = 1; diff --git a/RTCP/Cobalt/InputProc/test/tSampleBuffer.cc b/RTCP/Cobalt/InputProc/test/tSampleBuffer.cc index ea13871439108e4bde21b3d17b16302624d328c3..7c25d529c3d4002829cf5591e36f82581286a75e 100644 --- a/RTCP/Cobalt/InputProc/test/tSampleBuffer.cc +++ b/RTCP/Cobalt/InputProc/test/tSampleBuffer.cc @@ -57,6 +57,7 @@ int main() // Use a fixed key, so the test suite knows what to clean settings.dataKey = 0x10000004; + removeSampleBuffers(settings); // Limit the array in size to work on systems with only 32MB SHM settings.nrBoards = 1; diff --git a/RTCP/Cobalt/InputProc/test/tSampleBufferSync.cc b/RTCP/Cobalt/InputProc/test/tSampleBufferSync.cc index 7883a138447d7d35177e0f88deae4b84c07ce3d5..120f7d91e3c15969add7bd60e56737ebec4eb5b2 100644 --- a/RTCP/Cobalt/InputProc/test/tSampleBufferSync.cc +++ b/RTCP/Cobalt/InputProc/test/tSampleBufferSync.cc @@ -57,6 +57,7 @@ void initBoard() // Use a fixed key, so the test suite knows what to clean settings.dataKey = 0x10000005; + removeSampleBuffers(settings); // Limit the array in size to work on systems with only 32MB SHM settings.nrBoards = 1;