diff --git a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc index df9f608f47d2cc517f7f565e84742330bb12ce07..6556089cfe445bd0344231e4cc402b40c7c9604b 100644 --- a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc +++ b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc @@ -66,8 +66,8 @@ bool MPIData<SampleT>::write(const struct RSP &packet, const ssize_t *beamletInd ASSERTSTR(packet.header.nrBlocks == 16, "Packet has " << (int)packet.header.nrBlocks << " samples/beamlet, expected 16."); const size_t nrSamples = 16; - const TimeStamp packetBegin = packet.timeStamp(); - const TimeStamp packetEnd = packetBegin + nrSamples; + const uint64_t packetBegin = packet.timeStamp(); + const uint64_t packetEnd = packetBegin + nrSamples; bool consider_next = false; @@ -82,8 +82,8 @@ bool MPIData<SampleT>::write(const struct RSP &packet, const ssize_t *beamletInd /* Reading with offset X is the same as writing that data with offset -X */ const ssize_t offset = read_offsets[absBeamlet]; - const TimeStamp beamletBegin = packetBegin - offset; - const TimeStamp beamletEnd = packetEnd - offset; + const uint64_t beamletBegin = packetBegin - offset; + const uint64_t beamletEnd = packetEnd - offset; /* XXXX = packet data * [.....] = this->data @@ -191,7 +191,7 @@ void StationMetaData<SampleT>::computeMetaData() delays.getNextDelays(*delaysAtBegin); for (ssize_t block = -1; block < (ssize_t)nrBlocks; ++block) { - LOG_INFO_STR(logPrefix << str(format("[block %d] Retrieving delays") % block)); + LOG_DEBUG_STR(logPrefix << str(format("[block %d] Retrieving delays") % block)); // Fetch end delays (start delays are set by the previous block, or // before the loop). @@ -200,7 +200,7 @@ void StationMetaData<SampleT>::computeMetaData() // INPUT SmartPtr< MPIData<SampleT> > mpiData = metaDataPool.free.remove(); - LOG_INFO_STR(logPrefix << str(format("[block %d] Applying delays") % block)); + LOG_DEBUG_STR(logPrefix << str(format("[block %d] Applying delays") % block)); // Compute the next set of metaData and read_offsets from the new // delays pair. @@ -305,19 +305,25 @@ void StationInput::readRSPRealTime( size_t board, Stream &inputStream ) PacketReader reader(str(format("%s[board %s] ") % logPrefix % board), inputStream, mode); + Queue< SmartPtr<RSPData> > &inputQueue = rspDataPool[board].free; + Queue< SmartPtr<RSPData> > &outputQueue = rspDataPool[board].filled; + try { for(size_t i = 0; true; i++) { // Fill rspDataPool elements with RSP packets - SmartPtr<RSPData> data = rspDataPool.free.remove(); + SmartPtr<RSPData> rspData = inputQueue.remove(); + + // Abort condition needed to avoid getting stuck in free.remove() + if (!rspData) + break; - reader.readPackets(data->packets, data->valid); - data->board = board; + reader.readPackets(rspData->packets); // Periodically log progress if (i % 256 == 0) // Each block is ~40ms, so log every ~10s worth of data. reader.logStatistics(); - rspDataPool.filled.append(data); + outputQueue.append(rspData); } } catch (Stream::EndOfStreamException &ex) { // Ran out of data @@ -346,45 +352,54 @@ void StationInput::writeRSPRealTime( MPIData<SampleT> ¤t, MPIData<SampleT> * 3. We can receive old data from before `current'. */ - const TimeStamp maxDelay(mode.secondsToSamples(1.0), mode.clockHz()); - - const TimeStamp deadline = current.to + maxDelay; + const TimeStamp deadline = TimeStamp(current.to + mode.secondsToSamples(0.25), mode.clockHz()); - LOG_INFO_STR(logPrefix << "[block " << current.block << "] Waiting until " << deadline << " for " << current.from << " to " << current.to); + LOG_INFO_STR(logPrefix << "[block " << current.block << "] Waiting until " << deadline); - const TimeStamp now = TimeStamp::now(deadline.getClock()); + const TimeStamp now = TimeStamp::now(mode.clockHz()); if (deadline < now) { // We're too late! Don't process data, or we'll get even further behind! LOG_ERROR_STR(logPrefix << "[block " << current.block << "] Not running at real time! Deadline was " << TimeStamp(now - deadline, deadline.getClock()).getSeconds() << " seconds ago"); + } else { - NSTimer copyRSPTimer(str(format("%s copy RSP data") % logPrefix), true, true); + // One core can't handle the load, so use multiple +# pragma omp parallel for num_threads(nrBoards) + for (size_t board = 0; board < nrBoards; board++) { + NSTimer copyRSPTimer(str(format("%s [board %i] copy RSP -> block") % logPrefix % board), true, true); - SmartPtr<RSPData> rspData; + Queue< SmartPtr<RSPData> > &inputQueue = rspDataPool[board].filled; + Queue< SmartPtr<RSPData> > &outputQueue = rspDataPool[board].free; - while((rspData = rspDataPool.filled.remove(deadline, NULL)) != NULL) { - const ssize_t *beamletIndices = &this->beamletIndices[rspData->board][0]; + const ssize_t *beamletIndices = &this->beamletIndices[board][0]; - // Write valid packets to the current and/or next packet - copyRSPTimer.start(); - for (size_t p = 0; p < rspData->valid.size(); ++p) { - if (!rspData->valid[p]) - continue; + SmartPtr<RSPData> rspData; + + while ((rspData = inputQueue.remove(deadline, NULL)) != NULL) { + // Write valid packets to the current and/or next packet + copyRSPTimer.start(); - struct RSP &packet = rspData->packets[p]; + for (size_t p = 0; p < RT_PACKET_BATCH_SIZE; ++p) { + struct RSP &packet = rspData->packets[p]; - if (current.write(packet, beamletIndices) - && next) { - // We have data (potentially) spilling into `next'. + if (packet.payloadError()) + continue; - next->write(packet, beamletIndices); + if (current.write(packet, beamletIndices) + && next) { + // We have data (potentially) spilling into `next'. + + if (next->write(packet, beamletIndices)) { + LOG_WARN_STR(logPrefix << "Received data for several blocks into the future -- discarding."); + } + } } - } - copyRSPTimer.stop(); + copyRSPTimer.stop(); - rspDataPool.free.append(rspData); - ASSERT(!rspData); + outputQueue.append(rspData); + ASSERT(!rspData); + } } } } @@ -401,6 +416,8 @@ void StationInput::readRSPNonRealTime() /* Since the boards will be read at different speeds, we need to * manually keep them in sync. We read a packet from each board, * and let the board providing the youngest packet read again. + * + * We will multiplex all packets using rspDataPool[0]. */ // Cache for last packets read from each board @@ -442,19 +459,16 @@ void StationInput::readRSPNonRealTime() break; // Emit youngest packet - SmartPtr<RSPData> data = rspDataPool.free.remove(); + SmartPtr<RSPData> data = rspDataPool[0].free.remove(); // Abort of writer does not desire any more data if (!data) return; - ASSERT(data->valid.size() == 1); - - data->valid[0] = true; data->packets[0] = last_packets[youngest]; - data->board = youngest; + data->board = youngest; - rspDataPool.filled.append(data); + rspDataPool[0].filled.append(data); // Next packet should only be read from the stream we // emitted from @@ -463,21 +477,17 @@ void StationInput::readRSPNonRealTime() } // Signal EOD by inserting a packet beyond obs end - SmartPtr<RSPData> data = rspDataPool.free.remove(); + SmartPtr<RSPData> data = rspDataPool[0].free.remove(); // Abort if writer does not desire any more data if (!data) return; - ASSERT(data->valid.size() == 1); - - const BoardMode mode(ps.settings.nrBitsPerSample, ps.settings.clockMHz); - - data->valid[0] = true; + data->packets[0].payloadError(false); data->packets[0].timeStamp(TimeStamp::universe_heat_death(mode.clockHz())); - data->board = 0; + data->board = 0; - rspDataPool.filled.append(data); + rspDataPool[0].filled.append(data); } @@ -494,23 +504,22 @@ void StationInput::writeRSPNonRealTime( MPIData<SampleT> ¤t, MPIData<Sampl for(;;) { - SmartPtr<RSPData> data = rspDataPool.filled.remove(); + SmartPtr<RSPData> data = rspDataPool[0].filled.remove(); const ssize_t *beamletIndices = &this->beamletIndices[data->board][0]; // Only packet 0 is used in non-rt mode - ASSERT(data->valid.size() == 1); - ASSERT(data->valid[0]); + ASSERT(!data->packets[0].payloadError()); if (current.write(data->packets[0], beamletIndices)) { // We have data (potentially) spilling into `next'. if (!next || next->write(data->packets[0], beamletIndices)) { // Data is even later than next? Put this data back for a future block. - rspDataPool.filled.prepend(data); + rspDataPool[0].filled.prepend(data); break; } } - rspDataPool.free.append(data); + rspDataPool[0].free.append(data); ASSERT(!data); } } @@ -521,15 +530,16 @@ void StationInput::processInput( Queue< SmartPtr< MPIData<SampleT> > > &inputQue { vector<OMPThread> packetReaderThreads(nrBoards); - /* - * Each packet is expected to have 16 samples per subband, i.e. ~80 us worth of data @ 200 MHz. - * - * 512 packets will thus represent ~40ms worth of data. - * - * In non-rt mode, we just process one packet at a time to keep the code simple. - */ - for (size_t i = 0; i < 64; ++i) - rspDataPool.free.append(new RSPData(ps.realTime() ? 512 : 1)); + if (ps.realTime()) { + // Each board has its own pool to reduce lock contention + for (size_t board = 0; board < nrBoards; ++board) + for (size_t i = 0; i < 16; ++i) + rspDataPool[board].free.append(new RSPData(RT_PACKET_BATCH_SIZE)); + } else { + // We just process one packet at a time, merging all the streams into rspDataPool[0]. + for (size_t i = 0; i < 16; ++i) + rspDataPool[0].free.append(new RSPData(1)); + } #pragma omp parallel sections num_threads(2) { @@ -550,7 +560,6 @@ void StationInput::processInput( Queue< SmartPtr< MPIData<SampleT> > > &inputQue LOG_INFO_STR(logPrefix << "Processing packets"); if (ps.realTime()) { - vector< SmartPtr<Stream> > inputStreams(allocation.inputStreams()); #pragma omp parallel for num_threads(nrBoards) @@ -571,7 +580,7 @@ void StationInput::processInput( Queue< SmartPtr< MPIData<SampleT> > > &inputQue */ #pragma omp section { - Thread::ScopedPriority sp(SCHED_FIFO, 10); + //Thread::ScopedPriority sp(SCHED_FIFO, 10); /* * We maintain a `current' and a `next' block, @@ -601,17 +610,19 @@ void StationInput::processInput( Queue< SmartPtr< MPIData<SampleT> > > &inputQue } } + // Signal EOD to output outputQueue.append(NULL); + // Signal EOD to input + for (size_t i = 0; i < nrBoards; ++i) + rspDataPool[i].free.append(NULL); + if (ps.realTime()) { // kill reader threads LOG_INFO_STR( logPrefix << "Stopping all boards" ); # pragma omp parallel for num_threads(nrBoards) for (size_t i = 0; i < nrBoards; ++i) packetReaderThreads[i].kill(); - } else { - // signal reader threads - rspDataPool.free.append(NULL); } } } diff --git a/RTCP/Cobalt/GPUProc/src/Station/StationInput.h b/RTCP/Cobalt/GPUProc/src/Station/StationInput.h index 70fa7bb8a69c909ea0bdc1096e4dc4fe3028df6f..18294f46a5a352147a783f8f15c74e8254246749 100644 --- a/RTCP/Cobalt/GPUProc/src/Station/StationInput.h +++ b/RTCP/Cobalt/GPUProc/src/Station/StationInput.h @@ -57,8 +57,8 @@ namespace LOFAR { * is (re)used. */ ssize_t block; - TimeStamp from; - TimeStamp to; + uint64_t from; + uint64_t to; size_t nrSamples; /* @@ -152,22 +152,21 @@ namespace LOFAR { void processInput( Queue< SmartPtr< MPIData<SampleT> > > &inputQueue, Queue< SmartPtr< MPIData<SampleT> > > &outputQueue ); private: + // Each packet is expected to have 16 samples per subband, i.e. ~80 us worth of data @ 200 MHz. + // So 512 packets is ~40 ms of data. + static const size_t RT_PACKET_BATCH_SIZE = 512; + // Data received from an RSP board struct RSPData { - int board; - std::vector<struct RSP> packets; - std::vector<bool> valid; + size_t board; // annotation used in non-rt mode RSPData(size_t numPackets): - packets(numPackets), - valid(numPackets, false) + packets(numPackets) { } }; - Pool< RSPData > rspDataPool; - const Parset &ps; const size_t stationIdx; @@ -178,6 +177,7 @@ namespace LOFAR { const BoardMode mode; const size_t nrBoards; + Pool< RSPData > rspDataPool[4]; // [nrboards] const std::vector<size_t> targetSubbands; diff --git a/RTCP/Cobalt/GPUProc/test/CMakeLists.txt b/RTCP/Cobalt/GPUProc/test/CMakeLists.txt index abf71b21a2dee15a65bd20393d58d3a4ab9f530b..95526fb6a907a10ea0a6fc06ef1b15dbbf4370a7 100644 --- a/RTCP/Cobalt/GPUProc/test/CMakeLists.txt +++ b/RTCP/Cobalt/GPUProc/test/CMakeLists.txt @@ -12,6 +12,7 @@ lofar_add_test(tfpequals tfpequals.cc) lofar_add_test(tstartBGL DEPENDS rtcp) lofar_add_test(tMACfeedback DEPENDS rtcp) lofar_add_test(tProductionParsets DEPENDS rtcp) +lofar_add_test(tPerformanceTest tPerformanceTest) # cmpfloat is started by scripts for a fuzzy compare of 2 output files with raw floats lofar_add_executable(cmpfloat cmpfloat.cc) diff --git a/RTCP/Cobalt/GPUProc/test/tPerformanceTest.cc b/RTCP/Cobalt/GPUProc/test/tPerformanceTest.cc new file mode 100644 index 0000000000000000000000000000000000000000..0dbb57b6c3d492677c9206b534d2e20143bb53b4 --- /dev/null +++ b/RTCP/Cobalt/GPUProc/test/tPerformanceTest.cc @@ -0,0 +1,93 @@ +#include <lofar_config.h> + +#include <mpi.h> +#include <Common/Timer.h> +#include <GPUProc/gpu_wrapper.h> +#include <GPUProc/Kernels/CorrelatorKernel.h> +#include <InputProc/Transpose/MPIUtil.h> +#include <CoInterface/BlockID.h> +#include <CoInterface/SmartPtr.h> +#include <CoInterface/MultiDimArray.h> + +using namespace LOFAR; +using namespace LOFAR::Cobalt; +using namespace std; + +const size_t BUFSIZE = 128 * 1024 * 1024; + +int main(int argc, char **argv) { + INIT_LOGGER("tPerformanceTest"); + + LOG_INFO_STR("Initialising GPU"); + + gpu::Platform pf; + vector<gpu::Device> devices = pf.devices(); + gpu::Context context(devices[1]); + gpu::Stream stream(context); + + LOG_INFO_STR("Creating kernel"); + + Parset ps; + ps.add("Observation.DataProducts.Output_Correlated.enabled", "true"); + ps.add("Observation.DataProducts.Output_Correlated.filenames", "[SB0.MS]"); + ps.add("Observation.DataProducts.Output_Correlated.locations", "[:.]"); + ps.add("Observation.VirtualInstrument.stationList", "[CS001..CS064]"); + ps.add("Observation.Beam[0].subbandList", "[0]"); + ps.add("Observation.rspBoardList" , "[0]"); + ps.add("Observation.rspSlotList" , "[0]"); + ps.add("Cobalt.Correlator.nrChannelsPerSubband", "256"); + ps.updateSettings(); + + CorrelatorKernel::Parameters params(ps); + KernelFactory<CorrelatorKernel> factory(params); + + gpu::DeviceMemory devInput(context, factory.bufferSize(CorrelatorKernel::INPUT_DATA)); + gpu::DeviceMemory devOutput(context, factory.bufferSize(CorrelatorKernel::OUTPUT_DATA)); + + Kernel::Buffers buffers(devInput, devOutput); + + SmartPtr<CorrelatorKernel> kernel(factory.create(stream, buffers)); + + // Initialise and query MPI + int provided_mpi_thread_support; + + LOG_INFO("Initialising MPI"); + if (MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided_mpi_thread_support) != MPI_SUCCESS) { + cerr << "MPI_Init_thread failed" << endl; + exit(1); + } + + LOG_INFO_STR("Running kernel"); + + { + MultiDimArray<char,1> a(boost::extents[BUFSIZE], 1, mpiAllocator); + MultiDimArray<char,1> b(boost::extents[BUFSIZE], 1, mpiAllocator); + + NSTimer copyTimer("memcpy", true, true); + NSTimer zeroTimer("memset", true, true); + NSTimer syncTimer("gpu sync", true, true); + + for (size_t i = 0; i < 100; i++) { + const BlockID blockID; + + kernel->enqueue(blockID); + kernel->enqueue(blockID); + + copyTimer.start(); + memcpy(&b[0], &a[0], a.size()); + copyTimer.stop(); + + zeroTimer.start(); + memset(&a[0], 0, b.size()); + zeroTimer.stop(); + + syncTimer.start(); + stream.synchronize(); + syncTimer.stop(); + } + } + + LOG_INFO_STR("Done"); + + MPI_Finalize(); +} diff --git a/RTCP/Cobalt/GPUProc/test/tStationInput.cc b/RTCP/Cobalt/GPUProc/test/tStationInput.cc index 0a3c840bda85a26817c758608e858a4e435ef052..427dd676977244ceb451d9f4b011d29cb86c73cf 100644 --- a/RTCP/Cobalt/GPUProc/test/tStationInput.cc +++ b/RTCP/Cobalt/GPUProc/test/tStationInput.cc @@ -78,7 +78,7 @@ void MPIDataTester::checkAndClearPacketWritten( struct RSP &packet, const vector size_t nrWrittenSamples = 0; for(size_t timeslot = 0; timeslot < packet.header.nrBlocks; timeslot++) { - const TimeStamp ts = packet.timeStamp() + timeslot - data.read_offsets[beamletIdx]; + const uint64_t ts = packet.timeStamp() + timeslot - data.read_offsets[beamletIdx]; if (ts < data.from || ts >= data.to) continue; @@ -143,10 +143,10 @@ SUITE(MPIData) { bool spill = data.write(packet, &beamletIndices[0]); // Validate whether we spill into the next block - if (packet.timeStamp() + packet.header.nrBlocks - 1 > data.to - 1) { + if ((uint64_t)packet.timeStamp() + packet.header.nrBlocks - 1 > data.to - 1) { // last sample is beyond data.to CHECK_EQUAL(true, spill); - } else if (packet.timeStamp() + packet.header.nrBlocks - 1 == data.to - 1) { + } else if ((uint64_t)packet.timeStamp() + packet.header.nrBlocks - 1 == data.to - 1) { // last sample is last sample in block CHECK_EQUAL(true, spill); } else { diff --git a/RTCP/Cobalt/InputProc/src/OMPThread.h b/RTCP/Cobalt/InputProc/src/OMPThread.h index 21c8d6a1c0d6bafbaf0c2986cf708f2d4994371b..26f1ac999acb6039935a81ba0dd30f39607910ce 100644 --- a/RTCP/Cobalt/InputProc/src/OMPThread.h +++ b/RTCP/Cobalt/InputProc/src/OMPThread.h @@ -117,6 +117,8 @@ namespace LOFAR static void init() { + signal(SIGHUP, sighandler); +#if 0 // We avoid cancellation exception for OpenMP threads. // Allow signalling them ourselves to interrupt some blocking syscalls. struct sigaction sa; @@ -127,6 +129,7 @@ namespace LOFAR if (err != 0) { LOG_WARN("Failed to register a handler for SIGHUP: OpenMP threads may not terminate!"); } +#endif } private: diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketReader.cc b/RTCP/Cobalt/InputProc/src/Station/PacketReader.cc index 09d2b648ce79f8e36dec065cf4026b1620804c61..717e9c053d70ad752b16285a56e75a51ad967f0e 100644 --- a/RTCP/Cobalt/InputProc/src/Station/PacketReader.cc +++ b/RTCP/Cobalt/InputProc/src/Station/PacketReader.cc @@ -65,10 +65,8 @@ namespace LOFAR } - void PacketReader::readPackets( std::vector<struct RSP> &packets, std::vector<bool> &valid ) + void PacketReader::readPackets( std::vector<struct RSP> &packets ) { - ASSERT(valid.size() == packets.size()); - if (inputIsUDP) { SocketStream &sstream = dynamic_cast<SocketStream&>(inputStream); @@ -78,22 +76,22 @@ namespace LOFAR // validate received packets for (size_t i = 0; i < numRead; ++i) { - valid[i] = validatePacket(packets[i]); + packets[i].payloadError(!validatePacket(packets[i])); } // mark not-received packets as invalid for (size_t i = numRead; i < packets.size(); ++i) { - valid[i] = false; + packets[i].payloadError(true); } } else { // fall-back for non-UDP streams, emit packets // one at a time to avoid data loss on EndOfStream. - valid[0] = readPacket(packets[0]); + packets[0].payloadError(!readPacket(packets[0])); nrReceived++; for (size_t i = 1; i < packets.size(); ++i) { - valid[i] = false; + packets[i].payloadError(true); } } } diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketReader.h b/RTCP/Cobalt/InputProc/src/Station/PacketReader.h index d8b7fa37cc1fd088de66cc83bc06d026224549c4..eed7032ec5cef8fadd81f30ad4e535043ceb53b1 100644 --- a/RTCP/Cobalt/InputProc/src/Station/PacketReader.h +++ b/RTCP/Cobalt/InputProc/src/Station/PacketReader.h @@ -46,9 +46,9 @@ namespace LOFAR PacketReader( const std::string &logPrefix, Stream &inputStream, const BoardMode &mode = MODE_ANY ); - // Reads a set of packets from the input stream. Sets valid[i] to - // true for each valid packet i. - void readPackets( std::vector<struct RSP> &packets, std::vector<bool> &valid ); + // Reads a set of packets from the input stream. Sets the payloadError + // flag for all invalid packets. + void readPackets( std::vector<struct RSP> &packets ); // Reads a packet from the input stream. Returns true if a packet was // succesfully read. diff --git a/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.cc b/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.cc index 41796453aaa203cfc5aabbf4b02c9e15dbe0e252..f94e246b70a751b08ba6b507a745fb3e58e488ea 100644 --- a/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.cc +++ b/RTCP/Cobalt/InputProc/src/Station/PacketsToBuffer.cc @@ -121,17 +121,17 @@ namespace LOFAR for(;;) { // Write first, in case we're given packets to write for (size_t i = 0; i < packets.size(); ++i) { - if (!write[i]) + if (packets[i].payloadError()) continue; writer.writePacket(packets[i]); logStatistics(reader, packets[i]); // mark packet as written - write[i] = false; + packets[i].payloadError(true); } - reader.readPackets(packets, write); + reader.readPackets(packets); } } catch (BadModeException &ex) { diff --git a/RTCP/Cobalt/InputProc/src/Station/filterRSP.cc b/RTCP/Cobalt/InputProc/src/Station/filterRSP.cc index f9dfa676e43c7ea5dd40177b1e87e5e09813e9e3..e979ffc278ccd09fe0d80654350c83080e6bbfd4 100644 --- a/RTCP/Cobalt/InputProc/src/Station/filterRSP.cc +++ b/RTCP/Cobalt/InputProc/src/Station/filterRSP.cc @@ -62,7 +62,6 @@ void usage() struct packetSet { vector<struct RSP> packets; - vector<bool> valid; }; int main(int argc, char **argv) @@ -143,7 +142,6 @@ int main(int argc, char **argv) for (size_t i = 0; i < 256; ++i) { SmartPtr<packetSet> p = new packetSet; p->packets.resize(256); - p->valid.resize(p->packets.size()); readQueue.append(p); } @@ -170,7 +168,7 @@ int main(int argc, char **argv) while (!writerDone && (p = readQueue.remove()) != NULL) { // Read packets and queue them - reader.readPackets(p->packets, p->valid); + reader.readPackets(p->packets); writeQueue.append(p); } } catch(Stream::EndOfStreamException&) { @@ -186,11 +184,11 @@ int main(int argc, char **argv) // Keep reading until NULL while ((p = writeQueue.remove()) != NULL) { for (size_t i = 0; i < p->packets.size(); ++i) { - if (!p->valid[i]) - continue; - struct RSP &packet = p->packets[i]; + if (packet.payloadError()) + continue; + // **** Apply FROM filter **** if (from > 0 && packet.header.timestamp < from) continue;