diff --git a/LCS/ApplCommon/src/PosixTime.cc b/LCS/ApplCommon/src/PosixTime.cc index ef9a540310bf1773014ab53e7c91c93c40daee0a..c4913ba1f4978c496665259d403b219b9719e478 100644 --- a/LCS/ApplCommon/src/PosixTime.cc +++ b/LCS/ApplCommon/src/PosixTime.cc @@ -40,7 +40,7 @@ namespace LOFAR time_t sec(static_cast<time_t>(secsEpoch1970)); long usec(static_cast<long>(1000000 * (secsEpoch1970 - sec))); return posix_time::from_time_t(sec) + posix_time::microseconds(usec); - } + } } diff --git a/LCS/Stream/include/Stream/FileStream.h b/LCS/Stream/include/Stream/FileStream.h index db905d9e8ace2f58b9dc021e1cd1822b70f43433..5186659d04d02589edcf16384c25225df944ed02 100644 --- a/LCS/Stream/include/Stream/FileStream.h +++ b/LCS/Stream/include/Stream/FileStream.h @@ -40,7 +40,7 @@ class FileStream : public FileDescriptorBasedStream virtual void skip( size_t bytes ); // seek ahead - size_t size(); // return file size + virtual size_t size(); // return file size }; } // namespace LOFAR diff --git a/LCS/Stream/src/StringStream.cc b/LCS/Stream/src/StringStream.cc index b213a18a5a5cac1754a6e059071b3291671ff043..5900c9d7fdbf1cce8eaa40d08b151ec9de31510d 100644 --- a/LCS/Stream/src/StringStream.cc +++ b/LCS/Stream/src/StringStream.cc @@ -45,6 +45,7 @@ size_t StringStream::tryRead(void *ptr, size_t size) // still wrong for !USE_THREADS wrt stringstream exception vs EndOfStreamException, but !USE_THREADS is obsolete anyway #ifdef USE_THREADS if (!dataWritten.down(size)) { + // stream closed and avail < size size_t avail = dataWritten.getValue(); if (avail == 0) // size > 0 THROW(EndOfStreamException, "Stream has been closed"); @@ -93,6 +94,7 @@ size_t StringStream::tryReadv(const struct iovec *iov, int iovcnt) // still wrong for !USE_THREADS wrt stringstream exception vs EndOfStreamException, but !USE_THREADS is obsolete anyway #ifdef USE_THREADS if (!dataWritten.down(size)) { + // stream closed and avail < size size_t avail = dataWritten.getValue(); if (avail == 0) // size > 0 THROW(EndOfStreamException, "Stream has been closed"); diff --git a/RTCP/Cobalt/BrokenAntennaInfo/src/FinalMetaDataGatherer.cc b/RTCP/Cobalt/BrokenAntennaInfo/src/FinalMetaDataGatherer.cc index 12b142e3914d95c9ba0587d72620ca99a558910e..29a03e36e980dda6fbd26ca0f938df3fe8bb4d9f 100644 --- a/RTCP/Cobalt/BrokenAntennaInfo/src/FinalMetaDataGatherer.cc +++ b/RTCP/Cobalt/BrokenAntennaInfo/src/FinalMetaDataGatherer.cc @@ -193,7 +193,7 @@ namespace LOFAR { string host = parset.getString("Cobalt.FinalMetaDataGatherer.database.host", ""); if (host.empty()) - host = "sasdb"; + host = "sasdb.control.lofar"; string db = parset.getString("Cobalt.FinalMetaDataGatherer.database.name", ""); if (db.empty()) diff --git a/RTCP/Cobalt/CoInterface/src/Parset.cc b/RTCP/Cobalt/CoInterface/src/Parset.cc index f83a97e3ce3b0ac5e27cf82a688862f2ac9901ba..9ad9d831eb946479d6e956618d918ff1ecb680f8 100644 --- a/RTCP/Cobalt/CoInterface/src/Parset.cc +++ b/RTCP/Cobalt/CoInterface/src/Parset.cc @@ -39,6 +39,7 @@ #include <Common/LofarLogger.h> #include <Common/DataConvert.h> #include <Common/LofarBitModeInfo.h> +#include <Common/StringUtil.h> #include <ApplCommon/PosixTime.h> #include <CoInterface/PrintVector.h> #include <CoInterface/OutputTypes.h> @@ -636,7 +637,8 @@ namespace LOFAR settings.beamFormer.nrDelayCompensationChannels = nrDelayCompCh; // Derive antennaFields to use for beam forming - settings.beamFormer.antennaFieldNames = getOutputTypeAntennaFieldNames("Cobalt.BeamFormer.stationList", stations); + settings.beamFormer.antennaFieldNames = getOutputTypeAntennaFieldNames("Cobalt.BeamFormer.stationList", + stations, settings.antennaSet); LOG_DEBUG_STR("Beamforming " << settings.beamFormer.antennaFieldNames.size() << " fields: " << settings.beamFormer.antennaFieldNames); ObservationSettings::BeamFormer::StokesSettings @@ -894,7 +896,8 @@ namespace LOFAR } // Read antenna field names (via stationList key) to use for RSP raw output - settings.rspRaw.antennaFieldNames = getOutputTypeAntennaFieldNames("Cobalt.RSPRaw.stationList", stations); + settings.rspRaw.antennaFieldNames = getOutputTypeAntennaFieldNames("Cobalt.RSPRaw.stationList", + stations, settings.antennaSet); if (settings.rspRaw.antennaFieldNames.empty()) { settings.rspRaw.antennaFieldNames = settings.antennaFieldNames; // default } @@ -929,7 +932,7 @@ namespace LOFAR } // Assign output file locations - unsigned maxNrAntFieldStreams = 0; // normally we have 4 sending RSP boards per antenna field + vector<unsigned> maxNrBeamletsPerBoard; // normally we end up with 4 sending RSP boards per ant field unsigned locationIdx = 0; for (size_t i = 0; i < settings.rspRaw.antennaFieldNames.size(); ++i) { const ObservationSettings::AntennaFieldName& afName = settings.rspRaw.antennaFieldNames[i]; @@ -940,8 +943,8 @@ namespace LOFAR vector<ObservationSettings::AntennaField>::iterator afIt = settings.antennaFields.begin() + std::distance(settings.antennaFieldNames.begin(), nameIt); size_t nrStreams = afIt->inputStreams.size(); - if (maxNrAntFieldStreams < nrStreams) { - maxNrAntFieldStreams = nrStreams; + if (maxNrBeamletsPerBoard.size() < nrStreams) { + maxNrBeamletsPerBoard.resize(nrStreams); } for (unsigned s = 0; s < nrStreams; ++s) { @@ -964,20 +967,22 @@ namespace LOFAR outputProcHosts.insert(file.location.host); locationIdx += 1; + + maxNrBeamletsPerBoard[s] = std::max(maxNrBeamletsPerBoard[s], + (unsigned)std::count(afIt->rspBoardMap.begin(), + afIt->rspBoardMap.end(), s)); } } - // Read nrBeamletsPerBoardList and apply sane defaults + // Read nrBeamletsPerBoardList and apply sane defaults wrt max nr beamlets per board in the obs settings.rspRaw.nrBeamletsPerBoardList = getUint32Vector("Cobalt.RSPRaw.nrBeamletsPerBoardList", emptyVectorUnsigned, true); - if (settings.rspRaw.nrBeamletsPerBoardList.size() < maxNrAntFieldStreams) { - settings.rspRaw.nrBeamletsPerBoardList.resize(maxNrAntFieldStreams); + if (settings.rspRaw.nrBeamletsPerBoardList.size() < maxNrBeamletsPerBoard.size()) { + settings.rspRaw.nrBeamletsPerBoardList.resize(maxNrBeamletsPerBoard.size(), UINT_MAX); } - const unsigned maxNrPayloadBeamlets = 61 * 16 / settings.nrBitsPerSample; // see InputProc/src/Station/RSP.h, which we don't want to depend on here (better move hardware specs to CoInterface or LCS) for (size_t i = 0; i < settings.rspRaw.nrBeamletsPerBoardList.size(); ++i) { - if (settings.rspRaw.nrBeamletsPerBoardList[i] > maxNrPayloadBeamlets) { - settings.rspRaw.nrBeamletsPerBoardList[i] = maxNrPayloadBeamlets; - } + settings.rspRaw.nrBeamletsPerBoardList[i] = std::min(settings.rspRaw.nrBeamletsPerBoardList[i], + maxNrBeamletsPerBoard[i]); } } @@ -985,9 +990,9 @@ namespace LOFAR // set output hosts settings.outputProcHosts.clear(); for (set<string>::const_iterator i = outputProcHosts.begin(); i != outputProcHosts.end(); ++i) { - // skip empty host names - if (*i == "") + if (i->empty()) { continue; + } settings.outputProcHosts.push_back(*i); } @@ -997,7 +1002,8 @@ namespace LOFAR vector<struct ObservationSettings::AntennaFieldName> Parset::getOutputTypeAntennaFieldNames(const string& stationListKey, - const vector<string>& stations) const + const vector<string>& stations, + const string& antennaSet) const { vector<string> newStations = getStringVector(stationListKey, vector<string>(), true); if (newStations.empty()) { @@ -1027,7 +1033,7 @@ namespace LOFAR // Sort stations (CS, RS, int'l), to get a consistent and predictable order. std::sort(newStations.begin(), newStations.end(), compareStationNames); - return ObservationSettings::expandAntennaFieldNames(newStations, settings.antennaSet); + return ObservationSettings::expandAntennaFieldNames(newStations, antennaSet); } bool Parset::nodeReadsAntennaFieldData(const struct ObservationSettings& settings, @@ -1042,7 +1048,7 @@ namespace LOFAR // pos and ref must each have at least size 3. double Parset::distanceVec3(const vector<double>& pos, - const vector<double>& ref) const { + const vector<double>& ref) { double dx = pos.at(0) - ref.at(0); double dy = pos.at(1) - ref.at(1); double dz = pos.at(2) - ref.at(2); @@ -1050,7 +1056,7 @@ namespace LOFAR } // max delay distance in meters; static per obs, i.e. unprojected (some upper bound) - double Parset::maxDelayDistance(const struct ObservationSettings& settings) const { + double Parset::maxDelayDistance(const struct ObservationSettings& settings) { // Available in each parset through included StationCalibration.parset. const vector<double> refPhaseCenter = settings.delayCompensation.referencePhaseCenter; @@ -1069,7 +1075,7 @@ namespace LOFAR // Top frequency of highest subband observed in Hz. double Parset::maxObservationFrequency(const struct ObservationSettings& settings, - double subbandWidth) const { + double subbandWidth) { double maxCentralFrequency = 0.0; for (unsigned sb = 0; sb < settings.subbands.size(); sb++) { @@ -1083,7 +1089,7 @@ namespace LOFAR // Determine the nr of channels per subband for delay compensation. // We aim for the visibility samples to be good to about 1 part in 1000. // See the Cobalt beamformer design doc for more info on how and why. - unsigned Parset::calcNrDelayCompensationChannels(const struct ObservationSettings& settings) const { + unsigned Parset::calcNrDelayCompensationChannels(const struct ObservationSettings& settings) { double d = maxDelayDistance(settings); // in meters if (d < 400.0) d = 400.0; // for e.g. CS002LBA only; CS001LBA-CS002LBA is ~441 m @@ -1129,6 +1135,9 @@ namespace LOFAR return static_cast<size_t>(floor((stopTime - startTime) * subbandWidth() / blockSize)); } + size_t ObservationSettings::nrRspRawBlocks() const { + return static_cast<size_t>(floor((rspRaw.stopTime - rspRaw.startTime) * subbandWidth() / blockSize)); + } double ObservationSettings::subbandWidth() const { return 1.0 * clockHz() / 1024; diff --git a/RTCP/Cobalt/CoInterface/src/Parset.h b/RTCP/Cobalt/CoInterface/src/Parset.h index 7c9066f02c974b6ef9d9f54c74e946c2492e17bd..1d15f71ca164527957a996218775b35744e15bd5 100644 --- a/RTCP/Cobalt/CoInterface/src/Parset.h +++ b/RTCP/Cobalt/CoInterface/src/Parset.h @@ -112,6 +112,8 @@ namespace LOFAR size_t nrBlocks() const; + size_t nrRspRawBlocks() const; + // The number of seconds represented by each block. double blockDuration() const; @@ -821,16 +823,17 @@ namespace LOFAR void checkInputConsistency() const; - void addPosition(string stName); time_t getTime(const std::string &name, const std::string &defaultValue) const; std::vector<double> centroidPos(const string &stations) const; std::vector<struct ObservationSettings::AntennaFieldName> getOutputTypeAntennaFieldNames(const std::string& stationListKey, - const std::vector<std::string>& stations) const; + const std::vector<std::string>& stations, + const std::string& antennaSet) const; - std::vector<struct ObservationSettings::FileLocation> getFileLocations(const std::string outputType) const; + std::vector<struct ObservationSettings::FileLocation> + getFileLocations(const std::string outputType) const; // Returns whether nodeName has to participate in the observation // given antenna fields, antenna mode, and configured antenna field streams. @@ -838,12 +841,12 @@ namespace LOFAR bool nodeReadsAntennaFieldData(const struct ObservationSettings& settings, const std::string& nodeName) const; - double distanceVec3(const std::vector<double>& pos, - const std::vector<double>& ref) const; - double maxDelayDistance(const struct ObservationSettings& settings) const; - double maxObservationFrequency(const struct ObservationSettings& settings, - double subbandWidth) const; - unsigned calcNrDelayCompensationChannels(const struct ObservationSettings& settings) const; + static double distanceVec3(const std::vector<double>& pos, + const std::vector<double>& ref); + static double maxDelayDistance(const struct ObservationSettings& settings); + static double maxObservationFrequency(const struct ObservationSettings& settings, + double subbandWidth); + static unsigned calcNrDelayCompensationChannels(const struct ObservationSettings& settings); }; std::ostream &operator<<(std::ostream &os, const ObservationSettings::AntennaFieldName &fieldName); diff --git a/RTCP/Cobalt/CoInterface/src/RSP.h b/RTCP/Cobalt/CoInterface/src/RSP.h index 1f3ca85708f75792037704f0a143a2db6dcfc2fa..c52433a058c515a46f3a9cd3a5e8a7dafa308cc5 100644 --- a/RTCP/Cobalt/CoInterface/src/RSP.h +++ b/RTCP/Cobalt/CoInterface/src/RSP.h @@ -77,7 +77,7 @@ namespace LOFAR // number of beamlets, typically at maximum: // 16-bit: 61 // 8-bit: 122 - // 4-bit: 244 + // 4-bit: 244, but 10 fewer beamlets in total (e.g. 234 for last stream) uint8 nrBeamlets; // number of Xr+Xi+Yr+Yi samples per beamlet, typically 16 @@ -98,8 +98,8 @@ namespace LOFAR } header; // Payload, allocated for maximum size. - // Actual size depends on the header (nrBeamlets, nrBlocks). It changed in - // the past (61 vs 60) and may be less for tests, old pre-recorded data, and + // Actual size depends on the header (nrBeamlets, nrBlocks). It changed in the past (61 vs 60) and + // may be less for tests, old pre-recorded data, in 4 bit mode for 1 of the boards, and // RSP raw output for offline reprocessing or piggy-backing (selectable beamlet subset). union Payload { char data[8130]; diff --git a/RTCP/Cobalt/CoInterface/src/RSPRawTransfer.cc b/RTCP/Cobalt/CoInterface/src/RSPRawTransfer.cc index 7d6e35e37f81386676b65deff892ce33e3cb6082..11b97e9a6fa6492844c9e0b2b490424470c01caa 100644 --- a/RTCP/Cobalt/CoInterface/src/RSPRawTransfer.cc +++ b/RTCP/Cobalt/CoInterface/src/RSPRawTransfer.cc @@ -25,6 +25,7 @@ #include <cstring> #include <fcntl.h> #include <sys/socket.h> +#include <utility> // std::swap() #include <Common/LofarLogger.h> #include <Common/SystemCallException.h> #include <Stream/StreamFactory.h> @@ -39,19 +40,19 @@ namespace LOFAR { RSPRawSender::RSPRawSender() : itsStream(NULL), itsSentMsgSizes(0), - itsNrBeamletsToSend(0), + itsMaxNrBeamletsToSend(0), itsNrDroppedPackets(0) { } - RSPRawSender::RSPRawSender(unsigned maxNrPacketsPerSend, unsigned nrBeamletsToSend, + RSPRawSender::RSPRawSender(unsigned maxNrPacketsPerTrySend, unsigned maxNrBeamletsToSend, const string& streamDesc, time_t deadline) : itsStream(createStream(streamDesc, false, deadline)), - itsSentMsgSizes(maxNrPacketsPerSend), - itsNrBeamletsToSend(nrBeamletsToSend), + itsSentMsgSizes(maxNrPacketsPerTrySend), // trySend() in hot code: pre-allocate is easy and acceptable + itsMaxNrBeamletsToSend(maxNrBeamletsToSend), itsNrDroppedPackets(0) { - ASSERTSTR(maxNrPacketsPerSend > 0, "maxNrPacketsPerSend must be > 0"); + ASSERTSTR(maxNrPacketsPerTrySend > 0, "maxNrPacketsPerTrySend must be > 0"); // In RT mode InputProc threads may not block. Prefer not sent (i.e. dropped) packets. if (deadline != 0) { @@ -86,6 +87,14 @@ namespace LOFAR { return itsStream != NULL; } + void RSPRawSender::swap(RSPRawSender& other) /*noexcept*/ + { + std::swap(itsStream, other.itsStream); + std::swap(itsSentMsgSizes, other.itsSentMsgSizes); + std::swap(itsMaxNrBeamletsToSend, other.itsMaxNrBeamletsToSend); + std::swap(itsNrDroppedPackets, other.itsNrDroppedPackets); + } + unsigned RSPRawSender::getNrDroppedPackets() const { return itsNrDroppedPackets; @@ -102,54 +111,19 @@ namespace LOFAR { * If not in size, we may append bogus (or uninit data), but struct RSP is large enough. */ const uint8 packetNrBeamlets = packets[0].header.nrBeamlets; // save to restore - if (itsNrBeamletsToSend < packetNrBeamlets) { + if (itsMaxNrBeamletsToSend < packetNrBeamlets) { for (unsigned i = 0; i < nrPackets; i++) { - packets[i].header.nrBeamlets = itsNrBeamletsToSend; + packets[i].header.nrBeamlets = itsMaxNrBeamletsToSend; } } - size_t packetSize = packets[0].packetSize(); + unsigned packetSize = packets[0].packetSize(); SocketStream *sockStream = dynamic_cast<SocketStream *>(itsStream.get()); try { if (sockStream != NULL && sockStream->protocol == SocketStream::UDP) { - /* - * MSG_CONFIRM: Inform link-layer to just send the data without periodic ARP probing. - * We haven't seen any replies (network peer doesn't send any in this case, with its downsides), - * but we cannot afford stalls. - */ - unsigned nrSent = sockStream->sendmmsg(packets, packetSize, itsSentMsgSizes, MSG_CONFIRM); - if (nrSent < nrPackets) { // don't check itsSentMsgSizes: resending remainders won't help with UDP (message oriented) - itsNrDroppedPackets += nrPackets - nrSent; - LOG_WARN("RSPRawSender::trySend(): fewer sent to avoid blocking"); // not retried, not even in non-RT... - } + trySendUdp(sockStream, packets, packetSize, nrPackets); } else { // no SocketStream or SocketStream::TCP - // With TCP we must avoid partial RSP frame transfer. Try sending any remaining data of 1 RSP packet first. - trySendPending(); // may throw - - // Prepare writev(2) - vector<struct iovec> iov(nrPackets); - for (unsigned i = 0; i < nrPackets; i++) { - iov[i].iov_base = (char *)packets + i * packetSize; - iov[i].iov_len = packetSize; - } - - size_t bytesSent = itsStream->tryWritev(&iov[0], nrPackets); // may throw - if (bytesSent < nrPackets * packetSize) { - // Drop, except for unsent data of partially sent packet (if so). Stash that to retry later. - unsigned nrSent = bytesSent / packetSize; - unsigned partialPacketSent = bytesSent % packetSize; - if (partialPacketSent != 0) { - LOG_WARN("RSPRawSender::trySend(): partial packet sent: will retry remainder later"); - const char *data = (char *)iov[nrSent].iov_base + partialPacketSent; - size_t size = packetSize - partialPacketSent; - itsPendingData.resize(size); - std::memcpy(&itsPendingData[0], data, size); - nrSent += 1; // partially sent and stashed - } - - itsNrDroppedPackets += nrPackets - nrSent; - LOG_WARN("RSPRawSender::trySend(): fewer sent to avoid blocking"); // not retried, not even in non-RT... - } + trySendByteStream(packets, packetSize, nrPackets); } } catch (SystemCallException& exc) { itsNrDroppedPackets += nrPackets; @@ -164,13 +138,59 @@ namespace LOFAR { } } - if (itsNrBeamletsToSend < packetNrBeamlets) { + if (itsMaxNrBeamletsToSend < packetNrBeamlets) { for (unsigned i = 0; i < nrPackets; i++) { packets[i].header.nrBeamlets = packetNrBeamlets; // restore } } } + void RSPRawSender::trySendUdp(SocketStream *sockStream, struct RSP *packets, + unsigned packetSize, unsigned nrPackets) + { + /* + * MSG_CONFIRM: Inform link-layer to just send the data without periodic ARP probing. + * We haven't seen any replies (network peer doesn't send any in this case, with its downsides), + * but we cannot afford stalls. + */ + unsigned nrSent = sockStream->sendmmsg(packets, packetSize, itsSentMsgSizes, MSG_CONFIRM); + if (nrSent < nrPackets) { // don't check itsSentMsgSizes: resending remainders won't help with UDP (message oriented) + itsNrDroppedPackets += nrPackets - nrSent; + LOG_WARN("RSPRawSender::trySend(): fewer sent to avoid blocking"); // not retried, not even in non-RT... + } + } + + void RSPRawSender::trySendByteStream(struct RSP *packets, unsigned packetSize, unsigned nrPackets) + { + // With TCP we must avoid partial RSP frame transfer. Try sending any remaining data of 1 RSP packet first. + trySendPending(); // may throw + + // Prepare writev(2) + vector<struct iovec> iov(nrPackets); + for (unsigned i = 0; i < nrPackets; i++) { + iov[i].iov_base = (char *)packets + i * packetSize; + iov[i].iov_len = packetSize; + } + + size_t bytesSent = itsStream->tryWritev(&iov[0], nrPackets); // may throw + if (bytesSent < nrPackets * packetSize) { + // Drop, except for unsent data of partially sent packet (if so). Stash that to retry later. + unsigned nrSent = bytesSent / packetSize; + unsigned partialPacketSent = bytesSent % packetSize; + if (partialPacketSent != 0) { + LOG_WARN("RSPRawSender::trySend(): partial packet sent: will retry remainder later"); + const char *data = (char *)iov[nrSent].iov_base + partialPacketSent; + size_t size = packetSize - partialPacketSent; + itsPendingData.resize(size); + std::memcpy(&itsPendingData[0], data, size); + nrSent += 1; // partially sent and stashed + } + + itsNrDroppedPackets += nrPackets - nrSent; + LOG_WARN("RSPRawSender::trySend(): fewer sent to avoid blocking"); // not retried, not even in non-RT... + } + } + void RSPRawSender::trySendPending() { if (!itsPendingData.empty()) { diff --git a/RTCP/Cobalt/CoInterface/src/RSPRawTransfer.h b/RTCP/Cobalt/CoInterface/src/RSPRawTransfer.h index 34c117cefbdacf75dc7b5fa6bb7c1f77826511b9..7b25fedafd1e95f31d7c5ca3bb134bc4053939a9 100644 --- a/RTCP/Cobalt/CoInterface/src/RSPRawTransfer.h +++ b/RTCP/Cobalt/CoInterface/src/RSPRawTransfer.h @@ -40,13 +40,15 @@ public: RSPRawSender(); // deadline is an absolute timestamp or 0 for no connection timeout (blocking). - RSPRawSender(unsigned maxNrPacketsPerSend, unsigned nrBeamletsToSend, + RSPRawSender(unsigned maxNrPacketsPerTrySend, unsigned maxNrBeamletsToSend, const std::string& streamDesc, time_t deadline = 0); ~RSPRawSender(); bool initialized() const; + void swap(RSPRawSender& other) /*noexcept*/; + // Ensure nrPackets <= maxNrPacketsPerSend (passed upon object construction). // If deadline (passed upon object construction) was non-zero, trySend() may drop packets to avoid blocking. void trySend(struct RSP *packets, unsigned nrPackets); @@ -54,13 +56,16 @@ public: unsigned getNrDroppedPackets() const; private: + void trySendUdp(SocketStream *sockStream, struct RSP *packets, + unsigned packetSize, unsigned nrPackets); + void trySendByteStream(struct RSP *packets, unsigned packetSize, unsigned nrPackets); void trySendPending(); SmartPtr<Stream> itsStream; std::vector<unsigned> itsSentMsgSizes; - unsigned itsNrBeamletsToSend; + unsigned itsMaxNrBeamletsToSend; unsigned itsNrDroppedPackets; - std::vector<unsigned char> itsPendingData; // w/ TCP to retry sending the remainder of a partially sent RSP packet + std::vector<unsigned char> itsPendingData; // w/ non-UDP to retry sending the remainder of a partially sent RSP packet }; class RSPRawReceiver { diff --git a/RTCP/Cobalt/CoInterface/src/SmartPtr.h b/RTCP/Cobalt/CoInterface/src/SmartPtr.h index 06dc8484e9558be8a490c111d07fa5c5e1d673a1..d88f8f93e1ef4244c412cee985dc45851971d126 100644 --- a/RTCP/Cobalt/CoInterface/src/SmartPtr.h +++ b/RTCP/Cobalt/CoInterface/src/SmartPtr.h @@ -1,5 +1,6 @@ -//# SmartPtr.h -//# Copyright (C) 2011-2013 ASTRON (Netherlands Institute for Radio Astronomy) +//# SmartPtr.h: unsafe pre-C++11 version of the C++11 unique_ptr +//# Copyright (C) 2011-2013, 2017 +//# ASTRON (Netherlands Institute for Radio Astronomy) //# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands //# //# This file is part of the LOFAR software suite. @@ -24,6 +25,7 @@ //# Never #include <config.h> or #include <lofar_config.h> in a header file! #include <cstdlib> +#include <utility> // std::swap() namespace LOFAR { @@ -56,6 +58,8 @@ namespace LOFAR T *get(); T *release(); + void swap(SmartPtr<T,D>& other) /*noexcept*/; + private: T *ptr; }; @@ -187,6 +191,11 @@ namespace LOFAR } + template <typename T, class D> + inline void SmartPtr<T,D>::swap(SmartPtr<T,D>& other) /*noexcept*/ + { + std::swap(ptr, other.ptr); + } } // namespace Cobalt } // namespace LOFAR diff --git a/RTCP/Cobalt/CoInterface/src/StreamableData.h b/RTCP/Cobalt/CoInterface/src/StreamableData.h index 0237ecd0b08771201aa44ae82a0786c06f2fc5f9..567b7d0621ea7d5f4f450e2a34e25c462a04918a 100644 --- a/RTCP/Cobalt/CoInterface/src/StreamableData.h +++ b/RTCP/Cobalt/CoInterface/src/StreamableData.h @@ -173,6 +173,9 @@ namespace LOFAR template <typename T, unsigned DIM, unsigned FLAGS_DIM> inline SampleData<T,DIM,FLAGS_DIM>::SampleData(const ExtentList &extents, const FlagsExtentList &flagsExtents, Allocator &allocator) : + // This clarifies seq nr handling for beamforming, however, beamformed output is *not* + // written via StreamableData->write(), but in a custom way in MSWriterDAL::write(). + StreamableData(true, false), samples(extents, alignment, allocator), flags(flagsExtents) // e.g., for FilteredData [nrChannels][nrStations], sparse dimension [nrSamplesPerIntegration] { diff --git a/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/cluster.parset b/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/cluster.parset index 62cdd6be7fcd742e58ad1e13857912178d29fbc4..b8cc94121c6ab02278b4be5507cf2f895d5dbd44 100644 --- a/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/cluster.parset +++ b/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/cluster.parset @@ -19,10 +19,10 @@ Cobalt.FinalMetaDataGatherer.sshPrivateKey = Cobalt.FinalMetaDataGatherer.executable = source /data/home/lofarsys/production/lofar_cobalt/lofarinit.sh; FinalMetaDataGatherer # The database hostname to connect to: -# Production: sasdb -# Development: sasdbtest (do not use here; prefer hostname redirection) -# If empty (or missing), the default is used: sasdb -Cobalt.FinalMetaDataGatherer.database.host = sasdb +# Production: sasdb.control.lofar +# Development: sasdbtest.control.lofar (do not use here; prefer hostname redirection) +# If empty (or missing), the default is used: sasdb.control.lofar +Cobalt.FinalMetaDataGatherer.database.host = sasdb.control.lofar # The database port number to connect to. # If empty (or missing), the default used: 5432 diff --git a/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/correlated.parset b/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/correlated.parset index cc1ca287b68f022b6efc8babdbdf08db632a7e3f..096dee07523092196eafae26ac3fadf766085bc4 100644 --- a/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/correlated.parset +++ b/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/correlated.parset @@ -19,10 +19,10 @@ Cobalt.FinalMetaDataGatherer.sshPrivateKey = Cobalt.FinalMetaDataGatherer.executable = source /data/home/lofarsys/production/lofar_cobalt/lofarinit.sh; FinalMetaDataGatherer # The database hostname to connect to: -# Production: sasdb -# Development: sasdbtest (do not use here; prefer hostname redirection) -# If empty (or missing), the default is used: sasdb -Cobalt.FinalMetaDataGatherer.database.host = sasdb +# Production: sasdb.control.lofar +# Development: sasdbtest.control.lofar (do not use here; prefer hostname redirection) +# If empty (or missing), the default is used: sasdb.control.lofar +Cobalt.FinalMetaDataGatherer.database.host = sasdb.control.lofar # The database port number to connect to. # If empty (or missing), the default used: 5432 diff --git a/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/cs+is.parset b/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/cs+is.parset index 280855e0ba2fab034525118ea9771e7059d95690..4b84d95498d3dec9b9c561a0bf6addd3653c8c61 100644 --- a/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/cs+is.parset +++ b/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/cs+is.parset @@ -19,10 +19,10 @@ Cobalt.FinalMetaDataGatherer.sshPrivateKey = Cobalt.FinalMetaDataGatherer.executable = source /data/home/lofarsys/production/lofar_cobalt/lofarinit.sh; FinalMetaDataGatherer # The database hostname to connect to: -# Production: sasdb -# Development: sasdbtest (do not use here; prefer hostname redirection) -# If empty (or missing), the default is used: sasdb -Cobalt.FinalMetaDataGatherer.database.host = sasdb +# Production: sasdb.control.lofar +# Development: sasdbtest.control.lofar (do not use here; prefer hostname redirection) +# If empty (or missing), the default is used: sasdb.control.lofar +Cobalt.FinalMetaDataGatherer.database.host = sasdb.control.lofar # The database port number to connect to. # If empty (or missing), the default used: 5432 diff --git a/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/flyseye.parset b/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/flyseye.parset index 9e988174db36ae4e17e23927357ebfbfc8e3eac8..648f88c18e51112d9085932e6f9aa79c90d6aea3 100644 --- a/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/flyseye.parset +++ b/RTCP/Cobalt/CoInterface/test/tLTAFeedback.in_parsets/flyseye.parset @@ -19,10 +19,10 @@ Cobalt.FinalMetaDataGatherer.sshPrivateKey = Cobalt.FinalMetaDataGatherer.executable = source /data/home/lofarsys/production/lofar_cobalt/lofarinit.sh; FinalMetaDataGatherer # The database hostname to connect to: -# Production: sasdb -# Development: sasdbtest (do not use here; prefer hostname redirection) -# If empty (or missing), the default is used: sasdb -Cobalt.FinalMetaDataGatherer.database.host = sasdb +# Production: sasdb.control.lofar +# Development: sasdbtest.control.lofar (do not use here; prefer hostname redirection) +# If empty (or missing), the default is used: sasdb.control.lofar +Cobalt.FinalMetaDataGatherer.database.host = sasdb.control.lofar # The database port number to connect to. # If empty (or missing), the default used: 5432 diff --git a/RTCP/Cobalt/CoInterface/test/tParset.parset_obs228591 b/RTCP/Cobalt/CoInterface/test/tParset.parset_obs228591 index ee24fd500ee7520af1861b71735c53b2c56f5330..61a5948e52ee2a506149348afa11a4f5d63a962a 100644 --- a/RTCP/Cobalt/CoInterface/test/tParset.parset_obs228591 +++ b/RTCP/Cobalt/CoInterface/test/tParset.parset_obs228591 @@ -19,10 +19,10 @@ Cobalt.FinalMetaDataGatherer.sshPrivateKey = Cobalt.FinalMetaDataGatherer.executable = source /data/home/lofarsys/production/lofar_cobalt/lofarinit.sh; FinalMetaDataGatherer # The database hostname to connect to: -# Production: sasdb -# Development: sasdbtest (do not use here; prefer hostname redirection) -# If empty (or missing), the default is used: sasdb -Cobalt.FinalMetaDataGatherer.database.host = sasdb +# Production: sasdb.control.lofar +# Development: sasdbtest.control.lofar (do not use here; prefer hostname redirection) +# If empty (or missing), the default is used: sasdb.control.lofar +Cobalt.FinalMetaDataGatherer.database.host = sasdb.control.lofar # The database port number to connect to. # If empty (or missing), the default used: 5432 diff --git a/RTCP/Cobalt/CoInterface/test/tParsetDefault.cc b/RTCP/Cobalt/CoInterface/test/tParsetDefault.cc index 7f08111eacff2defab610f00620a686a840b2289..fc0360d9c59be1449bf4874a3c356052aa9eac73 100644 --- a/RTCP/Cobalt/CoInterface/test/tParsetDefault.cc +++ b/RTCP/Cobalt/CoInterface/test/tParsetDefault.cc @@ -26,8 +26,7 @@ #include <lofar_config.h> -#include <string> -#include <CoInterface/Parset.h> +#include "tParsetDefault.h" using std::string; using LOFAR::Cobalt::Parset; diff --git a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/FinalMetaDataGatherer.parset.in b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/FinalMetaDataGatherer.parset.in index 59542dda9c6ff4242a92c5ea167ad08f479f08e8..da4b3fd7ee1159649304c4e1d7f81f7089c9b8e1 100644 --- a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/FinalMetaDataGatherer.parset.in +++ b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/default/FinalMetaDataGatherer.parset.in @@ -5,10 +5,10 @@ Cobalt.FinalMetaDataGatherer.enabled = true # The database hostname to connect to: -# Production: sasdb -# Development: sasdbtest (do not use here; prefer hostname redirection) -# If empty (or missing), the default is used: sasdb -Cobalt.FinalMetaDataGatherer.database.host = sasdb +# Production: sasdb.control.lofar +# Development: sasdbtest.control.lofar (do not use here; prefer hostname redirection) +# If empty (or missing), the default is used: sasdb.control.lofar +Cobalt.FinalMetaDataGatherer.database.host = sasdb.control.lofar # The database port number to connect to. # If empty (or missing), the default used: 5432 diff --git a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/rspraw-enable.parset.OBSID b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/rspraw-enable.parset.OBSID index 70b247ec7377039d826925eb1068cd480fabf270..7a8d9f646b1f21eaec35560a3b937c00951a2aa3 100644 --- a/RTCP/Cobalt/GPUProc/etc/parset-additions.d/rspraw-enable.parset.OBSID +++ b/RTCP/Cobalt/GPUProc/etc/parset-additions.d/rspraw-enable.parset.OBSID @@ -1,14 +1,14 @@ # rspraw-enable.parset.OBSID: override COBALT settings to enable RSP raw antenna field output # +# $Id$ + # Basic Usage: After preparing an observation with desired station and freq settings, but before the MACScheduler starts it up, -# run as lofarsys on the COBALT head node: +# run as lofarbuild on the COBALT head node: # cd $LOFARROOT/etc/parset-additions.d/override && cp ../rspraw-enable.parset.OBSID rspraw-enable.parset.123456 # where 123456 is the observation ID. -# + # NOTE: This is a COBALT-only expert mode setting adjustment. While you really have to outstrip yourself to # screw up the system beyond this observation, little customization effort is needed to screw up this observation. -# -# $Id$ # By default, disable correlated and beamformed data. # You can comment out or enable these to *also* get correlated and/or beamformed output; @@ -36,11 +36,11 @@ Observation.DataProducts.Output_RSPRaw.enabled=true # Override beamlets: Observation uses beam and frequency settings as specified, # but RSP raw data is only written for the first N beamlets (whatever beams/subbands that corresponds to...). -# The value is a list: one value per sending RSP board (stream) for all stations(!), thus typically a list of (at least) length 4. +# The value is a list: one value per sending RSP board (stream) applied to all antenna fields, thus typically a list of length 4. # Each value must be within the valid range for the bit mode: # 16 bit: [ 61, 61, 61, 61] (thus max 244) # 8 bit: [122, 122, 122, 122] (thus max 488) -# 4 bit: [244, 244, 244, 244] (but the stations can handle a max total of 966 (not 976); unclear how it's spread over the RSP boards) +# 4 bit: [244, 244, 244, 234] (thus max 966 (not 976: RSP firmware resource limitation)) # Default: all observation beamlets #Cobalt.RSPRaw.nrBeamletsPerBoardList=[122, 122, 122, 122] diff --git a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc index 438b1254048fd84731c28d1c96c3afa9ee6ab2f6..d54a85108d8e343bbf7408da2f8dd9c8e58a2ba1 100644 --- a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc +++ b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc @@ -151,7 +151,7 @@ namespace LOFAR { ASSERT(!mpiData); // Swap delay sets to accomplish delaysAtBegin = delaysAfterEnd - swap(delaysAtBegin, delaysAfterEnd); + std::swap(delaysAtBegin, delaysAfterEnd); } // Signal EOD @@ -191,31 +191,7 @@ namespace LOFAR { if (ps.settings.rspRaw.enabled && std::find(ps.settings.rspRaw.antennaFieldNames.begin(), ps.settings.rspRaw.antennaFieldNames.end(), stationID.name()) != ps.settings.rspRaw.antennaFieldNames.end()) { - unsigned maxNrPacketsToSend = NONRT_PACKET_BATCH_SIZE; - time_t connTimeout = 0; - if (ps.settings.realTime) { - maxNrPacketsToSend = RT_PACKET_BATCH_SIZE; - connTimeout = std::time(NULL) + 3; // a few secs from now - } - - rspRawSenders.resize(nrBoards); - #pragma omp parallel for num_threads(nrBoards) // easy way to apply a single connTimeout to all - for (unsigned i = 0; i < nrBoards; ++i) { - unsigned fileIdx = ps.getRSPRawOutputStreamIdx(stationID.name(), i); -// TODO: produce UDP stream descriptor for piggy backing instead of TCP for RSP raw storage - string desc = getStreamDescriptorBetweenIONandStorage(ps, RSP_RAW_DATA, fileIdx, - hostID < ps.settings.nodes.size() ? ps.settings.nodes[hostID].out_nic : ""); - LOG_INFO_STR(logPrefix << "Opening RSP raw output stream for data from antenna field " << - stationID.name() << " board " << i << " with stream descriptor " << desc); - - try { - new (&rspRawSenders[i]) RSPRawSender(maxNrPacketsToSend, ps.settings.rspRaw.nrBeamletsPerBoardList.at(i), - desc, connTimeout); // pre-C++11 has no emplace_back(), so use placement new - } catch (Exception &exc) { // TimeOutException, SystemCallException - LOG_ERROR_STR(logPrefix << "RSPRawSender creation failure for stream " << desc << ": " << exc); - new (&rspRawSenders[i]) RSPRawSender; - } - } + initRspRaw(hostID); } } @@ -301,6 +277,40 @@ namespace LOFAR { } + void StationInput::initRspRaw(unsigned hostID) + { + unsigned maxNrPacketsToSend = NONRT_PACKET_BATCH_SIZE; + time_t connTimeout = 0; + if (ps.settings.realTime) { + maxNrPacketsToSend = RT_PACKET_BATCH_SIZE; + connTimeout = std::time(NULL) + 3; // a few secs from now + } + + rspRawSenders.resize(nrBoards); + #pragma omp parallel for num_threads(nrBoards) // easy way to apply a single connTimeout to all + for (unsigned i = 0; i < nrBoards; ++i) { + if (ps.settings.rspRaw.nrBeamletsPerBoardList.at(i) == 0) { + continue; + } + + unsigned fileIdx = ps.getRSPRawOutputStreamIdx(stationID.name(), i); +// TODO: produce UDP stream descriptor for piggy backing instead of TCP for RSP raw storage + string desc = getStreamDescriptorBetweenIONandStorage(ps, RSP_RAW_DATA, fileIdx, + hostID < ps.settings.nodes.size() ? ps.settings.nodes[hostID].out_nic : ""); + LOG_INFO_STR(logPrefix << "Opening RSP raw output stream for data from antenna field " << + stationID.name() << " board " << i << " with stream descriptor " << desc); + + try { + RSPRawSender rspRawSender(maxNrPacketsToSend, ps.settings.rspRaw.nrBeamletsPerBoardList[i], + desc, connTimeout); + rspRawSenders[i].swap(rspRawSender); // swap into place (no need for vector of smart ptrs pre-C++11) + } catch (Exception &exc) { // TimeOutException, SystemCallException + LOG_ERROR_STR(logPrefix << "RSPRawSender creation failure for stream " << desc << ": " << exc); + } + } + } + + void StationInput::readRSPRealTime( unsigned board, MACIO::RTmetadata &mdLogger, const string &mdKeyPrefix ) { diff --git a/RTCP/Cobalt/GPUProc/src/Station/StationInput.h b/RTCP/Cobalt/GPUProc/src/Station/StationInput.h index 45a2b88ef09738bcd3ebf047f92517cea508f700..511f355e704d050dfba539c83b80d3dfef82d221 100644 --- a/RTCP/Cobalt/GPUProc/src/Station/StationInput.h +++ b/RTCP/Cobalt/GPUProc/src/Station/StationInput.h @@ -130,6 +130,8 @@ namespace LOFAR { SmartPtr<Stream> inputStream(unsigned board) const; + void initRspRaw(unsigned hostID); + /* * Reads data from all the station input streams, and puts their packets in rspDataPool. * diff --git a/RTCP/Cobalt/GPUProc/src/scripts/cobaltswitch b/RTCP/Cobalt/GPUProc/src/scripts/cobaltswitch index c2f5fdc8c2dd4a013aaddc6db32df8b50efbbb42..fef07c95efb305ac6ec17c3adbd7c794c6048cfd 100755 --- a/RTCP/Cobalt/GPUProc/src/scripts/cobaltswitch +++ b/RTCP/Cobalt/GPUProc/src/scripts/cobaltswitch @@ -59,9 +59,9 @@ if [ "$NEW_MODE" != "" ]; then echo "# on `date +"%F %T"`" echo "" echo "# Route all meta data to the test systems" - echo "Cobalt.PVSSGateway.host = ccu099" - echo "Cobalt.Feedback.host = ccu099" - echo "Cobalt.FinalMetaDataGatherer.database.host = sasdbtest" + echo "Cobalt.PVSSGateway.host = ccu099.control.lofar" + echo "Cobalt.Feedback.host = ccu099.control.lofar" + echo "Cobalt.FinalMetaDataGatherer.database.host = sasdbtest.control.lofar" echo "" echo "# Use only cbt009 to observe" echo "Cobalt.Nodes = [cbt009_0, cbt009_1]" diff --git a/RTCP/Cobalt/OutputProc/src/MeasurementSetFormat.cc b/RTCP/Cobalt/OutputProc/src/MeasurementSetFormat.cc index 6439e976674fbfe93239a2be55214fd2c687f159..307df94e145e3b12541b31def242f357c8671969 100644 --- a/RTCP/Cobalt/OutputProc/src/MeasurementSetFormat.cc +++ b/RTCP/Cobalt/OutputProc/src/MeasurementSetFormat.cc @@ -698,5 +698,5 @@ namespace LOFAR } // namespace Cobalt -} // namepsace LOFAR +} // namespace LOFAR diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.cc b/RTCP/Cobalt/OutputProc/src/OutputThread.cc index 0b96d5a7e29eec6c185ffaf02d0b9355f4586e5c..46be2e737ac8bf661a51e574cdf3cba39541d3c5 100644 --- a/RTCP/Cobalt/OutputProc/src/OutputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/OutputThread.cc @@ -25,6 +25,8 @@ #include "OutputThread.h" #include <unistd.h> +#include <set> +#include <sstream> #include <iomanip> #include <boost/lexical_cast.hpp> #include <boost/format.hpp> @@ -32,9 +34,9 @@ #include <Common/SystemCallException.h> #include <Common/Thread/Mutex.h> #include <Common/Thread/Cancellation.h> +#include <Common/StreamUtil.h> // LOFAR::print() #include <ApplCommon/PVSSDatapointDefs.h> -#include <CoInterface/Parset.h> #include <CoInterface/OutputTypes.h> #include <CoInterface/Exceptions.h> #include <CoInterface/LTAFeedback.h> @@ -408,17 +410,14 @@ namespace LOFAR itsTargetDirectory == "" ? itsParset.getDirectoryName(RSP_RAW_DATA, itsStreamNr) : itsTargetDirectory; - const std::string fileName = itsParset.getFileName(CORRELATED_DATA, itsStreamNr); + const std::string fileName = itsParset.getFileName(RSP_RAW_DATA, itsStreamNr); const std::string path = directoryName + "/" + fileName; LOG_INFO_STR(itsLogPrefix << "Writing RSP raw data to " << path); - // Write parset as observation metadata. We end up with many duplicates, but at least we know all storage nodes used have it. - // Also patch antenna field stream locations (originals in StationStreams.parset) for easy offline reprocessing. - Parset rspRawParset = itsParset; -//TODO: patch locations, like -//PIC.Core.CS006HBA.RSP.ports = [udp:cbt004-10GB01:10060, udp:cbt004-10GB01:10061, udp:cbt004-10GB01:10062, udp:cbt004-10GB01:10063] -//PIC.Core.CS006HBA.RSP.receiver = cbt004_0 + // Write parset as observation metadata. We end up with many duplicate files, + // but at least we get the parset, even if storage node(s) fail. + Parset rspRawParset = makeRspRawParset(); if (itsParset.settings.realTime) { try { @@ -439,9 +438,77 @@ namespace LOFAR //logInitialStreamMetadataEvents("RSPRaw", fileName, directoryName); } -//TODO: see if we can get the data dropping stats in a useful state for RSP raw - itsNrExpectedBlocks = itsParset.settings.nrBlocks() * itsParset.settings.blockSize; + itsNrExpectedBlocks = itsParset.settings.nrRspRawBlocks() * itsParset.settings.blockSize; + } + + Parset RSPRawOutputThread::makeRspRawParset() + { + LOG_INFO("makeRspRawParset() begin"); + + Parset rspRawParset(itsParset); + + // Patch several parset key values for easy setup of (single node) offline reprocessing. + rspRawParset.replace("Observation.startTime", + LOFAR::timeString(rspRawParset.settings.rspRaw.startTime, true, "%F %T")); + rspRawParset.replace("Observation.stopTime", + LOFAR::timeString(rspRawParset.settings.rspRaw.stopTime, true, "%F %T")); + rspRawParset.replace("Cobalt.realTime", "false"); + rspRawParset.replace("Observation.DataProducts.Output_RSPRaw.enabled", "false"); + + const unsigned nrBoards = rspRawParset.settings.rspRaw.nrBeamletsPerBoardList.size(); + set<string> stationNameSet; + for (unsigned af = 0; af < rspRawParset.settings.rspRaw.antennaFieldNames.size(); ++af) + { + const string antFieldName = rspRawParset.settings.rspRaw.antennaFieldNames[af].fullName(); + + string rspPortsValue(1, '['); + string dataslotListValue(1, '['); + string rspBoardListValue(1, '['); + for (unsigned b = 0; b < nrBoards; ++b) + { + if (b > 0) { + rspPortsValue += ", "; + dataslotListValue += ", "; + rspBoardListValue += ", "; + } + + rspPortsValue += "file:" + rspRawParset.getFileName(RSP_RAW_DATA, af * nrBoards + b); + + unsigned nrBeamlets = rspRawParset.settings.rspRaw.nrBeamletsPerBoardList[b]; + if (nrBeamlets == 0) { + // It is valid to completely filter a stream, but unless it's the last stream(s) (not checked), + // we cannot generate a valid dataslot list w/ nrBeamlets-1. Users have to check output parset anyway. + LOG_WARN_STR("makeRspRawParset(): empty nr beamlets per board found for antenna field " << + antFieldName << ": setting nr beamlets to 1 to generate valid RSP *output* parset"); + nrBeamlets = 1; + } + dataslotListValue += str(boost::format("0..%u") % (nrBeamlets - 1)); + rspBoardListValue += str(boost::format("%u*%u") % nrBeamlets % b); + + stationNameSet.insert(rspRawParset.settings.rspRaw.antennaFieldNames[af].station); + } + + rspPortsValue.push_back(']'); + rspRawParset.replace("PIC.Core." + antFieldName + ".RSP.ports", rspPortsValue); + rspRawParset.replace("PIC.Core." + antFieldName + ".RSP.receiver", "localhost"); + + dataslotListValue.push_back(']'); + rspRawParset.replace("Observation.Dataslots." + antFieldName + ".DataslotList", dataslotListValue); + rspBoardListValue.push_back(']'); + rspRawParset.replace("Observation.Dataslots." + antFieldName + ".RSPBoardList", rspBoardListValue); + } + + ostringstream stationListStr; + LOFAR::print(stationListStr, stationNameSet.begin(), stationNameSet.end(), ",", "[", "]"); + rspRawParset.replace("Observation.VirtualInstrument.stationList", stationListStr.str()); + + rspRawParset.updateSettings(); // not needed and may WARN, but does some checks and to return valid obj + + LOG_INFO("makeRspRawParset() end"); + + return rspRawParset; } + } // namespace Cobalt } // namespace LOFAR diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.h b/RTCP/Cobalt/OutputProc/src/OutputThread.h index 93e8395960b87dc2b67c27bf514305ccaf15fbbf..f44f308e82709461ef224f626a36c03a517efaae 100644 --- a/RTCP/Cobalt/OutputProc/src/OutputThread.h +++ b/RTCP/Cobalt/OutputProc/src/OutputThread.h @@ -34,6 +34,7 @@ #include <CoInterface/TABTranspose.h> #include <CoInterface/FinalMetaData.h> #include <CoInterface/Pool.h> +#include <CoInterface/Parset.h> #include "MSWriter.h" @@ -155,6 +156,9 @@ namespace LOFAR const std::string &targetDirectory = ""); virtual void createMS(); + + private: + Parset makeRspRawParset(); }; } // namespace Cobalt diff --git a/RTCP/Cobalt/OutputProc/src/createHeaders.cc b/RTCP/Cobalt/OutputProc/src/createHeaders.cc index 67e677b14df516e5549f74777ab4d80c8d0654aa..4659eacc066cfec22fef5919d9f4c9f08d96e10a 100644 --- a/RTCP/Cobalt/OutputProc/src/createHeaders.cc +++ b/RTCP/Cobalt/OutputProc/src/createHeaders.cc @@ -1,5 +1,6 @@ -//# createHeaders.cc: Generates all .h5/.MS files given a (OLAP) parset -//# Copyright (C) 2012-2013 ASTRON (Netherlands Institute for Radio Astronomy) +//# createHeaders.cc: Generates all .h5/.MS/_rsp.raw files given a COBALT parset +//# Copyright (C) 2012-2013, 2017 +//# ASTRON (Netherlands Institute for Radio Astronomy) //# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands //# //# This file is part of the LOFAR software suite. @@ -83,6 +84,19 @@ int main(int argc, char *argv[]) } } + // Process RSP raw data + if (parset.settings.rspRaw.enabled) { + for (size_t fileIdx = 0; fileIdx < parset.settings.rspRaw.files.size(); ++fileIdx) + { + string logPrefix = str(format("[RSPraw stream %3u] ") % fileIdx); + + Pool<StreamableData> outputPool(logPrefix, true); + + RSPRawOutputThread writer(parset, fileIdx, outputPool, rtmd, "rtmd key prefix", logPrefix, "."); + writer.createMS(); + } + } + return 0; } diff --git a/SubSystems/Online_Cobalt/test/CMakeLists.txt b/SubSystems/Online_Cobalt/test/CMakeLists.txt index 46d04d507ade38d8548070e5b0b51867f0e467ac..96f6569ebcfd759fba36b220f287e9d01042996f 100644 --- a/SubSystems/Online_Cobalt/test/CMakeLists.txt +++ b/SubSystems/Online_Cobalt/test/CMakeLists.txt @@ -30,3 +30,4 @@ endif(BUILD_TESTING) add_subdirectory(Correlator) add_subdirectory(Beamformer) add_subdirectory(Commensal) +#add_subdirectory(RSPRaw)