From 9cdb6be4cf735ac2e1c9fa312da76ebf4bae4f3c Mon Sep 17 00:00:00 2001 From: John Romein <romein@astron.nl> Date: Wed, 15 Dec 2010 14:32:09 +0000 Subject: [PATCH] bug 225: Fixed short reads for non-UDP input. --- LCS/Stream/include/Stream/SocketStream.h | 5 +++-- LCS/Stream/src/SocketStream.cc | 4 ++-- RTCP/IONProc/src/InputThread.cc | 20 ++++++++++++-------- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/LCS/Stream/include/Stream/SocketStream.h b/LCS/Stream/include/Stream/SocketStream.h index 497cd76b48c..4bb4f5c1af8 100644 --- a/LCS/Stream/include/Stream/SocketStream.h +++ b/LCS/Stream/include/Stream/SocketStream.h @@ -53,11 +53,12 @@ class SocketStream : public FileDescriptorBasedStream void reaccept(time_t timeout = 0); // only for TCP server socket void setReadBufferSize(size_t size); + const Protocol protocol; + const Mode mode; + private: const char *hostname; uint16 port; - const Protocol protocol; - const Mode mode; const char *nfskey; int listen_sk; diff --git a/LCS/Stream/src/SocketStream.cc b/LCS/Stream/src/SocketStream.cc index a503eb06cbc..49347a16ca7 100644 --- a/LCS/Stream/src/SocketStream.cc +++ b/LCS/Stream/src/SocketStream.cc @@ -47,10 +47,10 @@ const int MAXPORT = 30000; SocketStream::SocketStream(const char *hostname, uint16 _port, Protocol protocol, Mode mode, time_t timeout, const char *nfskey ) : - hostname(hostname), - port(_port), protocol(protocol), mode(mode), + hostname(hostname), + port(_port), nfskey(nfskey), listen_sk(-1) { diff --git a/RTCP/IONProc/src/InputThread.cc b/RTCP/IONProc/src/InputThread.cc index ffa68a81c13..172c08a1bc9 100644 --- a/RTCP/IONProc/src/InputThread.cc +++ b/RTCP/IONProc/src/InputThread.cc @@ -32,6 +32,7 @@ #include <Interface/AlignedStdAllocator.h> #include <Interface/Exceptions.h> #include <Stream/NullStream.h> +#include <Stream/SocketStream.h> #include <BeamletBuffer.h> #include <InputThread.h> #include <RSP.h> @@ -91,18 +92,26 @@ template <typename SAMPLE_TYPE> void InputThread<SAMPLE_TYPE>::mainLoop() bool previousSeqidIsAccepted = false; bool dataShouldContainValidStamp = dynamic_cast<NullStream *>(itsArgs.stream) == 0; + bool isUDPstream = dynamic_cast<SocketStream *>(itsArgs.stream) != 0 && dynamic_cast<SocketStream *>(itsArgs.stream)->protocol == SocketStream::UDP; WallClockTime wallClockTime; LOG_DEBUG_STR(itsArgs.logPrefix << " input thread " << itsArgs.threadID << " entering loop"); while (!itsShouldStop) { - size_t size; - try { // interruptible read, to allow stopping this thread even if the station // does not send data - size = itsArgs.stream->tryRead(currentPacketPtr, packetSize); + if (isUDPstream) { + if (itsArgs.stream->tryRead(currentPacketPtr, packetSize) != packetSize) { + ++ itsArgs.packetCounters->received; + ++ itsArgs.packetCounters->badSize; + continue; + } + } else { + itsArgs.stream->read(currentPacketPtr, packetSize); + } + } catch (Stream::EndOfStreamException &) { break; } catch (SystemCallException &ex) { @@ -114,11 +123,6 @@ template <typename SAMPLE_TYPE> void InputThread<SAMPLE_TYPE>::mainLoop() ++ itsArgs.packetCounters->received; - if (size != packetSize) { - ++ itsArgs.packetCounters->badSize; - continue; - } - if (dataShouldContainValidStamp) { #if defined __PPC__ unsigned seqid, blockid; -- GitLab