diff --git a/RTCP/Cobalt/CoInterface/src/LTAFeedback.cc b/RTCP/Cobalt/CoInterface/src/LTAFeedback.cc index 93c3d00888206de50d816a7305d6c443c85192c8..d4d28ed3fe508f008d2b7869b69d3098d462f582 100644 --- a/RTCP/Cobalt/CoInterface/src/LTAFeedback.cc +++ b/RTCP/Cobalt/CoInterface/src/LTAFeedback.cc @@ -44,8 +44,9 @@ namespace LOFAR std::string LTAFeedback::feedback_version(boost::str(boost::format("%02d.%02d.%02d") % major_version % minor_version % patch_number)); - LTAFeedback::LTAFeedback(const ObservationSettings &settings): - settings(settings) + LTAFeedback::LTAFeedback(const ObservationSettings &settings, ssize_t last_block): + settings(settings), + last_block(last_block) { } @@ -246,7 +247,9 @@ namespace LOFAR // actual start/end times that we used ps.add("Observation.startTime", toUTC(settings.startTime)); - ps.add("Observation.endTime", toUTC(settings.getRealStopTime())); + + // make sure we don't report a stopTime < startTime if we stopped during block -1. + ps.add("Observation.stopTime", toUTC(std::max(settings.startTime, settings.getBlockEndTime(last_block)))); ps.add("Observation.DataProducts.nrOfOutput_Correlated_", str(format("%u") % (settings.correlator.enabled ? settings.correlator.files.size() : 0))); diff --git a/RTCP/Cobalt/CoInterface/src/LTAFeedback.h b/RTCP/Cobalt/CoInterface/src/LTAFeedback.h index 8005e83cc86d1b6385738ec0910c77fc2cd95787..0d68f085c4e3f4c6bff7eaa9cec30e14b140c94b 100644 --- a/RTCP/Cobalt/CoInterface/src/LTAFeedback.h +++ b/RTCP/Cobalt/CoInterface/src/LTAFeedback.h @@ -33,7 +33,8 @@ namespace LOFAR class LTAFeedback { public: - LTAFeedback(const ObservationSettings &settings); + // Generate feedback for these parset settings, given that "last_block" was the last block processed. + LTAFeedback(const ObservationSettings &settings, ssize_t last_block); // Subset name of each correlated and beamFormed file, respectively. static std::string correlatedPrefix(size_t fileno); @@ -66,6 +67,7 @@ namespace LOFAR private: const ObservationSettings settings; + const ssize_t last_block; }; } // namespace Cobalt } // namespace LOFAR diff --git a/RTCP/Cobalt/CoInterface/src/Parset.cc b/RTCP/Cobalt/CoInterface/src/Parset.cc index 6084f1d772423cef6e74c6838b52e3e2b5a8f652..e629028159fd90bbc23ee54c57eb97e9557f761e 100644 --- a/RTCP/Cobalt/CoInterface/src/Parset.cc +++ b/RTCP/Cobalt/CoInterface/src/Parset.cc @@ -1790,7 +1790,17 @@ namespace LOFAR // because we process in blocks. double ObservationSettings::getRealStopTime() const { - return startTime + nrBlocks() * blockDuration(); + return getBlockEndTime(nrBlocks() - 1); + } + + double ObservationSettings::getBlockStartTime(ssize_t block) const + { + return startTime + block * blockDuration(); + } + + double ObservationSettings::getBlockEndTime(ssize_t block) const + { + return getBlockStartTime(block + 1); } std::string Parset::getHostName(OutputType outputType, unsigned streamNr) const diff --git a/RTCP/Cobalt/CoInterface/src/Parset.h b/RTCP/Cobalt/CoInterface/src/Parset.h index 1af3b7af100a732d7e021ae522620a1a7bdc43f1..16589206364e4ca607892ca25b7ca6fb9d91f51d 100644 --- a/RTCP/Cobalt/CoInterface/src/Parset.h +++ b/RTCP/Cobalt/CoInterface/src/Parset.h @@ -94,6 +94,10 @@ namespace LOFAR // actual stop time, which is different, as we round to block boundaries double getRealStopTime() const; + // start and stop times for a given block + double getBlockStartTime() const; + double getBlockEndTime() const; + // The station clock, in MHz (200 or 160) // // key: Observation.sampleClock diff --git a/RTCP/Cobalt/GPUProc/src/MPIReceiver.cc b/RTCP/Cobalt/GPUProc/src/MPIReceiver.cc index 070ae30541c50a73185e24fb4dc272e39b9042b6..4b60789c31efa9a727baa5147065315488a6b590 100644 --- a/RTCP/Cobalt/GPUProc/src/MPIReceiver.cc +++ b/RTCP/Cobalt/GPUProc/src/MPIReceiver.cc @@ -59,7 +59,7 @@ namespace LOFAR nrBitsPerSample(i_nrBitsPerSample) {} - template<typename SampleT> void MPIReceiver::receiveInput() + template<typename SampleT> ssize_t MPIReceiver::receiveInput() { NSTimer receiveTimer("MPI: Receive station data", true, true); @@ -83,12 +83,13 @@ namespace LOFAR } bool allDone = false; + ssize_t block; // Receive input from StationInput::sendInputToPipeline. // // Start processing from block -1, and don't process anything if the // observation is empty. - for (ssize_t block = -1; !allDone; block++) + for (block = -1; !allDone; block++) { // Receive the samples from all subbands from the ant fields for this block. LOG_INFO_STR("[block " << block << "] Collecting input buffers"); @@ -130,21 +131,21 @@ namespace LOFAR // Signal end of input mpiPool.filled.append(NULL, false); + + // This is the last block we received (or -2 if we haven't received any) + return block - 1; } - void MPIReceiver::receiveInput() + ssize_t MPIReceiver::receiveInput() { switch (nrBitsPerSample) { default: case 16: - receiveInput< SampleType<i16complex> >(); - break; + return receiveInput< SampleType<i16complex> >(); case 8: - receiveInput< SampleType<i8complex> >(); - break; + return receiveInput< SampleType<i8complex> >(); case 4: - receiveInput< SampleType<i4complex> >(); - break; + return receiveInput< SampleType<i4complex> >(); } } } diff --git a/RTCP/Cobalt/GPUProc/src/MPIReceiver.h b/RTCP/Cobalt/GPUProc/src/MPIReceiver.h index e13c1783e8f3d0ceae576fe5064ca97881d9e51f..9212a1db7cd2f1d4f247780982f2c19fcf0813c6 100644 --- a/RTCP/Cobalt/GPUProc/src/MPIReceiver.h +++ b/RTCP/Cobalt/GPUProc/src/MPIReceiver.h @@ -79,11 +79,14 @@ namespace LOFAR // raw MPIRecvData // This function is typically started in a seperate thread // Internally the type of samples depends on nrBitsPerSample - void receiveInput(); + // + // Returns the index of the last block received, or -2 if no + // block ever was received. + ssize_t receiveInput(); private: // The templeted receive function. - template<typename SampleT> void receiveInput(); + template<typename SampleT> ssize_t receiveInput(); Pool<struct MPIRecvData> &mpiPool; const std::vector<size_t> subbandIndices; diff --git a/RTCP/Cobalt/GPUProc/src/rtcp.cc b/RTCP/Cobalt/GPUProc/src/rtcp.cc index 7adc34409a1f89026a695d86737e9cfcabc5e46c..fa8c5ca5b743fa6c6c480bc489c140a530596667 100644 --- a/RTCP/Cobalt/GPUProc/src/rtcp.cc +++ b/RTCP/Cobalt/GPUProc/src/rtcp.cc @@ -495,6 +495,9 @@ int main(int argc, char **argv) OMPThread::ScopedName sn("observation"); + // last received block index + ssize_t last_block = -2; + if (ps.settings.realTime) { // Wait just before the obs starts to allocate resources, // both the UDP sockets and the GPU buffers! @@ -564,7 +567,7 @@ int main(int argc, char **argv) { OMPThread::ScopedName sn("mpi recv"); - MPI_receiver.receiveInput(); + last_block = MPI_receiver.receiveInput(); } // Retrieve items from pool and process further on @@ -619,7 +622,7 @@ int main(int argc, char **argv) // send processing feedback ToBus bus("otdb.task.feedback.processing", broker_feedback()); - LTAFeedback fb(ps.settings); + LTAFeedback fb(ps.settings, last_block); Protocols::TaskFeedbackProcessing msg( "Cobalt/GPUProc/rtcp",