diff --git a/.gitattributes b/.gitattributes index 6bcc256f405959d13e642677e13294ac417deb18..640ea360b696164af11f0c791cc226c91c67c355 100644 --- a/.gitattributes +++ b/.gitattributes @@ -3625,6 +3625,7 @@ RTCP/InputProc/src/CMakeLists.txt -text RTCP/InputProc/src/Generator.cc -text RTCP/InputProc/src/Generator.h -text RTCP/InputProc/src/OMPThread.h -text +RTCP/InputProc/src/PacketReader.cc -text RTCP/InputProc/src/PacketReader.h -text RTCP/InputProc/src/Poll.h -text RTCP/InputProc/src/Ranges.cc -text diff --git a/RTCP/InputProc/src/CMakeLists.txt b/RTCP/InputProc/src/CMakeLists.txt index 1b89ee75d1e1f231ee8f8bd6016f7d3b847286de..048b2766c3b8fc031b98fcdc8042a584ae5d5c88 100644 --- a/RTCP/InputProc/src/CMakeLists.txt +++ b/RTCP/InputProc/src/CMakeLists.txt @@ -4,6 +4,7 @@ lofar_add_library(inputproc Package__Version.cc BufferSettings.cc Generator.cc + PacketReader.cc Ranges.cc SharedMemory.cc StationID.cc diff --git a/RTCP/InputProc/src/PacketReader.cc b/RTCP/InputProc/src/PacketReader.cc new file mode 100644 index 0000000000000000000000000000000000000000..5d5b98ed324fa34ea5dbd027f91775b72d15c8b4 --- /dev/null +++ b/RTCP/InputProc/src/PacketReader.cc @@ -0,0 +1,111 @@ +#include <lofar_config.h> +#include "PacketReader.h" +#include <Common/LofarLogger.h> +#include <Stream/SocketStream.h> +#include <Interface/RSPTimeStamp.h> +#include <Interface/Stream.h> +#include <boost/format.hpp> + +namespace LOFAR { +namespace RTCP { + + +PacketReader::PacketReader( const std::string &logPrefix, const std::string &streamDescriptor, const struct BufferSettings &settings ) +: + logPrefix(str(boost::format("%s [PacketReader] ") % logPrefix)), + settings(settings), + + nrReceived(0), + nrBadSize(0), + nrBadTime(0), + nrBadData(0), + nrBadMode(0), + hadSizeError(false), + hadModeError(false) +{ + inputStream = createStream(streamDescriptor, true); + + SocketStream *asSocket = dynamic_cast<SocketStream *>(inputStream.get()); + bool isUDP = asSocket && asSocket->protocol == SocketStream::UDP; + + supportPartialReads = !isUDP; +} + + +bool PacketReader::readPacket( struct RSP &packet ) +{ + if (supportPartialReads) { + // read header first + inputStream->read(&packet.header, sizeof packet.header); + + // read rest of packet + inputStream->read(&packet.payload.data, packet.packetSize() - sizeof packet.header); + + ++nrReceived; + } else { + // read full packet at once -- numbytes will tell us how much we've actually read + size_t numbytes = inputStream->tryRead(&packet, sizeof packet); + + ++nrReceived; + + if( numbytes < sizeof(struct RSP::Header) + || numbytes != packet.packetSize() ) { + + if (!hadSizeError) { + LOG_ERROR_STR( logPrefix << "Packet is " << numbytes << " bytes, but should be " << packet.packetSize() << " bytes" ); + hadSizeError = true; + } + + ++nrBadSize; + return false; + } + } + + // illegal timestamp means illegal packet + if (packet.header.timestamp == ~0U) { + ++nrBadTime; + return false; + } + + // check sanity of packet + + // discard packets with errors + if (packet.payloadError()) { + ++nrBadData; + return false; + } + + // check whether the station configuration matches ours + if (packet.clockMHz() * 1000000 != settings.station.clock + || packet.bitMode() != settings.station.bitmode) { + + if (!hadModeError) { + LOG_ERROR_STR( logPrefix << "Packet has mode (" << packet.clockMHz() << " MHz, " << packet.bitMode() << " bit), but should be mode (" << settings.station.clock / 1000000 << " MHz, " << settings.station.bitmode << " bit)"); + hadModeError = true; + } + + ++nrBadMode; + return false; + } + + return true; +} + + +void PacketReader::logStatistics() +{ + LOG_INFO_STR( logPrefix << "Received " << nrReceived << " packets: " << nrBadTime << " bad timestamps, " << nrBadSize << " bad sizes, " << nrBadData << " payload errors, " << nrBadMode << " clock/bitmode errors" ); + + nrReceived = 0; + nrBadTime = 0; + nrBadSize = 0; + nrBadData = 0; + nrBadMode = 0; + + hadSizeError = false; + hadModeError = false; +} + + +} +} diff --git a/RTCP/InputProc/src/PacketReader.h b/RTCP/InputProc/src/PacketReader.h index 1f50d36400818dae37a0fd469b0c261d1b7e9b59..346c0263a6cbed57b4bd8f478d803f4a33dca13f 100644 --- a/RTCP/InputProc/src/PacketReader.h +++ b/RTCP/InputProc/src/PacketReader.h @@ -1,16 +1,10 @@ #ifndef __PACKETREADER__ #define __PACKETREADER__ -#include <Common/LofarLogger.h> #include <Stream/Stream.h> -#include <Stream/SocketStream.h> -#include <Interface/RSPTimeStamp.h> #include <Interface/SmartPtr.h> -#include <Interface/Stream.h> #include <IONProc/RSP.h> -#include "SampleBuffer.h" #include "BufferSettings.h" -#include <boost/format.hpp> #include <string> namespace LOFAR { @@ -39,102 +33,6 @@ private: bool hadSizeError, hadModeError; }; -PacketReader::PacketReader( const std::string &logPrefix, const std::string &streamDescriptor, const struct BufferSettings &settings ) -: - logPrefix(str(boost::format("%s [PacketReader] ") % logPrefix)), - settings(settings), - - nrReceived(0), - nrBadSize(0), - nrBadTime(0), - nrBadData(0), - nrBadMode(0), - hadSizeError(false), - hadModeError(false) -{ - inputStream = createStream(streamDescriptor, true); - - SocketStream *asSocket = dynamic_cast<SocketStream *>(inputStream.get()); - bool isUDP = asSocket && asSocket->protocol == SocketStream::UDP; - - supportPartialReads = !isUDP; -} - - -bool PacketReader::readPacket( struct RSP &packet ) -{ - if (supportPartialReads) { - // read header first - inputStream->read(&packet.header, sizeof packet.header); - - // read rest of packet - inputStream->read(&packet.payload.data, packet.packetSize() - sizeof packet.header); - - ++nrReceived; - } else { - // read full packet at once -- numbytes will tell us how much we've actually read - size_t numbytes = inputStream->tryRead(&packet, sizeof packet); - - ++nrReceived; - - if( numbytes < sizeof(struct RSP::Header) - || numbytes != packet.packetSize() ) { - - if (!hadSizeError) { - LOG_ERROR_STR( logPrefix << "Packet is " << numbytes << " bytes, but should be " << packet.packetSize() << " bytes" ); - hadSizeError = true; - } - - ++nrBadSize; - return false; - } - } - - // illegal timestamp means illegal packet - if (packet.header.timestamp == ~0U) { - ++nrBadTime; - return false; - } - - // check sanity of packet - - // discard packets with errors - if (packet.payloadError()) { - ++nrBadData; - return false; - } - - // check whether the station configuration matches ours - if (packet.clockMHz() * 1000000 != settings.station.clock - || packet.bitMode() != settings.station.bitmode) { - - if (!hadModeError) { - LOG_ERROR_STR( logPrefix << "Packet has mode (" << packet.clockMHz() << " MHz, " << packet.bitMode() << " bit), but should be mode (" << settings.station.clock / 1000000 << " MHz, " << settings.station.bitmode << " bit)"); - hadModeError = true; - } - - ++nrBadMode; - return false; - } - - return true; -} - - -void PacketReader::logStatistics() -{ - LOG_INFO_STR( logPrefix << "Received " << nrReceived << " packets: " << nrBadTime << " bad timestamps, " << nrBadSize << " bad sizes, " << nrBadData << " payload errors, " << nrBadMode << " clock/bitmode errors" ); - - nrReceived = 0; - nrBadTime = 0; - nrBadSize = 0; - nrBadData = 0; - nrBadMode = 0; - - hadSizeError = false; - hadModeError = false; -} - } }