diff --git a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc index 4e348b07ebbb23a003af40879b5f652706c8d86b..534080818337c29fb1e083c90e4aeddc36979879 100644 --- a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc +++ b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc @@ -307,7 +307,9 @@ bool StationInput::receivedHere() const } -void StationInput::readRSPRealTime( size_t board, Stream &inputStream ) +void StationInput::readRSPRealTime( size_t board, Stream &inputStream, + MACIO::RTmetadata &mdLogger, + const string &mdKeyPrefix ) { /* * In real-time mode, we can't get ahead of the `current' @@ -330,9 +332,9 @@ void StationInput::readRSPRealTime( size_t board, Stream &inputStream ) reader.readPackets(rspData->packets); - // Periodically log progress + // Periodically LOG() and log() (for monitoring (PVSS)) progress if (i % 256 == 0) // Each block is ~40ms, so log every ~10s worth of data. - reader.logStatistics(); + reader.logStatistics(board, mdLogger, mdKeyPrefix, stationID.name()); outputQueue.append(rspData); } @@ -549,7 +551,9 @@ void StationInput::writeRSPNonRealTime( MPIData<SampleT> ¤t, MPIData<Sampl template <typename SampleT> -void StationInput::processInput( Queue< SmartPtr< MPIData<SampleT> > > &inputQueue, Queue< SmartPtr< MPIData<SampleT> > > &outputQueue ) +void StationInput::processInput( Queue< SmartPtr< MPIData<SampleT> > > &inputQueue, + Queue< SmartPtr< MPIData<SampleT> > > &outputQueue, + MACIO::RTmetadata &mdLogger, const string &mdKeyPrefix ) { OMPThreadSet packetReaderThreads; @@ -591,7 +595,7 @@ void StationInput::processInput( Queue< SmartPtr< MPIData<SampleT> > > &inputQue Thread::ScopedPriority sp(SCHED_FIFO, 10); - readRSPRealTime(board, *inputStreams[board]); + readRSPRealTime(board, *inputStreams[board], mdLogger, mdKeyPrefix); } } else { readRSPNonRealTime(); @@ -740,7 +744,8 @@ void MPISender::sendBlocks( Queue< SmartPtr< MPIData<SampleT> > > &inputQueue, Q } template<typename SampleT> void sendInputToPipeline(const Parset &ps, - size_t stationIdx, const SubbandDistribution &subbandDistribution) + size_t stationIdx, const SubbandDistribution &subbandDistribution, + MACIO::RTmetadata &mdLogger, const string &mdKeyPrefix) { // sanity check: Find out if we should actual start working here. StationMetaData<SampleT> sm(ps, stationIdx, subbandDistribution); @@ -785,7 +790,8 @@ template<typename SampleT> void sendInputToPipeline(const Parset &ps, */ #pragma omp section { - si.processInput<SampleT>( sm.metaDataPool.filled, mpiQueue ); + si.processInput<SampleT>( sm.metaDataPool.filled, mpiQueue, + mdLogger, mdKeyPrefix ); LOG_INFO_STR(logPrefix << "StationInput: done"); } @@ -805,20 +811,27 @@ template<typename SampleT> void sendInputToPipeline(const Parset &ps, } void sendInputToPipeline(const Parset &ps, size_t stationIdx, - const SubbandDistribution &subbandDistribution) + const SubbandDistribution &subbandDistribution, + MACIO::RTmetadata &mdLogger, const string &mdKeyPrefix) { switch (ps.nrBitsPerSample()) { default: case 16: - sendInputToPipeline< SampleType<i16complex> >(ps, stationIdx, subbandDistribution); + sendInputToPipeline< SampleType<i16complex> >(ps, stationIdx, + subbandDistribution, + mdLogger, mdKeyPrefix); break; case 8: - sendInputToPipeline< SampleType<i8complex> >(ps, stationIdx, subbandDistribution); + sendInputToPipeline< SampleType< i8complex> >(ps, stationIdx, + subbandDistribution, + mdLogger, mdKeyPrefix); break; case 4: - sendInputToPipeline< SampleType<i4complex> >(ps, stationIdx, subbandDistribution); + sendInputToPipeline< SampleType< i4complex> >(ps, stationIdx, + subbandDistribution, + mdLogger, mdKeyPrefix); break; } } diff --git a/RTCP/Cobalt/GPUProc/src/Station/StationInput.h b/RTCP/Cobalt/GPUProc/src/Station/StationInput.h index 1f1537936dc306fd1f8bb8149f0031279215f071..c50b1137a3d150a4c18003faa9116beec1ed9f76 100644 --- a/RTCP/Cobalt/GPUProc/src/Station/StationInput.h +++ b/RTCP/Cobalt/GPUProc/src/Station/StationInput.h @@ -27,6 +27,7 @@ #include <cstring> #include <Common/Thread/Semaphore.h> +#include <MACIO/RTmetadata.h> #include <CoInterface/Parset.h> #include <CoInterface/Pool.h> #include <CoInterface/SubbandMetaData.h> @@ -155,7 +156,8 @@ namespace LOFAR { template <typename SampleT> void processInput( Queue< SmartPtr< MPIData<SampleT> > > &inputQueue, - Queue< SmartPtr< MPIData<SampleT> > > &outputQueue ); + Queue< SmartPtr< MPIData<SampleT> > > &outputQueue, + MACIO::RTmetadata &mdLogger, const string &mdKeyPrefix ); private: // Each packet is expected to have 16 samples per subband, i.e. ~80 us worth of data @ 200 MHz. @@ -212,7 +214,9 @@ namespace LOFAR { * * Read data from one board in real-time mode. */ - void readRSPRealTime( size_t board, Stream &inputStream ); + void readRSPRealTime( size_t board, Stream &inputStream, + MACIO::RTmetadata &mdLogger, + const std::string &mdKeyPrefix ); /* * Read data from all boards in non-real-time mode. @@ -269,7 +273,10 @@ namespace LOFAR { }; #endif - void sendInputToPipeline(const Parset &ps, size_t stationIdx, const SubbandDistribution &subbandDistribution); + void sendInputToPipeline(const Parset &ps, size_t stationIdx, + const SubbandDistribution &subbandDistribution, + MACIO::RTmetadata &mdLogger, + const std::string &mdKeyPrefix); } } diff --git a/RTCP/Cobalt/GPUProc/src/rtcp.cc b/RTCP/Cobalt/GPUProc/src/rtcp.cc index 9796fa4a8cb8b229ff5cbacff05bcc8cf892c36c..46114b83378c34f4e0ae5aaafb9df422d1762e5d 100644 --- a/RTCP/Cobalt/GPUProc/src/rtcp.cc +++ b/RTCP/Cobalt/GPUProc/src/rtcp.cc @@ -219,6 +219,13 @@ int main(int argc, char **argv) const string mdHostName = ps.getString("Cobalt.PVSSGateway.host", ""); MACIO::RTmetadata mdLogger(ps.observationID(), mdRegisterName, mdHostName); + // For InputProc use boost::format to fill in one conv specifications (%xx). + // Since InputProc is inside GPUProc, don't inform the MAC Log Processor. + string fmtStrInp(createPropertySetName(PSN_COBALT_STATION_INPUT, "", ps.PVSS_TempObsName())); + prFmt.parse(fmtStrInp); + string mdKeyPrefixInputProc = str(prFmt % cbtNodeNr); + mdKeyPrefixInputProc.push_back('.'); // keys look like: "keyPrefix.subKeyName[x]" + LOG_INFO("===== INIT ====="); #ifdef HAVE_MPI @@ -540,6 +547,11 @@ int main(int argc, char **argv) waiter.waitUntil(deadline); } + // Log obsID once for InputProc monitoring (PVSS). + if (rank == 0) { + mdLogger.log(mdKeyPrefixInputProc + PN_CSI_OBSERVATION_NAME, ps.observationID()); + } + #pragma omp parallel sections num_threads(3) { #pragma omp section @@ -559,7 +571,11 @@ int main(int argc, char **argv) continue; } - sendInputToPipeline(ps, stat, subbandDistribution); + // Log _antenna_ _field_ name for monitoring (PVSS). + mdLogger.log(mdKeyPrefixInputProc + PN_CSI_STATION_NAME, stationID.name()); + + sendInputToPipeline(ps, stat, subbandDistribution, + mdLogger, mdKeyPrefixInputProc); } } diff --git a/RTCP/Cobalt/GPUProc/test/tMPIReceive.cc b/RTCP/Cobalt/GPUProc/test/tMPIReceive.cc index d07f2d5bd5ffa16de3847391c6b3c33250f5745b..d91f8231cf77edfa3fa918a8fa01c3b8475f5344 100644 --- a/RTCP/Cobalt/GPUProc/test/tMPIReceive.cc +++ b/RTCP/Cobalt/GPUProc/test/tMPIReceive.cc @@ -147,7 +147,9 @@ int main(int argc, char **argv) continue; } - sendInputToPipeline(ps, stat, subbandDistribution); + MACIO::RTmetadata rtmd(ps.observationID(), "", ""); + sendInputToPipeline(ps, stat, subbandDistribution, + rtmd, "rtmd key prefix"); cout << "First ended" << endl; } } diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketReader.cc b/RTCP/Cobalt/InputProc/src/Station/PacketReader.cc index 623d4516a5f83f33779ce011970491b1dd832f6e..01db1b2fdf5e73b89d4f1f7d4600f210194bcb85 100644 --- a/RTCP/Cobalt/InputProc/src/Station/PacketReader.cc +++ b/RTCP/Cobalt/InputProc/src/Station/PacketReader.cc @@ -22,6 +22,7 @@ #include "PacketReader.h" +#include <cmath> #include <typeinfo> #include <sys/time.h> #include <boost/format.hpp> @@ -165,7 +166,10 @@ namespace LOFAR } - void PacketReader::logStatistics() + void PacketReader::logStatistics(unsigned boardNr, + MACIO::RTmetadata &mdLogger, + const string &mdKeyPrefix, + const string &antFieldName) { // Determine time since last log struct timeval tv; @@ -175,7 +179,19 @@ namespace LOFAR const double interval = now - lastLogTime; // Emit log line - LOG_INFO_STR( logPrefix << (nrReceived/interval) << " pps: received " << nrReceived << " packets: " << nrBadTime << " bad timestamps, " << nrBadMode << " bad clock/bitmode, " << nrBadData << " payload errors, " << nrBadOther << " otherwise bad packets" ); + LOG_INFO_STR( logPrefix << (nrReceived / interval) << " pps: received " << + nrReceived << " packets: " << nrBadTime << " bad timestamps, " << + nrBadMode << " bad clock/bitmode, " << nrBadData << " payload errors, " << + nrBadOther << " otherwise bad packets" ); + + // Emit data points for monitoring (PVSS) + // Reproduce PN_CSI_STREAM0_BLOCKS_IN or PN_CSI_STREAM0_REJECTED, but with the right nr. + string streamStr = str(boost::format("stream%u") % boardNr); + mdLogger.log(mdKeyPrefix + streamStr + ".blocksIn[" + antFieldName + ']', + (unsigned)round(nrReceived / interval)); + size_t nrBad = nrBadTime + nrBadMode + nrBadData + nrBadOther; + mdLogger.log(mdKeyPrefix + streamStr + ".rejected[" + antFieldName + ']', + (unsigned)round(nrBad / interval)); // Reset counters nrReceived = 0; diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketReader.h b/RTCP/Cobalt/InputProc/src/Station/PacketReader.h index eed7032ec5cef8fadd81f30ad4e535043ceb53b1..f5c5d27e17c770b2291dd87ea771e4735ce3cfe9 100644 --- a/RTCP/Cobalt/InputProc/src/Station/PacketReader.h +++ b/RTCP/Cobalt/InputProc/src/Station/PacketReader.h @@ -25,6 +25,7 @@ #include <Common/Exception.h> #include <Stream/Stream.h> +#include <MACIO/RTmetadata.h> #include <InputProc/Buffer/BoardMode.h> #include "RSP.h" @@ -55,7 +56,10 @@ namespace LOFAR bool readPacket( struct RSP &packet ); // Logs (and resets) statistics about the packets read. - void logStatistics(); + void logStatistics(unsigned boardNr, + MACIO::RTmetadata &mdLogger, + const std::string &mdKeyPrefix, + const std::string &antFieldName); private: const std::string logPrefix; @@ -67,7 +71,7 @@ namespace LOFAR const BoardMode mode; // Whether inputStream is an UDP stream - // (UDP streams allow partial reads, and recvmmsg). + // UDP streams do not allow partial reads and can use recvmmsg(2) (Linux). bool inputIsUDP; // Statistics covering the packets read so far diff --git a/RTCP/Cobalt/InputProc/test/tGenerator.cc b/RTCP/Cobalt/InputProc/test/tGenerator.cc index 960d533fac4ce35aeb21482100199e595498d7df..6c273e56e7251af44c39742e09bacc1da983b903 100644 --- a/RTCP/Cobalt/InputProc/test/tGenerator.cc +++ b/RTCP/Cobalt/InputProc/test/tGenerator.cc @@ -92,6 +92,8 @@ int main( int, char **argv ) #pragma omp section { + MACIO::RTmetadata rtmd(12345, "", ""); + // Read and verify the generated packets try { @@ -101,7 +103,8 @@ int main( int, char **argv ) struct RSP packet; if (!reader.readPacket(packet)) { - reader.logStatistics(); + const unsigned boardNr = 0; // irrel, for logStats only + reader.logStatistics(boardNr, rtmd, "rtmd key prefix", stationID.name()); ASSERT(false); }