Skip to content
Snippets Groups Projects
Select Git revision
  • 41a3918847edbb6efa2c732d96e1c754cffc5d32
  • master default protected
  • esap-gui-auth
  • esap-gui-dev
4 results

Main.js

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    InputThread.cc 6.89 KiB
    //#  InputThread.cc: the thread that reads from a TH and places data into the buffer of the input section
    //#
    //#  Copyright (C) 2006
    //#  ASTRON (Netherlands Foundation for Research in Astronomy)
    //#  P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
    //#
    //#  This program is free software; you can redistribute it and/or modify
    //#  it under the terms of the GNU General Public License as published by
    //#  the Free Software Foundation; either version 2 of the License, or
    //#  (at your option) any later version.
    //#
    //#  This program is distributed in the hope that it will be useful,
    //#  but WITHOUT ANY WARRANTY; without even the implied warranty of
    //#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    //#  GNU General Public License for more details.
    //#
    //#  You should have received a copy of the GNU General Public License
    //#  along with this program; if not, write to the Free Software
    //#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
    //#
    //#  $Id$
    
    //# Always #include <lofar_config.h> first!
    #include <lofar_config.h>
    
    //# Includes
    #include <Common/LofarLogger.h>
    #include <CS1_InputSection/InputThread.h>
    #include <Common/Timer.h>
    #include <Transport/TransportHolder.h>
    #include <CS1_InputSection/BeamletBuffer.h>
    
    namespace LOFAR {
      namespace CS1 {
    
        bool InputThread::theirShouldStop = false;
    
        InputThread::InputThread(ThreadArgs args) : itsArgs(args)
        {}
    
        InputThread::~InputThread()
        {}
    
        void InputThread::operator()()
        {
          LOG_TRACE_FLOW_STR("WH_RSPInput WriterThread");   
          int seqid = 0;
          int blockid = 0;
          TimeStamp actualstamp;
    
    #define PACKET_STATISTICS_NOT
    #ifdef PACKET_STATISTICS
          TimeStamp expectedstamp;
          vector<PacketStats> missedStamps;
          vector<PacketStats> oldStamps;
          vector<PacketStats> invalidStamps;
          missedStamps.reserve(500);
          oldStamps.reserve(500);
          invalidStamps.reserve(500);
    #endif
    
          bool firstloop = true;
        
          // buffer for incoming rsp data
          int frameSize = itsArgs.frameSize;
          // reserve space in case there is an ip header in front of the packet
          char totRecvframe[frameSize + itsArgs.ipHeaderSize];
          memset(totRecvframe, 0, sizeof(totRecvframe));
          char* recvframe = totRecvframe;
          if (itsArgs.th->getType() == "TH_Ethernet") {
    	// only with TH_Ethernet there is an IPHeader
            // but also when we have recorded it from the rsp boards!
    	recvframe += itsArgs.ipHeaderSize;
    	frameSize += itsArgs.ipHeaderSize;
          };
        
          vector<NSTimer*> itsTimers;
          NSTimer threadTimer("threadTimer");
          NSTimer receiveTimer("receiveTimer");
          NSTimer writeTimer("writeTimer");
          itsTimers.push_back(&threadTimer);
          itsTimers.push_back(&receiveTimer);
          itsTimers.push_back(&writeTimer);
    
          // init Transportholder
          ASSERTSTR(itsArgs.th->init(), "Could not init TransportHolder");
    
          // how far is one beamlet of a subband away from the next beamlet of the same subband
          int strideSize = itsArgs.nSubbandsPerFrame;
    
          bool dataContainsValidStamp = (itsArgs.th->getType() != "TH_Null");
    
          while(!theirShouldStop) {
    	threadTimer.start();
    
    	bool validTimeStampReceived = false;
    	while (!validTimeStampReceived) {
    	  try {
    	    receiveTimer.start();
    	    //cerr<<"InputThread reading "<<frameSize<<" bytes from TH ("<<(void*)itsArgs.th<<" into "<<(void*)totRecvframe<<endl;
    	    itsArgs.th->recvBlocking( (void*)totRecvframe, frameSize, 0);
    	    receiveTimer.stop();
    	  } catch (Exception& e) {
    	    LOG_TRACE_FLOW_STR("WriteToBufferThread couldn't read from TransportHolder("<<e.what()<<", stopping thread");
    	    break;
    	  }	
    
    	  // get the actual timestamp of first EPApacket in frame
    	  if (!dataContainsValidStamp) {
    	    if (!firstloop) {
    	      actualstamp += itsArgs.nTimesPerFrame; 
    	    } else {
    	      actualstamp = TimeStamp(0, 0);
    #ifdef PACKET_STATISTICS
    	      expectedstamp = actualstamp;
    #endif
    	      firstloop = false;
    	    }	  
    	    validTimeStampReceived = true;
    	  } else {
    	    seqid   = ((int*)&recvframe[8])[0];
    	    blockid = ((int*)&recvframe[12])[0];
    	    validTimeStampReceived = (seqid != 0xffff); //if the second counter has 0xffff, the data cannot be trusted.
    	    //cerr<<"InputThread received valid? :"<<validTimeStampReceived<<endl;
    #ifdef PACKET_STATISTICS
    	    if (!validTimeStampReceived) invalidStamps.push_back({actualstamp, expectedstamp} );
    	    // hack for error in rsp boards where timestamps are not equal on both boards
    #endif
    	    actualstamp.setStamp(seqid ,blockid);
    
    	    //cerr<<endl<<"Reading stamp: " << actualstamp<<endl;
    #ifdef PACKET_STATISTICS
    	    if (firstloop) {
    	      // firstloop
    	      expectedstamp.setStamp(seqid, blockid); // init expectedstamp
    	      firstloop = false;
    	    }
    #endif
    	  }
    	}
          
    	// check and process the incoming data
    #ifdef PACKET_STATISTICS
    	if (actualstamp < expectedstamp) {
    	  PacketStats rewritten = {actualstamp, expectedstamp};
    	  oldStamps.push_back(rewritten);
    	} else if (actualstamp > expectedstamp) {
    	  do {
    	    PacketStats missed = {actualstamp, expectedstamp};
    	    missedStamps.push_back(missed);
    	    // increase the expectedstamp
    	    expectedstamp += itsArgs.nTimesPerFrame;
    	  } while (actualstamp > expectedstamp);
    	}
    	// increase the expectedstamp
    	expectedstamp += itsArgs.nTimesPerFrame; 
    #endif
    	writeTimer.start();
    	// expected packet received so write data into corresponding buffer
    	//cerr<<"InputThread: "<<actualstamp<<endl;
    
    	itsArgs.BBuffer->writeElements((Beamlet*)&recvframe[itsArgs.frameHeaderSize], actualstamp, itsArgs.nTimesPerFrame, strideSize);
    
    	writeTimer.stop();
    	threadTimer.stop();
          }
    
          printTimers(itsTimers);
    #ifdef PACKET_STATISTICS
          LOG_TRACE_INFO("Timestamps of missed packets:");
          vector<PacketStats>::iterator it = missedStamps.begin();
          for (; it != missedStamps.end(); it++) {
    	LOG_TRACE_INFO_STR("MIS " << it->expectedStamp << " missed at time " << it->receivedStamp);
          }
          LOG_TRACE_INFO_STR("Rewritten packets:");
          vector<PacketStats>::iterator rit = oldStamps.begin();
          for (; rit != oldStamps.end(); rit++) {
    	LOG_TRACE_INFO_STR("REW " << rit->receivedStamp<<" received at time "<< rit->expectedStamp);
          }
          LOG_TRACE_INFO_STR("Invalid timestamps:");
          vector<PacketStats>::iterator iit = invalidStamps.begin();
          for (; iit != invalidStamps.end(); iit++) {
    	LOG_TRACE_INFO_STR("INV received at time "<< iit->expectedStamp);
          }
    #endif
        }
    
        void InputThread::printTimers(vector<NSTimer*>& timers)
        {
          vector<NSTimer*>::iterator it = timers.begin();
          for (; it != timers.end(); it++) {
    	(*it)->print(cout);
          }
        }
    
        InputThread::InputThread (const InputThread& that)
          : itsArgs(that.itsArgs)
        {}
        ////InputThread& InputThread::operator= (const InputThread& that)
        ////{
        ////  if (this != &that) {
        ////    ... copy members ...
        ////  }
        ////  return *this;
        ////}
    
      } // namespace CS1
    } // namespace LOFAR