Skip to content
Snippets Groups Projects
Select Git revision
  • b6166080bd55d6ede2bbe8d4282dbb1fe7cb0dcd
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

PacketStream.h

  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    PacketStream.h 4.40 KiB
    /* PacketStream.h
     * Copyright (C) 2012-2013  ASTRON (Netherlands Institute for Radio Astronomy)
     * P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
     *
     * This file is part of the LOFAR software suite.
     * The LOFAR software suite is free software: you can redistribute it and/or
     * modify it under the terms of the GNU General Public License as published
     * by the Free Software Foundation, either version 3 of the License, or
     * (at your option) any later version.
     *
     * The LOFAR software suite is distributed in the hope that it will be useful,
     * but WITHOUT ANY WARRANTY; without even the implied warranty of
     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     * GNU General Public License for more details.
     *
     * You should have received a copy of the GNU General Public License along
     * with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
     *
     * $Id$
     */
    
    #ifndef LOFAR_INPUT_PROC_PACKETSTREAM_H
    #define LOFAR_INPUT_PROC_PACKETSTREAM_H
    
    #include <Stream/Stream.h>
    #include <Common/Thread/Cancellation.h>
    #include <CoInterface/RSP.h>
    #include <CoInterface/RSPTimeStamp.h>
    #include "PacketFactory.h"
    
    namespace LOFAR
    {
      namespace Cobalt
      {
        /* Generate a Stream of RSP packets. */
    
        class PacketStream: public Stream
        {
        public:
          // 'factory' will be copied.
          PacketStream( const PacketFactory &factory, const TimeStamp &from, const TimeStamp &to, size_t boardNr = 0 )
          :
            factory(factory),
            from(from),
            to(to),
            current(from),
            boardNr(boardNr),
            offset(0)
          {
          }
    
          virtual size_t tryRead(void *ptr, size_t size)
          {
            Cancellation::point();
    
            if (size == 0) {
              return 0;
            }
    
            if (current >= to) {
              THROW(EndOfStreamException, "No data beyond " << to);
            }
    
            if (offset == 0) {
              // generate new packet
              factory.makePacket(packet, current, boardNr);
              current += packet.header.nrBlocks;
            }
    
            size_t pktSize = packet.packetSize();
            size_t numBytes = std::min(pktSize - offset, size);
            memcpy(ptr, reinterpret_cast<char*>(&packet) + offset, numBytes);
    
            offset += numBytes;
            if (offset == pktSize) {
              // written full packet, so we'll need a new one on next read
              offset = 0;
            }
    
            return numBytes;
          }
    
          virtual size_t tryWrite(const void * /*ptr*/, size_t /*size*/)
          {
            THROW(NotImplemented, "Writing to PacketStream is not supported");
          }
    
          virtual size_t tryReadv(const struct iovec *iov, int iovcnt)
          {
            Cancellation::point();
    
            size_t nread = 0;
            for (int i = 0; i < iovcnt; i++) {
              if (iov[i].iov_len == 0) {
                continue;
              }
    
              if (current >= to) {
                if (nread == 0) {
                  THROW(EndOfStreamException, "No data beyond " << to);
                } else {
                  break;
                }
              }
    
              if (offset == 0) {
                // generate new packet
                factory.makePacket(packet, current, boardNr);
                current += packet.header.nrBlocks;
              }
    
              size_t pktSize = packet.packetSize();
              size_t numBytes = std::min(pktSize - offset, iov[i].iov_len);
              memcpy(iov[i].iov_base, reinterpret_cast<char*>(&packet) + offset, numBytes);
    
              offset += numBytes;
              if (offset == pktSize) {
                // written full packet, so we'll need a new one on next read
                offset = 0;
              }
    
              nread += numBytes;
    
              // Mimic tryRead() impl above: max 1 (partial) packet per buffer.
              // Then we can only use the next iov if we could exactly fill the previous, else our retval is ambiguous.
              if (numBytes < pktSize) {
                break;
              }
            }
    
            return nread;
          }
    
          virtual size_t tryWritev(const struct iovec * /*iov*/, int /*iovcnt*/)
          {
            THROW(NotImplemented, "Writing to PacketStream is not supported");
          }
    
          virtual std::string description() const {
            //TODO: fill in all the packetstream details
            return "PacketStream ";
          }
    
        private:
          PacketFactory factory;
    
          const TimeStamp from;
          const TimeStamp to;
          TimeStamp current;
          const size_t boardNr;
    
          struct RSP packet;
    
          // Write offset within packet. If 0, a new
          // packet is required.
          size_t offset;
        };
      }
    }
    
    #endif