Skip to content
Snippets Groups Projects
Select Git revision
  • fcb096194e695862d5c52ec6a399454bbcd31c03
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

InputThread.cc

Blame
  • John Romein's avatar
    John Romein authored
    Major cleanup
    fcb09619
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    InputThread.cc 4.64 KiB
    //#  InputThread.cc: the thread that reads from a TH and places data into the buffer of the input section
    //#
    //#  Copyright (C) 2006
    //#  ASTRON (Netherlands Foundation for Research in Astronomy)
    //#  P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
    //#
    //#  This program is free software; you can redistribute it and/or modify
    //#  it under the terms of the GNU General Public License as published by
    //#  the Free Software Foundation; either version 2 of the License, or
    //#  (at your option) any later version.
    //#
    //#  This program is distributed in the hope that it will be useful,
    //#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    //#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    //#  GNU General Public License for more details.
    //#
    //#  You should have received a copy of the GNU General Public License
    //#  along with this program; if not, write to the Free Software
    //#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
    //#
    //#  $Id$
    
    //# Always #include <lofar_config.h> first!
    #include <lofar_config.h>
    
    //# Includes
    #include <Common/LofarLogger.h>
    #include <Common/hexdump.h>
    #include <CS1_InputSection/InputThread.h>
    #include <Common/Timer.h>
    #include <Transport/TransportHolder.h>
    #include <Transport/TH_MPI.h>
    #include <CS1_InputSection/BeamletBuffer.h>
    
    namespace LOFAR {
    namespace CS1 {
    
    bool InputThread::theirShouldStop = false;
    volatile unsigned InputThread::nrPacketsReceived, InputThread::nrPacketsRejected;
    
    InputThread::InputThread(ThreadArgs args) : itsArgs(args)
    {
    }
    
    InputThread::~InputThread()
    {
    }
    
    // log from separate thread, since printing from a signal handler causes deadlocks
    
    void *InputThread::logThread(void *)
    {
      while (!theirShouldStop) {
    #if defined HAVE_MPI
        std::clog << TH_MPI::getCurrentRank() << ": received " << nrPacketsReceived << " packets, rejected " << nrPacketsRejected << " packets" << std::endl;
    #else
        std::clog << "received " << nrPacketsReceived << " packets, rejected " << nrPacketsRejected << " packets" << std::endl;
    #endif
        nrPacketsReceived = nrPacketsRejected = 0; // race conditions, but who cares
        sleep(1);
      }
    
      return 0;
    }
    
    void InputThread::operator()()
    {
      LOG_TRACE_FLOW_STR("WH_RSPInput WriterThread");   
    
      pthread_t logThreadId;
    
      if (pthread_create(&logThreadId, 0, logThread, 0) != 0) {
        std::cerr << "could not create log thread " << std::endl;
        exit(1);
      }
    
      TimeStamp actualstamp = -itsArgs.nTimesPerFrame;
    
      // buffer for incoming rsp data
      int frameSize = itsArgs.frameSize;
      // reserve space in case there is an ip header in front of the packet
      char totRecvframe[frameSize + itsArgs.ipHeaderSize];
      char *recvframe = totRecvframe;
    
      if (itsArgs.th->getType() == "TH_Ethernet") {
        // only with TH_Ethernet there is an IPHeader
        // but also when we have recorded it from the rsp boards!
        recvframe += itsArgs.ipHeaderSize;
        frameSize += itsArgs.ipHeaderSize;
      };
    
      ASSERTSTR(itsArgs.th->init(), "Could not init TransportHolder");
    
      NSTimer receiveTimer("receiveTimer", true), writeTimer("writeTimer", true);
      bool dataShouldContainValidStamp = (itsArgs.th->getType() != "TH_Null");
    
      while (!theirShouldStop) {
    retry: // until valid packet received
    
        try {
          receiveTimer.start();
          itsArgs.th->recvBlocking((void *) totRecvframe, frameSize, 0);
          receiveTimer.stop();
          ++ nrPacketsReceived;
        } catch (Exception& e) {
          LOG_TRACE_FLOW_STR("WriteToBufferThread couldn't read from TransportHolder(" << e.what() << ", exiting");
          exit(1);
        }	
    
        // get the actual timestamp of first EPApacket in frame
        if (dataShouldContainValidStamp) {
          unsigned seqid   = * ((unsigned *) &recvframe[8]);
          unsigned blockid = * ((unsigned *) &recvframe[12]);
    
          //if the seconds counter is 0xFFFFFFFF, the data cannot be trusted.
          if (seqid == ~0U) {
    	++ nrPacketsRejected;
    	goto retry;
          }
    
          actualstamp.setStamp(seqid, blockid);
        } else {
          actualstamp += itsArgs.nTimesPerFrame; 
        }
      
        // expected packet received so write data into corresponding buffer
        writeTimer.start();
    
        try {
          itsArgs.BBuffer->writeElements((Beamlet *) &recvframe[itsArgs.frameHeaderSize], actualstamp, itsArgs.nTimesPerFrame, itsArgs.nSubbandsPerFrame);
        } catch (Exception& e) {
          LOG_TRACE_FLOW_STR("WriteToBufferThread couldn't write to BeamletBuffer(" << e.what() << ", stopping thread");
          break;
        }	
    
        writeTimer.stop();
      }
    
      if (pthread_join(logThreadId, 0) != 0) {
        std::cerr << "could not join log thread" << std::endl;
        exit(1);
      }
    }
    
    InputThread::InputThread(const InputThread &that)
      : itsArgs(that.itsArgs)
    {
    }
    
    } // namespace CS1
    } // namespace LOFAR