diff --git a/LCS/Stream/include/Stream/SocketStream.h b/LCS/Stream/include/Stream/SocketStream.h index 497cd76b48ce52afc0c6f20a86ba9758d274aeb0..4bb4f5c1af84134b9dcae758c4c0c4d27f5f4437 100644 --- a/LCS/Stream/include/Stream/SocketStream.h +++ b/LCS/Stream/include/Stream/SocketStream.h @@ -53,11 +53,12 @@ class SocketStream : public FileDescriptorBasedStream void reaccept(time_t timeout = 0); // only for TCP server socket void setReadBufferSize(size_t size); + const Protocol protocol; + const Mode mode; + private: const char *hostname; uint16 port; - const Protocol protocol; - const Mode mode; const char *nfskey; int listen_sk; diff --git a/LCS/Stream/src/SocketStream.cc b/LCS/Stream/src/SocketStream.cc index a503eb06cbc676230ea9031cd9da00d6af264b7d..49347a16ca7c389b6cb76b6dcff5311e01f4de7e 100644 --- a/LCS/Stream/src/SocketStream.cc +++ b/LCS/Stream/src/SocketStream.cc @@ -47,10 +47,10 @@ const int MAXPORT = 30000; SocketStream::SocketStream(const char *hostname, uint16 _port, Protocol protocol, Mode mode, time_t timeout, const char *nfskey ) : - hostname(hostname), - port(_port), protocol(protocol), mode(mode), + hostname(hostname), + port(_port), nfskey(nfskey), listen_sk(-1) { diff --git a/RTCP/IONProc/src/InputThread.cc b/RTCP/IONProc/src/InputThread.cc index ffa68a81c1328f05cf47a601346ec4529cb5f9b7..172c08a1bc95cde96c6cb1b9a328c23a7a1a2ad1 100644 --- a/RTCP/IONProc/src/InputThread.cc +++ b/RTCP/IONProc/src/InputThread.cc @@ -32,6 +32,7 @@ #include <Interface/AlignedStdAllocator.h> #include <Interface/Exceptions.h> #include <Stream/NullStream.h> +#include <Stream/SocketStream.h> #include <BeamletBuffer.h> #include <InputThread.h> #include <RSP.h> @@ -91,18 +92,26 @@ template <typename SAMPLE_TYPE> void InputThread<SAMPLE_TYPE>::mainLoop() bool previousSeqidIsAccepted = false; bool dataShouldContainValidStamp = dynamic_cast<NullStream *>(itsArgs.stream) == 0; + bool isUDPstream = dynamic_cast<SocketStream *>(itsArgs.stream) != 0 && dynamic_cast<SocketStream *>(itsArgs.stream)->protocol == SocketStream::UDP; WallClockTime wallClockTime; LOG_DEBUG_STR(itsArgs.logPrefix << " input thread " << itsArgs.threadID << " entering loop"); while (!itsShouldStop) { - size_t size; - try { // interruptible read, to allow stopping this thread even if the station // does not send data - size = itsArgs.stream->tryRead(currentPacketPtr, packetSize); + if (isUDPstream) { + if (itsArgs.stream->tryRead(currentPacketPtr, packetSize) != packetSize) { + ++ itsArgs.packetCounters->received; + ++ itsArgs.packetCounters->badSize; + continue; + } + } else { + itsArgs.stream->read(currentPacketPtr, packetSize); + } + } catch (Stream::EndOfStreamException &) { break; } catch (SystemCallException &ex) { @@ -114,11 +123,6 @@ template <typename SAMPLE_TYPE> void InputThread<SAMPLE_TYPE>::mainLoop() ++ itsArgs.packetCounters->received; - if (size != packetSize) { - ++ itsArgs.packetCounters->badSize; - continue; - } - if (dataShouldContainValidStamp) { #if defined __PPC__ unsigned seqid, blockid;