diff --git a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/InputThread.h b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/InputThread.h index 2616044d218f7105e758a6024ada51014a674c29..dc2c80eaf1529b91e208ee2f0f89ef36faf55c56 100644 --- a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/InputThread.h +++ b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/InputThread.h @@ -57,14 +57,6 @@ namespace LOFAR int ID; }; - class PacketStats { - public: - PacketStats(const TimeStamp& rStamp, const TimeStamp& eStamp): receivedStamp(rStamp), expectedStamp(eStamp) {}; - TimeStamp receivedStamp; - TimeStamp expectedStamp; - }; - - // Description of class. class InputThread { public: @@ -80,7 +72,6 @@ namespace LOFAR // Copying is not allowed InputThread& operator= (const InputThread& that); - void printTimers(vector<NSTimer*>& timers); static void *logThread(void *); //# Datamembers diff --git a/Appl/CEP/CS1/CS1_InputSection/src/InputThread.cc b/Appl/CEP/CS1/CS1_InputSection/src/InputThread.cc index 963f751f88b3550af3c362dcc6e3421cdbf612d5..1c897ea8ace42e9d7497e38ddc79eda23c55e0f7 100644 --- a/Appl/CEP/CS1/CS1_InputSection/src/InputThread.cc +++ b/Appl/CEP/CS1/CS1_InputSection/src/InputThread.cc @@ -33,211 +33,119 @@ #include <CS1_InputSection/BeamletBuffer.h> namespace LOFAR { - namespace CS1 { +namespace CS1 { - bool InputThread::theirShouldStop = false; - volatile unsigned InputThread::nrPacketsReceived, InputThread::nrPacketsRejected; +bool InputThread::theirShouldStop = false; +volatile unsigned InputThread::nrPacketsReceived, InputThread::nrPacketsRejected; - InputThread::InputThread(ThreadArgs args) : itsArgs(args) - {} +InputThread::InputThread(ThreadArgs args) : itsArgs(args) +{ +} - InputThread::~InputThread() - {} +InputThread::~InputThread() +{ +} - void *InputThread::logThread(void *) - { - while (!theirShouldStop) { +// log from separate thread, since printing from a signal handler causes deadlocks + +void *InputThread::logThread(void *) +{ + while (!theirShouldStop) { #if defined HAVE_MPI - std::clog << TH_MPI::getCurrentRank() << ": received " << nrPacketsReceived << " packets, rejected " << nrPacketsRejected << " packets" << std::endl; + std::clog << TH_MPI::getCurrentRank() << ": received " << nrPacketsReceived << " packets, rejected " << nrPacketsRejected << " packets" << std::endl; #else - std::clog << "received " << nrPacketsReceived << " packets, rejected " << nrPacketsRejected << " packets" << std::endl; + std::clog << "received " << nrPacketsReceived << " packets, rejected " << nrPacketsRejected << " packets" << std::endl; #endif - nrPacketsReceived = nrPacketsRejected = 0; // race conditions, but who cares - sleep(1); + nrPacketsReceived = nrPacketsRejected = 0; // race conditions, but who cares + sleep(1); + } + + return 0; +} + +void InputThread::operator()() +{ + LOG_TRACE_FLOW_STR("WH_RSPInput WriterThread"); + + pthread_t logThreadId; + + if (pthread_create(&logThreadId, 0, logThread, 0) != 0) { + std::cerr << "could not create log thread " << std::endl; + exit(1); + } + + TimeStamp actualstamp = -itsArgs.nTimesPerFrame; + + // buffer for incoming rsp data + int frameSize = itsArgs.frameSize; + // reserve space in case there is an ip header in front of the packet + char totRecvframe[frameSize + itsArgs.ipHeaderSize]; + char *recvframe = totRecvframe; + + if (itsArgs.th->getType() == "TH_Ethernet") { + // only with TH_Ethernet there is an IPHeader + // but also when we have recorded it from the rsp boards! + recvframe += itsArgs.ipHeaderSize; + frameSize += itsArgs.ipHeaderSize; + }; + + ASSERTSTR(itsArgs.th->init(), "Could not init TransportHolder"); + + NSTimer receiveTimer("receiveTimer", true), writeTimer("writeTimer", true); + bool dataShouldContainValidStamp = (itsArgs.th->getType() != "TH_Null"); + + while (!theirShouldStop) { +retry: // until valid packet received + + try { + receiveTimer.start(); + itsArgs.th->recvBlocking((void *) totRecvframe, frameSize, 0); + receiveTimer.stop(); + ++ nrPacketsReceived; + } catch (Exception& e) { + LOG_TRACE_FLOW_STR("WriteToBufferThread couldn't read from TransportHolder(" << e.what() << ", exiting"); + exit(1); + } + + // get the actual timestamp of first EPApacket in frame + if (dataShouldContainValidStamp) { + unsigned seqid = * ((unsigned *) &recvframe[8]); + unsigned blockid = * ((unsigned *) &recvframe[12]); + + //if the seconds counter is 0xFFFFFFFF, the data cannot be trusted. + if (seqid == ~0U) { + ++ nrPacketsRejected; + goto retry; } - return 0; + actualstamp.setStamp(seqid, blockid); + } else { + actualstamp += itsArgs.nTimesPerFrame; } - - void InputThread::operator()() - { - LOG_TRACE_FLOW_STR("WH_RSPInput WriterThread"); - - pthread_t logThreadId; - - if (pthread_create(&logThreadId, 0, logThread, 0) != 0) { - std::cerr << "could not create log thread " << std::endl; - exit(1); - } - - int seqid = 0; - int blockid = 0; - TimeStamp actualstamp; - -#define PACKET_STATISTICS_NOT -#ifdef PACKET_STATISTICS - TimeStamp expectedstamp; - vector<PacketStats> missedStamps; - vector<PacketStats> oldStamps; - vector<PacketStats> invalidStamps; - missedStamps.reserve(500); - oldStamps.reserve(500); - invalidStamps.reserve(500); -#endif - - bool firstloop = true; - - // buffer for incoming rsp data - int frameSize = itsArgs.frameSize; - // reserve space in case there is an ip header in front of the packet - char totRecvframe[frameSize + itsArgs.ipHeaderSize]; - memset(totRecvframe, 0, sizeof(totRecvframe)); - char* recvframe = totRecvframe; - if (itsArgs.th->getType() == "TH_Ethernet") { - // only with TH_Ethernet there is an IPHeader - // but also when we have recorded it from the rsp boards! - recvframe += itsArgs.ipHeaderSize; - frameSize += itsArgs.ipHeaderSize; - }; - - vector<NSTimer*> itsTimers; - NSTimer threadTimer("threadTimer"); - NSTimer receiveTimer("receiveTimer"); - NSTimer writeTimer("writeTimer"); - itsTimers.push_back(&threadTimer); - itsTimers.push_back(&receiveTimer); - itsTimers.push_back(&writeTimer); - - // init Transportholder - ASSERTSTR(itsArgs.th->init(), "Could not init TransportHolder"); - - bool dataContainsValidStamp = (itsArgs.th->getType() != "TH_Null"); - - while(!theirShouldStop) { - threadTimer.start(); - - bool validTimeStampReceived = false; - while (!validTimeStampReceived) { - try { - receiveTimer.start(); - //cerr<<"InputThread "<<itsArgs.ID << " reading "<<frameSize<<" bytes from TH ("<<(void*)itsArgs.th<<" into "<<(void*)totRecvframe<<endl; - itsArgs.th->recvBlocking( (void*)totRecvframe, frameSize, 0); - receiveTimer.stop(); - ++ nrPacketsReceived; - } catch (Exception& e) { - LOG_TRACE_FLOW_STR("WriteToBufferThread couldn't read from TransportHolder("<<e.what()<<", stopping thread"); - break; - } - - // get the actual timestamp of first EPApacket in frame - if (!dataContainsValidStamp) { - if (!firstloop) { - actualstamp += itsArgs.nTimesPerFrame; - } else { - actualstamp = TimeStamp(0, 0); -#ifdef PACKET_STATISTICS - expectedstamp = actualstamp; -#endif - firstloop = false; - } - validTimeStampReceived = true; - } else { - seqid = ((int*)&recvframe[8])[0]; - blockid = ((int*)&recvframe[12])[0]; - validTimeStampReceived = (seqid != 0xFFFFFFFF); //if the second counter has 0xffffffff, the data cannot be trusted. - //cerr<<"InputThread received valid? :"<<validTimeStampReceived<<endl; - if (!validTimeStampReceived) - ++ nrPacketsRejected; -#ifdef PACKET_STATISTICS - if (!validTimeStampReceived) invalidStamps.push_back(PacketStats(actualstamp, expectedstamp)); - // hack for error in rsp boards where timestamps are not equal on both boards -#endif - actualstamp.setStamp(seqid ,blockid); - //cerr<<"InputThread received stamp: "<<actualstamp<<" ("<<seqid<<", "<<blockid<<")"<<endl; - - //cerr<<endl<<"Reading stamp: " << actualstamp<<endl; -#ifdef PACKET_STATISTICS - if (firstloop) { - // firstloop - expectedstamp.setStamp(seqid, blockid); // init expectedstamp - firstloop = false; - } -#endif - } - } - - // check and process the incoming data -#ifdef PACKET_STATISTICS - if (actualstamp < expectedstamp) { - oldStamps.push_back(PacketStats(actualstamp, expectedstamp)); - } else if (actualstamp > expectedstamp) { - do { - missedStamps.push_back(PacketStats(actualstamp, expectedstamp)); - // increase the expectedstamp - expectedstamp += itsArgs.nTimesPerFrame; - } while (actualstamp > expectedstamp); - } - // increase the expectedstamp - expectedstamp += itsArgs.nTimesPerFrame; -#endif - writeTimer.start(); - // expected packet received so write data into corresponding buffer - //cerr<<"InputThread: "<<actualstamp<<endl; - - try { - itsArgs.BBuffer->writeElements((Beamlet*)&recvframe[itsArgs.frameHeaderSize], actualstamp, itsArgs.nTimesPerFrame, itsArgs.nSubbandsPerFrame); - } catch (Exception& e) { - LOG_TRACE_FLOW_STR("WriteToBufferThread couldn't write to BeamletBuffer("<<e.what()<<", stopping thread"); - break; - } - - writeTimer.stop(); - threadTimer.stop(); - } - - printTimers(itsTimers); -#ifdef PACKET_STATISTICS - LOG_WARN("Timestamps of missed packets:"); - vector<PacketStats>::iterator it = missedStamps.begin(); - for (; it != missedStamps.end(); it++) { - LOG_WARN_STR("MIS " << it->expectedStamp << " missed at time " << it->receivedStamp); - } - LOG_WARN_STR("Rewritten packets:"); - vector<PacketStats>::iterator rit = oldStamps.begin(); - for (; rit != oldStamps.end(); rit++) { - LOG_WARN_STR("REW " << rit->receivedStamp<<" received at time "<< rit->expectedStamp); - } - LOG_WARN_STR("Invalid timestamps:"); - vector<PacketStats>::iterator iit = invalidStamps.begin(); - for (; iit != invalidStamps.end(); iit++) { - LOG_WARN_STR("INV received at time "<< iit->expectedStamp); - } -#endif - - if (pthread_join(logThreadId, 0) != 0) { - std::cerr << "could not join log thread" << std::endl; - exit(1); - } - } - - void InputThread::printTimers(vector<NSTimer*>& timers) - { - vector<NSTimer*>::iterator it = timers.begin(); - for (; it != timers.end(); it++) { - (*it)->print(cout); - } - } - - InputThread::InputThread (const InputThread& that) - : itsArgs(that.itsArgs) - {} - ////InputThread& InputThread::operator= (const InputThread& that) - ////{ - //// if (this != &that) { - //// ... copy members ... - //// } - //// return *this; - ////} - - } // namespace CS1 + + // expected packet received so write data into corresponding buffer + writeTimer.start(); + + try { + itsArgs.BBuffer->writeElements((Beamlet *) &recvframe[itsArgs.frameHeaderSize], actualstamp, itsArgs.nTimesPerFrame, itsArgs.nSubbandsPerFrame); + } catch (Exception& e) { + LOG_TRACE_FLOW_STR("WriteToBufferThread couldn't write to BeamletBuffer(" << e.what() << ", stopping thread"); + break; + } + + writeTimer.stop(); + } + + if (pthread_join(logThreadId, 0) != 0) { + std::cerr << "could not join log thread" << std::endl; + exit(1); + } +} + +InputThread::InputThread(const InputThread &that) + : itsArgs(that.itsArgs) +{ +} + +} // namespace CS1 } // namespace LOFAR