From d7fa9f664e0622870765847c2863a8bcf51b216a Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Sun, 24 Mar 2013 07:28:39 +0000 Subject: [PATCH] Task #4315: Major rewrite of MPI transpose --- .gitattributes | 6 +- RTCP/Cobalt/CoInterface/src/SparseSet.h | 43 ++ .../InputProc/src/Buffer/SampleBuffer.h | 2 + .../InputProc/src/Buffer/SampleBufferReader.h | 53 +- .../src/Buffer/SampleBufferReader.tcc | 110 ++-- RTCP/Cobalt/InputProc/src/CMakeLists.txt | 2 +- RTCP/Cobalt/InputProc/src/RSPBoards.cc | 28 +- .../Cobalt/InputProc/src/Station/Generator.cc | 46 +- RTCP/Cobalt/InputProc/src/Station/Generator.h | 6 +- .../InputProc/src/Station/PacketFactory.cc | 100 +++ .../InputProc/src/Station/PacketFactory.h | 65 ++ .../InputProc/src/Station/PacketWriter.tcc | 4 +- .../src/Transpose/MPITransferStations.h | 38 +- .../src/Transpose/MPITransferStations.tcc | 609 ++++++++++++++---- RTCP/Cobalt/InputProc/src/newInputSection.cc | 186 ------ RTCP/Cobalt/InputProc/test/CMakeLists.txt | 2 + RTCP/Cobalt/InputProc/test/tMPITransfer.cc | 220 +++++++ RTCP/Cobalt/InputProc/test/tMPITransfer.run | 2 + RTCP/Cobalt/InputProc/test/tMPITransfer.sh | 2 + 19 files changed, 1048 insertions(+), 476 deletions(-) create mode 100644 RTCP/Cobalt/InputProc/src/Station/PacketFactory.cc create mode 100644 RTCP/Cobalt/InputProc/src/Station/PacketFactory.h delete mode 100644 RTCP/Cobalt/InputProc/src/newInputSection.cc create mode 100644 RTCP/Cobalt/InputProc/test/tMPITransfer.cc create mode 100755 RTCP/Cobalt/InputProc/test/tMPITransfer.run create mode 100755 RTCP/Cobalt/InputProc/test/tMPITransfer.sh diff --git a/.gitattributes b/.gitattributes index b9542693c0c..b2c4936a502 100644 --- a/.gitattributes +++ b/.gitattributes @@ -3938,6 +3938,8 @@ RTCP/Cobalt/InputProc/src/RSPBoards.h -text RTCP/Cobalt/InputProc/src/SampleType.h -text RTCP/Cobalt/InputProc/src/Station/Generator.cc -text RTCP/Cobalt/InputProc/src/Station/Generator.h -text +RTCP/Cobalt/InputProc/src/Station/PacketFactory.cc -text +RTCP/Cobalt/InputProc/src/Station/PacketFactory.h -text RTCP/Cobalt/InputProc/src/Station/PacketReader.cc -text RTCP/Cobalt/InputProc/src/Station/PacketReader.h -text RTCP/Cobalt/InputProc/src/Station/PacketWriter.h -text @@ -3950,12 +3952,14 @@ RTCP/Cobalt/InputProc/src/Station/printRSP.cc -text RTCP/Cobalt/InputProc/src/Transpose/MPITransferStations.h -text RTCP/Cobalt/InputProc/src/Transpose/MPITransferStations.tcc -text RTCP/Cobalt/InputProc/src/WallClockTime.h -text -RTCP/Cobalt/InputProc/src/newInputSection.cc -text RTCP/Cobalt/InputProc/src/obsolete/MPI_RMA.h -text RTCP/Cobalt/InputProc/src/obsolete/Poll.h -text RTCP/Cobalt/InputProc/src/obsolete/TimeSync.h -text RTCP/Cobalt/InputProc/test/CMakeLists.txt -text RTCP/Cobalt/InputProc/test/tGenerator.cc -text +RTCP/Cobalt/InputProc/test/tMPITransfer.cc -text +RTCP/Cobalt/InputProc/test/tMPITransfer.run -text +RTCP/Cobalt/InputProc/test/tMPITransfer.sh -text RTCP/Cobalt/InputProc/test/tPacketFactory.cc -text RTCP/Cobalt/InputProc/test/tPacketReader.cc -text RTCP/Cobalt/InputProc/test/tPacketReader.in_16bit -text diff --git a/RTCP/Cobalt/CoInterface/src/SparseSet.h b/RTCP/Cobalt/CoInterface/src/SparseSet.h index af56a93822e..a4195e96304 100644 --- a/RTCP/Cobalt/CoInterface/src/SparseSet.h +++ b/RTCP/Cobalt/CoInterface/src/SparseSet.h @@ -82,6 +82,10 @@ namespace LOFAR SparseSet<T> operator | (const SparseSet<T> &) const; SparseSet<T> &operator |= (const SparseSet<T> &); + // Return the intersection of two sets. + SparseSet<T> operator & (const SparseSet<T> &) const; + SparseSet<T> &operator &= (const SparseSet<T> &); + // Increase all indices in the set by `count'. SparseSet<T> &operator += (size_t count); @@ -163,6 +167,14 @@ namespace LOFAR } + template <typename T> + inline SparseSet<T> &SparseSet<T>::operator &= (const SparseSet<T> &other) + { + ranges = (*this & other).ranges; + return *this; + } + + template <typename T> inline SparseSet<T> SparseSet<T>::invert(T first, T last) const { @@ -311,6 +323,37 @@ namespace LOFAR } + template <typename T> + SparseSet<T> SparseSet<T>::operator & (const SparseSet<T> &other) const + { + SparseSet<T> intersection_set; + const_iterator it1 = ranges.begin(), it2 = other.ranges.begin(); + + while (it1 != ranges.end() && it2 != other.ranges.end()) { + if (it1->end < it2->begin) { + // no overlap; *it1 is the smallest + ++it1; + } else if (it2->end < it1->begin) { + // no overlap; *it2 is the smallest + ++it2; + } else { // there is overlap, or it1 and it2 are contiguous + intersection_set.ranges.push_back(range(std::max(it1->begin, it2->begin), std::min(it1->end, it2->end))); + + // continue with earliest end, as multiple ranges may overlap + // with the latest end. + if (it1->end < it2->end) + ++it1; + else + ++it2; + } + } + + // ignore the remainder of the set that we have not finished yet + + return intersection_set; + } + + template <typename T> SparseSet<T> &SparseSet<T>::operator += (size_t count) { diff --git a/RTCP/Cobalt/InputProc/src/Buffer/SampleBuffer.h b/RTCP/Cobalt/InputProc/src/Buffer/SampleBuffer.h index d0da3deb1c9..dc1deaedbd2 100644 --- a/RTCP/Cobalt/InputProc/src/Buffer/SampleBuffer.h +++ b/RTCP/Cobalt/InputProc/src/Buffer/SampleBuffer.h @@ -90,6 +90,8 @@ namespace LOFAR MultiDimArray<T,2> beamlets; // [subband][sample] + size_t offset( const TimeStamp ×tamp ) const { return (int64)timestamp % nrSamples; } + class Board { public: Board( SampleBuffer<T> &buffer ); diff --git a/RTCP/Cobalt/InputProc/src/Buffer/SampleBufferReader.h b/RTCP/Cobalt/InputProc/src/Buffer/SampleBufferReader.h index 6c515f2fd78..a206ddb9b65 100644 --- a/RTCP/Cobalt/InputProc/src/Buffer/SampleBufferReader.h +++ b/RTCP/Cobalt/InputProc/src/Buffer/SampleBufferReader.h @@ -62,25 +62,33 @@ namespace LOFAR const size_t nrHistorySamples; struct CopyInstructions { - // Beamlet index - unsigned beamlet; - // Relevant time range TimeStamp from; TimeStamp to; - // Copy as one or two ranges of [from, to). - struct Range { - const T* from; - const T* to; - } ranges[2]; + struct Beamlet { + // Copy as one or two ranges of [from, to). + struct Range { + const T* from; + const T* to; + } ranges[2]; + + unsigned nrRanges; - unsigned nrRanges; + // The offset at which the data is accessed. + ssize_t offset; + }; - // The flags for this range - SparseSet<int64> flags; + std::vector<struct Beamlet> beamlets; }; + /* + * Read the flags for a specific beamlet. Readers should read the flags + * before and after reading the data. The valid data is then indicated by + * the intersection of both sets (flagsBefore & flagsAfter). + */ + SparseSet<int64> flags( const struct CopyInstructions &, unsigned beamlet ); + /* * Provide the offset in samples for a certain beamlet, based on the * geometric delays for the respective subband. @@ -93,36 +101,17 @@ namespace LOFAR return 0; } - /* - * Setup the copying of one block. - */ - virtual void copyStart( const TimeStamp &from, const TimeStamp &to, const std::vector<size_t> &wrapOffsets ) - { - (void)from; - (void)to; - (void)wrapOffsets; - } - /* * Copy one block. */ - virtual void copy( const struct CopyInstructions & ) + virtual void sendBlock( const struct CopyInstructions & ) { } - /* - * Tear down the copying of one block. - */ - virtual void copyEnd( const TimeStamp &from, const TimeStamp &to ) - { - (void)from; - (void)to; - } - private: WallClockTime waiter; - void copy( const TimeStamp &from, const TimeStamp &to ); + void sendBlock( const TimeStamp &from, const TimeStamp &to ); }; } diff --git a/RTCP/Cobalt/InputProc/src/Buffer/SampleBufferReader.tcc b/RTCP/Cobalt/InputProc/src/Buffer/SampleBufferReader.tcc index bf9bad3505e..131f690bc52 100644 --- a/RTCP/Cobalt/InputProc/src/Buffer/SampleBufferReader.tcc +++ b/RTCP/Cobalt/InputProc/src/Buffer/SampleBufferReader.tcc @@ -45,6 +45,7 @@ template<typename T> SampleBufferReader<T>::SampleBufferReader( const BufferSett ASSERT( blockSize > 0 ); ASSERT( blockSize < settings.nrSamples ); + ASSERT( blockSize > nrHistorySamples ); ASSERT( from < to ); } @@ -56,19 +57,17 @@ template<typename T> void SampleBufferReader<T>::process( double maxDelay ) const TimeStamp current(from); const size_t increment = blockSize - nrHistorySamples; - /* - for (TimeStamp current = from; current < to; current += increment) { +#if 1 + for (TimeStamp current = from; current + blockSize < to; current += increment) { // wait LOG_INFO_STR("Waiting until " << (current + maxDelay_ts) << " for " << current); waiter.waitUntil( current + maxDelay_ts ); // read LOG_INFO_STR("Reading from " << current << " to " << (current + blockSize)); - copy(current - nrHistorySamples, current - nrHistorySamples + blockSize); + sendBlock(current - nrHistorySamples, current - nrHistorySamples + blockSize); } - - LOG_INFO("Done reading data");*/ - +#else double totalwait = 0.0; unsigned totalnr = 0; @@ -81,7 +80,7 @@ template<typename T> void SampleBufferReader<T>::process( double maxDelay ) // read double bs = MPI_Wtime(); - copy(current - nrHistorySamples, current - nrHistorySamples + blockSize); + sendBlock(current - nrHistorySamples, current - nrHistorySamples + blockSize); totalwait += MPI_Wtime() - bs; totalnr++; @@ -95,6 +94,7 @@ template<typename T> void SampleBufferReader<T>::process( double maxDelay ) LOG_INFO_STR("Reading speed: " << mbps << " Mbit/s"); } } +#endif for( typename std::vector< typename SampleBuffer<T>::Board >::iterator board = buffer.boards.begin(); board != buffer.boards.end(); ++board ) { (*board).noMoreReading(); @@ -104,87 +104,69 @@ template<typename T> void SampleBufferReader<T>::process( double maxDelay ) } -template<typename T> void SampleBufferReader<T>::copy( const TimeStamp &from, const TimeStamp &to ) +template<typename T> void SampleBufferReader<T>::sendBlock( const TimeStamp &from, const TimeStamp &to ) { ASSERT( from < to ); ASSERT( to - from < (int64)buffer.nrSamples ); + // Create instructions for copying this block struct CopyInstructions info; info.from = from; info.to = to; - - // Determine the buffer offsets for all beamlets. Because beamlets can belong - // to different station beams, the offsets can differ per beamlet. - std::vector<ssize_t> beam_offsets(beamlets.size()); - std::vector<size_t> from_offsets(beamlets.size()); - std::vector<size_t> to_offsets(beamlets.size()); - std::vector<size_t> wrap_offsets(beamlets.size()); + info.beamlets.resize(beamlets.size()); for (size_t i = 0; i < beamlets.size(); ++i) { + unsigned b = beamlets[i]; + struct CopyInstructions::Beamlet &ib = info.beamlets[i]; + + // Determine the offset with which this beamlet is read (likely based on + // the relevant station beam). ssize_t offset = beamletOffset(i, from, to); - beam_offsets[i] = offset; + ib.offset = offset; // Determine the relevant offsets in the buffer - from_offsets[i] = (info.from + offset) % buffer.nrSamples; - to_offsets[i] = (info.to + offset) % buffer.nrSamples; + size_t from_offset = buffer.offset(info.from + offset); + size_t to_offset = buffer.offset(info.to + offset); - if (to_offsets[i] == 0) - to_offsets[i] = buffer.nrSamples; + if (to_offset == 0) + to_offset = buffer.nrSamples; // Determine whether we need to wrap around the end of the buffer - wrap_offsets[i] = from_offsets[i] < to_offsets[i] ? 0 : buffer.nrSamples - from_offsets[i]; - - } - - // Signal read intent on all buffers - for( typename std::vector< typename SampleBuffer<T>::Board >::iterator board = buffer.boards.begin(); board != buffer.boards.end(); ++board ) { - (*board).startRead(from, to); - } - - // Signal start of block - copyStart(from, to, wrap_offsets); - - // Copy all specified beamlets - for (size_t i = 0; i < beamlets.size(); ++i) { - unsigned nr = beamlets[i]; - const T* origin = &buffer.beamlets[nr][0]; - - ssize_t beam_offset = beam_offsets[i]; - size_t from_offset = from_offsets[i]; - size_t wrap_offset = wrap_offsets[i]; - size_t to_offset = to_offsets[i]; + size_t wrap_offset = from_offset < to_offset ? 0 : buffer.nrSamples - from_offset; - info.beamlet = i; + const T* origin = &buffer.beamlets[b][0]; if (wrap_offset > 0) { // Copy as two parts - info.nrRanges = 2; + ib.nrRanges = 2; - info.ranges[0].from = origin + from_offset; - info.ranges[0].to = origin + wrap_offset; + ib.ranges[0].from = origin + from_offset; + ib.ranges[0].to = origin + buffer.nrSamples; - info.ranges[1].from = origin; - info.ranges[1].to = origin + to_offset; + ib.ranges[1].from = origin; + ib.ranges[1].to = origin + to_offset; } else { // Copy as one part - info.nrRanges = 1; + ib.nrRanges = 1; - info.ranges[0].from = origin + from_offset; - info.ranges[0].to = origin + to_offset; + ib.ranges[0].from = origin + from_offset; + ib.ranges[0].to = origin + to_offset; } + } - // Add the flags (translate available packets to missing packets) - size_t boardIdx = settings.boardIndex(i); + // Signal read intent on all buffers + for( typename std::vector< typename SampleBuffer<T>::Board >::iterator board = buffer.boards.begin(); board != buffer.boards.end(); ++board ) { + (*board).startRead(from, to); + } - info.flags = buffer.boards[boardIdx].flags.sparseSet(from + beam_offset, to + beam_offset).invert(from + beam_offset, to + beam_offset); + // TODO: + // Add the flags (translate available packets to missing packets) + //size_t boardIdx = settings.boardIndex(i); - // Copy the beamlet - copy(info); - } + //info.flags = buffer.boards[boardIdx].flags.sparseSet(from + beam_offset, to + beam_offset).invert(from + beam_offset, to + beam_offset); - // Signal end of block - copyEnd(from, to); + sendBlock(info); // Signal end of read intent on all buffers, reserving nrHistorySamples for the // next read. @@ -193,6 +175,18 @@ template<typename T> void SampleBufferReader<T>::copy( const TimeStamp &from, co } } + +template<typename T> SparseSet<int64> SampleBufferReader<T>::flags( const struct CopyInstructions &info, unsigned beamlet ) +{ + // Determine corresponding RSP board + size_t boardIdx = settings.boardIndex(beamlet); + + ssize_t beam_offset = info.beamlets[beamlet].offset; + + // Translate available packets to missing packets. + return buffer.boards[boardIdx].flags.sparseSet(info.from + beam_offset, info.to + beam_offset).invert(info.from + beam_offset, info.to + beam_offset); +} + } } diff --git a/RTCP/Cobalt/InputProc/src/CMakeLists.txt b/RTCP/Cobalt/InputProc/src/CMakeLists.txt index 713a9d194d9..fd7c686630c 100644 --- a/RTCP/Cobalt/InputProc/src/CMakeLists.txt +++ b/RTCP/Cobalt/InputProc/src/CMakeLists.txt @@ -12,10 +12,10 @@ lofar_add_library(inputproc Buffer/SharedMemory.cc Buffer/StationID.cc Station/Generator.cc + Station/PacketFactory.cc Station/PacketReader.cc Station/PacketsToBuffer.cc ) -lofar_add_bin_program(newInputSection newInputSection.cc) lofar_add_bin_program(filterRSP Station/filterRSP.cc) lofar_add_bin_program(printRSP Station/printRSP.cc) diff --git a/RTCP/Cobalt/InputProc/src/RSPBoards.cc b/RTCP/Cobalt/InputProc/src/RSPBoards.cc index 4eeaf443423..25bc92d10bb 100644 --- a/RTCP/Cobalt/InputProc/src/RSPBoards.cc +++ b/RTCP/Cobalt/InputProc/src/RSPBoards.cc @@ -46,7 +46,7 @@ namespace LOFAR void RSPBoards::process() { // References to all threads that will need aborting - std::vector<OMPThread> threads(nrBoards * 2); + std::vector<OMPThread> threads(nrBoards + 1); ASSERT(nrBoards > 0); @@ -77,23 +77,21 @@ namespace LOFAR // Log threads # pragma omp section { - // start all log statistics - LOG_INFO_STR( logPrefix << "Starting all log statistics" ); -# pragma omp parallel for num_threads(nrBoards) - for (size_t i = 0; i < nrBoards; ++i) { - OMPThread::ScopedRun sr(threads[i + nrBoards]); + // start log statistics + LOG_INFO_STR( logPrefix << "Starting log statistics" ); - try { - for(;; ) { - if (usleep(999999) == -1 && errno == EINTR) - // got killed - break; + OMPThread::ScopedRun sr(threads[0 + nrBoards]); - logStatistics(); - } - } catch(Exception &ex) { - LOG_ERROR_STR("Caught exception: " << ex); + try { + for(;; ) { + if (usleep(999999) == -1 && errno == EINTR) + // got killed + break; + + logStatistics(); } + } catch(Exception &ex) { + LOG_ERROR_STR("Caught exception: " << ex); } } diff --git a/RTCP/Cobalt/InputProc/src/Station/Generator.cc b/RTCP/Cobalt/InputProc/src/Station/Generator.cc index afa258b6629..05b4c3a5f49 100644 --- a/RTCP/Cobalt/InputProc/src/Station/Generator.cc +++ b/RTCP/Cobalt/InputProc/src/Station/Generator.cc @@ -36,55 +36,17 @@ namespace LOFAR namespace Cobalt { - Generator::Generator( const BufferSettings &settings, const std::vector<std::string> &streamDescriptors ) + Generator::Generator( const BufferSettings &settings, const std::vector<std::string> &streamDescriptors, PacketFactory &packetFactory ) : RSPBoards(str(boost::format("[station %s %s] [Generator] ") % settings.station.stationName % settings.station.antennaField), streamDescriptors.size()), settings(settings), streamDescriptors(streamDescriptors), + packetFactory(packetFactory), nrSent(nrBoards, 0) { LOG_INFO_STR( logPrefix << "Initialised" ); } - void Generator::makePacket( size_t boardNr, struct RSP &packet, const TimeStamp ×tamp ) - { - // configure the packet header - packet.header.version = 3; // we emulate BDI 6.0 - - packet.header.sourceInfo1 = - (boardNr & 0x1F) | (settings.station.clockMHz == 200 ? 1 << 7 : 0); - - switch (settings.station.bitMode) { - case 16: - packet.header.sourceInfo2 = 0; - break; - - case 8: - packet.header.sourceInfo2 = 1; - break; - - case 4: - packet.header.sourceInfo2 = 2; - break; - } - - packet.header.nrBeamlets = settings.nrBeamletsPerBoard; - packet.header.nrBlocks = 16; - - packet.header.timestamp = timestamp.getSeqId(); - packet.header.blockSequenceNumber = timestamp.getBlockId(); - - // insert data that is different for each packet - int64 data = timestamp; - - memset(packet.payload.data, data & 0xFF, sizeof packet.payload.data); - - // verify whether the packet really reflects what we intended - ASSERT(packet.rspBoard() == boardNr); - ASSERT(packet.payloadError() == false); - ASSERT(packet.bitMode() == settings.station.bitMode); - ASSERT(packet.clockMHz() == settings.station.clockMHz); - } void Generator::processBoard( size_t nr ) { @@ -101,9 +63,7 @@ namespace LOFAR struct RSP packet; // generate packet - makePacket( nr, packet, current ); - - ASSERT(packet.packetSize() <= sizeof packet); + packetFactory.makePacket( packet, current, nr ); // wait until it is due if (!waiter.waitUntil(current)) diff --git a/RTCP/Cobalt/InputProc/src/Station/Generator.h b/RTCP/Cobalt/InputProc/src/Station/Generator.h index 55812737403..51a5df97551 100644 --- a/RTCP/Cobalt/InputProc/src/Station/Generator.h +++ b/RTCP/Cobalt/InputProc/src/Station/Generator.h @@ -30,6 +30,7 @@ #include <RSPBoards.h> #include <Buffer/BufferSettings.h> +#include "PacketFactory.h" #include "RSP.h" namespace LOFAR @@ -42,18 +43,17 @@ namespace LOFAR class Generator : public RSPBoards { public: - Generator( const BufferSettings &settings, const std::vector<std::string> &streamDescriptors ); + Generator( const BufferSettings &settings, const std::vector<std::string> &streamDescriptors, PacketFactory &packetFactory ); protected: const BufferSettings settings; const std::vector<std::string> streamDescriptors; + PacketFactory &packetFactory; std::vector<size_t> nrSent; virtual void processBoard( size_t nr ); virtual void logStatistics(); - - virtual void makePacket( size_t boardNr, struct RSP &packet, const TimeStamp ×tamp ); }; } diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketFactory.cc b/RTCP/Cobalt/InputProc/src/Station/PacketFactory.cc new file mode 100644 index 00000000000..5f8579b963b --- /dev/null +++ b/RTCP/Cobalt/InputProc/src/Station/PacketFactory.cc @@ -0,0 +1,100 @@ +/* PacketFactory.cc + * Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) + * P.O. Box 2, 7990 AA Dwingeloo, The Netherlands + * + * This file is part of the LOFAR software suite. + * The LOFAR software suite is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The LOFAR software suite is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + * + * $Id: $ + */ + +#include <lofar_config.h> + +#include "PacketFactory.h" + +#include <string.h> + +namespace LOFAR +{ + namespace Cobalt + { + + PacketFactory::PacketFactory( const BufferSettings &settings ) + : + settings(settings) + { + } + + PacketFactory::~PacketFactory() + { + } + + + void PacketFactory::makeHeader( struct RSP &packet, const TimeStamp ×tamp, size_t boardNr ) + { + // configure the packet header + packet.header.version = 3; // we emulate BDI 6.0 + + packet.header.sourceInfo1 = + (boardNr & 0x1F) | (settings.station.clockMHz == 200 ? 1 << 7 : 0); + + switch (settings.station.bitMode) { + case 16: + packet.header.sourceInfo2 = 0; + break; + + case 8: + packet.header.sourceInfo2 = 1; + break; + + case 4: + packet.header.sourceInfo2 = 2; + break; + } + + packet.header.nrBeamlets = settings.nrBeamletsPerBoard; + packet.header.nrBlocks = 16; + + packet.header.timestamp = timestamp.getSeqId(); + packet.header.blockSequenceNumber = timestamp.getBlockId(); + + // verify whether the packet really reflects what we intended + ASSERT(packet.rspBoard() == boardNr); + ASSERT(packet.payloadError() == false); + ASSERT(packet.bitMode() == settings.station.bitMode); + ASSERT(packet.clockMHz() == settings.station.clockMHz); + + // verify that the packet has a valid size + ASSERT(packet.packetSize() <= sizeof packet); + } + + + + void PacketFactory::makePayload( struct RSP &packet ) + { + // insert data that is different for each packet + int64 data = packet.timeStamp(); + + memset(packet.payload.data, data & 0xFF, sizeof packet.payload.data); + } + + + void PacketFactory::makePacket( struct RSP &packet, const TimeStamp ×tamp, size_t boardNr ) + { + makeHeader(packet, timestamp, boardNr); + makePayload(packet); + } + } +} + diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketFactory.h b/RTCP/Cobalt/InputProc/src/Station/PacketFactory.h new file mode 100644 index 00000000000..e08e5e96258 --- /dev/null +++ b/RTCP/Cobalt/InputProc/src/Station/PacketFactory.h @@ -0,0 +1,65 @@ +/* PacketFactory.h + * Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) + * P.O. Box 2, 7990 AA Dwingeloo, The Netherlands + * + * This file is part of the LOFAR software suite. + * The LOFAR software suite is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The LOFAR software suite is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + * + * $Id: $ + */ + +#ifndef LOFAR_INPUT_PROC_PACKETFACTORY_H +#define LOFAR_INPUT_PROC_PACKETFACTORY_H + +#include <Buffer/BufferSettings.h> +#include <CoInterface/RSPTimeStamp.h> + +#include "RSP.h" + +namespace LOFAR +{ + namespace Cobalt + { + /* Generate RSP packets */ + + class PacketFactory + { + public: + PacketFactory( const BufferSettings &settings ); + virtual ~PacketFactory(); + + /* + * Fill an RSP packet for a certain RSP board and time stamp. + */ + virtual void makePacket( struct RSP &packet, const TimeStamp ×tamp, size_t boardNr); + + protected: + const BufferSettings &settings; + + /* + * Fill packet.header. + */ + virtual void makeHeader( struct RSP &packet, const TimeStamp ×tamp, size_t boardNr); + + /* + * Fill packet.payload. Called after makeHeader(). + */ + virtual void makePayload( struct RSP &packet ); + }; + + } +} + +#endif + diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketWriter.tcc b/RTCP/Cobalt/InputProc/src/Station/PacketWriter.tcc index 471f938db97..60e2dfb132e 100644 --- a/RTCP/Cobalt/InputProc/src/Station/PacketWriter.tcc +++ b/RTCP/Cobalt/InputProc/src/Station/PacketWriter.tcc @@ -68,8 +68,8 @@ template<typename T> void PacketWriter<T>::writePacket( const struct RSP &packet const TimeStamp end = begin + nrTimeslots; // determine the time span when cast on the buffer - const size_t from_offset = (int64)begin % settings.nrSamples; - size_t to_offset = ((int64)end) % settings.nrSamples; + const size_t from_offset = buffer.offset(begin); + size_t to_offset = buffer.offset(end); if (to_offset == 0) to_offset = settings.nrSamples; diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPITransferStations.h b/RTCP/Cobalt/InputProc/src/Transpose/MPITransferStations.h index a382e0e7ea4..53e767fad03 100644 --- a/RTCP/Cobalt/InputProc/src/Transpose/MPITransferStations.h +++ b/RTCP/Cobalt/InputProc/src/Transpose/MPITransferStations.h @@ -33,6 +33,10 @@ #include <Buffer/SampleBufferReader.h> #include <Buffer/BufferSettings.h> +#include <map> +#include <set> +#include <vector> + namespace LOFAR { namespace Cobalt @@ -45,7 +49,7 @@ namespace LOFAR class MPISendStation : public SampleBufferReader<T> { public: - MPISendStation( const struct BufferSettings &settings, const TimeStamp &from, const TimeStamp &to, size_t blockSize, size_t nrHistorySamples, const std::vector<size_t> &beamlets, unsigned destRank ); + MPISendStation( const struct BufferSettings &settings, const TimeStamp &from, const TimeStamp &to, size_t blockSize, size_t nrHistorySamples, const std::map<size_t, int> &beamletDistribution ); struct Header { StationID station; @@ -73,21 +77,22 @@ namespace LOFAR enum tag_types { CONTROL = 0, BEAMLET = 1, FLAGS = 2 }; protected: - const unsigned destRank; - - std::vector<MPI_Request> requests; - size_t nrRequests; - - Matrix<char> metaData; // [beamlet][data] - - virtual void copyStart( const TimeStamp &from, const TimeStamp &to, const std::vector<size_t> &wrapOffsets ); - virtual void copy( const struct SampleBufferReader<T>::CopyInstructions &info ); - virtual void copyEnd( const TimeStamp &from, const TimeStamp &to ); + virtual void sendBlock( const struct SampleBufferReader<T>::CopyInstructions &info ); size_t metaDataSize() const { return sizeof(uint32_t) + this->settings.nrFlagRanges * sizeof(int64) * 2; } + + private: + const std::string logPrefix; + const std::map<size_t, int> beamletDistribution; + const std::set<int> targetRanks; + const std::map<int, std::vector<size_t> > beamletsOfTarget; + + MPI_Request sendHeader( int rank, Header &header, const struct SampleBufferReader<T>::CopyInstructions &info ); + void sendData( int rank, unsigned beamlet, const struct SampleBufferReader<T>::CopyInstructions::Beamlet &ib ); + void sendFlags( int rank, unsigned beamlet, const SparseSet<int64> &flags ); }; @@ -103,7 +108,7 @@ namespace LOFAR class MPIReceiveStations { public: - MPIReceiveStations( const struct BufferSettings &settings, const std::vector<int> stationRanks, const std::vector<size_t> &beamlets, size_t blockSize ); + MPIReceiveStations( const std::vector<int> stationRanks, const std::vector<size_t> &beamlets, size_t blockSize ); struct Block { MultiDimArray<T, 2> samples; // [beamlet][sample] @@ -116,12 +121,19 @@ namespace LOFAR void receiveBlock(); private: - const struct BufferSettings settings; + const std::string logPrefix; const std::vector<int> stationRanks; public: const std::vector<size_t> beamlets; const size_t blockSize; + + MPI_Request receiveHeader( int rank, struct MPISendStation<T>::Header &header ); + MPI_Request receiveBeamlet( int rank, size_t beamlet, int transfer, T *from, size_t nrSamples ); + MPI_Request receiveFlags( int rank, size_t beamlet, std::vector<char> &buffer ); + + int waitAny( std::vector<MPI_Request> &requests ); + void waitAll( std::vector<MPI_Request> &requests ); }; diff --git a/RTCP/Cobalt/InputProc/src/Transpose/MPITransferStations.tcc b/RTCP/Cobalt/InputProc/src/Transpose/MPITransferStations.tcc index 30d5cdd741c..9d4f7a55ab0 100644 --- a/RTCP/Cobalt/InputProc/src/Transpose/MPITransferStations.tcc +++ b/RTCP/Cobalt/InputProc/src/Transpose/MPITransferStations.tcc @@ -22,128 +22,434 @@ #include <pthread.h> #include <Common/LofarLogger.h> +#include <Common/Singleton.h> #include <Common/Thread/Mutex.h> +#include <Common/Thread/Condition.h> +#include <Common/Thread/Semaphore.h> + +#include <boost/format.hpp> + +// Send headers asynchroneously (disable for debugging) +#define SEND_HEADERS_ASYNC + +using namespace std; namespace LOFAR { namespace Cobalt { Mutex MPIMutex; +/* + * + * The MPISendStation object sends all beamlets from + * one station to all receiver nodes. + * + * The following data flow is observed. Blocks are + * sent sequentially. + * + * |-- BEAMLET 0 -- FLAGS 0 --| + * | + * HEADER --|-- BEAMLET 1 -- FLAGS 1 --| + * | + * |-- BEAMLET 2 -- FLAGS 2 --| + * + * One header is sent per receiver node per block. + */ + +namespace ThreadSafeMPI { +/* + * MPIRequestManager keeps track of a set of outstanding + * MPI requests across threads. Any thread can call + * wait(request) + * which will return once `request' has completed. + * + * The process() function must be running for the request + * manager to work, and can be stopped through the stop() + * function. + */ -template<typename T> MPISendStation<T>::MPISendStation( const struct BufferSettings &settings, const TimeStamp &from, const TimeStamp &to, size_t blockSize, size_t nrHistorySamples, const std::vector<size_t> &beamlets, unsigned destRank ) -: - SampleBufferReader<T>(settings, beamlets, from, to, blockSize, nrHistorySamples), - destRank(destRank), - requests(1 + beamlets.size() * 3, 0), // apart from the header, at most three transfers per beamlet: one or two for the samples, plus one for the flags - nrRequests(0), - metaData(this->buffer.nrBoards, metaDataSize()) +class MPIRequestManager { +public: + MPIRequestManager(): done(false) {} + ~MPIRequestManager() { stop(); } + + void stop() { + done = true; + + // unlock waits for request + requestAdded.signal(); + } + + bool wait( const MPI_Request &request ) { + Semaphore doneWaiting; + + addRequest(request, &doneWaiting); + + // Wait for request to finish + return doneWaiting.down(); + } + + // Note: if a request is put in the background, its + // data STILL needs to stay in scope until the transfer + // finishes. + void waitInBackground( const MPI_Request &request ) { + addRequest(request, NULL); + } + + void process() { + while(!done) { + // don't occupy CPU indefinitely + pthread_yield(); + + // wait for at least one pending request + { + ScopedLock sl(requestsMutex); + + while(!done && empty()) { + requestAdded.wait(requestsMutex); + } + } + + // handle any finished request + if (!empty()) { + handleAny(); + } + } + } + +private: + volatile bool done; + + Mutex requestsMutex; + Condition requestAdded; + + map<MPI_Request, Semaphore*> requests; + + bool empty() const { + return requests.empty(); + } + + void addRequest( const MPI_Request &request, Semaphore *semaphore ) { + ScopedLock sl(requestsMutex); + + // MPI_REQUEST_NULL is used to signal a completed request. + ASSERT(request != MPI_REQUEST_NULL); + + // Request may not already be present. + ASSERT(requests.find(request) == requests.end()); + + requests[request] = semaphore; + + // Signal that a request has been added + requestAdded.signal(); + } + + void handleAny() { + ScopedLock sl(MPIMutex); + + // Convert the requests map to a vector of request identifiers + std::vector<MPI_Request> ids; + + { + ScopedLock sl(requestsMutex); + + ids.reserve(requests.size()); + for(map<MPI_Request, Semaphore*>::const_iterator i = requests.begin(); i != requests.end(); ++i) { + ids.push_back(i->first); + } + } + + // MPI_Testany wants something to test + ASSERT(ids.size() > 0); + + // Test if any request has finished + std::vector<int> readyIds(ids.size()); + int readyCount; + + // NOTE: MPI_Testsome sets a completed request to MPI_REQUEST_NULL in the + // ids array! So we need to create a copy in order to lookup the original + // MPI_Request. + std::vector<MPI_Request> ids_copy(ids); + int error = ::MPI_Testsome(ids_copy.size(), &ids_copy[0], &readyCount, &readyIds[0], MPI_STATUS_IGNORE); + ASSERT(error == MPI_SUCCESS); + + ASSERT(readyCount != MPI_UNDEFINED); + + for (int i = 0; i < readyCount; ++i) { + int readyIdx = readyIds[i]; + + ASSERT(readyIdx != MPI_UNDEFINED); + + // A request is finished. Remove it from the map and raise the + // associated Semaphore. + ScopedLock sl(requestsMutex); + + Semaphore *result = requests.at(ids[readyIdx]); + requests.erase(ids[readyIdx]); + + if (result) { + // Wake up client + result->up(); + } + } + } +}; + +/* + * Send [ptr, ptr + numBytes) to destRank. + */ +MPI_Request MPI_Isend(void *ptr, size_t numBytes, int destRank, int tag) { + ASSERT(tag >= 0); // Silly MPI requirement + + MPI_Request request; + + { + ScopedLock sl(MPIMutex); + + int error; + + error = ::MPI_Isend(ptr, numBytes, MPI_CHAR, destRank, tag, MPI_COMM_WORLD, &request); + ASSERT(error == MPI_SUCCESS); + } + + return request; +} + +/* + * Receive [ptr, ptr + numBytes) from destRank. + */ +MPI_Request MPI_Irecv(void *ptr, size_t numBytes, int destRank, int tag) { + ASSERT(tag >= 0); // Silly MPI requirement + + MPI_Request request; + + { + ScopedLock sl(MPIMutex); + + int error; + + error = ::MPI_Irecv(ptr, numBytes, MPI_CHAR, destRank, tag, MPI_COMM_WORLD, &request); + ASSERT(error == MPI_SUCCESS); + } + + return request; +} + +void MPI_Wait(const MPI_Request &request) { + Singleton<MPIRequestManager>::instance().wait(request); +} + +void MPI_Send(void *ptr, size_t numBytes, int destRank, int tag) { + MPI_Request request = MPI_Isend(ptr, numBytes, destRank, tag); + + MPI_Wait(request); +} + +void MPI_Recv(void *ptr, size_t numBytes, int destRank, int tag) { + MPI_Request request = MPI_Irecv(ptr, numBytes, destRank, tag); + + MPI_Wait(request); +} + +}; + + +// Returns the keys of an std::map. +template<typename K, typename V> std::vector<K> keys( const std::map<K, V> &m ) { + std::vector<K> keys; + + keys.reserve(m.size()); + for (typename std::map<K,V>::const_iterator i = m.begin(); i != m.end(); ++i) { + keys.push_back(i->first); + } + + return keys; } -template<typename T> void MPISendStation<T>::copyStart( const TimeStamp &from, const TimeStamp &to, const std::vector<size_t> &wrapOffsets ) +// Returns the set of unique values of an std::map. +template<typename K, typename V> std::set<V> values( const std::map<K, V> &m ) { - Header header; + std::set<V> values; + + for (typename std::map<K,V>::const_iterator i = m.begin(); i != m.end(); ++i) { + values.insert(i->second); + } + + return values; +} + + +// Returns the inverse of an std::map. +template<typename K, typename V> std::map<V, std::vector<K> > inverse( const std::map<K, V> &m ) +{ + std::map<V, std::vector<K> > inverse; + + for (typename std::map<K,V>::const_iterator i = m.begin(); i != m.end(); ++i) { + inverse[i->second].push_back(i->first); + } + + return inverse; +} + + +template<typename T> MPISendStation<T>::MPISendStation( const struct BufferSettings &settings, const TimeStamp &from, const TimeStamp &to, size_t blockSize, size_t nrHistorySamples, const std::map<size_t, int> &beamletDistribution ) +: + SampleBufferReader<T>(settings, keys(beamletDistribution), from, to, blockSize, nrHistorySamples), + logPrefix(str(boost::format("[station %s] [MPISendStation] ") % settings.station.stationName)), + beamletDistribution(beamletDistribution), + targetRanks(values(beamletDistribution)), + beamletsOfTarget(inverse(beamletDistribution)) +{ + LOG_INFO_STR(logPrefix << "Initialised"); +} + +template<typename T> MPI_Request MPISendStation<T>::sendHeader( int rank, Header &header, const struct SampleBufferReader<T>::CopyInstructions &info ) +{ + LOG_DEBUG_STR(logPrefix << "Sending header to rank " << rank); + + const std::vector<size_t> &beamlets = beamletsOfTarget.at(rank); // Copy static information header.station = this->settings.station; - header.from = from; - header.to = to; - header.nrBeamlets = this->beamlets.size(); + header.from = info.from; + header.to = info.to; + header.nrBeamlets = beamlets.size(); header.metaDataSize = this->metaDataSize(); // Copy the wrapOffsets - ASSERT(wrapOffsets.size() * sizeof wrapOffsets[0] <= sizeof header.wrapOffsets); - memcpy(&header.wrapOffsets[0], &wrapOffsets[0], wrapOffsets.size() * sizeof wrapOffsets[0]); + ASSERT(beamlets.size() <= sizeof header.wrapOffsets / sizeof header.wrapOffsets[0]); - { - ScopedLock sl(MPIMutex); - - int error = MPI_Isend(&header, sizeof header, MPI_CHAR, destRank, 0, MPI_COMM_WORLD, &requests[nrRequests++]); + for(unsigned beamlet = 0; beamlet < beamlets.size(); ++beamlet) { + const struct SampleBufferReader<T>::CopyInstructions::Beamlet &ib = info.beamlets[beamlets[beamlet]]; - ASSERT(error == MPI_SUCCESS); - } + header.wrapOffsets[beamlet] = ib.nrRanges == 1 ? 0 : ib.ranges[0].to - ib.ranges[0].from; + } - //LOG_INFO( "Header sent" ); + // Send the actual header + union tag_t tag; + tag.bits.type = CONTROL; + +#ifdef SEND_HEADERS_ASYNC + return ThreadSafeMPI::MPI_Isend(&header, sizeof header, rank, tag.value); +#else + ThreadSafeMPI::MPI_Send(&header, sizeof header, rank, tag.value); + return 0; +#endif } - -template<typename T> void MPISendStation<T>::copy( const struct SampleBufferReader<T>::CopyInstructions &info ) +template<typename T> void MPISendStation<T>::sendData( int rank, unsigned beamlet, const struct SampleBufferReader<T>::CopyInstructions::Beamlet &ib ) { - ScopedLock sl(MPIMutex); + LOG_DEBUG_STR(logPrefix << "Sending beamlet " << beamlet << " to rank " << rank << " using " << ib.nrRanges << " transfers"); - // Send beamlet - for(unsigned transfer = 0; transfer < info.nrRanges; ++transfer) { + // Send beamlet using 1 or 2 transfers +# pragma omp parallel for num_threads(ib.nrRanges) + for(unsigned transfer = 0; transfer < ib.nrRanges; ++transfer) { union tag_t tag; tag.bits.type = BEAMLET; - tag.bits.beamlet = info.beamlet; + tag.bits.beamlet = beamlet; tag.bits.transfer = transfer; - ASSERT(tag.value >= 0); // Silly MPI requirement - const T *from = info.ranges[transfer].from; - const T *to = info.ranges[transfer].to; + const T *from = ib.ranges[transfer].from; + const T *to = ib.ranges[transfer].to; - int error = MPI_Isend( - (void*)from, (to - from) * sizeof(T), MPI_CHAR, - destRank, tag.value, - MPI_COMM_WORLD, &requests[nrRequests++]); + ASSERT( from < to ); // There must be data to send, or MPI will error - ASSERT(error == MPI_SUCCESS); + ThreadSafeMPI::MPI_Send((void*)from, (to - from) * sizeof(T), rank, tag.value); } +} + +template<typename T> void MPISendStation<T>::sendFlags( int rank, unsigned beamlet, const SparseSet<int64> &flags ) +{ + //LOG_DEBUG_STR("Sending flags to rank " << rank); - // Send flags - ssize_t numBytes = info.flags.marshall(&metaData[info.beamlet][0], metaDataSize()); + std::vector<char> metaData(metaDataSize()); + ssize_t numBytes = flags.marshall(&metaData[0], metaData.size()); ASSERT(numBytes >= 0); union tag_t tag; - tag.bits.type = FLAGS; - tag.bits.beamlet = info.beamlet; - ASSERT(tag.value >= 0); // Silly MPI requirement - - int error = MPI_Isend( - (void*)&metaData[info.beamlet][0], metaDataSize(), MPI_CHAR, - destRank, tag.value, - MPI_COMM_WORLD, &requests[nrRequests++]); + tag.bits.beamlet = beamlet; - ASSERT(error == MPI_SUCCESS); + ThreadSafeMPI::MPI_Send(&metaData[0], metaData.size(), rank, tag.value); } -template<typename T> void MPISendStation<T>::copyEnd( const TimeStamp &from, const TimeStamp &to ) +template<typename T> void MPISendStation<T>::sendBlock( const struct SampleBufferReader<T>::CopyInstructions &info ) { - (void)from; (void)to; + /* + * SEND HEADER (ASYNC) + */ - int alldone = false; - std::vector<MPI_Status> statusses(nrRequests); + std::map<int, Header> headers; + std::map<int, MPI_Request> headerRequests; - // Poll until all transfers are finished. Note that we can't hold the - // MPIMutex lock, because multiple MPISendStation objects might be active. - while (!alldone) { - { - ScopedLock sl(MPIMutex); + // No need for parallellisation since we only post the sends + for(std::set<int>::const_iterator i = targetRanks.begin(); i != targetRanks.end(); ++i) { + int rank = *i; + + headerRequests[rank] = sendHeader(rank, headers[rank], info); + } + + /* + * SEND PAYLOAD + */ + + // Send beamlets to all nodes in parallel, so for each beamlet the flags + // can be send immediately after. +# pragma omp parallel for num_threads(info.beamlets.size()) + for(size_t beamlet = 0; beamlet < info.beamlets.size(); ++beamlet) { + const struct SampleBufferReader<T>::CopyInstructions::Beamlet &ib = info.beamlets[beamlet]; + const int rank = beamletDistribution.at(beamlet); - int error = MPI_Testall(nrRequests, &requests[0], &alldone, &statusses[0]); + /* + * OBTAIN FLAGS BEFORE DATA IS SENT + */ - ASSERT(error == MPI_SUCCESS); - } + SparseSet<int64> flagsBefore = flags(info, beamlet); + + /* + * SEND BEAMLETS + */ - // can't hold lock indefinitely - pthread_yield(); + sendData(rank, beamlet, ib); + + /* + * OBTAIN FLAGS AFTER DATA IS SENT + */ + + SparseSet<int64> flagsAfter = flags(info, beamlet); + + /* + * SEND FLAGS + */ + + // The only valid samples are those that existed both + // before and after the transfer. + sendFlags(rank, beamlet, flagsBefore & flagsAfter); } - //LOG_INFO( "Copy done"); + /* + * WRAP UP ASYNC HEADER SEND + */ - nrRequests = 0; + // No need for parallellisation, because all sends should be done by now. We do + // need to make sure though, because the Headers will soon go out of scope. +#ifdef SEND_HEADERS_ASYNC + for(std::map<int, MPI_Request>::const_iterator i = headerRequests.begin(); i != headerRequests.end(); ++i) { + ThreadSafeMPI::MPI_Wait(i->second); + } +#endif } -template<typename T> MPIReceiveStations<T>::MPIReceiveStations( const struct BufferSettings &settings, const std::vector<int> stationRanks, const std::vector<size_t> &beamlets, size_t blockSize ) +template<typename T> MPIReceiveStations<T>::MPIReceiveStations( const std::vector<int> stationRanks, const std::vector<size_t> &beamlets, size_t blockSize ) : lastBlock(stationRanks.size()), - settings(settings), + logPrefix(str(boost::format("[beamlets %u..%u (%u)] [MPIReceiveStations] ") % beamlets[0] % beamlets[beamlets.size()-1] % beamlets.size())), stationRanks(stationRanks), beamlets(beamlets), blockSize(blockSize) @@ -155,108 +461,167 @@ template<typename T> MPIReceiveStations<T>::MPIReceiveStations( const struct Buf } -template<typename T> void MPIReceiveStations<T>::receiveBlock() +template<typename T> MPI_Request MPIReceiveStations<T>::receiveHeader( int rank, struct MPISendStation<T>::Header &header ) { - int error; + MPI_Request request; + + typename MPISendStation<T>::tag_t tag; + + // receive the header + tag.bits.type = MPISendStation<T>::CONTROL; + ASSERT(tag.value >= 0); // Silly MPI requirement + +#ifdef SEND_HEADERS_ASYNC + int error = MPI_Irecv(&header, sizeof header, MPI_CHAR, rank, tag.value, MPI_COMM_WORLD, &request); + ASSERT(error == MPI_SUCCESS); + + return request; +#else + (void)request; + int error = MPI_Recv(&header, sizeof header, MPI_CHAR, rank, tag.value, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + ASSERT(error == MPI_SUCCESS); + + return 0; +#endif +} + +template<typename T> MPI_Request MPIReceiveStations<T>::receiveBeamlet( int rank, size_t beamlet, int transfer, T *from, size_t nrSamples ) +{ + MPI_Request request; + + typename MPISendStation<T>::tag_t tag; + tag.bits.type = MPISendStation<T>::BEAMLET; + tag.bits.beamlet = beamlet; + tag.bits.transfer = transfer; + ASSERT(tag.value >= 0); // Silly MPI requirement + + int error = MPI_Irecv( + from, nrSamples * sizeof(T), MPI_CHAR, + rank, tag.value, + MPI_COMM_WORLD, &request); + + ASSERT(error == MPI_SUCCESS); + + return request; +} + + +template<typename T> MPI_Request MPIReceiveStations<T>::receiveFlags( int rank, size_t beamlet, std::vector<char> &buffer ) +{ + MPI_Request request; + + typename MPISendStation<T>::tag_t tag; + tag.bits.type = MPISendStation<T>::FLAGS; + tag.bits.beamlet = beamlet; + ASSERT(tag.value >= 0); // Silly MPI requirement + + int error = MPI_Irecv( + &buffer[0], buffer.size(), MPI_CHAR, + rank, tag.value, + MPI_COMM_WORLD, &request); + + ASSERT(error == MPI_SUCCESS); + + return request; +} + + +template<typename T> int MPIReceiveStations<T>::waitAny( std::vector<MPI_Request> &requests ) +{ + int idx; + + // Wait for any header request to finish + // NOTE: MPI_Waitany will overwrite completed entries with + // MPI_REQUEST_NULL. + int error = MPI_Waitany(requests.size(), &requests[0], &idx, MPI_STATUS_IGNORE); + ASSERT(error == MPI_SUCCESS); + + ASSERT(idx != MPI_UNDEFINED); + ASSERT(requests[idx] == MPI_REQUEST_NULL); + + return idx; +} + + +template<typename T> void MPIReceiveStations<T>::waitAll( std::vector<MPI_Request> &requests ) +{ + if (requests.size() > 0) { + // NOTE: MPI_Waitall will write MPI_REQUEST_NULL into requests array. + int error = MPI_Waitall(requests.size(), &requests[0], MPI_STATUS_IGNORE); + ASSERT(error == MPI_SUCCESS); + } +} + + +template<typename T> void MPIReceiveStations<T>::receiveBlock() +{ // All requests except the headers - std::vector<MPI_Request> requests(beamlets.size() * 3 * stationRanks.size()); + std::vector<MPI_Request> requests(beamlets.size() * 3 * stationRanks.size(), MPI_REQUEST_NULL); size_t nrRequests = 0; // Post receives for all headers - std::vector<MPI_Request> header_requests(stationRanks.size()); + std::vector<MPI_Request> header_requests(stationRanks.size(), MPI_REQUEST_NULL); std::vector<struct MPISendStation<T>::Header> headers(stationRanks.size()); for (size_t stat = 0; stat < stationRanks.size(); ++stat) { - typename MPISendStation<T>::tag_t tag; + LOG_DEBUG_STR(logPrefix << "Posting receive for header from rank " << stationRanks[stat]); // receive the header - tag.bits.type = MPISendStation<T>::CONTROL; - ASSERT(tag.value >= 0); // Silly MPI requirement - - error = MPI_Irecv(&headers[stat], sizeof headers[stat], MPI_CHAR, stationRanks[stat], tag.value, MPI_COMM_WORLD, &header_requests[stat]); - ASSERT(error == MPI_SUCCESS); + header_requests[stat] = receiveHeader(stationRanks[stat], headers[stat]); } // Process stations in the order in which we receive the headers Matrix< std::vector<char> > metaData(stationRanks.size(), beamlets.size()); // [station][beamlet][data] for (size_t i = 0; i < stationRanks.size(); ++i) { - int stat; - /* * For each station, receive its header, and post the relevant sample and * flag Irecvs. */ - // Wait for any header request to finish - error = MPI_Waitany(header_requests.size(), &header_requests[0], &stat, MPI_STATUS_IGNORE); - ASSERT(error == MPI_SUCCESS); + LOG_DEBUG_STR(logPrefix << "Waiting for headers"); + // Wait for any header request to finish +#ifdef SEND_HEADERS_ASYNC + int stat = waitAny(header_requests); +#else + int stat = i; +#endif int rank = stationRanks[stat]; // Check the header const struct MPISendStation<T>::Header &header = headers[stat]; + LOG_DEBUG_STR(logPrefix << "Received header from rank " << rank); + ASSERT(header.to - header.from == (int64)blockSize); - ASSERT(header.nrBeamlets == beamlets.size()); + ASSERTSTR(header.nrBeamlets == beamlets.size(), "Got " << header.nrBeamlets << " beamlets, but expected " << beamlets.size()); // Post receives for the samples - for (size_t beamlet = 0; beamlet < header.nrBeamlets; ++beamlet) { - const size_t wrapOffset = header.wrapOffsets[beamlet]; + for (size_t beamletIdx = 0; beamletIdx < header.nrBeamlets; ++beamletIdx) { + const size_t beamlet = beamlets[beamletIdx]; + const size_t wrapOffset = header.wrapOffsets[beamletIdx]; - typename MPISendStation<T>::tag_t tag; - tag.bits.type = MPISendStation<T>::BEAMLET; - tag.bits.beamlet = beamlet; + LOG_DEBUG_STR(logPrefix << "Receiving beamlet " << beamlet << " from rank " << rank << " using " << (wrapOffset > 0 ? 2 : 1) << " transfers"); // First sample transfer - tag.bits.transfer = 0; - ASSERT(tag.value >= 0); // Silly MPI requirement - - error = MPI_Irecv( - &lastBlock[stat].samples[beamlet][0], sizeof(T) * (wrapOffset ? wrapOffset : blockSize), MPI_CHAR, - rank, tag.value, - MPI_COMM_WORLD, &requests[nrRequests++]); - - ASSERT(error == MPI_SUCCESS); + requests[nrRequests++] = receiveBeamlet(rank, beamlet, 0, &lastBlock[stat].samples[beamletIdx][0], wrapOffset ? wrapOffset : blockSize); // Second sample transfer if (wrapOffset > 0) { - tag.bits.transfer = 1; - ASSERT(tag.value >= 0); // Silly MPI requirement - - error = MPI_Irecv( - &lastBlock[stat].samples[beamlet][wrapOffset], sizeof(T) * (blockSize - wrapOffset), MPI_CHAR, - rank, tag.value, - MPI_COMM_WORLD, &requests[nrRequests++]); - - ASSERT(error == MPI_SUCCESS); + requests[nrRequests++] = receiveBeamlet(rank, beamlet, 1, &lastBlock[stat].samples[beamletIdx][wrapOffset], blockSize - wrapOffset); } // Flags transfer - tag.value = 0; // reset - tag.bits.type = MPISendStation<T>::FLAGS; - tag.bits.beamlet = beamlet; - ASSERT(tag.value >= 0); // Silly MPI requirement - - metaData[stat][beamlet].resize(header.metaDataSize); - - error = MPI_Irecv( - &metaData[stat][0][0], header.metaDataSize, MPI_CHAR, - rank, tag.value, - MPI_COMM_WORLD, &requests[nrRequests++]); - - ASSERT(error == MPI_SUCCESS); + metaData[stat][beamletIdx].resize(header.metaDataSize); + requests[nrRequests++] = receiveFlags(rank, beamlet, metaData[stat][beamletIdx]); } } // Wait for all transfers to finish - if (nrRequests > 0) { - std::vector<MPI_Status> statusses(nrRequests); - - error = MPI_Waitall(nrRequests, &requests[0], &statusses[0]); - ASSERT(error == MPI_SUCCESS); - } + requests.resize(nrRequests); + waitAll(requests); // Convert raw metaData to flags array for (size_t stat = 0; stat < stationRanks.size(); ++stat) diff --git a/RTCP/Cobalt/InputProc/src/newInputSection.cc b/RTCP/Cobalt/InputProc/src/newInputSection.cc deleted file mode 100644 index 8b8101fd11a..00000000000 --- a/RTCP/Cobalt/InputProc/src/newInputSection.cc +++ /dev/null @@ -1,186 +0,0 @@ -/* newInputSection.cc - * Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) - * P.O. Box 2, 7990 AA Dwingeloo, The Netherlands - * - * This file is part of the LOFAR software suite. - * The LOFAR software suite is free software: you can redistribute it and/or - * modify it under the terms of the GNU General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * The LOFAR software suite is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. - * - * $Id: $ - */ - -#include <lofar_config.h> - -#include <string> -#include <vector> -#include <map> -#include <omp.h> -#if defined HAVE_MPI -#include <mpi.h> -#else -#error Cannot build input section without HAVE_MPI -#endif -#include <boost/format.hpp> - -#include <Common/lofar_complex.h> -#include <Common/LofarLogger.h> -#include <Common/Thread/Mutex.h> -#include <CoInterface/MultiDimArray.h> -#include <CoInterface/Stream.h> -#include <CoInterface/RSPTimeStamp.h> -#include <Stream/Stream.h> -#include <Stream/SocketStream.h> - -#include "OMPThread.h" -#include "SampleType.h" -#include "WallClockTime.h" -#include "Buffer/StationID.h" -#include "Buffer/BufferSettings.h" -#include "Station/Generator.h" -#include "Station/PacketsToBuffer.h" -#include "Transpose/MPITransferStations.h" - - -#define DURATION 60 -#define BLOCKSIZE 0.005 -#define NRSTATIONS 3 -#define NR_TAPS 16 - -using namespace LOFAR; -using namespace Cobalt; - - -int main( int argc, char **argv ) -{ - size_t clock = 200 * 1000 * 1000; - - typedef SampleType<i16complex> SampleT; - const TimeStamp from(time(0L) + 1, 0, clock); - const TimeStamp to(time(0L) + 1 + DURATION, 0, clock); - const size_t blockSize = BLOCKSIZE * clock / 1024 + NR_TAPS; - std::map<unsigned, std::vector<size_t> > beamlets; - - struct StationID stationID("RS106", "LBA", clock, 16); - struct BufferSettings settings(stationID, false); - - settings.setBufferSize(5.0); - - INIT_LOGGER(argv[0]); - - if (MPI_Init(&argc, &argv) != MPI_SUCCESS) { - LOG_ERROR_STR("MPI_Init failed"); - return 1; - } - - int nrHosts, rank; - - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &nrHosts); - - int nrStations = NRSTATIONS; - - for (unsigned i = 0; i < 244; ++i) - beamlets[nrStations + i % (nrHosts - nrStations)].push_back(i); - - if (rank > nrStations - 1) { - // receiver - LOG_INFO_STR("Receiver " << rank << " starts, handling " << beamlets[rank].size() << " subbands from " << nrStations << " stations." ); - - std::vector<int> stationRanks(nrStations); - - for (size_t i = 0; i < stationRanks.size(); i++) - stationRanks[i] = i; - - { - MPIReceiveStations<SampleT> receiver(settings, stationRanks, beamlets[rank], blockSize); - - for(size_t block = 0; block < (to - from) / blockSize + 1; ++block) { - receiver.receiveBlock(); - - // data is now in receiver.lastBlock - - //LOG_INFO_STR("Receiver " << rank << " received block " << block); - } - } - - LOG_INFO_STR("Receiver " << rank << " done"); - - MPI_Finalize(); - return 0; - } - - omp_set_nested(true); - omp_set_num_threads(32); - OMPThread::init(); - - std::vector<std::string> inputStreams(4); - inputStreams[0] = "udp:127.0.0.1:4346"; - inputStreams[1] = "udp:127.0.0.1:4347"; - inputStreams[2] = "udp:127.0.0.1:4348"; - inputStreams[3] = "udp:127.0.0.1:4349"; - - if(rank == 0) { - MultiPacketsToBuffer station( settings, inputStreams ); - Generator generator( settings, inputStreams ); - - #pragma omp parallel sections num_threads(4) - { - #pragma omp section - { station.process(); - } - - #pragma omp section - { generator.process(); - } - - #pragma omp section - { sleep(DURATION + 1); - station.stop(); - sleep(1); - generator.stop(); - } - - #pragma omp section - { - struct StationID lookup("RS106", "HBA0"); - struct BufferSettings s(stationID, true); - - LOG_INFO_STR("Detected " << s); - #pragma omp parallel for num_threads(nrHosts - nrStations) - for (int i = nrStations; i < nrHosts; ++i) { - LOG_INFO_STR("Connecting to receiver " << i ); - MPISendStation< SampleT > streamer(s, from, to, blockSize, NR_TAPS, beamlets[i], i ); - - LOG_INFO_STR("Sending to receiver " << i ); - streamer.process( 0.0 ); - } - } - } - } else { - struct StationID lookup("RS106", "HBA0"); - struct BufferSettings s(stationID, true); - - LOG_INFO_STR("Detected " << s); - #pragma omp parallel for num_threads(nrHosts - nrStations) - for (int i = nrStations; i < nrHosts; ++i) { - LOG_INFO_STR("Connecting to receiver " << i ); - MPISendStation< SampleT > streamer(s, from, to, blockSize, NR_TAPS, beamlets[i], i ); - - LOG_INFO_STR("Sending to receiver " << i ); - streamer.process( 0.0 ); - } - } - - MPI_Finalize(); -} - diff --git a/RTCP/Cobalt/InputProc/test/CMakeLists.txt b/RTCP/Cobalt/InputProc/test/CMakeLists.txt index d008fd7725f..df8d8adb714 100644 --- a/RTCP/Cobalt/InputProc/test/CMakeLists.txt +++ b/RTCP/Cobalt/InputProc/test/CMakeLists.txt @@ -20,3 +20,5 @@ lofar_add_test(tGenerator tGenerator.cc) lofar_add_test(tPacketWriter tPacketWriter.cc) lofar_add_test(tPacketsToBuffer tPacketsToBuffer.cc) + +lofar_add_test(tMPITransfer tMPITransfer.cc) diff --git a/RTCP/Cobalt/InputProc/test/tMPITransfer.cc b/RTCP/Cobalt/InputProc/test/tMPITransfer.cc new file mode 100644 index 00000000000..bd53af64025 --- /dev/null +++ b/RTCP/Cobalt/InputProc/test/tMPITransfer.cc @@ -0,0 +1,220 @@ +/* tMPITransfer.cc + * Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) + * P.O. Box 2, 7990 AA Dwingeloo, The Netherlands + * + * This file is part of the LOFAR software suite. + * The LOFAR software suite is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The LOFAR software suite is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + * + * $Id: $ + */ + +#include <lofar_config.h> + +#include <string> +#include <vector> +#include <map> +#include <omp.h> +#include <mpi.h> + +#include <boost/format.hpp> + +#include <Common/lofar_complex.h> +#include <Common/LofarLogger.h> +#include <Common/Thread/Mutex.h> +#include <CoInterface/MultiDimArray.h> +#include <CoInterface/Stream.h> +#include <CoInterface/RSPTimeStamp.h> +#include <Stream/Stream.h> +#include <Stream/SocketStream.h> + +#include "OMPThread.h" +#include "SampleType.h" +#include "Buffer/StationID.h" +#include "Buffer/BufferSettings.h" +#include "Station/PacketFactory.h" +#include "Station/Generator.h" +#include "Station/PacketsToBuffer.h" +#include "Transpose/MPITransferStations.h" + +#include <map> +#include <vector> + +#define DURATION 3 +#define BLOCKSIZE 0.2 +#define NRSTATIONS 3 +#define NRBEAMLETS 4 +#define NR_TAPS 16 + +using namespace LOFAR; +using namespace Cobalt; +using namespace std; +using boost::format; + +const size_t clockMHz = 200; +const size_t clockHz = clockMHz * 1000 * 1000; +typedef SampleType<i16complex> SampleT; +const TimeStamp from(time(0L) + 5, 0, clockHz); +const TimeStamp to(time(0L) + 5 + DURATION, 0, clockHz); +const size_t blockSize = BLOCKSIZE * clockHz / 1024 + NR_TAPS; +map<unsigned, vector<size_t> > beamlets; +map<size_t, int> beamletDistribution; + +// Rank in MPI set of hosts +int rank; + +// Number of MPI hosts +int nrHosts; + +// Number of MPI hosts acting as stations. +int nrStations; + +// Number of MPI hosts acting as receivers. +int nrReceivers; + +void sender() +{ + struct StationID stationID(str(format("CS%03d") % rank), "LBA", clockMHz, 16); + struct BufferSettings settings(stationID, false); + + omp_set_nested(true); + //omp_set_num_threads(32); + OMPThread::init(); + + // Transfer of packets from generator to buffer + std::vector<std::string> inputStreams(4); + for (size_t i = 0; i < inputStreams.size(); ++i) { + inputStreams[i] = str(format("tcp:127.0.0.%d:%u") % (rank + 1) % (4346+i)); + } + + MultiPacketsToBuffer station( settings, inputStreams ); + PacketFactory factory( settings ); + Generator generator( settings, inputStreams, factory ); + + #pragma omp parallel sections num_threads(4) + { + // Generate the data to send + #pragma omp section + { generator.process(); + } + + // Start a circular buffer + #pragma omp section + { station.process(); + } + + // Start a circular buffer + #pragma omp section + { Singleton<ThreadSafeMPI::MPIRequestManager>::instance().process(); + } + + // Send data to receivers + #pragma omp section + { + struct BufferSettings s(stationID, true); + + LOG_INFO_STR("Detected " << s); + LOG_INFO_STR("Connecting to receivers to send " << from << " to " << to); + MPISendStation< SampleT > streamer(s, from, to, blockSize, NR_TAPS, beamletDistribution); + + LOG_INFO_STR("Sending to receivers"); + streamer.process( 0.0 ); + + generator.stop(); + station.stop(); + Singleton<ThreadSafeMPI::MPIRequestManager>::instance().stop(); + } + } +} + +void receiver() +{ + int receiverNr = rank - nrStations; + + LOG_INFO_STR("Receiver node " << rank << " starts, handling " << beamlets[receiverNr].size() << " subbands from " << nrStations << " stations." ); + LOG_INFO_STR("Connecting to senders to receive " << from << " to " << to); + + std::vector<int> stationRanks(nrStations); + + for (size_t i = 0; i < stationRanks.size(); i++) + stationRanks[i] = i; + + { + MPIReceiveStations<SampleT> receiver(stationRanks, beamlets[receiverNr], blockSize); + + size_t block = 0; + + for(TimeStamp current = from; current + blockSize < to; current += blockSize) { + receiver.receiveBlock(); + + // data is now in receiver.lastBlock + + // calculate flagging average + const size_t nrStations = stationRanks.size(); + const size_t nrBeamlets = beamlets[receiverNr].size(); + const size_t nrSamples = nrStations * nrBeamlets * blockSize; + size_t nrFlaggedSamples = 0; + + for (size_t s = 0; s < nrStations; ++s) { + for (size_t b = 0; b < nrBeamlets; ++b) { + nrFlaggedSamples = receiver.lastBlock[s].flags[b].count(); + } + } + + float flagPerc = 100.0f * nrFlaggedSamples / nrSamples; + + LOG_INFO_STR("Receiver " << rank << " received block " << block << " flags: " << flagPerc << "%" ); + ++block; + } + } + + LOG_INFO_STR("Receiver " << rank << " done"); +} + +int main( int argc, char **argv ) +{ + INIT_LOGGER( "tMPITransfer" ); + + // Prevent stalling. + alarm(30); + + if (MPI_Init(&argc, &argv) != MPI_SUCCESS) { + LOG_ERROR_STR("MPI_Init failed"); + return 1; + } + + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &nrHosts); + + // Need at least one sender and one receiver + ASSERT( nrHosts >= 2 ); + + // Use half of the nodes as stations + nrStations = nrHosts/2; + nrReceivers = nrHosts - nrStations; + + // Divide the subbands over the receivers + for (unsigned i = 0; i < NRBEAMLETS; ++i) { + beamlets[i % nrReceivers].push_back(i); + beamletDistribution[i] = nrStations + i % nrReceivers; + } + + if (rank < nrStations) { + sender(); + } else { + receiver(); + } + + MPI_Finalize(); +} + diff --git a/RTCP/Cobalt/InputProc/test/tMPITransfer.run b/RTCP/Cobalt/InputProc/test/tMPITransfer.run new file mode 100755 index 00000000000..7a95c3f717d --- /dev/null +++ b/RTCP/Cobalt/InputProc/test/tMPITransfer.run @@ -0,0 +1,2 @@ +#!/bin/sh +mpirun -host localhost -np 4 tMPITransfer diff --git a/RTCP/Cobalt/InputProc/test/tMPITransfer.sh b/RTCP/Cobalt/InputProc/test/tMPITransfer.sh new file mode 100755 index 00000000000..ac72bc755cf --- /dev/null +++ b/RTCP/Cobalt/InputProc/test/tMPITransfer.sh @@ -0,0 +1,2 @@ +#!/bin/sh +./runctest.sh tMPITransfer > tMPITransfer.log 2>&1 -- GitLab