From 55d7b06986b3cc0e8628e56e6edaafb195d58bb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20J=C3=BCrges?= <jurges@astron.nl> Date: Wed, 30 Jan 2019 17:49:56 +0000 Subject: [PATCH] SW-546: Book keeping is done on a sub-band base in a std::map Also refactored some functions out of the TBB_Dipole clase since they are not essential fot the object itself. --- RTCP/Cobalt/OutputProc/src/TBB_Dipole.cc | 494 ++++++++++------------- RTCP/Cobalt/OutputProc/src/TBB_Dipole.h | 58 ++- 2 files changed, 269 insertions(+), 283 deletions(-) diff --git a/RTCP/Cobalt/OutputProc/src/TBB_Dipole.cc b/RTCP/Cobalt/OutputProc/src/TBB_Dipole.cc index ea381619d6d..b1aefe0da8e 100644 --- a/RTCP/Cobalt/OutputProc/src/TBB_Dipole.cc +++ b/RTCP/Cobalt/OutputProc/src/TBB_Dipole.cc @@ -75,21 +75,84 @@ namespace LOFAR } + + static std::string getRawFilename(const std::string& h5Filename, + uint32_t rspID, uint32_t rcuID) + { + std:: string rawFilename{h5Filename}; + std::string rspRcuSbStr{boost::str( + boost::format("_RSP%03u_RCU%03u") % rspID % rcuID)}; + + const std::size_t pos{rawFilename.find('_', rawFilename.find('_') + 1)}; + + // insert _RSPxxx_RCUxxx after station name (2nd '_') + rawFilename.insert(pos, rspRcuSbStr); + rawFilename.resize(rawFilename.size() - (sizeof(".h5") - 1)); + rawFilename.append(".raw"); + + return rawFilename; + } + + uint32_t getTransientModeSampleNr(const TBB_Header& header) + { + /* + * In transient mode, at 200 MHz we get DEFAULT_TBB_TRANSIENT_NSAMPLES (1024) samples per frame: 195312.5 frames/s. + * This means that every 2 seconds, a frame overlaps a seconds boundary. But the sample values generated + * by the RSPs start at zero for each second, even if it should start at 512 for odd timestamps at 200 MHz. + * At 160 MHz sample rate, an integer number of frames fits in a second (156250), so no correction is needed. + */ + uint32_t sampleNr = header.sampleNr; + if (header.sampleFreq == 200 && (header.time & 1)) { + sampleNr += DEFAULT_TBB_TRANSIENT_NSAMPLES / 2; + } + return sampleNr; + } + + uint32_t getSpectralModeBandNr(const TBB_Header& header) + { + return (header.bandSliceNr & TBB_BAND_NR_MASK); + } + + uint32_t getSpectralModeSliceNr(const TBB_Header& header) + { + return (header.bandSliceNr >> TBB_SLICE_NR_SHIFT); + } + + bool hasAllZeroDataSamples(const TBB_Payload& payload, + std::size_t numberOfSamples) + { + /** + * Good (noisy) data may have a few consecutive zero values, so this loop + * terminates quickly, unless the antenna is broken or disabled, which + * happens sometimes. + * Another reason for all zeros is that a wave generator is used and set + * to zero amp (pointless). + * + * Unfortunately, the crc32 variant used does not reject all zeros because + * the checksum would be 0. + */ + for(std::size_t index{0U}; index < numberOfSamples; ++index) + { + if(payload.data[index] != 0) + { + return false; + } + } + + return true; + } + + TBB_Dipole::TBB_Dipole() : itsDALDipole(), - itsLastLogErrorTime(0), - subbandOffset{0}, - subbandSizeInBytes{0} + itsLastLogErrorTime(0) { } // Do not use. Only needed for vector<TBB_Dipole>(N). TBB_Dipole::TBB_Dipole(const TBB_Dipole& rhs) : itsDALDipole(), - itsLastLogErrorTime(rhs.itsLastLogErrorTime), - subbandOffset{rhs.subbandOffset}, - subbandSizeInBytes{rhs.subbandSizeInBytes}, - currentSubbandSizeInBytes{rhs.currentSubbandSizeInBytes} + itsLastLogErrorTime(rhs.itsLastLogErrorTime) { } @@ -134,25 +197,29 @@ namespace LOFAR } } + // Add a new flag range at the end or extend the last stored flag range. 'len' may not be 0. + void TBB_Dipole::appendFlags(DumpInfo& di, uint64_t offset, + std::size_t len) + { + if((di.itsFlagOffsets.empty() == true) + || (offset > di.itsFlagOffsets.back().end)) + { + di.itsFlagOffsets.push_back(dal::Range(offset, offset + len)); + } + else + { + // extend + di.itsFlagOffsets.back().end += len; + } + } + void TBB_Dipole::init(const TBB_Header& header, const Parset& parset, const StationMetaData& stationMetaData, - const vector< double >& allSubbandCentralFreqs, - const string& h5Filename, const std::size_t _subbandSize, + const std::vector< double >& allSubbandCentralFreqs, + const std::string& h5Filename, const std::size_t subBandSize, dal::TBB_Station& station, Mutex& h5Mutex) { itsH5Filename = h5Filename; - subbandSizeInSamples = _subbandSize; - /** - * We get a std::size_t but need in the checks later an int64_t. - * Obviously the std::size_t is the proper type for a size. - * - * Every value in the sub-band time series is a complex value that - * consists of two 16 bit numbers. Hence to get the size in bytes - * multiply by 4. - */ - subbandSizeInBytes = static_cast< int64_t >(_subbandSize) * - sizeof(std::complex< int16_t >); - itsAllSubbandCentralFreqs = allSubbandCentralFreqs; itsDumpInfo.itsDatasetLen = 0; @@ -164,21 +231,15 @@ namespace LOFAR else { // spectral mode - /** - * Set itsDumpInfo.itsTime0 to 0 so that it will later be lazily - * initialized (cannot init for all subbands with 1st packet). - */ - itsDumpInfo.itsTime0 = 0; - // itsDumpInfo.itsSliceNr0 will be later laziliy initialized. - subbandsToBeStored = parset.getUint32Vector( - "Observation.TBB.TBBsetting.subbandList", true); - remainingSubbandsToBeStored = subbandsToBeStored; - - currentSubbandSizeInBytes.resize(subbandsToBeStored.size(), 0U); + const std::vector< uint32_t > subBandsToBeStored{ + parset.getUint32Vector( + "Observation.TBB.TBBsetting.subbandList", true)}; std::ostringstream bands; - for(const auto& subBand: subbandsToBeStored) + for(const auto subBand: subBandsToBeStored) { + subBandBookKeeping.insert(std::make_pair(subBand, + SubBandBookKeeping(subBand, subBandSize))); bands << (boost::format("%03u, ") % subBand); } LOG_INFO_STR("TBB: Storing the following sub-bands: " @@ -205,47 +266,6 @@ namespace LOFAR return itsAllSubbandCentralFreqs.empty(); } - string TBB_Dipole::getRawFilename(const string& h5Filename, unsigned rspID, unsigned rcuID) - { - string rawFilename = h5Filename; - string rspRcuSbStr = str(boost::format("_RSP%03u_RCU%03u") % rspID % rcuID); - size_t pos = rawFilename.find('_', rawFilename.find('_') + 1); - rawFilename.insert(pos, rspRcuSbStr); // insert _RSPxxx_RCUxxx after station name (2nd '_') - rawFilename.resize(rawFilename.size() - (sizeof(".h5") - 1)); - rawFilename.append(".raw"); - return rawFilename; - } - - // Add a new flag range at the end or extend the last stored flag range. 'len' may not be 0. - void TBB_Dipole::appendFlags(DumpInfo& di, uint64_t offset, size_t len) - { - if(di.itsFlagOffsets.empty() - || (offset > di.itsFlagOffsets.back().end)) - { - di.itsFlagOffsets.push_back(dal::Range(offset, offset + len)); - } - else - { - // extend - di.itsFlagOffsets.back().end += len; - } - } - - uint32_t TBB_Dipole::getTransientModeSampleNr(const TBB_Header& header) const - { - /* - * In transient mode, at 200 MHz we get DEFAULT_TBB_TRANSIENT_NSAMPLES (1024) samples per frame: 195312.5 frames/s. - * This means that every 2 seconds, a frame overlaps a seconds boundary. But the sample values generated - * by the RSPs start at zero for each second, even if it should start at 512 for odd timestamps at 200 MHz. - * At 160 MHz sample rate, an integer number of frames fits in a second (156250), so no correction is needed. - */ - uint32_t sampleNr = header.sampleNr; - if (header.sampleFreq == 200 && (header.time & 1)) { - sampleNr += DEFAULT_TBB_TRANSIENT_NSAMPLES / 2; - } - return sampleNr; - } - void TBB_Dipole::processTransientFrameData(const TBB_Frame& frame) { /* @@ -318,39 +338,17 @@ namespace LOFAR return; } - const uint32_t bandNr{frame.header.bandSliceNr & TBB_BAND_NR_MASK}; - - /** - * Figure out the sub-band's offset in the file on disk. Note that - * this is not the offset of the current frame within the data of a - * single sub-band. - */ - const auto bandIndex(std::find(subbandsToBeStored.begin(), - subbandsToBeStored.end(), bandNr)); - if(bandIndex != subbandsToBeStored.end()) + const uint32_t bandNr{getSpectralModeBandNr(frame.header)}; + const auto incomingSubBand{subBandBookKeeping.find(bandNr)}; + struct SubBandBookKeeping currentSubBand(0, 0); + if(incomingSubBand != subBandBookKeeping.end()) { + currentSubBand = incomingSubBand->second; + /** * Do I need to store this sub-band or was it already complete? */ - const auto bandIndexStillToBeStored{ - std::find(remainingSubbandsToBeStored.begin(), - remainingSubbandsToBeStored.end(), bandNr)}; - if(bandIndexStillToBeStored != remainingSubbandsToBeStored.end()) - { - /** - * In case this is the very first frame then - * itsDumpInfo.itsDatasetLen is obviously 0. For any frame after - * the first one its value is incremented at the end of this - * function. - * When a frame for a new sub-band arrives, we can use the value - * as a total size of the data for the previous sub-band on disk - * and keep our fingers crossed that the data of the current - * sub-band has the same size. - */ - subbandOffset = subbandSizeInBytes * - (bandIndex - subbandsToBeStored.begin()); - } - else + if(currentSubBand.isComplete == true) { LOG_WARN_STR("TBB: Received a frame for sub-band #" << static_cast< uint32_t >(bandNr) @@ -369,41 +367,50 @@ namespace LOFAR } /** - * Lazily initialize last part of dump info struct from 1st frame - * received. - * itsDumpInfo.itsTime0 is set to 0 when TBB_Dipole::init gets called. + * Lazily initialize last part of the book keeping map from the 1st + * frame that is received for a sub-band. + * subBandBookKeeping[bandNr].isInitialised is set to false when + * TBB_Dipole::init gets called and sets up the book keeping map. */ - const uint32_t sliceNr{frame.header.bandSliceNr >> TBB_SLICE_NR_SHIFT}; - if(itsDumpInfo.itsTime0 == 0U) + const uint32_t sliceNr{getSpectralModeSliceNr(frame.header)}; + if(currentSubBand.isInitialised == false) { - itsDumpInfo.itsTime0 = frame.header.time; - itsDumpInfo.itsSliceNr0 = sliceNr; - LOG_INFO_STR("TBB: Initialised itsTime0 = " - << itsDumpInfo.itsTime0 - << ", itsSliceNr0 = " << itsDumpInfo.itsSliceNr0); + currentSubBand.time0 = frame.header.time; + currentSubBand.slice0 = sliceNr; + currentSubBand.isInitialised = true; + + LOG_INFO_STR("TBB: Band #" + << currentSubBand.bandNr + << ", initialised time0 = " + << currentSubBand.time0 + << ", slice0 = " << currentSubBand.slice0); } /** * Out-of-order frame arrival has not been seen for Dutch stations. * TBB from int'l stations is not (yet) dumped to CEP. - * We could support out-of-order arrival, except for packets that - * appear to be overtaken by the 1st packet received, but this makes + * + * But: + * + * We support out-of-order arrival, except for packets that appear to + * have been overtaken by the 1st packet received. But this makes * flagging lost packets more complicated than how we deal with it * using appendFlags() below. + * * Also, fake out-of-order occurs when data is dumped (incorrectly) * across the frozen TBB write pointer (LOFAR control bug). */ - if((frame.header.time < itsDumpInfo.itsTime0) - || ((frame.header.time == itsDumpInfo.itsTime0) - && (sliceNr < itsDumpInfo.itsSliceNr0))) + if((frame.header.time < currentSubBand.time0) + || ((frame.header.time == currentSubBand.time0) + && (sliceNr < currentSubBand.slice0))) { logErrorRateLimited(&itsLastLogErrorTime, "TBB: received an " "out-of-order packet! Current start time of first frame for " "this data stream: " - + boost::lexical_cast< std::string >(itsDumpInfo.itsTime0) + + boost::lexical_cast< std::string >(currentSubBand.time0) + ", current sample or slice nr of the first frame for this " "data stream: " - + boost::lexical_cast< std::string >(itsDumpInfo.itsSliceNr0) + + boost::lexical_cast< std::string >(currentSubBand.slice0) + ", received frame header: " + frame.header.to_string()); return; @@ -418,15 +425,15 @@ namespace LOFAR * can be handled. */ int64_t offset{frame.header.time}; - offset -= itsDumpInfo.itsTime0; - LOG_INFO_STR("TBB: offset = frame.header.time - itsDumpInfo.itsTime0, " + offset -= currentSubBand.time0; + LOG_INFO_STR("TBB: offset = frame.header.time - currentSubBand.time0, " << offset << ", " << frame.header.time << ", " - << itsDumpInfo.itsTime0); + << currentSubBand.time0); /** - * Convert the offset in [s] to the total number of samples (containing a - * complex number of two 16 bit integers). sampleFreq is in MHz. + * Convert the offset in [s] to the total number of samples (containing + * a complex number of two 16 bit integers). sampleFreq is in MHz. */ offset *= (frame.header.sampleFreq * 1000000); LOG_INFO_STR("TBB: offset *= (frame.header.sampleFreq * 1000000), " @@ -436,8 +443,8 @@ namespace LOFAR /** * Divide the offset, i.e. the number of samples by the size of an FFT * block. The result is the index of the raw voltage sampling window - * (size 1024 values) that got fed into the FFT. This not a sample within - * the window but just a rough indicator which window we are in! + * (size 1024 values) that got fed into the FFT. This not a sample + * within the window but just a rough indicator which window we are in! * [ 1024 ][ 1024 ][ 1024 ] * ^ * | Start recording @@ -445,29 +452,30 @@ namespace LOFAR * | Window number of the 0th TBB data frame * ^ * | Window number of frame N - * This is also then the index of the TBB frame for this frequency because - * the begin of a voltage window is also the begin of a frequency window. + * This is also then the index of the TBB frame for this frequency + * because the begin of a voltage window is also the begin of a + * frequency window. */ offset /= SPECTRAL_TRANSFORM_SIZE; LOG_INFO_STR("TBB: offset /= SPECTRAL_TRANSFORM_SIZE, " - << offset << ", " << SPECTRAL_TRANSFORM_SIZE); + << offset + << ", " + << SPECTRAL_TRANSFORM_SIZE); /** * Now try to figure out where in the raw voltage data window our * spectral frame exactly begins. - */ - - /** - * Add to the offset + * Add that then to the offset. + * * Attention! - * Both slice numbers are already divided by 1024! + * Both slice numbers were already divided by SPECTRAL_TRANSFORM_SIZE! */ offset += sliceNr; - offset -= itsDumpInfo.itsSliceNr0; - LOG_INFO_STR("TBB: offset += sliceNr - itsDumpInfo.itsSliceNr0, " + offset -= currentSubBand.slice0; + LOG_INFO_STR("TBB: offset += sliceNr - currentSubBand.slice0, " << offset << ", " << sliceNr << ", " - << itsDumpInfo.itsSliceNr0); + << currentSubBand.slice0); /** * Now check for even/odd seconds. Why? Because @@ -501,12 +509,12 @@ namespace LOFAR * - check if the second of the current frame is odd. * - Both yes: means that we have to add one sample to the offset. */ - if((itsDumpInfo.itsTime0 % 2 == 0) && (frame.header.time % 2 == 1)) + if((currentSubBand.time0 % 2 == 0) && (frame.header.time % 2 == 1)) { offset += 1; LOG_INFO_STR("TBB: Added one sample to the offset because t0 is " - "even and t_now is odd: itsDumpInfo.itsTime0 = " - << itsDumpInfo.itsTime0 + "even and t_now is odd: currentSubBand.time0 = " + << currentSubBand.time0 << ", frame.header.time = " << frame.header.time << ", offset = " @@ -517,13 +525,19 @@ namespace LOFAR * Flag lost frame(s) (assumes no out-of-order). * Assumes all frames (except maybe the last) have the same nr of samples (fine). * This cannot detect lost frames at the end of a dataset. + * + * TODO + * 2019-01-30 Thomas: + * Now that all out-of-order situations can be handled (but for the 0th + * frame not being the 0th frame), this code below is plain wrong. + * FIXME */ - const int64_t nskipped{ - offset - static_cast< int64_t >(itsDumpInfo.itsDatasetLen)}; + const int64_t nskipped{offset - static_cast< int64_t >( + currentSubBand.currentSizeInSamples)}; // should be > 0, but do signed cmp to avoid crazy flagging range if(nskipped > 0) { - appendFlags(itsDumpInfo, itsDumpInfo.itsDatasetLen, + appendFlags(itsDumpInfo, currentSubBand.currentSizeInSamples, static_cast< uint64_t >(nskipped)); } @@ -549,82 +563,61 @@ namespace LOFAR + frame.header.to_string() + " crc32: " + boost::lexical_cast<string>(crc32)); } - else if(hasAllZeroDataSamples(frame.payload, + + if(hasAllZeroDataSamples(frame.payload, 2 * frame.header.nOfSamplesPerFrame) == true) { // in (complex) values appendFlags(itsDumpInfo, offset, frame.header.nOfSamplesPerFrame); } - if(!itsLastSubbandDataset - ||(itsLastSubbandDataset - && (itsLastSubbandDataset->bandNumber().value != bandNr))) + if(!currentSubBand.dataSet + ||(currentSubBand.dataSet + && (currentSubBand.dataSet->bandNumber().value != bandNr))) { /** * a new subband arrives, so create a new TBB_SubbandDataset in the * itsDipoleGroup */ - itsLastSubbandDataset.reset(new dal::TBB_SubbandDataset( + currentSubBand.dataSet.reset(new dal::TBB_SubbandDataset( itsDipoleGroup()->subband(bandNr))); -// itsLastSubbandDataset->create1D( -// 0, -1, itsH5Filename, itsLastSubbandDataset->LITTLE); // Store the data in the hdf5 file. Do not provide a file name! - itsLastSubbandDataset->create1D(subbandSizeInSamples, - subbandSizeInSamples, "", itsLastSubbandDataset->LITTLE); - itsLastSubbandDataset->groupType().value = "SubbandDataset"; + currentSubBand.dataSet->create1D(currentSubBand.totalSizeInSamples, + currentSubBand.totalSizeInSamples, "", + currentSubBand.dataSet->LITTLE); + currentSubBand.dataSet->groupType().value = "SubbandDataset"; LOG_INFO_STR("TBB: Created HDF5 SubbandDataset " - << itsLastSubbandDataset->name() + << currentSubBand.dataSet->name() << " for dipole " << itsDipoleGroup()->name() - << " stationID=" << static_cast< uint32_t >(frame.header.stationID) - << " rsp=" << static_cast< uint32_t >(frame.header.rspID) - << " rcu=" << static_cast< uint32_t >(frame.header.rcuID)); + << ", stationID =" << static_cast< uint32_t >(frame.header.stationID) + << ", rsp =" << static_cast< uint32_t >(frame.header.rspID) + << ", rcu =" << static_cast< uint32_t >(frame.header.rcuID)); /** * TODO 20181127: * fill in these parameters from somewhere, probably from * processSpectralFrameData */ - itsLastSubbandDataset->time().value = 0; - itsLastSubbandDataset->centralFrequency().value = 0; - itsLastSubbandDataset->centralFrequencyUnit().value = "not filled in"; - itsLastSubbandDataset->bandwidth().value = 0; - itsLastSubbandDataset->bandwidthUnit().value = "not filled in"; - itsLastSubbandDataset->timeResolution().value = 0; - itsLastSubbandDataset->timeResolutionUnit().value = "not filled in"; - itsLastSubbandDataset->bandNumber().value = bandNr; - itsLastSubbandDataset->sliceNumber().value = + currentSubBand.dataSet->time().value = 0; + currentSubBand.dataSet->centralFrequency().value = 0; + currentSubBand.dataSet->centralFrequencyUnit().value = "not filled in"; + currentSubBand.dataSet->bandwidth().value = 0; + currentSubBand.dataSet->bandwidthUnit().value = "not filled in"; + currentSubBand.dataSet->timeResolution().value = 0; + currentSubBand.dataSet->timeResolutionUnit().value = "not filled in"; + currentSubBand.dataSet->bandNumber().value = bandNr; + currentSubBand.dataSet->sliceNumber().value = frame.header.bandSliceNr >> 10; - itsLastSubbandDataset->samplesPerFrame().value = 0; + currentSubBand.dataSet->samplesPerFrame().value = 0; + currentSubBand.dataSet->dataLength().value = 0; } - /** - * TODO 20181127: - * do we have enough reserved space to write all subbands in one file? - */ - /** - * Since we are writing around HDF5, there is no need to lock. Resize - * the HDF5 dataset in the destructor. - */ - - /** - * Calculate the total offset in the file: - * totalFileOffset = total size of sub-band data * sub-band number - * + offset in the sub-band - */ - const off64_t offsetInFileInBytes{subbandOffset + - static_cast< off64_t >(offset * 2 * sizeof(int16_t))}; - const size_t numberOfBytesToWrite{ - frame.header.nOfSamplesPerFrame * 2 * sizeof(int16_t)}; - - LOG_INFO_STR("TBB: offset calculated in samples for pwrite = " + LOG_INFO_STR("TBB: offset in samples calculated for the HDF5 data set " + "= " << offset - << ", sub-band offset in bytes for band " - << static_cast< uint32_t >(bandNr) - << " = " - << subbandOffset - << ", offset in bytes casted for pwrite = " - << offsetInFileInBytes); + << " for band " + << static_cast< uint32_t >(bandNr)); /** * I know, I know. These two casts are not good. I tried to use a @@ -632,81 +625,56 @@ namespace LOFAR * Next best thing is a union of int16_t[] and int16_t[][2]. And hence * the weird casting. */ - itsLastSubbandDataset->set1D(offset, + currentSubBand.dataSet->set1D(offset, reinterpret_cast< std::complex< int16_t > * >( const_cast< int16_t * >(&(frame.payload.spectralData[0][0]))), frame.header.nOfSamplesPerFrame, 0); // in (complex) values itsDumpInfo.itsDatasetLen = offset + frame.header.nOfSamplesPerFrame; + currentSubBand.currentSizeInSamples += + frame.header.nOfSamplesPerFrame; /** - * Check if the currently handled sub-band has received enough bytes - * (subbandSizeInBytes). If so, then remove the currently handled + * Check if the currently handled sub-band has received enough samples + * (subbandSizeInSamples). If so, then remove the currently handled * sub-band from remainingSubbandsToBeStored */ - const auto bandIndexToBeStored{std::find(remainingSubbandsToBeStored.begin(), - remainingSubbandsToBeStored.end(), bandNr)}; - if(bandIndexToBeStored != remainingSubbandsToBeStored.end()) + if(currentSubBand.currentSizeInSamples >= + currentSubBand.totalSizeInSamples) { - const auto bandIndexInVector{bandIndex - subbandsToBeStored.begin()}; - try + currentSubBand.isComplete = true; + currentSubBand.dataSet->dataLength().value = + currentSubBand.currentSizeInSamples; + currentSubBand.dataSet->flagOffsets().create( + itsDumpInfo.itsFlagOffsets.size()).set( + itsDumpInfo.itsFlagOffsets); + + LOG_INFO_STR("TBB: Data for sub-band #" + << bandNr + << " for dipole " + << itsDipoleGroup()->name() + << " of RCU " + << static_cast< uint32_t >(frame.header.rcuID) + << " is complete has been stored on disk."); + + bool allComplete{true}; + for(auto subBand: subBandBookKeeping) { - currentSubbandSizeInBytes.at(bandIndexInVector) - += numberOfBytesToWrite; - } - catch(std::exception& ex) - { - LOG_WARN_STR("TBB: caught an exception! " - << __PRETTY_FUNCTION__ - << " " - << ex.what() - << ", bandIndexInVector = " - << static_cast< std::size_t >(bandIndexInVector)); + allComplete &= subBand.second.isComplete; } - try - { - if(currentSubbandSizeInBytes.at(bandIndexInVector) - >= subbandSizeInBytes) - { - remainingSubbandsToBeStored.erase(bandIndexToBeStored); - itsLastSubbandDataset->dataLength().value = - currentSubbandSizeInBytes.at(bandIndexInVector); - itsLastSubbandDataset->flagOffsets().create( - itsDumpInfo.itsFlagOffsets.size()).set( - itsDumpInfo.itsFlagOffsets); - - LOG_INFO_STR("TBB: Data for sub-band #" - << bandNr - << " for dipole " - << itsDipoleGroup()->name() - << " of RCU " - << static_cast< uint32_t >(frame.header.rcuID) - << " is complete has been stored on disk."); - - if(remainingSubbandsToBeStored.empty() == true) - { - LOG_INFO_STR("TBB: Storing of data should be complete for " - "dipole " - << itsDipoleGroup()->name() - << ", station ID = " - << static_cast< uint32_t >(frame.header.stationID) - << ", RSP = " - << static_cast< uint32_t >(frame.header.rspID) - << ", RCU = " - << static_cast< uint32_t >(frame.header.rcuID)); - } - } - } - catch(std::exception& ex) + if(allComplete == true) { - LOG_WARN_STR("TBB: caught another exception! " - << __PRETTY_FUNCTION__ - << " " - << ex.what() - << ", bandIndexInVector = " - << static_cast< std::size_t >(bandIndexInVector)); + LOG_INFO_STR("TBB: Storing of data should be complete " + "for dipole " + << itsDipoleGroup()->name() + << ", station ID = " + << static_cast< uint32_t >(frame.header.stationID) + << ", RSP = " + << static_cast< uint32_t >(frame.header.rspID) + << ", RCU = " + << static_cast< uint32_t >(frame.header.rcuID)); } } else @@ -901,24 +869,6 @@ namespace LOFAR #endif return itsCrc32gen.checksum() == crc32val; } - - bool TBB_Dipole::hasAllZeroDataSamples(const TBB_Payload& payload, unsigned nTrSamples) const - { - /* - * Good (noisy) data may have a few consecutive zero values, so this loop terminates - * quickly, unless the antenna is broken or disabled, which happens sometimes. - * Another reason for all zeros is that a wave generator is used and set to zero amp (pointless). - * Unfortunately, the crc32 variant used does not reject all zeros (checksum 0). - */ - for (unsigned i = 0; i < nTrSamples; i++) { - if (payload.data[i] != 0) { - return false; - } - } - - return true; - } - } // namespace Cobalt } // namespace LOFAR diff --git a/RTCP/Cobalt/OutputProc/src/TBB_Dipole.h b/RTCP/Cobalt/OutputProc/src/TBB_Dipole.h index adf20112285..610e60f0784 100644 --- a/RTCP/Cobalt/OutputProc/src/TBB_Dipole.h +++ b/RTCP/Cobalt/OutputProc/src/TBB_Dipole.h @@ -22,7 +22,10 @@ #define LOFAR_COBALT_OUTPUTPROC_TBBDIPOLE_H 1 #include "TBB_Frame.h" +#include <vector> +#include <map> #include <boost/scoped_ptr.hpp> +#include <memory> #include <boost/crc.hpp> #include <Common/Thread/Mutex.h> @@ -48,7 +51,6 @@ namespace LOFAR inline dal::TBB_DipoleDataset* itsDipoleDataset() { return dynamic_cast<dal::TBB_DipoleDataset*>(itsDALDipole.get()); } inline dal::TBB_DipoleGroup* itsDipoleGroup() { return dynamic_cast<dal::TBB_DipoleGroup*>(itsDALDipole.get()); } - boost::scoped_ptr<dal::TBB_SubbandDataset> itsLastSubbandDataset; std::vector<double> itsAllSubbandCentralFreqs; // size: transient mode: 0, spectral mode: RSP_NR_SUBBANDS std::string itsH5Filename; @@ -104,8 +106,6 @@ namespace LOFAR private: bool doTransient() const; - uint32_t getTransientModeSampleNr(const TBB_Header& header) const; - static std::string getRawFilename(const std::string& h5Filename, unsigned rspID, unsigned rcuID); void appendFlags(DumpInfo& di, size_t offset, size_t len); // initTBB_DipoleGroupOrDataset() must be called with the global h5Mutex held. @@ -114,15 +114,51 @@ namespace LOFAR const std::vector<double>& allSubbandCentralFreqs, const std::string& rawFilename, dal::TBB_Station& station); - bool crc32tbb(const TBB_Payload* payload, size_t nTrSamples); - bool hasAllZeroDataSamples(const TBB_Payload& payload, unsigned nTrSamples) const; + bool crc32tbb(const TBB_Payload* payload, size_t nTrSamples); + + /** + * This structure is for book keeping of received sub-band data. + * When a new sub-band frame is received it is checked if the map + * @ref subBandBookKeeping contains an entry that has sub-band number + * as key. If not, the sub-band data is unexpected and the frame will + * be discarded. + * + * If the map contains an entry for the sub-band and the entry's + * @ref subBandBookKeeping.isInitialised value is false, then the entry + * will be set-up with proper data and + * @ref subBandBookKeeping.isInitialised will be set to true. + * + * From then on it is pretty simple. Every time a new frame comes in + * the data of the relevant map entry will be updated. At some point + * totalSizeInSamples == currentSizeInSamples will be true and the + * sub-band data set is complete. isComplete will be set true and any + * future frame that contains more data for this dub-band will be + * discarded. + */ + struct SubBandBookKeeping + { + SubBandBookKeeping(std::size_t _bandNr, std::size_t subBandSize): + totalSizeInSamples{subBandSize}, + bandNr{_bandNr}, + time0{0U}, + slice0{0U}, + currentSizeInSamples{0U}, + isComplete{false}, + isInitialised{false} + { + } + + std::size_t totalSizeInSamples; + std::size_t bandNr; + int64_t time0; + uint32_t slice0; + std::size_t currentSizeInSamples; + bool isComplete; + bool isInitialised; + std::shared_ptr< dal::TBB_SubbandDataset > dataSet; + }; - std::vector< unsigned int > subbandsToBeStored; - std::vector< unsigned int > remainingSubbandsToBeStored; - off_t subbandOffset; - int64_t subbandSizeInBytes; - int64_t subbandSizeInSamples; - std::vector< int64_t > currentSubbandSizeInBytes; + std::map< uint32_t, struct SubBandBookKeeping > subBandBookKeeping; }; } // namespace Cobalt -- GitLab