Skip to content
Snippets Groups Projects
Commit 9cdb6be4 authored by John Romein's avatar John Romein
Browse files

bug 225:

Fixed short reads for non-UDP input.
parent b861fd37
No related branches found
No related tags found
No related merge requests found
...@@ -53,11 +53,12 @@ class SocketStream : public FileDescriptorBasedStream ...@@ -53,11 +53,12 @@ class SocketStream : public FileDescriptorBasedStream
void reaccept(time_t timeout = 0); // only for TCP server socket void reaccept(time_t timeout = 0); // only for TCP server socket
void setReadBufferSize(size_t size); void setReadBufferSize(size_t size);
const Protocol protocol;
const Mode mode;
private: private:
const char *hostname; const char *hostname;
uint16 port; uint16 port;
const Protocol protocol;
const Mode mode;
const char *nfskey; const char *nfskey;
int listen_sk; int listen_sk;
......
...@@ -47,10 +47,10 @@ const int MAXPORT = 30000; ...@@ -47,10 +47,10 @@ const int MAXPORT = 30000;
SocketStream::SocketStream(const char *hostname, uint16 _port, Protocol protocol, Mode mode, time_t timeout, const char *nfskey ) SocketStream::SocketStream(const char *hostname, uint16 _port, Protocol protocol, Mode mode, time_t timeout, const char *nfskey )
: :
hostname(hostname),
port(_port),
protocol(protocol), protocol(protocol),
mode(mode), mode(mode),
hostname(hostname),
port(_port),
nfskey(nfskey), nfskey(nfskey),
listen_sk(-1) listen_sk(-1)
{ {
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include <Interface/AlignedStdAllocator.h> #include <Interface/AlignedStdAllocator.h>
#include <Interface/Exceptions.h> #include <Interface/Exceptions.h>
#include <Stream/NullStream.h> #include <Stream/NullStream.h>
#include <Stream/SocketStream.h>
#include <BeamletBuffer.h> #include <BeamletBuffer.h>
#include <InputThread.h> #include <InputThread.h>
#include <RSP.h> #include <RSP.h>
...@@ -91,18 +92,26 @@ template <typename SAMPLE_TYPE> void InputThread<SAMPLE_TYPE>::mainLoop() ...@@ -91,18 +92,26 @@ template <typename SAMPLE_TYPE> void InputThread<SAMPLE_TYPE>::mainLoop()
bool previousSeqidIsAccepted = false; bool previousSeqidIsAccepted = false;
bool dataShouldContainValidStamp = dynamic_cast<NullStream *>(itsArgs.stream) == 0; bool dataShouldContainValidStamp = dynamic_cast<NullStream *>(itsArgs.stream) == 0;
bool isUDPstream = dynamic_cast<SocketStream *>(itsArgs.stream) != 0 && dynamic_cast<SocketStream *>(itsArgs.stream)->protocol == SocketStream::UDP;
WallClockTime wallClockTime; WallClockTime wallClockTime;
LOG_DEBUG_STR(itsArgs.logPrefix << " input thread " << itsArgs.threadID << " entering loop"); LOG_DEBUG_STR(itsArgs.logPrefix << " input thread " << itsArgs.threadID << " entering loop");
while (!itsShouldStop) { while (!itsShouldStop) {
size_t size;
try { try {
// interruptible read, to allow stopping this thread even if the station // interruptible read, to allow stopping this thread even if the station
// does not send data // does not send data
size = itsArgs.stream->tryRead(currentPacketPtr, packetSize); if (isUDPstream) {
if (itsArgs.stream->tryRead(currentPacketPtr, packetSize) != packetSize) {
++ itsArgs.packetCounters->received;
++ itsArgs.packetCounters->badSize;
continue;
}
} else {
itsArgs.stream->read(currentPacketPtr, packetSize);
}
} catch (Stream::EndOfStreamException &) { } catch (Stream::EndOfStreamException &) {
break; break;
} catch (SystemCallException &ex) { } catch (SystemCallException &ex) {
...@@ -114,11 +123,6 @@ template <typename SAMPLE_TYPE> void InputThread<SAMPLE_TYPE>::mainLoop() ...@@ -114,11 +123,6 @@ template <typename SAMPLE_TYPE> void InputThread<SAMPLE_TYPE>::mainLoop()
++ itsArgs.packetCounters->received; ++ itsArgs.packetCounters->received;
if (size != packetSize) {
++ itsArgs.packetCounters->badSize;
continue;
}
if (dataShouldContainValidStamp) { if (dataShouldContainValidStamp) {
#if defined __PPC__ #if defined __PPC__
unsigned seqid, blockid; unsigned seqid, blockid;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment