diff --git a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/AH_InputSection.h b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/AH_InputSection.h index 1423cd8ce62593622c4686069dd9ab0593d94ccf..9b4e3af14474adf88ccbefa5afd0f448d54c68a0 100644 --- a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/AH_InputSection.h +++ b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/AH_InputSection.h @@ -71,8 +71,11 @@ namespace LOFAR //# Datamembers vector<Step *> itsSteps; Connector itsConnector; - Stub_Delay *itsInputStub; + Stub_Delay *itsDelayStub; Stub_BGL *itsOutputStub; + + std::vector<unsigned> itsInputNodes, itsOutputNodes; + std::vector<WorkHolder *> itsWHs; }; // @} diff --git a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/BeamletBuffer.h b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/BeamletBuffer.h index 4962e9725fd18dcaf298bf31a22379ff12b931fb..717b6cbb3a46fce8e251bdfe90e4de9bde52afde 100644 --- a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/BeamletBuffer.h +++ b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/BeamletBuffer.h @@ -61,13 +61,11 @@ namespace LOFAR class BeamletBuffer { public: - BeamletBuffer(int bufferSize, uint nSubbands, uint history, uint readWriteDelay); + BeamletBuffer(int bufferSize, unsigned nSubbands, unsigned history, unsigned readWriteDelay); ~BeamletBuffer(); - // write elements in the buffer, return value is number of succesfully written elements - uint writeElements(Beamlet* data, TimeStamp begin, uint nElements, uint nSubbandsPerBlock); - // get elements out of the buffer, return value is number of valid elements - uint getElements(vector<Beamlet *> buffers, SparseSet* flags, TimeStamp begin, uint nElements); + void writeElements(Beamlet* data, TimeStamp begin, unsigned nElements); + void getElements(boost::multi_array_ref<SampleType, 3> &buffers, SparseSet &flags, TimeStamp begin, unsigned nElements); TimeStamp startBufferRead(); TimeStamp startBufferRead(TimeStamp); @@ -82,7 +80,7 @@ namespace LOFAR BeamletBuffer& operator= (const BeamletBuffer& that); // Needed for mapping a timestamp to a place in the buffer - uint mapTime2Index(TimeStamp time) const { + unsigned mapTime2Index(TimeStamp time) const { // TODO: this is very slow because of the % return time % itsSize; } @@ -94,7 +92,7 @@ namespace LOFAR //vector<Beamlet *> itsSBBuffers; mutex itsFlagsMutex; SparseSet itsFlags; - uint itsNSubbands; + unsigned itsNSubbands; int itsSize; boost::multi_array<SampleType, 3> itsSBBuffers; @@ -104,9 +102,9 @@ namespace LOFAR LockedRange<TimeStamp, int> itsLockedRange; // These are for statistics - uint itsDroppedItems; - uint itsDummyItems; - uint itsSkippedItems; + unsigned itsDroppedItems; + unsigned itsDummyItems; + unsigned itsSkippedItems; NSTimer itsWriteTimer; NSTimer itsReadTimer; diff --git a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/Makefile.am b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/Makefile.am index 53b20f9f98678a9b7c0f33a47365c17a9b8fcfc0..f518179b7212f2e00214cc5bf72f4b2424ac73fb 100644 --- a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/Makefile.am +++ b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/Makefile.am @@ -8,8 +8,7 @@ INSTHDRS = Connector.h \ LockedRange.tcc \ BeamletBuffer.h \ AH_InputSection.h \ - WH_SBCollect.h \ - WH_RSPInput.h + WH_InputSection.h NOINSTHDRS = diff --git a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_RSPInput.h b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_InputSection.h similarity index 57% rename from Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_RSPInput.h rename to Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_InputSection.h index fa49b342c364862486022929149dcc7e471d9e5d..df45e3d87bcb6d35d92cd82aaf6ef5b91e98b9ff 100644 --- a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_RSPInput.h +++ b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_InputSection.h @@ -1,4 +1,4 @@ -//# WH_RSPInput.h: Catch RSP ethernet frames and synchronize RSP inputs +//# WH_InputSection.h: Catch RSP ethernet frames and synchronize RSP inputs //# //# Copyright (C) 2006 //# ASTRON (Netherlands Foundation for Research in Astronomy) @@ -20,8 +20,8 @@ //# //# $Id$ -#ifndef LOFAR_CS1_INPUTSECTION_WH_RSPINPUT_H -#define LOFAR_CS1_INPUTSECTION_WH_RSPINPUT_H +#ifndef LOFAR_CS1_INPUTSECTION_WH_INPUTSECTION_H +#define LOFAR_CS1_INPUTSECTION_WH_INPUTSECTION_H // \file // Catch RSP ethernet frames and synchronize RSP inputs @@ -30,11 +30,15 @@ //# Includes #include <tinyCEP/WorkHolder.h> -#include <boost/thread.hpp> #include <CS1_Interface/RSPTimeStamp.h> +#include <CS1_Interface/DH_Subband.h> #include <APS/ParameterSet.h> #include <Common/Timer.h> +#include <boost/thread.hpp> +#include <boost/multi_array.hpp> + + namespace LOFAR { class NSTimer; @@ -52,72 +56,77 @@ namespace LOFAR // This class is the workholder that receives data from the RSP boards // and distributes it per subband to several other input nodes - class WH_RSPInput: public WorkHolder { + class WH_InputSection: public WorkHolder { public: - explicit WH_RSPInput(const string& name, - ACC::APS::ParameterSet& ps, - TransportHolder& th, - uint stationNr); - virtual ~WH_RSPInput(); + typedef DH_Subband::SampleType SampleType; + + explicit WH_InputSection(const string &name, + ACC::APS::ParameterSet &ps, + TransportHolder *inputTH, + unsigned stationNr, + unsigned nrInputChannels, + unsigned nrOutputChannels, + const std::vector<unsigned> &inputNodes, + const std::vector<unsigned> &outputNodes); + virtual ~WH_InputSection(); - static WorkHolder* construct(const string& name, - ACC::APS::ParameterSet& ps, - TransportHolder& th, - uint stationNr); - - virtual WH_RSPInput* make(const string& name); + virtual WH_InputSection *make(const string &name); - virtual void startThread(); - virtual void preprocess(); - virtual void process(); - virtual void postprocess(); - // Show the work holder on stdout. - virtual void dump() const; - private: // Copying is not allowed - WH_RSPInput (const WH_RSPInput& that); - WH_RSPInput& operator= (const WH_RSPInput& that); + WH_InputSection (const WH_InputSection &that); + WH_InputSection& operator= (const WH_InputSection &that); + + void doInput(SparseSet &flags); + void doOutput(); + + void limitFlagsLength(SparseSet &flags); + + void transposeData(); + void transposeMetaData(const SparseSet &flags); //# Datamembers + bool itsDelayCompensation; + bool itsIsInput, itsIsOutput; + const std::vector<unsigned> &itsInputNodes, &itsOutputNodes; + + boost::multi_array<SampleType, 4> *itsInputData, *itsOutputData; + + struct metaData { + float fineDelayAtBegin, fineDelayAfterEnd; + char flagsBuffer[132]; // enough for 16 flag ranges + } *itsInputMetaData, *itsOutputMetaData; + // writer thread - InputThread* itsInputThreadObject; - boost::thread* itsInputThread; + InputThread *itsInputThreadObject; + boost::thread *itsInputThread; - TransportHolder& itsTH; + TransportHolder *itsInputTH; uint itsStationNr; // ACC parameters interface ACC::APS::ParameterSet &itsPS; - bool itsDelayCompensation; - - // Sync Master or slave - bool itsSyncMaster; - // synced stamp TimeStamp itsSyncedStamp; - int itsNSubbandsPerCell; - int itsNSamplesPerSec; - int itsNHistorySamples; + unsigned itsNSubbandsPerCell; + unsigned itsNSamplesPerSec; + unsigned itsNHistorySamples; BeamletBuffer *itsBBuffer; NSTimer itsPrePostTimer, itsProcessTimer, itsGetElemTimer; - + void startThread(); + //handle timer alarm static void timerSignal(int signal); - double itsFrequency; - static int theirNoRunningWHs; - static int theirNoAlarms; - static bool theirTimerSet; - + static bool signalReceived; }; // @} diff --git a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_SBCollect.h b/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_SBCollect.h deleted file mode 100644 index 22bf2520d1dee59ff15b28407973969d3b7120b4..0000000000000000000000000000000000000000 --- a/Appl/CEP/CS1/CS1_InputSection/include/CS1_InputSection/WH_SBCollect.h +++ /dev/null @@ -1,89 +0,0 @@ -//# WH_SBCollect.h: Joins all data (stations, pols) for a subband -//# -//# Copyright (C) 2006 -//# ASTRON (Netherlands Foundation for Research in Astronomy) -//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl -//# -//# This program is free software; you can redistribute it and/or modify -//# it under the terms of the GNU General Public License as published by -//# the Free Software Foundation; either version 2 of the License, or -//# (at your option) any later version. -//# -//# This program is distributed in the hope that it will be useful, -//# but WITHOUT ANY WARRANTY; without even the implied warranty of -//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -//# GNU General Public License for more details. -//# -//# You should have received a copy of the GNU General Public License -//# along with this program; if not, write to the Free Software -//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -//# -//# $Id$ - -#ifndef LOFAR_CS1_INPUTSECTION_WH_SBCOLLECT_H -#define LOFAR_CS1_INPUTSECTION_WH_SBCOLLECT_H - -// \file -// Joins all data (stations, pols) for a subband - -//# Never #include <config.h> or #include <lofar_config.h> in a header file! - -//# Includes -#include <tinyCEP/WorkHolder.h> -#include <APS/ParameterSet.h> - -namespace LOFAR -{ - namespace CS1 - { - - // \addtogroup CS1_InputSection - // @{ - - //# Forward Declarations - //class forward; - - - // This class collect the data from different input nodes and joins all data of one subband for a certain amount of time - class WH_SBCollect : public WorkHolder - { - public: - explicit WH_SBCollect(const string& name, - const ACC::APS::ParameterSet pset, - const int noutputs); - virtual ~WH_SBCollect(); - - static WorkHolder* construct(const string& name, - const ACC::APS::ParameterSet pset, - const int noutputs); - virtual WH_SBCollect* make(const string& name); - - virtual void preprocess(); - virtual void process(); - virtual void postprocess(); - - private: - // Copying is not allowed - WH_SBCollect (const WH_SBCollect& that); - WH_SBCollect& operator= (const WH_SBCollect& that); - - //# Datamembers - ACC::APS::ParameterSet itsPS; - unsigned itsNStations; - unsigned itsNSubbandsPerCell; - - //handle timer alarm - static void timerSignal(int signal); - double itsFrequency; - static int theirNoRunningWHs; - static int theirNoAlarms; - static bool theirTimerSet; - - }; - - // @} - - } // namespace CS1 -} // namespace LOFAR - -#endif diff --git a/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc b/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc index bf80c392db1d993cc92d4aad3bbe6cb229d5c86f..6bdc6bc49d5e5879f4d7c3d6a23f27c349de4620 100644 --- a/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc +++ b/Appl/CEP/CS1/CS1_InputSection/src/AH_InputSection.cc @@ -26,198 +26,136 @@ //# Includes #include <Common/LofarLogger.h> #include <CS1_InputSection/AH_InputSection.h> +#include <CS1_InputSection/WH_InputSection.h> #include <CS1_Interface/RSPTimeStamp.h> //# Workholders -#include <CS1_InputSection/WH_RSPInput.h> -#include <CS1_InputSection/WH_SBCollect.h> #include <Transport/TransportHolder.h> #include <Transport/TH_MPI.h> #include <Transport/TH_Socket.h> #include <Transport/TH_File.h> +#include <algorithm> + + #define MPICH_WORKING_ON_INFINI_BAND 1 #define IS_MULTIPLE(number, bignumber) (floor(bignumber / number) == (1.0 * bignumber / number)) namespace LOFAR { - namespace CS1 { +namespace CS1 { - AH_InputSection::AH_InputSection() : - itsInputStub(0), - itsOutputStub(0) - {} +AH_InputSection::AH_InputSection() : + itsDelayStub(0), + itsOutputStub(0) +{} - AH_InputSection::~AH_InputSection() - { - undefine(); - } +AH_InputSection::~AH_InputSection() +{ + undefine(); +} - void AH_InputSection::undefine() - { - delete itsOutputStub; - delete itsInputStub; - itsInputStub = 0; - itsOutputStub = 0; - } +void AH_InputSection::undefine() +{ + delete itsOutputStub; + delete itsDelayStub; + itsDelayStub = 0; + itsOutputStub = 0; +} - void AH_InputSection::define(const LOFAR::KeyValueMap&) - { - LOG_TRACE_FLOW_STR("Start of AH_InputSection::define()"); - undefine(); +void AH_InputSection::define(const LOFAR::KeyValueMap&) +{ + LOG_TRACE_FLOW_STR("Start of AH_InputSection::define()"); -#if defined HAVE_MPI - int lowestFreeNode = 0; -#endif - - TimeStamp::setMaxBlockId(itsParamSet.getDouble("Observation.SampleRate")); + itsParamSet.getDouble("Observation.SampleRate"); + TimeStamp::setMaxBlockId(itsParamSet.getDouble("Observation.SampleRate")); - int psetsPerCell = itsParamSet.getInt32("BGLProc.PsetsPerCell"); - int nCells = itsParamSet.getInt32("Observation.NSubbands") / (itsParamSet.getInt32("General.SubbandsPerPset") * psetsPerCell); // number of SubBand filters in the application - int nNodesPerCell = itsParamSet.getInt32("BGLProc.NodesPerPset") * psetsPerCell; - - LOG_TRACE_FLOW_STR("Create the top-level composite"); - Composite comp(0, 0, "topComposite"); - setComposite(comp); // tell the ApplicationHolder this is the top-level compisite + LOG_TRACE_FLOW_STR("Create the top-level composite"); + Composite comp(0, 0, "topComposite"); + setComposite(comp); // tell the ApplicationHolder this is the top-level compisite - LOG_TRACE_FLOW_STR("Create the input side delay stub"); - // TODO create connector class + LOG_TRACE_FLOW_STR("Create the input side delay stub"); + LOG_TRACE_FLOW_STR("Create the RSP reception Steps"); - LOG_TRACE_FLOW_STR("Create the RSP reception Steps"); - + itsDelayStub = new Stub_Delay(true, itsParamSet); + itsOutputStub = new Stub_BGL(false, false, "Input_BGLProc", itsParamSet); - int nRSP = itsParamSet.getInt32("Input.NRSPBoards"); - int nStations = itsParamSet.getInt32("Observation.NStations"); - int inputCells = nRSP/nStations; - int nameBufferSize = 40; - char nameBuffer[nameBufferSize]; - - itsInputStub = new Stub_Delay(true, itsParamSet); - itsOutputStub = new Stub_BGL(false, false, "Input_BGLProc", itsParamSet); - - for (int ic = 0; ic < inputCells; ic ++) { - WorkHolder* lastWH; - vector<Step*> RSPSteps; - for (int station = 0; station < nStations; station ++) { - snprintf(nameBuffer, nameBufferSize, "Input.Transport.Station%d.Rsp%d", station, ic); - TransportHolder* lastTH = Connector::readTH(itsParamSet, nameBuffer, true); - - snprintf(nameBuffer, nameBufferSize, "RSP_Input_node_station%d_cell%d", station, ic); - lastWH = new WH_RSPInput(nameBuffer, - itsParamSet, - *lastTH, - station); - RSPSteps.push_back(new Step(lastWH, nameBuffer, false)); -#ifdef HAVE_MPI - RSPSteps.back()->runOnNode(lowestFreeNode++); -#endif - comp.addBlock(RSPSteps.back()); - - // Connect the Delay Controller - itsInputStub->connect(ic * nStations + station, (RSPSteps.back())->getInDataManager(0), 0); - } - - LOG_TRACE_FLOW_STR("Create the Subband merger workholders"); - vector<Step*> collectSteps; - for (int cell = 0; cell < nCells / inputCells; cell++) { - sprintf(nameBuffer, "Collect_node_%d_%d", cell, ic); - lastWH = new WH_SBCollect(nameBuffer, // name - itsParamSet, - nNodesPerCell); - collectSteps.push_back(new Step(lastWH, nameBuffer, false)); -#ifdef HAVE_MPI - collectSteps.back()->runOnNode(lowestFreeNode++); -#endif - comp.addBlock(collectSteps.back()); - - // Connect splitters to mergers (transpose) -#ifndef HAVE_MPI + // TODO: support multiple RSPs per station + itsInputNodes = itsParamSet.getUint32Vector("Input.InputNodes"); + itsOutputNodes = itsParamSet.getUint32Vector("Input.OutputNodes"); + unsigned nrBGLnodesPerCell = itsParamSet.getUint32("BGLProc.NodesPerPset") * itsParamSet.getInt32("BGLProc.PsetsPerCell"); - for (int station = 0; station < nStations; station++) { - itsConnector.connectSteps(RSPSteps[station], cell, collectSteps.back(), station); - } - -#else -#if MPICH_WORKING_ON_INFINI_BAND - for (int station = 0; station < nStations; station++) { - itsConnector.connectSteps(RSPSteps[station], cell, collectSteps.back(), station); - } +#if defined HAVE_MPI + unsigned nrNodes = TH_MPI::getNumberOfNodes(); #else - vector<string> transposeHosts = itsParamSet.getStringVector("TransposeHosts"); - vector<string> transposePorts = itsParamSet.getStringVector("TransposePorts"); - for (int station = 0; station < nStations; station++) { - // We need to find out if we are on the client or the server - // because TH_Socket doesn't find it out itself. - if (collectSteps.back()->getNode() == TH_MPI::getCurrentRank()) { - // create server socket - collectSteps.back()->connect(station, - RSPSteps[station], - cell, - 1, - new TH_Socket(transposePorts[station], - true, - Socket::TCP, - 5, - false), - false); - } else { - // create client socket - collectSteps.back()->connect(station, - RSPSteps[station], - cell, - 1, - new TH_Socket(transposeHosts[cell], - transposePorts[station], - true, - Socket::TCP, - false), - false); - } - } + unsigned nrNodes = 1; #endif -#endif - // connect outputs to Subband stub - vector<int> channels; - for (int core = 0; core < nNodesPerCell; core++) { - collectSteps.back()->getOutDataManager(0).setOutBuffer(core, false, 3); - itsOutputStub->connect(cell + ic * nCells / inputCells, - core, - (collectSteps.back())->getOutDataManager(0), - core); - - channels.push_back(core); - } - collectSteps.back()->getOutDataManager(0).setOutRoundRobinPolicy(channels, itsParamSet.getInt32("BGLProc.MaxConcurrentCommunications")); - } - } - - LOG_TRACE_FLOW_STR("Finished define()"); -#ifdef HAVE_MPI - ASSERTSTR (lowestFreeNode == TH_MPI::getNumberOfNodes(), "CS1_InputSection needs "<< lowestFreeNode << " nodes, "<<TH_MPI::getNumberOfNodes()<<" available"); -#endif - } + itsWHs.resize(nrNodes); - void AH_InputSection::prerun() { - getComposite().preprocess(); - } - - void AH_InputSection::run(int steps) { - LOG_TRACE_FLOW_STR("Start AH_InputSection::run() " ); - for (int i = 0; i < steps; i++) { - LOG_TRACE_LOOP_STR("processing run " << i ); - getComposite().process(); - } - LOG_TRACE_FLOW_STR("Finished AH_InputSection::run() " ); - } + for (unsigned node = 0, cell = 0, station = 0; node < nrNodes; node ++) { + bool isInput = std::find(itsInputNodes.begin(), itsInputNodes.end(), node) != itsInputNodes.end(); + bool isOutput = std::find(itsOutputNodes.begin(), itsOutputNodes.end(), node) != itsOutputNodes.end(); + TransportHolder *th = 0; + char nameBuffer[40]; - void AH_InputSection::dump() const { - LOG_TRACE_FLOW_STR("AH_InputSection::dump() not implemented" ); + if (isInput) { + snprintf(nameBuffer, sizeof nameBuffer, "Input.Transport.Station%d.Rsp%d", station, 0); // FIXME last arg is RSP number + th = Connector::readTH(itsParamSet, nameBuffer, true); } - void AH_InputSection::quit() { + itsWHs[node] = new WH_InputSection("InputSection", itsParamSet, th, isInput ? station : 0, isInput ? 1 : 0, isOutput ? nrBGLnodesPerCell : 0, itsInputNodes, itsOutputNodes); + Step *step = new Step(itsWHs[node], "Step", false); + step->runOnNode(node); + comp.addBlock(step); + + if (isInput) { + itsDelayStub->connect(station, step->getInDataManager(0), 0); + station ++; } - } // namespace CS1 + if (isOutput) { + DataManager &dm = step->getOutDataManager(0); + std::vector<int> channels(nrBGLnodesPerCell); + + for (unsigned core = 0; core < nrBGLnodesPerCell; core ++) { + dm.setOutBuffer(core, false, 3); + itsOutputStub->connect(cell, core, dm, core); + channels[core] = core; + } + + dm.setOutRoundRobinPolicy(channels, itsParamSet.getInt32("BGLProc.MaxConcurrentCommunications")); + cell ++; + } + } + + LOG_TRACE_FLOW_STR("Finished define()"); +} + +void AH_InputSection::prerun() +{ + getComposite().preprocess(); +} + +void AH_InputSection::run(int steps) +{ + LOG_TRACE_FLOW_STR("Start AH_InputSection::run() " ); + for (int i = 0; i < steps; i++) { + LOG_TRACE_LOOP_STR("processing run " << i ); + getComposite().process(); + } + LOG_TRACE_FLOW_STR("Finished AH_InputSection::run() " ); +} + +void AH_InputSection::dump() const +{ + LOG_TRACE_FLOW_STR("AH_InputSection::dump() not implemented" ); +} + +void AH_InputSection::quit() +{ +} + +} // namespace CS1 } // namespace LOFAR diff --git a/Appl/CEP/CS1/CS1_InputSection/src/BeamletBuffer.cc b/Appl/CEP/CS1/CS1_InputSection/src/BeamletBuffer.cc index d6d996d6aa0175d53375d0dbe66a8248bd2dd840..4dfb6dc55acd35b428edba434926d990b3c1bab1 100644 --- a/Appl/CEP/CS1/CS1_InputSection/src/BeamletBuffer.cc +++ b/Appl/CEP/CS1/CS1_InputSection/src/BeamletBuffer.cc @@ -31,192 +31,180 @@ #include <CS1_Interface/RSPTimeStamp.h> namespace LOFAR { - namespace CS1 { - - BeamletBuffer::BeamletBuffer(int bufferSize, uint nSubbands, uint history, uint readWriteDelay): - itsNSubbands(nSubbands), - itsSize(bufferSize), - itsSBBuffers(boost::extents[nSubbands][bufferSize][NR_POLARIZATIONS]), - itsLockedRange(bufferSize, readWriteDelay, bufferSize - history, 0), - itsDroppedItems(0), - itsDummyItems(0), - itsSkippedItems(0), - itsWriteTimer("write"), - itsReadTimer("read") +namespace CS1 { + +BeamletBuffer::BeamletBuffer(int bufferSize, unsigned nSubbands, unsigned history, unsigned readWriteDelay): + itsNSubbands(nSubbands), + itsSize(bufferSize), + itsSBBuffers(boost::extents[nSubbands][bufferSize][NR_POLARIZATIONS]), + itsLockedRange(bufferSize, readWriteDelay, bufferSize - history, 0), + itsDroppedItems(0), + itsDummyItems(0), + itsSkippedItems(0), + itsWriteTimer("write"), + itsReadTimer("read") +{ + mutex::scoped_lock sl(itsFlagsMutex); + itsFlags.include(0, bufferSize); +} + +BeamletBuffer::~BeamletBuffer() +{ + clog<<"BeamletBuffer did not receive "<<itsDummyItems<<" stamps and received "<<itsDroppedItems<<" items too late. "<<itsSkippedItems<<" items were skipped (but may be received later)."<<endl; + clog<<"BeamletBufferTimers:"<<endl; + clog<<itsReadTimer<<endl; + clog<<itsWriteTimer<<endl; +} + +void BeamletBuffer::checkForSkippedData(TimeStamp writeBegin) { + // flag the data from itsHighestWritten to end + while ((writeBegin > itsHighestWritten) and (itsHighestWritten > TimeStamp())) { + // take only the first part, so the buffer won't block + TimeStamp flagEnd = itsHighestWritten + itsSize/4; + + if (flagEnd > writeBegin) + flagEnd = writeBegin; + + TimeStamp realBegin = itsLockedRange.writeLock(itsHighestWritten, flagEnd); + + itsWriteTimer.start(); + //cerr<<"BeamletBuffer: skipping "<<itsHighestWritten<<" - "<<flagEnd<<" ("<<flagEnd-itsHighestWritten<<")"<<endl; + itsSkippedItems += flagEnd - itsHighestWritten; + unsigned startI = mapTime2Index(realBegin), endI = mapTime2Index(flagEnd); + { -#if 0 - for (uint sb = 0; sb < nSubbands; sb ++) { - itsSBBuffers.push_back(new Beamlet[bufferSize]); - } -#endif mutex::scoped_lock sl(itsFlagsMutex); - itsFlags.include(0, bufferSize); - } - - BeamletBuffer::~BeamletBuffer() - { - cout<<"BeamletBuffer did not receive "<<itsDummyItems<<" stamps and received "<<itsDroppedItems<<" items too late. "<<itsSkippedItems<<" items were skipped (but may be received later)."<<endl; - cout<<"BeamletBufferTimers:"<<endl; - cout<<itsReadTimer<<endl; - cout<<itsWriteTimer<<endl; - cout.flush(); -#if 0 - vector<Beamlet*>::iterator bit = itsSBBuffers.begin(); - for (; bit != itsSBBuffers.end(); bit++) { - delete [] *bit; + if (endI < startI) { + itsFlags.include(0, endI).include(startI, itsSize); + } else { + itsFlags.include(startI, endI); } -#endif - } + } - void BeamletBuffer::checkForSkippedData(TimeStamp writeBegin) { - // flag the data from itsHighestWritten to end - while ((writeBegin > itsHighestWritten) and (itsHighestWritten > TimeStamp())) { - // take only the first part, so the buffer won't block - TimeStamp flagEnd = itsHighestWritten + itsSize/4; - if (flagEnd > writeBegin) flagEnd = writeBegin; - TimeStamp realBegin = itsLockedRange.writeLock(itsHighestWritten, flagEnd); - - itsWriteTimer.start(); - //cerr<<"BeamletBuffer: skipping "<<itsHighestWritten<<" - "<<flagEnd<<" ("<<flagEnd-itsHighestWritten<<")"<<endl; - itsSkippedItems += flagEnd - itsHighestWritten; - uint startI = mapTime2Index(realBegin), endI = mapTime2Index(flagEnd); - - { - mutex::scoped_lock sl(itsFlagsMutex); - if (endI < startI) { - itsFlags.include(0, endI).include(startI, itsSize); - } else { - itsFlags.include(startI, endI); - } - } - - itsWriteTimer.stop(); - if (itsHighestWritten < flagEnd) itsHighestWritten = flagEnd; - itsLockedRange.writeUnlock(flagEnd); - } - } + itsWriteTimer.stop(); - uint BeamletBuffer::writeElements(Beamlet* data, TimeStamp begin, uint nElements, uint subbandsPerFrame) - { - // if this part start beyond itsHighestWritten, there is a gap in the data in the buffer - // so set that data to zero and invalidate it. - //cerr<<"BeamletBuffer checking for skipped data"<<endl; - checkForSkippedData(begin); - - // Now write the normal data - //cerr<<"BeamletBuffer writing normal data"<<endl; - TimeStamp end = begin + nElements; - TimeStamp realBegin = itsLockedRange.writeLock(begin, end); - //cerr<<"BeamletBuffer writelock received"<<endl; - - if (realBegin < end) { - - itsDroppedItems += realBegin - begin; - data += realBegin - begin; - itsWriteTimer.start(); - - uint startI = mapTime2Index(realBegin), endI = mapTime2Index(end); - - //cerr<<"BeamletBuffer: write from "<<realBegin<<" instead of "<<begin<<endl; - //cerr<<"BeamletBuffer: Writing from "<<startI<<" to "<<endI<<" timestamp "<<begin<<endl; - if (endI < startI) { - // the data wraps around the allocated memory, so do it in two parts - - uint chunk1 = itsSize - startI; - for (uint sb = 0; sb < itsNSubbands; sb++) { - memcpy(itsSBBuffers[sb][startI].origin(), &data[0] , sizeof(SampleType[chunk1][NR_POLARIZATIONS])); - memcpy(itsSBBuffers[sb][0].origin() , &data[chunk1], sizeof(SampleType[endI][NR_POLARIZATIONS])); - data += nElements; - } - - mutex::scoped_lock sl(itsFlagsMutex); - itsFlags.exclude(startI, itsSize).exclude(0, endI); - } else { - for (uint sb = 0; sb < itsNSubbands; sb++) { - memcpy(itsSBBuffers[sb][startI].origin(), data, sizeof(SampleType[endI - startI][NR_POLARIZATIONS])); - data += nElements; - } - - mutex::scoped_lock sl(itsFlagsMutex); - itsFlags.exclude(startI, endI); - } - - itsWriteTimer.stop(); - if (itsHighestWritten < end) itsHighestWritten = end; - } - itsLockedRange.writeUnlock(end); - return end - realBegin; - } + if (itsHighestWritten < flagEnd) + itsHighestWritten = flagEnd; - uint BeamletBuffer::getElements(vector<Beamlet *> buffers, SparseSet *flags, TimeStamp begin, uint nElements) - { - ASSERTSTR(buffers.size() == itsNSubbands, "BeamletBuffer received wrong number of buffers to write to (in getElements)."); - TimeStamp end = begin + nElements; - TimeStamp realBegin = itsLockedRange.readLock(begin, end); - - itsReadTimer.start(); + itsLockedRange.writeUnlock(flagEnd); + } +} - uint nInvalid = realBegin - begin; - itsDummyItems += nInvalid * itsNSubbands; - flags->include(0, nInvalid); // set flags later +void BeamletBuffer::writeElements(Beamlet *data, TimeStamp begin, unsigned nElements) +{ + // if this part start beyond itsHighestWritten, there is a gap in the data in the buffer + // so set that data to zero and invalidate it. + //cerr<<"BeamletBuffer checking for skipped data"<<endl; + checkForSkippedData(begin); - // copy the real data - uint startI = mapTime2Index(begin), endI = mapTime2Index(end); + // Now write the normal data + //cerr<<"BeamletBuffer writing normal data"<<endl; + TimeStamp end = begin + nElements; + TimeStamp realBegin = itsLockedRange.writeLock(begin, end); + //cerr<<"BeamletBuffer writelock received"<<endl; - if (endI < startI) { - // the data wraps around the allocated memory, so copy in two parts - uint firstChunk = itsSize - startI; + if (realBegin < end) { + itsDroppedItems += realBegin - begin; + data += realBegin - begin; + itsWriteTimer.start(); - for (uint sb = 0; sb < itsNSubbands; sb++) { - memcpy(&buffers[sb][0] , itsSBBuffers[sb][startI].origin(), sizeof(SampleType[firstChunk][NR_POLARIZATIONS])); - memcpy(&buffers[sb][firstChunk], itsSBBuffers[sb][0].origin(), sizeof(SampleType[endI][NR_POLARIZATIONS])); - } + unsigned startI = mapTime2Index(realBegin), endI = mapTime2Index(end); - mutex::scoped_lock sl(itsFlagsMutex); - *flags |= (itsFlags.subset(0, endI) += firstChunk); - *flags |= (itsFlags.subset(startI, itsSize) -= startI); - } else { - for (uint sb = 0; sb < itsNSubbands; sb++) { - memcpy(&buffers[sb][0], itsSBBuffers[sb][startI].origin(), sizeof(SampleType[endI - startI][NR_POLARIZATIONS])); - } - mutex::scoped_lock sl(itsFlagsMutex); - *flags |= (itsFlags.subset(startI, endI) -= startI); + //cerr<<"BeamletBuffer: write from "<<realBegin<<" instead of "<<begin<<endl; + //cerr<<"BeamletBuffer: Writing from "<<startI<<" to "<<endI<<" timestamp "<<begin<<endl; + if (endI < startI) { + // the data wraps around the allocated memory, so do it in two parts + + unsigned chunk1 = itsSize - startI; + for (unsigned sb = 0; sb < itsNSubbands; sb ++) { + memcpy(itsSBBuffers[sb][startI].origin(), &data[0] , sizeof(SampleType[chunk1][NR_POLARIZATIONS])); + memcpy(itsSBBuffers[sb][0].origin() , &data[chunk1], sizeof(SampleType[endI][NR_POLARIZATIONS])); + data += nElements; } - //cout<<"BeamletBuffer: getting elements "<<begin<<" - "<<begin+nElements<<": "<<*flags<<endl; - - // limit the size of the sparse set - const std::vector<struct SparseSet::range> &ranges = flags->getRanges(); - - if (ranges.size() > 16) - flags->include(ranges[15].begin, ranges[ranges.size() - 1].end); + mutex::scoped_lock sl(itsFlagsMutex); + itsFlags.exclude(startI, itsSize).exclude(0, endI); + } else { + for (unsigned sb = 0; sb < itsNSubbands; sb ++) { + memcpy(itsSBBuffers[sb][startI].origin(), data, sizeof(SampleType[endI - startI][NR_POLARIZATIONS])); + data += nElements; + } - itsReadTimer.stop(); - itsLockedRange.readUnlock(end); - return end - realBegin; + mutex::scoped_lock sl(itsFlagsMutex); + itsFlags.exclude(startI, endI); } - TimeStamp BeamletBuffer::startBufferRead() { - TimeStamp oldest = itsLockedRange.getReadStart(); - TimeStamp fixPoint(oldest.getSeqId() + 1, 0); - - TimeStamp realBegin = itsLockedRange.readLock(fixPoint, fixPoint); - ASSERTSTR(realBegin == fixPoint, "Error in starting up buffer"); - itsLockedRange.readUnlock(fixPoint); - return fixPoint; - } + itsWriteTimer.stop(); + + if (itsHighestWritten < end) + itsHighestWritten = end; + } + + itsLockedRange.writeUnlock(end); +} + +void BeamletBuffer::getElements(boost::multi_array_ref<SampleType, 3> &buffers, SparseSet &flags, TimeStamp begin, unsigned nElements) +{ + //ASSERTSTR(buffers.size() == itsNSubbands, "BeamletBuffer received wrong number of buffers to write to (in getElements)."); + TimeStamp end = begin + nElements; + TimeStamp realBegin = itsLockedRange.readLock(begin, end); + + itsReadTimer.start(); + + unsigned nInvalid = realBegin - begin; + itsDummyItems += nInvalid * itsNSubbands; + flags.include(0, nInvalid); // set flags later - TimeStamp BeamletBuffer::startBufferRead(TimeStamp begin) { - TimeStamp oldest = itsLockedRange.getReadStart(); - TimeStamp realBegin = itsLockedRange.readLock(begin, begin); - ASSERTSTR(realBegin == begin, "Error in starting up buffer"); + // copy the real data + unsigned startI = mapTime2Index(begin), endI = mapTime2Index(end); - // if begin is no longer in the buffer the oldest possible beginning is returned - // if begin is not yet in the buffer readLock waits for it + if (endI < startI) { + // the data wraps around the allocated memory, so copy in two parts + unsigned firstChunk = itsSize - startI; - itsLockedRange.readUnlock(realBegin); - return realBegin; + for (unsigned sb = 0; sb < itsNSubbands; sb ++) { + memcpy(buffers[sb].origin() , itsSBBuffers[sb][startI].origin(), sizeof(SampleType[firstChunk][NR_POLARIZATIONS])); + memcpy(buffers[sb][firstChunk].origin(), itsSBBuffers[sb][0].origin(), sizeof(SampleType[endI][NR_POLARIZATIONS])); } - } // namespace CS1 + mutex::scoped_lock sl(itsFlagsMutex); + flags |= (itsFlags.subset(0, endI) += firstChunk); + flags |= (itsFlags.subset(startI, itsSize) -= startI); + } else { + for (unsigned sb = 0; sb < itsNSubbands; sb ++) { + memcpy(buffers[sb].origin(), itsSBBuffers[sb][startI].origin(), sizeof(SampleType[endI - startI][NR_POLARIZATIONS])); + } + mutex::scoped_lock sl(itsFlagsMutex); + flags |= (itsFlags.subset(startI, endI) -= startI); + } + + //cout<<"BeamletBuffer: getting elements "<<begin<<" - "<<begin+nElements<<": "<<flags<<endl; + + itsReadTimer.stop(); + itsLockedRange.readUnlock(end); +} + +TimeStamp BeamletBuffer::startBufferRead() { + TimeStamp oldest = itsLockedRange.getReadStart(); + TimeStamp fixPoint(oldest.getSeqId() + 1, 0); + + TimeStamp realBegin = itsLockedRange.readLock(fixPoint, fixPoint); + ASSERTSTR(realBegin == fixPoint, "Error in starting up buffer"); + itsLockedRange.readUnlock(fixPoint); + return fixPoint; +} + +TimeStamp BeamletBuffer::startBufferRead(TimeStamp begin) { + TimeStamp oldest = itsLockedRange.getReadStart(); + TimeStamp realBegin = itsLockedRange.readLock(begin, begin); + ASSERTSTR(realBegin == begin, "Error in starting up buffer"); + + // if begin is no longer in the buffer the oldest possible beginning is returned + // if begin is not yet in the buffer readLock waits for it + + itsLockedRange.readUnlock(realBegin); + return realBegin; +} + +} // namespace CS1 } // namespace LOFAR diff --git a/Appl/CEP/CS1/CS1_InputSection/src/InputThread.cc b/Appl/CEP/CS1/CS1_InputSection/src/InputThread.cc index 29a55c6f619118c816b343dec5391264c817bfc8..c46989215dea8d9db46c8697a7c44f0f534124f4 100644 --- a/Appl/CEP/CS1/CS1_InputSection/src/InputThread.cc +++ b/Appl/CEP/CS1/CS1_InputSection/src/InputThread.cc @@ -134,7 +134,7 @@ retry: // until valid packet received writeTimer.start(); try { - itsArgs.BBuffer->writeElements((Beamlet *) &recvframe[itsArgs.frameHeaderSize], actualstamp, itsArgs.nTimesPerFrame, itsArgs.nSubbandsPerFrame); + itsArgs.BBuffer->writeElements((Beamlet *) &recvframe[itsArgs.frameHeaderSize], actualstamp, itsArgs.nTimesPerFrame); } catch (Exception& e) { LOG_TRACE_FLOW_STR("WriteToBufferThread couldn't write to BeamletBuffer(" << e.what() << ", stopping thread"); break; diff --git a/Appl/CEP/CS1/CS1_InputSection/src/Makefile.am b/Appl/CEP/CS1/CS1_InputSection/src/Makefile.am index 84aad83ac325a7caf697cf52f6f894665367234f..c10988bf433eacd0003e8dcc1640f8ca58fd5c73 100644 --- a/Appl/CEP/CS1/CS1_InputSection/src/Makefile.am +++ b/Appl/CEP/CS1/CS1_InputSection/src/Makefile.am @@ -8,8 +8,7 @@ libcs1_inputsection_la_SOURCES = Connector.cc \ InputThread.cc \ BeamletBuffer.cc \ AH_InputSection.cc \ - WH_SBCollect.cc \ - WH_RSPInput.cc + WH_InputSection.cc # AM_YFLAGS = -d -p KeyParse # AM_LFLAGS = -PKeyTokenize diff --git a/Appl/CEP/CS1/CS1_InputSection/src/WH_InputSection.cc b/Appl/CEP/CS1/CS1_InputSection/src/WH_InputSection.cc new file mode 100644 index 0000000000000000000000000000000000000000..0be9c3c13e98adc88a8b232f018be907061af05c --- /dev/null +++ b/Appl/CEP/CS1/CS1_InputSection/src/WH_InputSection.cc @@ -0,0 +1,424 @@ +//# WH_InputSection.cc: Catch RSP ethernet frames and synchronize RSP inputs +//# +//# Copyright (C) 2006 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id$ + +//# Always #include <lofar_config.h> first! +#include <lofar_config.h> + +//# Includes +#include <Common/LofarLogger.h> +#include <AMCBase/Epoch.h> +#include <CS1_InputSection/WH_InputSection.h> +#include <CS1_Interface/DH_Delay.h> +#include <APS/ParameterSet.h> +#include <Transport/TransportHolder.h> +#include <CS1_Interface/RSPTimeStamp.h> +#include <CS1_InputSection/BeamletBuffer.h> +#include <CS1_InputSection/InputThread.h> +#include <tinyCEP/Sel_RoundRobin.h> + +#include <algorithm> + +#if defined HAVE_MPI +#include <Transport/TH_MPI.h> +#include <mpi.h> +#endif + +#include <signal.h> +#include <sys/time.h> + + +#undef USE_TIMER + +namespace LOFAR { +namespace CS1 { + +bool WH_InputSection::signalReceived; + + +WH_InputSection::WH_InputSection(const string &name, + ACC::APS::ParameterSet &ps, + TransportHolder *inputTH, + unsigned stationNr, + unsigned nrInputChannels, + unsigned nrOutputChannels, + const std::vector<unsigned> &inputNodes, + const std::vector<unsigned> &outputNodes) +: + WorkHolder(nrInputChannels, nrOutputChannels, name, "WH_InputSection"), + itsInputNodes(inputNodes), + itsOutputNodes(outputNodes), + itsInputTH(inputTH), + itsStationNr(stationNr), + itsPS(ps), + itsBBuffer(0), + itsPrePostTimer("pre/post"), + itsProcessTimer("process"), + itsGetElemTimer("getElem") +{ + LOG_TRACE_FLOW_STR("WH_InputSection constructor"); + + // get parameters + itsNSubbandsPerCell = ps.getUint32("General.SubbandsPerPset") * ps.getUint32("BGLProc.PsetsPerCell"); + itsNSamplesPerSec = ps.getUint32("Observation.NSubbandSamples"); + itsNHistorySamples = (ps.getUint32("BGLProc.NPPFTaps") - 1) * ps.getUint32("Observation.NChannels"); + + // create incoming dataholder holding the delay information + if (nrInputChannels > 0) + getDataManager().addInDataHolder(0, new DH_Delay("DH_Delay", ps.getInt32("Input.NRSPBoards"))); + + // create a outgoing dataholder for each subband + if (nrOutputChannels > 0) { + vector<int> channels; + + for (int i = 0; i < itsNoutputs; i ++) { + getDataManager().addOutDataHolder(i, new DH_Subband("DH_Subband", ps)); + getDataManager().setAutoTriggerOut(i, false); + channels.push_back(i); + } + + getDataManager().setOutputSelector(new Sel_RoundRobin(channels)); + } +} + + +WH_InputSection::~WH_InputSection() +{ +} + + +WH_InputSection *WH_InputSection::make(const string& name) +{ + return new WH_InputSection(name, itsPS, itsInputTH, itsStationNr, itsNinputs, itsNoutputs, itsInputNodes, itsOutputNodes); +} + + +void WH_InputSection::startThread() +{ + /* start up thread which writes RSP data from ethernet link + into cyclic buffers */ + LOG_TRACE_FLOW_STR("WH_InputSection starting thread"); + + ThreadArgs args; + args.BBuffer = itsBBuffer; + args.th = itsInputTH; + args.ipHeaderSize = itsPS.getInt32("Input.IPHeaderSize"); + args.frameHeaderSize = itsPS.getInt32("Input.SzEPAheader"); + args.nTimesPerFrame = itsPS.getInt32("Input.NTimesInFrame"); + args.nSubbandsPerFrame = itsPS.getInt32("Input.NSubbandsPerFrame"); + + args.frameSize = args.frameHeaderSize + args.nSubbandsPerFrame * args.nTimesPerFrame * sizeof(Beamlet); + args.ID = itsStationNr; + + + if (itsInputTH->getType() == "TH_File" || itsInputTH->getType() == "TH_Null") { + // if we are reading from file, overwriting the buffer should not be allowed + // this way we can work with smaller files + itsBBuffer->setAllowOverwrite(false); + } + + itsInputThreadObject = new InputThread(args); + itsInputThread = new boost::thread(*itsInputThreadObject); +} + +void WH_InputSection::preprocess() +{ + itsPrePostTimer.start(); + + itsIsInput = std::find(itsInputNodes.begin(), itsInputNodes.end(), (unsigned) getNode()) != itsInputNodes.end(); + itsIsOutput = std::find(itsOutputNodes.begin(), itsOutputNodes.end(), (unsigned) getNode()) != itsOutputNodes.end(); + + if (itsIsInput) { + // create the buffer controller. + int cyclicBufferSize = itsPS.getInt32("Input.NSamplesToBuffer"); + int subbandsToReadFromFrame = itsPS.getInt32("Observation.NSubbands") * itsPS.getInt32("Observation.NStations") / itsPS.getInt32("Input.NRSPBoards"); + ASSERTSTR(subbandsToReadFromFrame <= itsPS.getInt32("Input.NSubbandsPerFrame"), subbandsToReadFromFrame << " < " << itsPS.getInt32("Input.NSubbandsPerFrame")); + + itsBBuffer = new BeamletBuffer(cyclicBufferSize, subbandsToReadFromFrame, cyclicBufferSize/6, cyclicBufferSize/6); + startThread(); + + itsDelayCompensation = itsPS.getBool("Observation.DelayCompensation"); + + // determine starttime + double startTime = itsPS.getDouble("Observation.StartTime"); + +#if 1 + // interpret the time as utc + double utc = startTime; +#else + double utc = AMC::Epoch(startTime).utc(); +#endif + int sampleFreq = itsPS.getInt32("Observation.SampleRate"); + int seconds = (int)floor(utc); + int samples = (int)((utc - floor(utc)) * sampleFreq); + + itsSyncedStamp = TimeStamp(seconds, samples); + + cout<<"Starting buffer at "<<itsSyncedStamp<<endl;cout.flush(); + itsBBuffer->startBufferRead(itsSyncedStamp); + + unsigned nrCells = itsPS.getUint32("Observation.NSubbands") / itsNSubbandsPerCell; + + itsInputData = new boost::multi_array<SampleType, 4>(boost::extents[nrCells][itsNSubbandsPerCell][itsNSamplesPerSec + itsNHistorySamples][NR_POLARIZATIONS]); + itsInputMetaData = new struct metaData[nrCells]; + } + + if (itsIsOutput) { + unsigned nrStations = itsInputNodes.size(); + + itsOutputData = new boost::multi_array<SampleType, 4>(boost::extents[nrStations][itsNSubbandsPerCell][itsNSamplesPerSec + itsNHistorySamples][NR_POLARIZATIONS]); + itsOutputMetaData = new struct metaData[nrStations]; + +#if defined USE_TIMER + sighandler_t ret = signal(SIGALRM, *WH_InputSection::timerSignal); + ASSERTSTR(ret != SIG_ERR, "WH_InputSection couldn't set signal handler for timer"); + struct itimerval value; + + double interval = itsNSamplesPerSec / itsPS.getDouble("Observation.SampleRate") / itsNSubbandsPerCell; + __time_t secs = static_cast<__time_t>(floor(interval)); + __time_t usecs = static_cast<__time_t>(1e6 * (interval - secs)); + + value.it_interval.tv_sec = value.it_value.tv_sec = secs; + value.it_interval.tv_usec = value.it_value.tv_usec = usecs; + cout << "Setting timer interval to " << secs << "secs and " << usecs << "ms" << endl; + + setitimer(ITIMER_REAL, &value, 0); +#endif + } +} + +void WH_InputSection::doInput(SparseSet &flags) +{ + TimeStamp delayedStamp = itsSyncedStamp - itsNHistorySamples; + itsSyncedStamp += itsNSamplesPerSec; + + if (itsDelayCompensation) { + DH_Delay *dh = static_cast<DH_Delay *>(getDataManager().getInHolder(0)); + delayedStamp += (*dh)[itsStationNr].coarseDelay; + } + + // get the data from the cyclic buffer + itsGetElemTimer.start(); + boost::multi_array_ref<SampleType, 3> inputData(itsInputData->origin(), boost::extents[itsOutputNodes.size() * itsNSubbandsPerCell][itsNSamplesPerSec + itsNHistorySamples][NR_POLARIZATIONS]); + itsBBuffer->getElements(inputData, flags, delayedStamp, itsNSamplesPerSec + itsNHistorySamples); + itsGetElemTimer.stop(); + + std::clog << "WH_InputSection out " << itsStationNr << " " << delayedStamp << " flags: " << flags << std::endl; +} + + +void WH_InputSection::limitFlagsLength(SparseSet &flags) +{ + const std::vector<struct SparseSet::range> &ranges = flags.getRanges(); + + if (ranges.size() > 16) + flags.include(ranges[15].begin, ranges[ranges.size() - 1].end); +} + + +void WH_InputSection::transposeData() +{ +#if defined HAVE_MPI + int nrNodes = TH_MPI::getNumberOfNodes(); +#else + int nrNodes = 1; +#endif + + int sendCounts[nrNodes], sendDisplacements[nrNodes]; + int receiveCounts[nrNodes], receiveDisplacements[nrNodes]; + + memset(sendCounts, 0, sizeof sendCounts); + memset(receiveCounts, 0, sizeof receiveCounts); + + if (itsIsInput) + for (unsigned output = 0; output < itsOutputNodes.size(); output ++) { + sendCounts[itsOutputNodes[output]] = (*itsInputData)[output].num_elements() * sizeof(SampleType); + sendDisplacements[itsOutputNodes[output]] = reinterpret_cast<char *>((*itsInputData)[output].origin()) - reinterpret_cast<char *>(itsInputData->origin()); + } + + if (itsIsOutput) + for (unsigned input = 0; input < itsInputNodes.size(); input ++) { + receiveCounts[itsInputNodes[input]] = (*itsOutputData)[input].num_elements() * sizeof(SampleType); + receiveDisplacements[itsInputNodes[input]] = reinterpret_cast<char *>((*itsOutputData)[input].origin()) - reinterpret_cast<char *>(itsOutputData->origin()); + } + +#if defined HAVE_MPI + if (MPI_Alltoallv(itsIsInput ? itsInputData->origin() : 0, + sendCounts, sendDisplacements, MPI_BYTE, + itsIsOutput ? itsOutputData->origin() : 0, + receiveCounts, receiveDisplacements, MPI_BYTE, + MPI_COMM_WORLD) != MPI_SUCCESS) { + std::cerr << "MPI_Alltoallv() failed" << std::endl; + exit(1); + } +#endif +} + + +void WH_InputSection::transposeMetaData(const SparseSet &flags) +{ +#if defined HAVE_MPI + int nrNodes = TH_MPI::getNumberOfNodes(); +#else + int nrNodes = 1; +#endif + + int sendCounts[nrNodes], sendDisplacements[nrNodes]; + int receiveCounts[nrNodes], receiveDisplacements[nrNodes]; + + memset(sendCounts, 0, sizeof sendCounts); + memset(receiveCounts, 0, sizeof receiveCounts); + + if (itsIsInput) { + DH_Delay *delayDHp = static_cast<DH_Delay *>(getDataManager().getInHolder(0)); + + for (unsigned output = 0; output < itsOutputNodes.size(); output++) { + itsInputMetaData[output].fineDelayAtBegin = (*delayDHp)[itsStationNr].fineDelayAtBegin; + itsInputMetaData[output].fineDelayAfterEnd = (*delayDHp)[itsStationNr].fineDelayAfterEnd; + + if (flags.marshall(itsInputMetaData[output].flagsBuffer, sizeof itsInputMetaData[output].flagsBuffer) < 0) { + std::cerr << "Too many flags!" << std::endl; + std::exit(1); + } + + sendCounts[itsOutputNodes[output]] = sizeof(struct metaData); + sendDisplacements[itsOutputNodes[output]] = reinterpret_cast<char *>(&itsInputMetaData[output]) - reinterpret_cast<char *>(itsInputMetaData); + } + } + + if (itsIsOutput) + for (unsigned input = 0; input < itsInputNodes.size(); input ++) { + receiveCounts[itsInputNodes[input]] = sizeof(struct metaData); + receiveDisplacements[itsInputNodes[input]] = reinterpret_cast<char *>(&itsOutputMetaData[input]) - reinterpret_cast<char *>(itsOutputMetaData); + } + +#if defined HAVE_MPI + if (MPI_Alltoallv(itsIsInput ? itsInputMetaData : 0, + sendCounts, sendDisplacements, MPI_BYTE, + itsIsOutput ? itsOutputMetaData : 0, + receiveCounts, receiveDisplacements, MPI_BYTE, + MPI_COMM_WORLD) != MPI_SUCCESS) { + std::cerr << "MPI_Alltoallv() failed" << std::endl; + exit(1); + } +#endif +} + + +void WH_InputSection::doOutput() +{ + // Copy every subband to one BG/L core + Selector *selector = getDataManager().getOutputSelector(); + + for (unsigned subband = 0; subband < itsNSubbandsPerCell; subband ++) { + // ask the round robin selector for the next output + DH_Subband *outHolder = static_cast<DH_Subband *>(getDataManager().getOutHolder(selector->getCurrentSelection())); + + // Copy one subband from every input + for (unsigned station = 0; station < itsInputNodes.size(); station ++) { + ASSERT(outHolder->getSamples3D()[station].num_elements() == (*itsOutputData)[station][subband].num_elements()); + + memcpy(outHolder->getSamples3D()[station].origin(), + (*itsOutputData)[station][subband].origin(), + outHolder->getSamples3D()[station].num_elements() * sizeof(DH_Subband::SampleType)); + + // copy other information (delayInfo, flags etc) + outHolder->getDelays()[station].delayAtBegin = itsOutputMetaData[station].fineDelayAtBegin; + outHolder->getDelays()[station].delayAfterEnd = itsOutputMetaData[station].fineDelayAfterEnd; + outHolder->getFlags()[station].unmarshall(itsOutputMetaData[station].flagsBuffer); + } + + outHolder->fillExtraData(); + + getDataManager().readyWithOutHolder(selector->getCurrentSelection()); + selector->selectNext(); + } +} + + +void WH_InputSection::process() +{ + itsProcessTimer.start(); + SparseSet flags; + + if (itsIsInput) { + doInput(flags); + limitFlagsLength(flags); + } + + transposeData(); + transposeMetaData(flags); + + if (itsIsOutput) + doOutput(); + + itsProcessTimer.stop(); + +#if defined USE_TIMER + while (!signalReceived) + pause(); + + signalReceived = false; +#endif +} + +void WH_InputSection::postprocess() +{ + if (itsIsInput) { + InputThread::stopThreads(); + itsBBuffer->clear(); + itsInputThread->join(); + delete itsInputThread; + delete itsInputThreadObject; + delete itsBBuffer; + delete itsInputData; + delete [] itsInputMetaData; + } + + if (itsIsOutput) { + delete itsOutputData; + delete [] itsOutputMetaData; + } + +#if defined USE_TIMER + // unset timer + struct itimerval value; + memset(&value, 0, sizeof value); + setitimer(ITIMER_REAL, &value, 0); + // remove sig handler + sighandler_t ret = signal(SIGALRM, SIG_DFL); + ASSERTSTR(ret != SIG_ERR, "WH_InputSection couldn't unset signal handler for timer"); +#endif + + itsPrePostTimer.stop(); + + itsPrePostTimer.print(clog); + itsProcessTimer.print(clog); + itsGetElemTimer.print(clog); +} + +void WH_InputSection::timerSignal(int) +{ + signalReceived = true; +} + +} // namespace CS1 +} // namespace LOFAR diff --git a/Appl/CEP/CS1/CS1_InputSection/src/WH_RSPInput.cc b/Appl/CEP/CS1/CS1_InputSection/src/WH_RSPInput.cc deleted file mode 100644 index 31915b119bc97678e645496b4245363275e79451..0000000000000000000000000000000000000000 --- a/Appl/CEP/CS1/CS1_InputSection/src/WH_RSPInput.cc +++ /dev/null @@ -1,325 +0,0 @@ -//# WH_RSPInput.cc: Catch RSP ethernet frames and synchronize RSP inputs -//# -//# Copyright (C) 2006 -//# ASTRON (Netherlands Foundation for Research in Astronomy) -//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl -//# -//# This program is free software; you can redistribute it and/or modify -//# it under the terms of the GNU General Public License as published by -//# the Free Software Foundation; either version 2 of the License, or -//# (at your option) any later version. -//# -//# This program is distributed in the hope that it will be useful, -//# but WITHOUT ANY WARRANTY; without even the implied warranty of -//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -//# GNU General Public License for more details. -//# -//# You should have received a copy of the GNU General Public License -//# along with this program; if not, write to the Free Software -//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -//# -//# $Id$ - -//# Always #include <lofar_config.h> first! -#include <lofar_config.h> - -//# Includes -#include <Common/LofarLogger.h> -#include <AMCBase/Epoch.h> -#include <CS1_InputSection/WH_RSPInput.h> -#include <CS1_Interface/DH_RSP.h> -#include <CS1_Interface/DH_Delay.h> -#include <Common/hexdump.h> -#include <APS/ParameterSet.h> -#include <Transport/TransportHolder.h> -#include <CS1_Interface/RSPTimeStamp.h> -#include <CS1_InputSection/BeamletBuffer.h> -#include <CS1_InputSection/InputThread.h> -#include <tinyCEP/Sel_RoundRobin.h> - - -// for timer -#include <signal.h> -#include <sys/time.h> -// for rand -#include <stdlib.h> -// for sleep (yield) -#include <boost/thread.hpp> - -namespace LOFAR { - namespace CS1 { - - int WH_RSPInput::theirNoRunningWHs = 0; - int WH_RSPInput::theirNoAlarms = 0; - bool WH_RSPInput::theirTimerSet = 0; - - WH_RSPInput::WH_RSPInput(const string& name, - ACC::APS::ParameterSet& ps, - TransportHolder& th, - uint stationNr) - : WorkHolder (1, - ps.getInt32("Observation.NSubbands") * ps.getInt32("Observation.NStations") / (ps.getInt32("Input.NRSPBoards") * ps.getInt32("General.SubbandsPerPset") * ps.getInt32("BGLProc.PsetsPerCell")), - name, - "WH_RSPInput"), - itsTH(th), - itsStationNr(stationNr), - itsPS (ps), - itsBBuffer(0), - itsPrePostTimer("pre/post"), - itsProcessTimer("process"), - itsGetElemTimer("getElem") - { - LOG_TRACE_FLOW_STR("WH_RSPInput constructor"); - - char str[32]; - - // get parameters - itsNSubbandsPerCell = ps.getInt32("General.SubbandsPerPset") * ps.getInt32("BGLProc.PsetsPerCell"); - itsNSamplesPerSec = ps.getInt32("Observation.NSubbandSamples"); - itsNHistorySamples = (ps.getInt32("BGLProc.NPPFTaps") - 1) * ps.getInt32("Observation.NChannels"); - - // create incoming dataholder holding the delay information - getDataManager().addInDataHolder(0, new DH_Delay("DH_Delay", ps.getInt32("Input.NRSPBoards"))); - - // create a outgoing dataholder for each subband - for (int s=0; s < itsNoutputs; s++) { - snprintf(str, 32, "DH_RSP_out_%d", s); - getDataManager().addOutDataHolder(s, new DH_RSP(str, itsPS)); - } - } - - - WH_RSPInput::~WH_RSPInput() - { - } - - - WorkHolder* WH_RSPInput::construct(const string& name, - ACC::APS::ParameterSet& ps, - TransportHolder& th, - uint stationNr) - { - return new WH_RSPInput(name, ps, th, stationNr); - } - - - WH_RSPInput* WH_RSPInput::make(const string& name) - { - return new WH_RSPInput(name, itsPS, itsTH, itsStationNr); - } - - - void WH_RSPInput::startThread() - { - - /* start up thread which writes RSP data from ethernet link - into cyclic buffers */ - LOG_TRACE_FLOW_STR("WH_RSPInput starting thread"); - - ThreadArgs args; - args.BBuffer = itsBBuffer; - args.th = &itsTH; - args.ipHeaderSize = itsPS.getInt32("Input.IPHeaderSize"); - args.frameHeaderSize = itsPS.getInt32("Input.SzEPAheader"); - args.nTimesPerFrame = itsPS.getInt32("Input.NTimesInFrame"); - args.nSubbandsPerFrame = itsPS.getInt32("Input.NSubbandsPerFrame"); - - args.frameSize = args.frameHeaderSize + args.nSubbandsPerFrame * args.nTimesPerFrame * sizeof(Beamlet); - args.ID = itsStationNr; - - - if ((itsTH.getType() == "TH_File") || (itsTH.getType() == "TH_Null")) { - // if we are reading from file, overwriting the buffer should not be allowed - // this way we can work with smaller files - itsBBuffer->setAllowOverwrite(false); - } - - itsInputThreadObject = new InputThread(args); - itsInputThread = new boost::thread(*itsInputThreadObject); - } - - void WH_RSPInput::preprocess() - { - // create the buffer controller. - int cyclicBufferSize = itsPS.getInt32("Input.NSamplesToBuffer"); - int subbandsToReadFromFrame = itsPS.getInt32("Observation.NSubbands") * itsPS.getInt32("Observation.NStations") / itsPS.getInt32("Input.NRSPBoards"); - ASSERTSTR(subbandsToReadFromFrame <= itsPS.getInt32("Input.NSubbandsPerFrame"), subbandsToReadFromFrame << " < " << itsPS.getInt32("Input.NSubbandsPerFrame")); - - itsBBuffer = new BeamletBuffer(cyclicBufferSize, subbandsToReadFromFrame, cyclicBufferSize/6, cyclicBufferSize/6); - startThread(); - itsPrePostTimer.start(); - - itsDelayCompensation = itsPS.getBool("Observation.DelayCompensation"); - - // determine starttime - double startTime = itsPS.getDouble("Observation.StartTime"); - double utc = 0; - -#if 1 - // interpret the time as utc - utc = startTime; -#else - utc = AMC::Epoch(startTime).utc(); -#endif - int sampleFreq = itsPS.getInt32("Observation.SampleRate"); - int seconds = (int)floor(utc); - int samples = (int)((utc - floor(utc)) * sampleFreq); - - itsSyncedStamp = TimeStamp(seconds, samples); - - cout<<"Starting buffer at "<<itsSyncedStamp<<endl;cout.flush(); - itsBBuffer->startBufferRead(itsSyncedStamp); - cout<<"end of WH_RSPInput::preprocess"<<endl;cout.flush(); - - - if (!theirTimerSet) { -#define USE_TIMER 0 -#if USE_TIMER - sighandler_t ret = signal(SIGALRM, *WH_RSPInput::timerSignal); - ASSERTSTR(ret != SIG_ERR, "WH_RSPInput couldn't set signal handler for timer"); - struct itimerval value; - memset (&value, 0, sizeof(itimerval)); - - __time_t secs = 1; - __time_t usecs = 0; - // this means 1MHz is the highest frequency - value.it_interval.tv_sec = secs; - value.it_interval.tv_usec = usecs; - value.it_value.tv_sec = sec; - value.it_value.tv_usec = usecs; - cout << "Setting timer interval to " << secs << "secs and " << usecs << "ms" << endl; - - setitimer(ITIMER_REAL, &value, 0); -#else - theirNoAlarms = -1; //This will make sure data is written at maximum speed -#endif - - theirTimerSet = true; - } - theirNoRunningWHs++; - - } - - void WH_RSPInput::process() - { - //cout<<"begin of WH_RSPInput::process"<<endl;cout.flush(); - itsProcessTimer.start(); - - // delay control - DH_Delay *delayDHp = static_cast<DH_Delay *>(getDataManager().getInHolder(0)); - // Get delay from the delay controller - timestamp_t delayedstamp = itsSyncedStamp; - - if (itsDelayCompensation) { - delayedstamp += (*delayDHp)[itsStationNr].coarseDelay; - } - - /* startstamp is the synced and delay-controlled timestamp to - start from in cyclic buffer */ - vector<Beamlet *> subbandbuffer; - SparseSet flags; - - // collect pointers to subbands in output dataholders - for (int output = 0; output < itsNoutputs; output ++) { - DH_RSP *rspDHp = static_cast<DH_RSP *>(getDataManager().getOutHolder(output)); - - for (int subband = 0; subband < itsNSubbandsPerCell; subband++) { - subbandbuffer.push_back(reinterpret_cast<Beamlet *>(rspDHp->getSamples()[subband].origin())); - } - } - - // get the data from the cyclic buffer - itsGetElemTimer.start(); - - itsBBuffer->getElements(subbandbuffer, - &flags, - delayedstamp - itsNHistorySamples, - itsNSamplesPerSec + itsNHistorySamples); - itsGetElemTimer.stop(); - - // print flags - cout<<"WH_RSP out "<<itsStationNr<<" "<<delayedstamp<<" flags: "<< flags <<endl; - // printsamples - - // fill in the outgoing dataholders - for (int output = 0; output < itsNoutputs; output++) { - DH_RSP *rspDHp = static_cast<DH_RSP *>(getDataManager().getOutHolder(output)); - - // fill in the data - rspDHp->getFlags() = flags; - rspDHp->setStationID(itsStationNr); - rspDHp->setTimeStamp(delayedstamp - itsNHistorySamples); - rspDHp->fillExtraData(); - rspDHp->setFineDelayAtBegin((*delayDHp)[itsStationNr].fineDelayAtBegin); - rspDHp->setFineDelayAfterEnd((*delayDHp)[itsStationNr].fineDelayAfterEnd); - } - - - - itsSyncedStamp += itsNSamplesPerSec; - itsProcessTimer.stop(); - while (theirNoAlarms == 0) - { - // wait for alarm to go off - boost::thread::yield(); - }; - - // we handled one alarm, so decrease it - theirNoAlarms--; - } - - void WH_RSPInput::postprocess() - { - theirNoRunningWHs--; - if (theirNoRunningWHs == 0) - { - theirTimerSet = false; - if (itsFrequency != 0) { -#if USE_TIMER - // unset timer - struct itimerval value; - memset (&value, 0, sizeof(itimerval)); - setitimer(ITIMER_REAL, &value, 0); - // remove sig handler - sighandler_t ret = signal(SIGALRM, SIG_DFL); - ASSERTSTR(ret != SIG_ERR, "WH_RSPInput couldn't unset signal handler for timer"); -#endif - } - } - - - sleep(1); - itsPrePostTimer.stop(); - - cout<<"\nWH_Timers:"<<endl; - - itsPrePostTimer.print(clog); - itsProcessTimer.print(clog); - itsGetElemTimer.print(clog); - - cout<<"in WH_RSPInput postprocess"<<endl; - //args.Connection = 0; - // stop writer thread - InputThread::stopThreads(); - itsBBuffer->clear(); - cout<<"buffer cleared"<<endl; - itsInputThread->join(); - delete itsInputThread; - delete itsInputThreadObject; - - delete itsBBuffer; - sleep(2); - } - - void WH_RSPInput::dump() const - { - } - - void WH_RSPInput::timerSignal(int) - { - // set the number of frames that can be sent - theirNoAlarms += theirNoRunningWHs; - } - - } // namespace CS1 -} // namespace LOFAR diff --git a/Appl/CEP/CS1/CS1_InputSection/src/WH_SBCollect.cc b/Appl/CEP/CS1/CS1_InputSection/src/WH_SBCollect.cc deleted file mode 100644 index 4fa38fd8eee561dd69b9d7b738cdec4ceb9b233d..0000000000000000000000000000000000000000 --- a/Appl/CEP/CS1/CS1_InputSection/src/WH_SBCollect.cc +++ /dev/null @@ -1,192 +0,0 @@ -//# WH_SBCollect.cc: Joins all data (stations, pols) for a subband -//# -//# Copyright (C) 2006 -//# ASTRON (Netherlands Foundation for Research in Astronomy) -//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl -//# -//# This program is free software; you can redistribute it and/or modify -//# it under the terms of the GNU General Public License as published by -//# the Free Software Foundation; either version 2 of the License, or -//# (at your option) any later version. -//# -//# This program is distributed in the hope that it will be useful, -//# but WITHOUT ANY WARRANTY; without even the implied warranty of -//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -//# GNU General Public License for more details. -//# -//# You should have received a copy of the GNU General Public License -//# along with this program; if not, write to the Free Software -//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -//# -//# $Id$ - -//# Always #include <lofar_config.h> first! -#include <lofar_config.h> - -//# Includes -#include <Common/LofarLogger.h> -#include <CS1_InputSection/WH_SBCollect.h> -#include <CS1_Interface/DH_RSP.h> -#include <CS1_Interface/DH_Subband.h> -#include <Common/hexdump.h> -#include <tinyCEP/Sel_RoundRobin.h> -#include <CEPFrame/DataManager.h> - -// for timer -#include <signal.h> -#include <sys/time.h> -// for rand -#include <stdlib.h> -// for sleep (yield) -#include <boost/thread.hpp> - -namespace LOFAR -{ - namespace CS1 - { - - int WH_SBCollect::theirNoRunningWHs = 0; - int WH_SBCollect::theirNoAlarms = 0; - bool WH_SBCollect::theirTimerSet = 0; - - WH_SBCollect::WH_SBCollect(const string& name, - const ACC::APS::ParameterSet pset, - const int noutputs) - : WorkHolder (pset.getInt32("Observation.NStations"), - noutputs, - name, - "WH_SBCollect"), - itsPS (pset), - itsNStations (pset.getInt32("Observation.NStations")), - itsNSubbandsPerCell (pset.getInt32("General.SubbandsPerPset") * pset.getInt32("BGLProc.PsetsPerCell")) - { - char str[32]; - for (int i=0; i<itsNinputs; i++) - { - sprintf(str, "DH_in_%d", i); - getDataManager().addInDataHolder(i, new DH_RSP(str, itsPS)); - } - vector<int> channels; - for(int i=0;i<itsNoutputs; i++) - { - sprintf(str, "DH_out_%d", i); - getDataManager().addOutDataHolder(i, new DH_Subband(str, itsPS)); - getDataManager().setAutoTriggerOut(i, false); - channels.push_back(i); - } - // Set a round robin output selector - getDataManager().setOutputSelector(new Sel_RoundRobin(channels)); - } - - WH_SBCollect::~WH_SBCollect() { - } - - WorkHolder* WH_SBCollect::construct(const string& name, - const ACC::APS::ParameterSet pset, - const int noutputs) - { - return new WH_SBCollect(name, pset, noutputs); - } - - WH_SBCollect* WH_SBCollect::make(const string& name) - { - return new WH_SBCollect(name, itsPS, itsNoutputs); - } - - void WH_SBCollect::preprocess() { - sleep(6); - if (!theirTimerSet) { -#define USE_TIMER 0 -#if USE_TIMER - sighandler_t ret = signal(SIGALRM, *WH_SBCollect::timerSignal); - ASSERTSTR(ret != SIG_ERR, "WH_SBCollect couldn't set signal handler for timer"); - struct itimerval value; - memset (&value, 0, sizeof(itimerval)); - - double interval = itsPS.getUint32("Observation.NSubbandSamples") / itsPS.getDouble("Observation.SampleRate") / itsNSubbandsPerCell; - __time_t secs = static_cast<__time_t>(floor(interval)); - __time_t usecs = static_cast<__time_t>(1e6 * (interval - secs)); - // this means 1MHz is the highest frequency - value.it_interval.tv_sec = secs; - value.it_interval.tv_usec = usecs; - value.it_value.tv_sec = secs; - value.it_value.tv_usec = usecs; - clog << "Setting timer interval to " << secs << "secs and " << usecs << "ms" << endl; - - setitimer(ITIMER_REAL, &value, 0); -#else - theirNoAlarms = -1; //This will make sure data is written at maximum speed -#endif - theirTimerSet = true; - } - theirNoRunningWHs++; - } - - void WH_SBCollect::process() - { - // Copy every subband to one BG/L core - for (unsigned subband = 0; subband < itsNSubbandsPerCell; subband ++) { - // ask the round robin selector for the next output - DH_Subband *outHolder = static_cast<DH_Subband *>(getDataManager().getOutHolder(getDataManager().getOutputSelector()->getCurrentSelection())); - - // Copy one subband from every input - for (unsigned station = 0; station < itsNStations; station ++) { - DH_RSP *inHolder = static_cast<DH_RSP *>(getDataManager().getInHolder(station)); - - if (subband == 0) - inHolder->getExtraData(); - - memcpy(outHolder->getSamples3D()[station].origin(), - inHolder->getSamples()[subband].origin(), - outHolder->getSamples3D()[station].num_elements() * sizeof(DH_Subband::SampleType)); - - // copy other information (delayInfo, flags etc) - outHolder->getDelays()[station].delayAtBegin = inHolder->getFineDelayAtBegin(); - outHolder->getDelays()[station].delayAfterEnd = inHolder->getFineDelayAfterEnd(); - outHolder->getFlags()[station] = inHolder->getFlags(); - } - - outHolder->fillExtraData(); - - while (theirNoAlarms == 0) - { - // wait for alarm to go off - boost::thread::yield(); - }; - // we handled one alarm, so decrease it - theirNoAlarms--; - getDataManager().readyWithOutHolder(getDataManager().getOutputSelector()->getCurrentSelection()); - getDataManager().getOutputSelector()->selectNext(); - } - } - - void WH_SBCollect::postprocess() - { - theirNoRunningWHs--; - if (theirNoRunningWHs == 0) - { - theirTimerSet = false; - if (itsFrequency != 0) { - // unset timer -#if USE_TIMER - struct itimerval value; - memset (&value, 0, sizeof(itimerval)); - setitimer(ITIMER_REAL, &value, 0); - // remove sig handler - sighandler_t ret = signal(SIGALRM, SIG_DFL); - ASSERTSTR(ret != SIG_ERR, "WH_SBCollect couldn't unset signal handler for timer"); -#endif - } - } - cout<<"Sleeping to enable shutdown of threads"<<endl; - sleep(30); - } - - void WH_SBCollect::timerSignal(int sig) - { - // set the number of frames that can be sent - theirNoAlarms += theirNoRunningWHs; - } - } // namespace CS1 - -} // namespace LOFAR diff --git a/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/SparseSet.h b/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/SparseSet.h index c67fc288ef3e83d904ad4e4fc62faef7e473f8a1..c5c435feea3f77f03804a7cdd8b7d4d97bf5e5b1 100644 --- a/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/SparseSet.h +++ b/Appl/CEP/CS1/CS1_Interface/include/CS1_Interface/SparseSet.h @@ -62,6 +62,9 @@ class SparseSet { void write(BlobOStream &) const; void read(BlobIStream &); + ssize_t marshall(void *ptr, size_t maxSize) const; + void unmarshall(const void *ptr); + private: std::vector<struct range> ranges; }; diff --git a/Appl/CEP/CS1/CS1_Interface/src/SparseSet.cc b/Appl/CEP/CS1/CS1_Interface/src/SparseSet.cc index 6a3f42d0065e8a864ff08cce3ace4a4bd8c56bdf..144cc2ed1ac67729b9211e35c2b0bda1ed19c8bc 100644 --- a/Appl/CEP/CS1/CS1_Interface/src/SparseSet.cc +++ b/Appl/CEP/CS1/CS1_Interface/src/SparseSet.cc @@ -27,6 +27,7 @@ #include <algorithm> #include <cassert> +#include <cstring> #include <iostream> @@ -204,6 +205,27 @@ void SparseSet::read(BlobIStream &bis) } +ssize_t SparseSet::marshall(void *ptr, size_t maxSize) const +{ + size_t size = sizeof(uint32) + ranges.size() * sizeof(range) > maxSize; + + if (size > maxSize) + return -1; + + * (uint32 *) ptr = ranges.size(); + memcpy((uint32 *) ptr + 1, &ranges[0], ranges.size() * sizeof(range)); + + return size; +} + + +void SparseSet::unmarshall(const void *ptr) +{ + ranges.resize(* (uint32 *) ptr); + memcpy(&ranges[0], (uint32 *) ptr + 1, ranges.size() * sizeof(range)); +} + + std::ostream &operator << (std::ostream &str, const SparseSet &set) { for (std::vector<SparseSet::range>::const_iterator it = set.getRanges().begin(); it != set.getRanges().end(); it ++) diff --git a/Appl/CEP/CS1/CS1_Run/src/CS1_Sections.py b/Appl/CEP/CS1/CS1_Run/src/CS1_Sections.py index c7713208b9445f18e9774a854fad482fa731769b..4bc964e1c2bcaf43f3a7cf6a3a43245dfd974606 100644 --- a/Appl/CEP/CS1/CS1_Run/src/CS1_Sections.py +++ b/Appl/CEP/CS1/CS1_Run/src/CS1_Sections.py @@ -3,6 +3,7 @@ from LOFAR_Jobs import * import time import os import copy +import sys class Section(object): """ @@ -79,28 +80,31 @@ class InputSection(Section): if not nSubbands % nSubbandsPerCell == 0: raise Exception('Not a integer number of compute cells (nSubbands = %d and nSubbandsPerCell = %d)' % (nSubbands, nSubbandsPerCell)) self.nCells = int(nCells) - self.noProcesses = self.nrsp + self.nCells host = copy.deepcopy(myhost) - slaves = host.getSlaves() - inputNodes = parset.getInputNodes() + [2, 3, 5, 6, 8, 9, 11, 12] - # for sbCollect use only the nodes that are used less than 2 times - sbcollectNodes = [node for node in range(1, 13) if inputNodes.count(node) < 2] - newslaves = [slaves[ind - 1] for ind in inputNodes + sbcollectNodes] - - host.setSlaves(newslaves) - Section.__init__(self, parset, \ - 'Appl/CEP/CS1/CS1_InputSection', \ - host = host, \ - buildvar = 'gnu64_mpich-opt') + inputNodes = parset.getInputNodes() + outputNodes = range(1, self.nCells + 1) + allNodes = inputNodes + [node for node in outputNodes if not node in inputNodes] + + inputIndices = range(len(inputNodes)) + outputIndices = [allNodes.index(node) for node in outputNodes] + + newslaves = [slaves[ind - 1] for ind in allNodes] + host.setSlaves(newslaves) + self.noProcesses = len(newslaves) + + Section.__init__(self, parset, \ + 'Appl/CEP/CS1/CS1_InputSection', \ + host = host, \ + buildvar = 'gnu64_mpich-opt') + + self.parset['Input.InputNodes'] = inputIndices + self.parset['Input.OutputNodes'] = outputIndices - myslaves = self.host.getSlaves()[self.nrsp : self.nrsp + self.nCells] - transposeIPs = [s.getIntName() for s in myslaves] - bglprocIPs = [s.getExtIP() for s in myslaves] - #self.parset['TransposeHosts'] = '[' + ','.join(transposeIPs) + ']' - self.parset['Connections.Input_BGLProc.ServerHosts'] = '[' + ','.join(bglprocIPs) + ']' + bglprocIPs = [newslaves[j].getExtIP() for j in outputIndices] + self.parset['Connections.Input_BGLProc.ServerHosts'] = '[' + ','.join(bglprocIPs) + ']' def run(self, runlog, noRuns, runCmd = None): Section.run(self, runlog, noRuns, runCmd)