diff --git a/.gitattributes b/.gitattributes index 1e93ea47952b26cd6b855ed75a40c8e13b04f31d..fa6e7d8dcc8a095a52591698bcac67e41bad8187 100644 --- a/.gitattributes +++ b/.gitattributes @@ -3974,6 +3974,7 @@ RTCP/InputProc/src/OMPThread.h -text RTCP/InputProc/src/PacketReader.cc -text RTCP/InputProc/src/PacketReader.h -text RTCP/InputProc/src/PacketWriter.h -text +RTCP/InputProc/src/PacketsToBuffer.cc -text RTCP/InputProc/src/PacketsToBuffer.h -text RTCP/InputProc/src/RSP.h -text RTCP/InputProc/src/RSPBoards.cc -text diff --git a/RTCP/InputProc/src/CMakeLists.txt b/RTCP/InputProc/src/CMakeLists.txt index 82199571180f30b77f0a3fe170e1f31b15431813..a4655caea717ec7ac97ef8a88d27a5ba0e04fc37 100644 --- a/RTCP/InputProc/src/CMakeLists.txt +++ b/RTCP/InputProc/src/CMakeLists.txt @@ -5,6 +5,7 @@ lofar_add_library(inputproc BufferSettings.cc Generator.cc PacketReader.cc + PacketsToBuffer.cc Ranges.cc SharedMemory.cc StationID.cc diff --git a/RTCP/InputProc/src/PacketsToBuffer.cc b/RTCP/InputProc/src/PacketsToBuffer.cc new file mode 100644 index 0000000000000000000000000000000000000000..ec5924f91abdbdf85aa8fd014b8de1ccf3592dbf --- /dev/null +++ b/RTCP/InputProc/src/PacketsToBuffer.cc @@ -0,0 +1,124 @@ +#include <lofar_config.h> +#include "PacketsToBuffer.h" +#include <Common/LofarLogger.h> +#include <Interface/SmartPtr.h> +#include <Interface/Stream.h> +#include <Stream/Stream.h> +#include "RSP.h" +#include "SampleBuffer.h" +#include "BufferSettings.h" +#include "PacketReader.h" +#include "PacketWriter.h" +#include <boost/format.hpp> +#include <string> +#include <ios> + +namespace LOFAR { +namespace RTCP { + + +PacketsToBuffer::PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr ) +: + logPrefix(str(boost::format("[station %s board %u] ") % settings.station % boardNr)), + inputStream(inputStream), + settings(settings), + boardNr(boardNr) +{ + LOG_INFO_STR( logPrefix << "Initialised" ); +} + + + +void PacketsToBuffer::process() +{ + // Holder for packet + struct RSP packet; + + // Whether packet has been read already + bool packetValid = false; + + // Keep reading if mode changes + for(;;) { + try { + // Process packets based on (expected) bit mode + switch(settings.station.bitMode) { + case 16: + process<i16complex>(packet, packetValid); + break; + + case 8: + process<i8complex>(packet, packetValid); + break; + + case 4: + process<i4complex>(packet, packetValid); + break; + } + + // process<>() exited gracefully, so we're done + break; + } catch (PacketReader::BadModeException &ex) { + // Mode switch detected + unsigned bitMode = packet.bitMode(); + unsigned clockMHz = packet.clockMHz(); + + LOG_INFO_STR( logPrefix << "Mode switch detected to " << clockMHz << " MHz, " << bitMode << " bit"); + + // update settings + settings.station.bitMode = bitMode; + settings.station.clockMHz = clockMHz; + + // Process packet again + packetValid = true; + } + } +} + + +template<typename T> void PacketsToBuffer::process( struct RSP &packet, bool writeGivenPacket ) throw(PacketReader::BadModeException) +{ + // Create input structures + PacketReader reader(logPrefix, inputStream); + + // Create output structures + SampleBuffer<T> buffer(settings, true); + PacketWriter<T> writer(logPrefix, buffer, buffer.flags[boardNr], settings.nrBeamlets / settings.nrBoards * boardNr, settings); + + try { + // Process lingering packet from previous run, if any + if (writeGivenPacket) + writer.writePacket(packet); + + // Transport packets from reader to writer + for(;;) + if (reader.readPacket(packet, settings)) + writer.writePacket(packet); + + } catch (PacketReader::BadModeException &ex) { + // Packet has different clock or bitmode + throw; + } catch (Stream::EndOfStreamException &ex) { + // Ran out of data + LOG_INFO_STR( logPrefix << "End of stream"); + + } catch (SystemCallException &ex) { + if (ex.error == EINTR) + LOG_INFO_STR( logPrefix << "Aborted: " << ex.what()); + else + LOG_ERROR_STR( logPrefix << "Caught Exception: " << ex); + + } catch (Exception &ex) { + LOG_ERROR_STR( logPrefix << "Caught Exception: " << ex); + } + + LOG_INFO_STR( logPrefix << "End"); +} + + +template void PacketsToBuffer::process<i16complex>( struct RSP &packet, bool writeGivenPacket ) throw(PacketReader::BadModeException); +template void PacketsToBuffer::process<i8complex>( struct RSP &packet, bool writeGivenPacket ) throw(PacketReader::BadModeException); +template void PacketsToBuffer::process<i4complex>( struct RSP &packet, bool writeGivenPacket ) throw(PacketReader::BadModeException); + + +} +} diff --git a/RTCP/InputProc/src/PacketsToBuffer.h b/RTCP/InputProc/src/PacketsToBuffer.h index 90e5c70e954994c99cc26d5e4785243041429a72..02e2eb09d4428717122be6dbe3b90bd489e43817 100644 --- a/RTCP/InputProc/src/PacketsToBuffer.h +++ b/RTCP/InputProc/src/PacketsToBuffer.h @@ -1,14 +1,14 @@ #ifndef __PACKETSTOBUFFER__ #define __PACKETSTOBUFFER__ -#include <Common/LofarLogger.h> +#include <Interface/SmartPtr.h> +#include <Interface/Stream.h> #include <Stream/Stream.h> #include "RSP.h" +#include "RSPBoards.h" #include "SampleBuffer.h" #include "BufferSettings.h" #include "PacketReader.h" -#include "PacketWriter.h" -#include <boost/format.hpp> #include <string> #include <ios> @@ -73,103 +73,6 @@ private: const std::vector<std::string> streamDescriptors; }; -PacketsToBuffer::PacketsToBuffer( Stream &inputStream, const BufferSettings &settings, unsigned boardNr ) -: - logPrefix(str(boost::format("[station %s board %u] ") % settings.station % boardNr)), - inputStream(inputStream), - settings(settings), - boardNr(boardNr) -{ - LOG_INFO_STR( logPrefix << "Initialised" ); -} - - -void PacketsToBuffer::process() -{ - // Holder for packet - struct RSP packet; - - // Whether packet has been read already - bool packetValid = false; - - // Default mode - - for(;;) { - try { - // Process packets based on (expected) bit mode - switch(settings.station.bitMode) { - case 16: - process<i16complex>(packet, packetValid); - break; - - case 8: - process<i8complex>(packet, packetValid); - break; - - case 4: - process<i4complex>(packet, packetValid); - break; - } - - // process<>() exited gracefully, so we're done - break; - } catch (PacketReader::BadModeException &ex) { - // Mode switch detected - unsigned bitMode = packet.bitMode(); - unsigned clockMHz = packet.clockMHz(); - - LOG_INFO_STR( logPrefix << "Mode switch detected to " << clockMHz << " MHz, " << bitMode << " bit"); - - // update settings - settings.station.bitMode = bitMode; - settings.station.clockMHz = clockMHz; - - // Process packet again - packetValid = true; - } - } -} - - -template<typename T> void PacketsToBuffer::process( struct RSP &packet, bool writeGivenPacket ) throw(PacketReader::BadModeException) -{ - // Create input structures - PacketReader reader(logPrefix, inputStream); - - // Create output structures - SampleBuffer<T> buffer(settings, true); - PacketWriter<T> writer(logPrefix, buffer, buffer.flags[boardNr], settings.nrBeamlets / settings.nrBoards * boardNr, settings); - - try { - // Process lingering packet from previous run, if any - if (writeGivenPacket) - writer.writePacket(packet); - - // Transport packets from reader to writer - for(;;) - if (reader.readPacket(packet, settings)) - writer.writePacket(packet); - - } catch (PacketReader::BadModeException &ex) { - // Packet has different clock or bitmode - throw; - } catch (Stream::EndOfStreamException &ex) { - // Ran out of data - LOG_INFO_STR( logPrefix << "End of stream"); - - } catch (SystemCallException &ex) { - if (ex.error == EINTR) - LOG_INFO_STR( logPrefix << "Aborted: " << ex.what()); - else - LOG_ERROR_STR( logPrefix << "Caught Exception: " << ex); - - } catch (Exception &ex) { - LOG_ERROR_STR( logPrefix << "Caught Exception: " << ex); - } - - LOG_INFO_STR( logPrefix << "End"); -} - } }