diff --git a/LCS/Stream/include/Stream/CMakeLists.txt b/LCS/Stream/include/Stream/CMakeLists.txt index f24ab1a792e08a8753d94b46d21a32e1464a0837..854586b75227a02447db51483cb509deb47dc961 100644 --- a/LCS/Stream/include/Stream/CMakeLists.txt +++ b/LCS/Stream/include/Stream/CMakeLists.txt @@ -7,7 +7,8 @@ set (inst_HEADERS PortBroker.h SharedMemoryStream.h SocketStream.h - Stream.h) + Stream.h + StreamFactory.h) # Create symbolic link to include directory. execute_process(COMMAND ${CMAKE_COMMAND} -E create_symlink diff --git a/LCS/Stream/include/Stream/SocketStream.h b/LCS/Stream/include/Stream/SocketStream.h index 04770aa4b9e7d2d9007e2827705cf9bc66e86c5c..0207af0d3c29bfe32467375acd55efc246256471 100644 --- a/LCS/Stream/include/Stream/SocketStream.h +++ b/LCS/Stream/include/Stream/SocketStream.h @@ -37,8 +37,6 @@ namespace LOFAR { class SocketStream : public FileDescriptorBasedStream { public: - EXCEPTION_CLASS(TimeOutException, LOFAR::Exception); - enum Protocol { TCP, UDP }; @@ -78,6 +76,8 @@ class SocketStream : public FileDescriptorBasedStream static void deletekey(const std::string &nfskey); }; +EXCEPTION_CLASS(TimeOutException, LOFAR::Exception); + } // namespace LOFAR #include "SocketStream.tcc" diff --git a/LCS/Stream/include/Stream/Stream.h b/LCS/Stream/include/Stream/Stream.h index 915ffc4bf6c59d528186893f416a927dc7e778f7..49cbf2465d2ff69dc349c22d3825ab104b620bc7 100644 --- a/LCS/Stream/include/Stream/Stream.h +++ b/LCS/Stream/include/Stream/Stream.h @@ -34,8 +34,6 @@ namespace LOFAR { class Stream { public: - EXCEPTION_CLASS(EndOfStreamException, LOFAR::Exception); - virtual ~Stream(); virtual size_t tryRead(void *ptr, size_t size) = 0; @@ -49,6 +47,8 @@ class Stream virtual void sync(); }; +EXCEPTION_CLASS(EndOfStreamException, LOFAR::Exception); + } // namespace LOFAR #endif diff --git a/LCS/Stream/include/Stream/StreamFactory.h b/LCS/Stream/include/Stream/StreamFactory.h new file mode 100644 index 0000000000000000000000000000000000000000..c0202747364d10eb99596fa6011c5c485928a8b4 --- /dev/null +++ b/LCS/Stream/include/Stream/StreamFactory.h @@ -0,0 +1,38 @@ +//# StreamFactory.h: class/functions to construct streams +//# Copyright (C) 2008-2013 ASTRON (Netherlands Institute for Radio Astronomy) +//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +//# +//# This file is part of the LOFAR software suite. +//# The LOFAR software suite is free software: you can redistribute it and/or +//# modify it under the terms of the GNU General Public License as published +//# by the Free Software Foundation, either version 3 of the License, or +//# (at your option) any later version. +//# +//# The LOFAR software suite is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License along +//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +//# +//# $Id$ + +#ifndef LOFAR_STREAM_STREAMFACTORY_H +#define LOFAR_STREAM_STREAMFACTORY_H + +#include <ctime> +#include <string> +#include <Stream/Stream.h> + +namespace LOFAR +{ + + // Create a stream from a descriptor. + // Caller should wrap the returned pointer in some smart ptr type. + Stream *createStream(const std::string &descriptor, bool asReader, time_t deadline = 0); + +} // namespace LOFAR + +#endif + diff --git a/LCS/Stream/src/CMakeLists.txt b/LCS/Stream/src/CMakeLists.txt index a84cd04edb8f1edf808e3da5bc94913acce46e6b..355f76184272d710dd565918352b39aa1ec89b4c 100644 --- a/LCS/Stream/src/CMakeLists.txt +++ b/LCS/Stream/src/CMakeLists.txt @@ -13,6 +13,7 @@ lofar_add_library(stream SharedMemoryStream.cc SocketStream.cc Stream.cc - StringStream.cc) + StringStream.cc + StreamFactory.cc) lofar_add_bin_program(versionstream versionstream.cc) diff --git a/LCS/Stream/src/FileDescriptorBasedStream.cc b/LCS/Stream/src/FileDescriptorBasedStream.cc index bc2be0eab03807fe17e8572af14b2ee7aca6a2c7..ebd7a2c9074b8a4affbab2981763ddb8b1708db8 100644 --- a/LCS/Stream/src/FileDescriptorBasedStream.cc +++ b/LCS/Stream/src/FileDescriptorBasedStream.cc @@ -22,32 +22,36 @@ #include <lofar_config.h> -#include <Common/LofarLogger.h> -#include <Common/SystemCallException.h> #include <Stream/FileDescriptorBasedStream.h> -#include <Common/Thread/Cancellation.h> #include <unistd.h> -#include <stdexcept> - +#include <Common/SystemCallException.h> +#include <Common/Thread/Cancellation.h> +#include <Common/LofarLogger.h> namespace LOFAR { + FileDescriptorBasedStream::~FileDescriptorBasedStream() { - ScopedDelayCancellation dc; // close() can throw as it is a cancellation point - - if (fd >= 0 && close(fd) < 0) { - // try/throw/catch to match patterns elsewhere. - // - // This ensures a proper string for errno, a - // backtrace if available, and the proper representation - // of exceptions in general. - try { - THROW_SYSCALL("close"); - } catch (Exception &ex) { - LOG_ERROR_STR("Exception in destructor: " << ex); + if (fd >= 0) { + int rv; + + { + // Avoid close() throwing in the destructor, + // as it is a cancellation point (see pthreads(7)). + ScopedDelayCancellation dc; + + rv = ::close(fd); + } + if (rv < 0) { + // Print error message similar to other failed system calls. + try { + THROW_SYSCALL("close"); + } catch (Exception &exc) { + LOG_ERROR_STR(exc); + } } } } @@ -80,7 +84,7 @@ size_t FileDescriptorBasedStream::tryWrite(const void *ptr, size_t size) void FileDescriptorBasedStream::sync() { - if (fsync(fd) < 0) + if (::fsync(fd) < 0) THROW_SYSCALL("fsync"); } diff --git a/LCS/Stream/src/SocketStream.cc b/LCS/Stream/src/SocketStream.cc index 06143ce018df258f08063dbb72f5565a319ad1fd..bffa81946f08f4ca0d53e25822f9f8304715fee2 100644 --- a/LCS/Stream/src/SocketStream.cc +++ b/LCS/Stream/src/SocketStream.cc @@ -1,6 +1,6 @@ //# SocketStream.cc: //# -//# Copyright (C) 2008 +//# Copyright (C) 2008, 2015 //# ASTRON (Netherlands Institute for Radio Astronomy) //# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands //# @@ -22,27 +22,27 @@ #include <lofar_config.h> -#include <Common/LofarLogger.h> -#include <Common/Thread/Cancellation.h> -#include <Common/Thread/Mutex.h> #include <Stream/SocketStream.h> +#include <cstdlib> #include <cstring> #include <cstdio> - -#include <dirent.h> -#include <errno.h> -#include <netdb.h> -#include <sys/select.h> -#include <sys/socket.h> +#include <cerrno> #include <sys/types.h> -#include <time.h> #include <unistd.h> -#include <cstdlib> +#include <time.h> +#include <sys/select.h> +#include <sys/socket.h> +#include <netdb.h> +#include <dirent.h> #include <boost/lexical_cast.hpp> #include <boost/format.hpp> +#include <Common/Thread/Mutex.h> +#include <Common/Thread/Cancellation.h> +#include <Common/LofarLogger.h> + //# AI_NUMERICSERV is not defined on OS-X #ifndef AI_NUMERICSERV # define AI_NUMERICSERV 0 @@ -59,7 +59,7 @@ const int MAXPORT = 30000; static struct RandomState { - RandomState() { + RandomState() { xsubi[0] = getpid(); xsubi[1] = time(0); xsubi[2] = time(0) >> 16; @@ -87,7 +87,7 @@ SocketStream::SocketStream(const std::string &hostname, uint16 _port, Protocol p % _port); struct addrinfo hints; - bool autoPort = (port == 0); + const bool autoPort = (port == 0); // use getaddrinfo, because gethostbyname is not thread safe memset(&hints, 0, sizeof(struct addrinfo)); @@ -105,6 +105,11 @@ SocketStream::SocketStream(const std::string &hostname, uint16 _port, Protocol p for(;;) { try { try { + // Not all potentially blocking calls below have a timeout arg. + // So check early every round. It may abort hanging tests early (=good). + if (deadline > 0 && deadline <= time(0)) + THROW(TimeOutException, "SocketStream"); + char portStr[16]; int retval; struct addrinfo *result; @@ -149,7 +154,7 @@ SocketStream::SocketStream(const std::string &hostname, uint16 _port, Protocol p if (deadline > 0 && time(0) >= deadline) throw TimeOutException("client socket", THROW_ARGS); - if (usleep(999999) < 0) { + if (usleep(999999) < 0) { // near 1 sec; max portably safe arg val // interrupted by a signal handler -- abort to allow this thread to // be forced to continue after receiving a SIGINT, as with any other // system call in this constructor @@ -158,7 +163,7 @@ SocketStream::SocketStream(const std::string &hostname, uint16 _port, Protocol p } else THROW_SYSCALL(str(boost::format("connect [%s]") % description)); } else { - int on = 1; + const int on = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof on) < 0) THROW_SYSCALL("setsockopt(SO_REUSEADDR)"); @@ -170,7 +175,7 @@ SocketStream::SocketStream(const std::string &hostname, uint16 _port, Protocol p listen_sk = fd; fd = -1; - int listenBacklog = 15; + const int listenBacklog = 15; if (listen(listen_sk, listenBacklog) < 0) THROW_SYSCALL(str(boost::format("listen [%s]") % description)); @@ -309,8 +314,10 @@ void SocketStream::syncNFS() if (!dir) THROW_SYSCALL("opendir"); - if (!readdir(dir)) - THROW_SYSCALL("readdir"); + struct dirent entry; + struct dirent *result; + if (readdir_r(dir, &entry, &result) != 0 || !result) + THROW_SYSCALL("readdir_r"); if (closedir(dir) != 0) THROW_SYSCALL("closedir"); @@ -335,11 +342,11 @@ std::string SocketStream::readkey(const std::string &nfskey, time_t deadline) if (deadline > 0 && deadline <= time(0)) THROW(TimeOutException, "client socket"); - if (usleep(999999) > 0) { + if (usleep(999999) < 0) { // near 1 sec; max portably safe arg val // interrupted by a signal handler -- abort to allow this thread to // be forced to continue after receiving a SIGINT, as with any other // system call - THROW_SYSCALL("sleep"); + THROW_SYSCALL("usleep"); } } } diff --git a/LCS/Stream/src/StreamFactory.cc b/LCS/Stream/src/StreamFactory.cc new file mode 100644 index 0000000000000000000000000000000000000000..e5a3b71765e40bdd2932dc6deb1a92c513299029 --- /dev/null +++ b/LCS/Stream/src/StreamFactory.cc @@ -0,0 +1,73 @@ +//# StreamFactory.cc: class/functions to construct streams +//# Copyright (C) 2008-2013 ASTRON (Netherlands Institute for Radio Astronomy) +//# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +//# +//# This file is part of the LOFAR software suite. +//# The LOFAR software suite is free software: you can redistribute it and/or +//# modify it under the terms of the GNU General Public License as published +//# by the Free Software Foundation, either version 3 of the License, or +//# (at your option) any later version. +//# +//# The LOFAR software suite is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License along +//# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +//# +//# $Id$ + +//# Always #include <lofar_config.h> first! +#include <lofar_config.h> + +//# Includes +#include <Stream/StreamFactory.h> + +#include <vector> +#include <boost/lexical_cast.hpp> + +#include <Common/StringUtil.h> +#include <Common/Exceptions.h> +#include <Stream/FileStream.h> +#include <Stream/NullStream.h> +#include <Stream/SocketStream.h> +#include <Stream/PortBroker.h> +#include <Stream/NamedPipeStream.h> + +namespace LOFAR +{ + + // Caller should wrap the returned pointer in some smart ptr type. + Stream *createStream(const std::string &descriptor, bool asServer, time_t deadline) + { + std::vector<std::string> split = StringUtil::split(descriptor, ':'); + + if (descriptor == "null:") + return new NullStream; + else if (split.size() == 3 && split[0] == "udp") + return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline); + else if (split.size() == 3 && split[0] == "tcp") + return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::TCP, asServer ? SocketStream::Server : SocketStream::Client, deadline); + else if (split.size() == 3 && split[0] == "udpkey") + return new SocketStream(split[1].c_str(), 0, SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline, split[2].c_str()); + else if (split.size() == 4 && split[0] == "tcpbroker") + return asServer ? static_cast<Stream*>(new PortBroker::ServerStream(split[3])) : static_cast<Stream*>(new PortBroker::ClientStream(split[1], boost::lexical_cast<unsigned short>(split[2]), split[3])); + else if (split.size() == 3 && split[0] == "tcpkey") + return new SocketStream(split[1].c_str(), 0, SocketStream::TCP, asServer ? SocketStream::Server : SocketStream::Client, deadline, split[2].c_str()); + else if (split.size() > 1 && split[0] == "file") { + // don't use split[1] to allow : in filenames + const std::string filename = descriptor.substr(5); + return asServer ? new FileStream(filename.c_str()) : new FileStream(filename.c_str(), 0666); + } else if (split.size() == 2 && split[0] == "pipe") + return new NamedPipeStream(split[1].c_str(), asServer); + else if (split.size() == 2) + return new SocketStream(split[0].c_str(), boost::lexical_cast<unsigned short>(split[1]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline); + else if (split.size() == 1) + return asServer ? new FileStream(split[0].c_str()) : new FileStream(split[0].c_str(), 0666); + + THROW(NotImplemented, std::string("createStream(): unrecognized descriptor: " + descriptor)); + } + +} // namespace LOFAR + diff --git a/LCS/Stream/test/tFixedBufferStream.cc b/LCS/Stream/test/tFixedBufferStream.cc index d6a06699d673f8f8b10740668d6e1f01802d84e1..0186beae9a40865007226c50094c376621535d64 100644 --- a/LCS/Stream/test/tFixedBufferStream.cc +++ b/LCS/Stream/test/tFixedBufferStream.cc @@ -109,7 +109,7 @@ int main(int /*argc*/, const char* argv[]) try { testWrite(ws, 500); - } catch(Stream::EndOfStreamException &e) { + } catch(EndOfStreamException &e) { EOB = true; } @@ -120,7 +120,7 @@ int main(int /*argc*/, const char* argv[]) try { testRead(rs, 500); - } catch(Stream::EndOfStreamException &e) { + } catch(EndOfStreamException &e) { EOB = true; } diff --git a/RTCP/Cobalt/CoInterface/src/Stream.cc b/RTCP/Cobalt/CoInterface/src/Stream.cc index cc8644bc56710bdb650eabfabb946e7dd5387976..f86a9c37fa04c1b3fc627e174f5e7e8a3f1f6237 100644 --- a/RTCP/Cobalt/CoInterface/src/Stream.cc +++ b/RTCP/Cobalt/CoInterface/src/Stream.cc @@ -48,77 +48,6 @@ namespace LOFAR namespace Cobalt { - Stream *createStream(const string &descriptor, bool asServer, time_t deadline) - { - if (deadline > 0 && deadline <= std::time(0)) // TODO: doesn't belong here - THROW(SocketStream::TimeOutException, "createStream deadline passed"); - - vector<string> split = StringUtil::split(descriptor, ':'); - - if (descriptor == "null:") - return new NullStream; - else if (split.size() == 3 && split[0] == "udp") - return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline); - else if (split.size() == 3 && split[0] == "tcp") - return new SocketStream(split[1].c_str(), boost::lexical_cast<unsigned short>(split[2]), SocketStream::TCP, asServer ? SocketStream::Server : SocketStream::Client, deadline); - else if (split.size() == 3 && split[0] == "udpkey") - return new SocketStream(split[1].c_str(), 0, SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline, split[2].c_str()); - else if (split.size() == 4 && split[0] == "tcpbroker") - return asServer ? static_cast<Stream*>(new PortBroker::ServerStream(split[3])) : static_cast<Stream*>(new PortBroker::ClientStream(split[1], boost::lexical_cast<unsigned short>(split[2]), split[3])); - else if (split.size() == 3 && split[0] == "tcpkey") - return new SocketStream(split[1].c_str(), 0, SocketStream::TCP, asServer ? SocketStream::Server : SocketStream::Client, deadline, split[2].c_str()); - else if (split.size() > 1 && split[0] == "file") { - // don't use split[1] to allow : in filenames - const string filename = descriptor.substr(5); - return asServer ? new FileStream(filename.c_str()) : new FileStream(filename.c_str(), 0666); - } else if (split.size() == 2 && split[0] == "pipe") - return new NamedPipeStream(split[1].c_str(), asServer); - else if (split.size() == 2) - return new SocketStream(split[0].c_str(), boost::lexical_cast<unsigned short>(split[1]), SocketStream::UDP, asServer ? SocketStream::Server : SocketStream::Client, deadline); - else if (split.size() == 1) - return asServer ? new FileStream(split[0].c_str()) : new FileStream(split[0].c_str(), 0666); - else - THROW(CoInterfaceException, string("createStream unrecognized descriptor format: " + descriptor)); - } - - // TODO: this function is an unfinished start to refactor the above. - enum StreamSpecification::Protocol readProtocol(char **str) - { - // The first part can be done with iterators, - // but the if-nest below becomes tedious. Use C. - char *base = *str; - unsigned i = 0; - while (isalnum(base[i])) { - i += 1; - } - - enum StreamSpecification::Protocol rv; - if (strncmp(base, "udp", i) == 0) { - rv = StreamSpecification::UDP_STREAM; - } else if (strncmp(base, "tcp", i) == 0) { - rv = StreamSpecification::TCP_STREAM; - } else if (strncmp(base, "file", i) == 0) { - rv = StreamSpecification::FILE_STREAM; - } else if (strncmp(base, "null", i) == 0) { - rv = StreamSpecification::NULL_STREAM; - } else if (strncmp(base, "pipe", i) == 0) { - rv = StreamSpecification::PIPE_STREAM; - } else if (strncmp(base, "udpkey", i) == 0) { - rv = StreamSpecification::UDPKEY_STREAM; - } else if (strncmp(base, "tcpkey", i) == 0) { - rv = StreamSpecification::TCPKEY_STREAM; - } else if (strncmp(base, "tcpbroker", i) == 0) { - rv = StreamSpecification::TCPBROKER_STREAM; - } else if (strncmp(base, "factory", i) == 0) { - rv = StreamSpecification::FACTORY_STREAM; - } else { - THROW(CoInterfaceException, string("createStream failed to recognize protocol: ") + *str); - } - - *str += i; - return rv; - } - uint16 storageBrokerPort(int observationID) { return 7000 + observationID % 1000; @@ -131,6 +60,7 @@ namespace LOFAR } + // The returned descriptor can be supplied to LCS/Stream StreamFactory.h string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr) { string host = parset.getHostName(outputType, streamNr); diff --git a/RTCP/Cobalt/CoInterface/src/Stream.h b/RTCP/Cobalt/CoInterface/src/Stream.h index d248559710cbf010e37ca003c978005bb4e31e48..8f4797dc77ff00d327d6ac4b335dbaa126a89e89 100644 --- a/RTCP/Cobalt/CoInterface/src/Stream.h +++ b/RTCP/Cobalt/CoInterface/src/Stream.h @@ -27,33 +27,15 @@ #include <CoInterface/Parset.h> #include <Stream/Stream.h> - namespace LOFAR { namespace Cobalt { - // Create a stream from a descriptor - Stream *createStream(const std::string &descriptor, bool asReader, time_t deadline = 0); - - struct StreamSpecification { - enum Protocol { - UNKNOWN_STREAM = 0, UDP_STREAM, TCP_STREAM, FILE_STREAM, NULL_STREAM, - PIPE_STREAM, UDPKEY_STREAM, TCPKEY_STREAM, TCPBROKER_STREAM, FACTORY_STREAM - }; - - Protocol protocol; - std::string hostname; - std::string port; - }; - - // Returns a supported protocol constant if recognized and updates *str. - // Throws without changing *str iff no protocol was recognized. - enum StreamSpecification::Protocol readProtocol(char **str); - uint16 storageBrokerPort(int observationID); std::string getStorageControlDescription(int observationID, int rank); + // The returned descriptor can be supplied to LCS/Stream StreamFactory.h std::string getStreamDescriptorBetweenIONandStorage(const Parset &parset, OutputType outputType, unsigned streamNr); } // namespace Cobalt diff --git a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc index a115b2ff1f38b36b18e6a2aa6b08f4b06b284c2e..3cfc8a2cc5cdbd572e21cb0abee18ade279abb57 100644 --- a/RTCP/Cobalt/CoInterface/src/TABTranspose.cc +++ b/RTCP/Cobalt/CoInterface/src/TABTranspose.cc @@ -490,7 +490,7 @@ void Receiver::receiveLoop() collectors.at(fileIdx)->addSubband(subband); } - } catch (Stream::EndOfStreamException &) { + } catch (EndOfStreamException &) { } } @@ -552,7 +552,7 @@ void MultiReceiver::listenLoop() try { stream = new PortBroker::ServerStream(servicePrefix, true); - } catch(SocketStream::TimeOutException &) { + } catch(TimeOutException &) { // fail silently if no client connected LOG_DEBUG_STR("TABTranspose::MultiReceiver: Timed out"); break; diff --git a/RTCP/Cobalt/CoInterface/test/tTABTranspose.cc b/RTCP/Cobalt/CoInterface/test/tTABTranspose.cc index dd112c9286450412689d94b3e120be5fd2407545..d343f370bcbbf5d714f36b2119edff1973ddca19 100644 --- a/RTCP/Cobalt/CoInterface/test/tTABTranspose.cc +++ b/RTCP/Cobalt/CoInterface/test/tTABTranspose.cc @@ -20,6 +20,8 @@ #include <lofar_config.h> +#include <ctime> + #include <Common/LofarLogger.h> #include <Common/Timer.h> #include <Stream/StringStream.h> @@ -476,8 +478,8 @@ SUITE(MultiReceiver) { // Connect with multiple clients { - PortBroker::ClientStream cs1("localhost", PortBroker::DEFAULT_PORT, "foo-1", 1); - PortBroker::ClientStream cs2("localhost", PortBroker::DEFAULT_PORT, "foo-2", 1); + PortBroker::ClientStream cs1("localhost", PortBroker::DEFAULT_PORT, "foo-1", time(0) + 1); + PortBroker::ClientStream cs2("localhost", PortBroker::DEFAULT_PORT, "foo-2", time(0) + 1); // Disconnect them too! (~cs) } @@ -495,7 +497,7 @@ SUITE(MultiReceiver) { // Connect { - PortBroker::ClientStream cs("localhost", PortBroker::DEFAULT_PORT, "foo-1", 1); + PortBroker::ClientStream cs("localhost", PortBroker::DEFAULT_PORT, "foo-1", time(0) + 1); // Send one block { diff --git a/RTCP/Cobalt/GPUProc/src/CommandThread.cc b/RTCP/Cobalt/GPUProc/src/CommandThread.cc index 810709a56edeaf12159d09ac741891f7c1684830..be0349f946c982999e5f1adbe83c414c71a7eb07 100644 --- a/RTCP/Cobalt/GPUProc/src/CommandThread.cc +++ b/RTCP/Cobalt/GPUProc/src/CommandThread.cc @@ -23,8 +23,8 @@ #include "CommandThread.h" #include <Common/LofarLogger.h> +#include <Stream/StreamFactory.h> #include <CoInterface/MultiDimArray.h> -#include <CoInterface/Stream.h> #include <InputProc/Transpose/MPIProtocol.h> #include <InputProc/Transpose/MPIUtil.h> @@ -59,7 +59,7 @@ namespace LOFAR { LOG_INFO_STR("[CommandThread] Received command: '" << command << "'"); return command; - } catch(Stream::EndOfStreamException &) { + } catch(EndOfStreamException &) { LOG_INFO("[CommandThread] Connection reset by peer"); } catch(OMPThreadSet::CannotStartException &) { /* stop() was called */ diff --git a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc index a459da0af621d9f4de2c6d9352b6d4373d86b4af..d762ff061d658c292954c1aad8b99aac70711ac2 100644 --- a/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc +++ b/RTCP/Cobalt/GPUProc/src/Station/StationInput.cc @@ -39,6 +39,7 @@ #include <Common/LofarLogger.h> #include <Common/Timer.h> #include <Stream/FileStream.h> +#include <Stream/StreamFactory.h> #include <CoInterface/Parset.h> #include <CoInterface/OMPThread.h> #include <CoInterface/TimeFuncs.h> @@ -289,7 +290,7 @@ namespace LOFAR { outputQueue.append(rspData); } - } catch (Stream::EndOfStreamException &ex) { + } catch (EndOfStreamException &ex) { // Ran out of data LOG_INFO_STR( logPrefix << "End of stream"); @@ -404,7 +405,7 @@ namespace LOFAR { // Retry until we have a valid packet while (!readers[board]->readPacket(last_packets[board])) ; - } catch (Stream::EndOfStreamException &ex) { + } catch (EndOfStreamException &ex) { // Ran out of data LOG_INFO_STR( logPrefix << "End of stream"); diff --git a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc index 942726e88376f83d45e3265646612f5080d5bfb4..9ff49da785934ee2f3e09f69e84c45e7dac719bd 100644 --- a/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/cuda/Pipelines/Pipeline.cc @@ -33,6 +33,7 @@ #include <Stream/Stream.h> #include <Stream/FileStream.h> #include <Stream/NullStream.h> +#include <Stream/StreamFactory.h> #include <CoInterface/Align.h> #include <CoInterface/BudgetTimer.h> diff --git a/RTCP/Cobalt/GPUProc/src/opencl/Pipelines/CorrelatorPipeline.cc b/RTCP/Cobalt/GPUProc/src/opencl/Pipelines/CorrelatorPipeline.cc index 0b27e32f09a838e2e5a20a071f073ec9260a4241..dca433a4d19e0c130bd0a505e174a6a82c5473c9 100644 --- a/RTCP/Cobalt/GPUProc/src/opencl/Pipelines/CorrelatorPipeline.cc +++ b/RTCP/Cobalt/GPUProc/src/opencl/Pipelines/CorrelatorPipeline.cc @@ -29,6 +29,7 @@ #include <Stream/Stream.h> #include <Stream/FileStream.h> #include <Stream/NullStream.h> +#include <Stream/StreamDescriptor.h> #include <CoInterface/CorrelatedData.h> #include <CoInterface/Stream.h> diff --git a/RTCP/Cobalt/InputProc/src/Station/Generator.cc b/RTCP/Cobalt/InputProc/src/Station/Generator.cc index 31e55581597542359ba2843e8319d44a951b6cfc..dcc2c6355218b4aab9dffb533124ea6e923ca2a4 100644 --- a/RTCP/Cobalt/InputProc/src/Station/Generator.cc +++ b/RTCP/Cobalt/InputProc/src/Station/Generator.cc @@ -85,7 +85,7 @@ namespace LOFAR current += packet.header.nrBlocks; } - } catch (Stream::EndOfStreamException &ex) { + } catch (EndOfStreamException &ex) { LOG_INFO_STR( logPrefix << "End of stream"); } catch (SystemCallException &ex) { if (ex.error == EINTR) diff --git a/RTCP/Cobalt/InputProc/src/Station/filterRSP.cc b/RTCP/Cobalt/InputProc/src/Station/filterRSP.cc index e979ffc278ccd09fe0d80654350c83080e6bbfd4..c3972602f95e75e4cdb0a47f2bcbe0b723c8af0f 100644 --- a/RTCP/Cobalt/InputProc/src/Station/filterRSP.cc +++ b/RTCP/Cobalt/InputProc/src/Station/filterRSP.cc @@ -30,8 +30,8 @@ #include <Common/Thread/Queue.h> #include <Common/Thread/Thread.h> #include <Common/LofarLogger.h> +#include <Stream/StreamFactory.h> #include <ApplCommon/PosixTime.h> -#include <CoInterface/Stream.h> #include <CoInterface/SmartPtr.h> #include "RSP.h" #include "PacketReader.h" @@ -171,7 +171,7 @@ int main(int argc, char **argv) reader.readPackets(p->packets); writeQueue.append(p); } - } catch(Stream::EndOfStreamException&) { + } catch(EndOfStreamException&) { } writeQueue.append(NULL); diff --git a/RTCP/Cobalt/InputProc/src/Station/generate.cc b/RTCP/Cobalt/InputProc/src/Station/generate.cc index 6687344c1d76baf605309884a1e7b0bfa6bb3c00..a4b8b47d7e917874ecb84a1bf3665bd26393058c 100644 --- a/RTCP/Cobalt/InputProc/src/Station/generate.cc +++ b/RTCP/Cobalt/InputProc/src/Station/generate.cc @@ -25,7 +25,7 @@ #include <omp.h> #include <Common/LofarLogger.h> -#include <CoInterface/Stream.h> +#include <Stream/StreamFactory.h> #include <InputProc/Station/PacketFactory.h> #include <InputProc/Station/Generator.h> diff --git a/RTCP/Cobalt/InputProc/src/Station/generateRSP.cc b/RTCP/Cobalt/InputProc/src/Station/generateRSP.cc index b11e6f65da29872f58c2cace93e08696214bea5c..7d03614d4580ffdc2815feb6be083f16dd090ad6 100644 --- a/RTCP/Cobalt/InputProc/src/Station/generateRSP.cc +++ b/RTCP/Cobalt/InputProc/src/Station/generateRSP.cc @@ -31,8 +31,8 @@ #include <Common/LofarLogger.h> #include <ApplCommon/PosixTime.h> +#include <Stream/StreamFactory.h> #include <CoInterface/SmartPtr.h> -#include <CoInterface/Stream.h> #include <InputProc/Buffer/BoardMode.h> #include <InputProc/RSPTimeStamp.h> #include <InputProc/Station/RSP.h> diff --git a/RTCP/Cobalt/InputProc/src/Station/printRSP.cc b/RTCP/Cobalt/InputProc/src/Station/printRSP.cc index 7adc2a17838414a901b11cd5fde3d652f416e39d..7b22f4e70f142b23acbc39617cab0bc606ccb7ea 100644 --- a/RTCP/Cobalt/InputProc/src/Station/printRSP.cc +++ b/RTCP/Cobalt/InputProc/src/Station/printRSP.cc @@ -68,7 +68,7 @@ void report( const string &filename ) // read payload f.read( &packet.payload, packet.packetSize() - sizeof (RSP::Header) ); } - } catch(Stream::EndOfStreamException &) { + } catch(EndOfStreamException &) { } } diff --git a/RTCP/Cobalt/InputProc/src/Station/repairRSP.cc b/RTCP/Cobalt/InputProc/src/Station/repairRSP.cc index ec50ac68fba14fa8699b75f05391bb6fb8f626b2..650de89400f52831fb74bdcfa0d2ef61df82e174 100644 --- a/RTCP/Cobalt/InputProc/src/Station/repairRSP.cc +++ b/RTCP/Cobalt/InputProc/src/Station/repairRSP.cc @@ -31,7 +31,7 @@ #include <Common/Thread/Thread.h> #include <Common/LofarLogger.h> #include <ApplCommon/PosixTime.h> -#include <CoInterface/Stream.h> +#include <Stream/StreamFactory.h> #include <CoInterface/SmartPtr.h> #include "RSP.h" #include "PacketReader.h" @@ -144,7 +144,7 @@ int main(int argc, char **argv) // Write packet outputStream->write(&packet, packet.packetSize()); } - } catch(Stream::EndOfStreamException&) { + } catch(EndOfStreamException&) { } } diff --git a/RTCP/Cobalt/InputProc/test/tGenerator.cc b/RTCP/Cobalt/InputProc/test/tGenerator.cc index ebd38d779781fcc44a57af9c8b699ddebb9daf84..8ffc79708f0423a8c131c2563e9f412f2dec95b6 100644 --- a/RTCP/Cobalt/InputProc/test/tGenerator.cc +++ b/RTCP/Cobalt/InputProc/test/tGenerator.cc @@ -26,7 +26,7 @@ #include <omp.h> #include <Common/LofarLogger.h> -#include <CoInterface/Stream.h> +#include <Stream/StreamFactory.h> #include <CoInterface/OMPThread.h> #include <InputProc/Station/PacketFactory.h> diff --git a/RTCP/Cobalt/InputProc/test/tPacketReader.cc b/RTCP/Cobalt/InputProc/test/tPacketReader.cc index ae0333a6b051c44018d55eb158e1d24d929e9040..a689800de0a9123c892c5068d12d9f77ea6ca34b 100644 --- a/RTCP/Cobalt/InputProc/test/tPacketReader.cc +++ b/RTCP/Cobalt/InputProc/test/tPacketReader.cc @@ -51,7 +51,7 @@ void test(const std::string &filename, unsigned bitmode, unsigned nrPackets) for( size_t i = 0; i < 3; ++i) { try { ASSERT( !reader.readPacket(packet) ); - } catch (Stream::EndOfStreamException &ex) { + } catch (EndOfStreamException &ex) { // expected } } diff --git a/RTCP/Cobalt/InputProc/test/t_generateRSP.cc b/RTCP/Cobalt/InputProc/test/t_generateRSP.cc index 0bbe4525cb616a39803e1525e65be61ad225a855..119467fbf92c22698e61ef04a96a6737c10e1e30 100644 --- a/RTCP/Cobalt/InputProc/test/t_generateRSP.cc +++ b/RTCP/Cobalt/InputProc/test/t_generateRSP.cc @@ -85,7 +85,7 @@ void read_rsp(Stream& is, ostream& os, unsigned bitMode, unsigned nrSubbands) } os << endl; } - } catch (Stream::EndOfStreamException&) { } + } catch (EndOfStreamException&) { } } int main() diff --git a/RTCP/Cobalt/OutputProc/src/InputThread.cc b/RTCP/Cobalt/OutputProc/src/InputThread.cc index e636374c33e074c9af41d87b7aae29887b670db5..f520d26ddbeed9c844aa5f8d1354aacb7687bbbb 100644 --- a/RTCP/Cobalt/OutputProc/src/InputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/InputThread.cc @@ -26,6 +26,7 @@ #include <Common/Timer.h> #include <Stream/NullStream.h> #include <Stream/SocketStream.h> +#include <Stream/StreamFactory.h> #include <CoInterface/Stream.h> @@ -57,9 +58,9 @@ namespace LOFAR LOG_DEBUG_STR(itsLogPrefix << "Read block with seqno = " << data->sequenceNumber()); } - } catch (SocketStream::TimeOutException &) { + } catch (TimeOutException &) { LOG_WARN_STR(itsLogPrefix << "Connection from " << itsInputDescriptor << " timed out"); - } catch (Stream::EndOfStreamException &) { + } catch (EndOfStreamException &) { LOG_INFO_STR(itsLogPrefix << "Connection from " << itsInputDescriptor << " closed"); } catch (SystemCallException &ex) { LOG_WARN_STR(itsLogPrefix << "Connection from " << itsInputDescriptor << " failed: " << ex.text()); diff --git a/RTCP/Cobalt/OutputProc/src/TBB_Writer.cc b/RTCP/Cobalt/OutputProc/src/TBB_Writer.cc index 15fc6a768ec068d68d8acefcefe3e5dcdeeba58a..6ff57faea260c9462cc668b131df931cb53c6421 100644 --- a/RTCP/Cobalt/OutputProc/src/TBB_Writer.cc +++ b/RTCP/Cobalt/OutputProc/src/TBB_Writer.cc @@ -52,9 +52,8 @@ #include <Common/StringUtil.h> #include <Common/StreamUtil.h> #include <ApplCommon/AntField.h> -#include <Stream/SocketStream.h> +#include <Stream/StreamFactory.h> #include <CoInterface/Exceptions.h> -#include <CoInterface/Stream.h> #include <dal/lofar/StationNames.h> @@ -1061,7 +1060,7 @@ namespace LOFAR } catch (exception& exc) { LOG_WARN_STR(itsLogPrefix << "may have lost a frame buffer (1): " << exc.what()); } - } catch (Stream::EndOfStreamException& ) { // after end of stream, for input from file or pipe + } catch (EndOfStreamException& ) { // after end of stream, for input from file or pipe break; } catch (exception& exc) { LOG_FATAL_STR(itsLogPrefix << exc.what()); diff --git a/RTCP/Cobalt/OutputProc/src/plotMS.cc b/RTCP/Cobalt/OutputProc/src/plotMS.cc index 226ee955ba47d42f607eafc7121e49c642f4a625..504da3040c1f1e5eeb48c95d38a785b281f40f68 100644 --- a/RTCP/Cobalt/OutputProc/src/plotMS.cc +++ b/RTCP/Cobalt/OutputProc/src/plotMS.cc @@ -195,7 +195,7 @@ int main(int argc, char *argv[]) for(;; ) { try { data->read(&datafile, true, 512); - } catch (Stream::EndOfStreamException &) { + } catch (EndOfStreamException &) { break; } //data->peerMagicNumber = 0xda7a0000; // fake wrong endianness to circumvent bug diff --git a/RTCP/Cobalt/OutputProc/test/tSubbandWriter.cc b/RTCP/Cobalt/OutputProc/test/tSubbandWriter.cc index 02d5cd431e7757627926383bc6386980a210c2c8..f68e6fea6d00e8075ba67ee05737e836b2fddcc0 100644 --- a/RTCP/Cobalt/OutputProc/test/tSubbandWriter.cc +++ b/RTCP/Cobalt/OutputProc/test/tSubbandWriter.cc @@ -22,14 +22,17 @@ #include <string> #include <cstdlib> +#include <string> #include <omp.h> -#include <UnitTest++.h> + #include <boost/format.hpp> +#include <boost/lexical_cast.hpp> +#include <Stream/StreamFactory.h> +#include <Stream/PortBroker.h> #include <CoInterface/CorrelatedData.h> #include <CoInterface/Stream.h> #include <OutputProc/SubbandWriter.h> -#include <Stream/PortBroker.h> #include <MSLofar/FailedTileInfo.h> #include <tables/Tables/Table.h> @@ -37,8 +40,7 @@ #include <tables/Tables/ArrayColumn.h> #include <casa/Quanta/MVTime.h> -#include <boost/lexical_cast.hpp> -#include <string> +#include <UnitTest++.h> using namespace std; using namespace LOFAR;