diff --git a/RTCP/CNProc/src/CN_Processing.cc b/RTCP/CNProc/src/CN_Processing.cc index 5bef2862673b9e45550b3f1a78afe79b2584c6f0..4cac60ca1545e61b56014b90666fc1089357b11e 100644 --- a/RTCP/CNProc/src/CN_Processing.cc +++ b/RTCP/CNProc/src/CN_Processing.cc @@ -169,7 +169,7 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::preprocess(CN_C itsNrSubbands = configuration.nrSubbands(); itsNrSubbandsPerPset = configuration.nrSubbandsPerPset(); itsNrSubbandsPerBeam = configuration.nrSubbandsPerBeam(); - itsNrFilesPerStokes = configuration.nrFilesPerStokes(); + itsNrPartsPerStokes = configuration.nrPartsPerStokes(); itsNrBeamsPerPset = configuration.nrBeamsPerPset(); itsCenterFrequencies = configuration.refFreqs(); itsFlysEye = configuration.flysEye(); @@ -359,7 +359,7 @@ template <typename SAMPLE_TYPE> int CN_Processing<SAMPLE_TYPE>::transposeBeams(u // the phase 3 psets are dedicated to phase 3 myBeam = *itsCurrentBeam; - beamToProcess = myBeam < itsNrBeams * itsNrStokes * itsNrFilesPerStokes; + beamToProcess = myBeam < itsNrBeams * itsNrStokes * itsNrPartsPerStokes; //LOG_DEBUG_STR(itsLogPrefix << "transpose: my beam = " << myBeam << " process? " << beamToProcess << " my coreindex = " << itsCurrentBeam->core ); @@ -379,20 +379,20 @@ template <typename SAMPLE_TYPE> int CN_Processing<SAMPLE_TYPE>::transposeBeams(u myBeam = firstBeamOfPset + relativeCoreIndex; - beamToProcess = myBeam < itsNrBeams * itsNrStokes * itsNrFilesPerStokes && relativeCoreIndex < itsNrBeamsPerPset; + beamToProcess = myBeam < itsNrBeams * itsNrStokes * itsNrPartsPerStokes && relativeCoreIndex < itsNrBeamsPerPset; } if (beamToProcess) { - unsigned stokesFile = myBeam % itsNrFilesPerStokes; - unsigned firstSubband = stokesFile * itsNrSubbandsPerBeam; - unsigned lastSubband = std::min( (stokesFile+1) * itsNrSubbandsPerBeam, itsNrSubbands ); + unsigned partNr = myBeam % itsNrPartsPerStokes; + unsigned firstSubband = partNr * itsNrSubbandsPerBeam; + unsigned lastSubband = std::min( (partNr+1) * itsNrSubbandsPerBeam, itsNrSubbands ); for (unsigned sb = firstSubband; sb < lastSubband; sb ++) { // calculate which (pset,core) produced subband sb unsigned pset = sb / itsNrSubbandsPerPset; unsigned core = (block * itsNrSubbandsPerPset + sb % itsNrSubbandsPerPset) % itsNrPhaseOneTwoCores; - //LOG_DEBUG_STR(itsLogPrefix << "transpose: receive subband " << sb << " of beam " << myBeam << " part " << stokesFile << " from pset " << pset << " core " << core); + //LOG_DEBUG_STR(itsLogPrefix << "transpose: receive subband " << sb << " of beam " << myBeam << " part " << partNr << " from pset " << pset << " core " << core); if (itsPlan->calculate( itsPlan->itsTransposedCoherentStokesData )) { itsAsyncTransposeBeams->postReceive(itsPlan->itsTransposedCoherentStokesData, sb - firstSubband, sb, myBeam, pset, core); } else { @@ -422,7 +422,7 @@ template <typename SAMPLE_TYPE> int CN_Processing<SAMPLE_TYPE>::transposeBeams(u static NSTimer asyncSendTimer("async beam send", true, true); - unsigned stokesFile = *itsCurrentSubband / itsNrSubbandsPerBeam; + unsigned partNr = *itsCurrentSubband / itsNrSubbandsPerBeam; #if 1 /* overlap computation and transpose */ @@ -436,11 +436,11 @@ template <typename SAMPLE_TYPE> int CN_Processing<SAMPLE_TYPE>::transposeBeams(u asyncSendTimer.start(); for (unsigned j = 0; j < itsNrStokes; j++) { // calculate which (pset,core) needs beam i - unsigned beam = (i * itsNrStokes + j) * itsNrFilesPerStokes + stokesFile; + unsigned beam = (i * itsNrStokes + j) * itsNrPartsPerStokes + partNr; unsigned pset = beam / itsNrBeamsPerPset; unsigned core = (firstCore + beam % itsNrBeamsPerPset) % itsNrPhaseThreeCores; - //LOG_DEBUG_STR(itsLogPrefix << "transpose: send subband " << *itsCurrentSubband << " of beam " << i << " pol/sgtokes " << j << " part " << stokesFile << " to pset " << pset << " core " << core); + //LOG_DEBUG_STR(itsLogPrefix << "transpose: send subband " << *itsCurrentSubband << " of beam " << i << " pol/sgtokes " << j << " part " << partNr << " to pset " << pset << " core " << core); if (itsPlan->calculate( itsPlan->itsCoherentStokesData )) { itsAsyncTransposeBeams->asyncSend(pset, core, *itsCurrentSubband, i, j, beam, itsPlan->itsCoherentStokesData); // Asynchronously send one beam to another pset. } else { @@ -463,11 +463,11 @@ template <typename SAMPLE_TYPE> int CN_Processing<SAMPLE_TYPE>::transposeBeams(u asyncSendTimer.start(); for (unsigned j = 0; j < itsNrStokes; j++) { // calculate which (pset,core) needs beam i - unsigned beam = (i * itsNrStokes + j) * itsNrFilesPerStokes + stokesFile; + unsigned beam = (i * itsNrStokes + j) * itsNrPartsPerStokes + partNr; unsigned pset = beam / itsNrBeamsPerPset; unsigned core = (firstCore + beam % itsNrBeamsPerPset) % itsNrPhaseThreeCores; - //LOG_DEBUG_STR(itsLogPrefix << "transpose: send subband " << *itsCurrentSubband << " of beam " << i << " pol/sgtokes " << j << " part " << stokesFile << " to pset " << pset << " core " << core); + //LOG_DEBUG_STR(itsLogPrefix << "transpose: send subband " << *itsCurrentSubband << " of beam " << i << " pol/sgtokes " << j << " part " << partNr << " to pset " << pset << " core " << core); if (itsPlan->calculate( itsPlan->itsCoherentStokesData )) { itsAsyncTransposeBeams->asyncSend(pset, core, *itsCurrentSubband, i, j, beam, itsPlan->itsCoherentStokesData); // Asynchronously send one beam to another pset. } else { @@ -670,9 +670,9 @@ template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::finishSendingBe template <typename SAMPLE_TYPE> void CN_Processing<SAMPLE_TYPE>::receiveBeam( unsigned beamToProcess ) { - unsigned stokesFile = beamToProcess % itsNrFilesPerStokes; - unsigned firstSubband = stokesFile * itsNrSubbandsPerBeam; - unsigned lastSubband = std::min( (stokesFile+1) * itsNrSubbandsPerBeam, itsNrSubbands ); + unsigned partNr = beamToProcess % itsNrPartsPerStokes; + unsigned firstSubband = partNr * itsNrSubbandsPerBeam; + unsigned lastSubband = std::min( (partNr+1) * itsNrSubbandsPerBeam, itsNrSubbands ); #if defined HAVE_MPI static NSTimer asyncReceiveTimer("wait for any async beam receive", true, true); diff --git a/RTCP/CNProc/src/CN_Processing.h b/RTCP/CNProc/src/CN_Processing.h index 786ef5d885ecf9e7506d43373a2aaa8b7385d923..5998e91044de908a5694b001dcb1e8c828148ae9 100644 --- a/RTCP/CNProc/src/CN_Processing.h +++ b/RTCP/CNProc/src/CN_Processing.h @@ -108,7 +108,7 @@ template <typename SAMPLE_TYPE> class CN_Processing : public CN_Processing_Base, unsigned itsNrSubbands; unsigned itsNrSubbandsPerPset; unsigned itsNrSubbandsPerBeam; - unsigned itsNrFilesPerStokes; + unsigned itsNrPartsPerStokes; unsigned itsNrBeams; unsigned itsNrStokes; // the number of polarizations/stokes that will be split off per beam during the transpose unsigned itsNrBeamsPerPset; diff --git a/RTCP/IONProc/src/ControlPhase3Cores.cc b/RTCP/IONProc/src/ControlPhase3Cores.cc index fe5f5868a85e290703ed704b51c357ce012c0fb7..d69747e24e846b5057759f30ac3a5e849b603a46 100644 --- a/RTCP/IONProc/src/ControlPhase3Cores.cc +++ b/RTCP/IONProc/src/ControlPhase3Cores.cc @@ -37,8 +37,8 @@ namespace RTCP { ControlPhase3Cores::ControlPhase3Cores(const Parset *ps, const std::vector<Stream *> &phaseThreeStreams ) : - itsPhaseThreeStreams(phaseThreeStreams), itsLogPrefix(str(format("[obs %u] ") % ps->observationID())), + itsPhaseThreeStreams(phaseThreeStreams), itsNrBeamsPerPset(ps->nrBeamsPerPset()), itsThread(0) { diff --git a/RTCP/IONProc/src/Job.cc b/RTCP/IONProc/src/Job.cc index e8c83b4a39a99051a08052178d0b7de580272489..d7a7cce84c5d58dc62827cdd6f889e5b7894b816 100644 --- a/RTCP/IONProc/src/Job.cc +++ b/RTCP/IONProc/src/Job.cc @@ -272,9 +272,7 @@ void Job::joinSSH(int childPID, const std::string &hostName, unsigned &timeout) void Job::startStorageProcesses() { - //itsStorageHostNames = itsParset.getStringVector("Observation.VirtualInstrument.storageNodeList"); -// use IP addresses, since the I/O node cannot resolve host names - itsStorageHostNames = itsParset.getStringVector("OLAP.OLAP_Conn.IONProc_Storage_ServerHosts"); + itsStorageHostNames = itsParset.getStringVector("OLAP.Storage.hosts"); std::string userName = itsParset.getString("OLAP.Storage.userName"); std::string sshKey = itsParset.getString("OLAP.Storage.sshIdentityFile"); @@ -529,6 +527,9 @@ template <typename SAMPLE_TYPE> void Job::doObservation() plan.removeNonOutputs(); unsigned nrOutputTypes = plan.nrOutputTypes(); std::vector<OutputSection *> outputSections(nrOutputTypes, 0); + unsigned nrparts = itsParset.nrPartsPerStokes(); + unsigned nrbeams = itsParset.flysEye() ? itsParset.nrMergedStations() : itsParset.nrPencilBeams(); + LOG_INFO_STR(itsLogPrefix << "----- Observation start"); @@ -549,52 +550,43 @@ template <typename SAMPLE_TYPE> void Job::doObservation() unsigned phase, maxlistsize; int psetIndex; - std::vector<unsigned> list; // list of subbands or beams + std::vector<std::pair<unsigned,std::string> > list; // list of filenames std::vector<unsigned> cores; - std::string type; - - unsigned nrstokes = 0; - - if (itsParset.outputBeamFormedData() || itsParset.outputTrigger()) - nrstokes = NR_POLARIZATIONS; - else if (itsParset.outputCoherentStokes()) - nrstokes = itsParset.nrStokes(); - unsigned nrbeams = (itsParset.flysEye() ? itsParset.nrMergedStations() : itsParset.nrPencilBeams()) * nrstokes * itsParset.nrFilesPerStokes(); + std::string mask = itsParset.fileNameMask( p.info.storageFilenamesSetKey ); - switch (p.distribution) { + switch (p.info.distribution) { case ProcessingPlan::DIST_SUBBAND: phase = 2; - type = "subbands"; cores = itsParset.phaseOneTwoCores(); psetIndex = itsParset.phaseTwoPsetIndex(myPsetNumber); if (psetIndex < 0) { // this pset does not participate for this output - LOG_DEBUG_STR(itsLogPrefix << "Not setting up output " << p.outputNr << " (" << p.name << ") because " << myPsetNumber << " is not in " << itsParset.phaseTwoPsets()); continue; } maxlistsize = itsParset.nrSubbandsPerPset(); + for (unsigned sb = 0; sb < itsParset.nrSubbandsPerPset(); sb ++) { - unsigned subbandNumber = psetIndex * itsParset.nrSubbandsPerPset() + sb; + unsigned s = psetIndex * itsParset.nrSubbandsPerPset() + sb; - if (subbandNumber < itsParset.nrSubbands()) - list.push_back(subbandNumber); + if (s < itsParset.nrSubbands()) { + std::string filename = itsParset.constructSubbandFilename( mask, s ); + list.push_back(std::pair<unsigned,std::string>(s,filename)); + } } break; case ProcessingPlan::DIST_BEAM: phase = 3; - type = "beams"; cores = itsParset.phaseThreeCores(); psetIndex = itsParset.phaseThreePsetIndex(myPsetNumber); if (psetIndex < 0) { // this pset does not participate for this outputlist - LOG_DEBUG_STR(itsLogPrefix << "Not setting up output " << p.outputNr << " (" << p.name << ") because " << myPsetNumber << " is not in " << itsParset.phaseThreePsets()); continue; } @@ -614,10 +606,18 @@ template <typename SAMPLE_TYPE> void Job::doObservation() } for (unsigned beam = 0; beam < itsParset.nrBeamsPerPset(); beam ++) { + unsigned nrstokes = p.info.nrStokes; + unsigned beamNumber = psetIndex * itsParset.nrBeamsPerPset() + beam; - if (beamNumber < nrbeams) - list.push_back(beamNumber); + unsigned b = beamNumber / nrparts / nrstokes; + unsigned s = beamNumber / nrparts % nrstokes; + unsigned q = beamNumber % nrparts; + + if (b < nrbeams) { + std::string filename = itsParset.constructBeamFormedFilename( mask, b, s, q ); + list.push_back(std::pair<unsigned,std::string>(beamNumber,filename)); + } } break; @@ -626,7 +626,7 @@ template <typename SAMPLE_TYPE> void Job::doObservation() continue; } - LOG_DEBUG_STR(itsLogPrefix << "Setting up output " << p.outputNr << " (" << p.name << ") for " << type << " " << list); + LOG_DEBUG_STR(itsLogPrefix << "Setting up output " << p.outputNr << " (" << p.name << ")"); outputSections[output] = new OutputSection(itsParset, cores, list, maxlistsize, p, &createCNstream); } diff --git a/RTCP/IONProc/src/OutputSection.cc b/RTCP/IONProc/src/OutputSection.cc index 3b99218a417bf71b45df7c7b19996a9e5b557c9c..beb7f73dff4e9511997ccb8fc857bfd6a0203da6 100644 --- a/RTCP/IONProc/src/OutputSection.cc +++ b/RTCP/IONProc/src/OutputSection.cc @@ -52,7 +52,7 @@ using boost::format; namespace LOFAR { namespace RTCP { -OutputSection::OutputSection(const Parset &parset, std::vector<unsigned> &coreList, std::vector<unsigned> &itemList, unsigned nrUsedCores, const ProcessingPlan::planlet &outputConfig, Stream *(*createStream)(unsigned,unsigned)) +OutputSection::OutputSection(const Parset &parset, std::vector<unsigned> &coreList, std::vector<std::pair<unsigned,std::string> > &itemList, unsigned nrUsedCores, const ProcessingPlan::planlet &outputConfig, Stream *(*createStream)(unsigned,unsigned)) : itsItemList(itemList), itsOutputNr(outputConfig.outputNr), @@ -88,7 +88,7 @@ OutputSection::OutputSection(const Parset &parset, std::vector<unsigned> &coreLi // create an output thread for this subband for (unsigned i = 0; i < itsItemList.size(); i++ ) { - itsOutputThreads.push_back(new OutputThread(parset, itsItemList[i], outputConfig)); + itsOutputThreads.push_back(new OutputThread(parset, outputConfig, itsItemList[i].first, itsItemList[i].second)); } LOG_DEBUG_STR(itsLogPrefix << "] Creating streams between compute nodes and OutputSection..."); @@ -164,14 +164,14 @@ void OutputSection::noMoreIterations() void OutputSection::droppingData(unsigned subband) { if (itsDroppedCount[subband] ++ == 0) - LOG_WARN_STR(itsLogPrefix << " subband " << std::setw(3) << itsItemList[subband] << "] Dropping data"); + LOG_WARN_STR(itsLogPrefix << " index " << setw(3) << itsItemList[subband].second << "] Dropping data"); } void OutputSection::notDroppingData(unsigned subband) { if (itsDroppedCount[subband] > 0) { - LOG_WARN_STR(itsLogPrefix << " subband " << std::setw(3) << itsItemList[subband] << "] Dropped " << itsDroppedCount[subband] << " integration time(s)" ); + LOG_WARN_STR(itsLogPrefix << " index " << setw(3) << itsItemList[subband].second << "] Dropped " << itsDroppedCount[subband] << " integration time(s)" ); itsDroppedCount[subband] = 0; } } diff --git a/RTCP/IONProc/src/OutputSection.h b/RTCP/IONProc/src/OutputSection.h index 444dbe158fad177266f26eeb01bffdc11a296b02..dbff7bbed7911d9dd8a2c13305f2b94d11bf1ec0 100644 --- a/RTCP/IONProc/src/OutputSection.h +++ b/RTCP/IONProc/src/OutputSection.h @@ -37,7 +37,7 @@ namespace RTCP { class OutputSection { public: - OutputSection(const Parset &, std::vector<unsigned> &coreList, std::vector<unsigned> &itemList, unsigned nrUsedCores, const ProcessingPlan::planlet &outputConfig, Stream * (*createStream)(unsigned, unsigned)); + OutputSection(const Parset &, std::vector<unsigned> &coreList, std::vector<std::pair<unsigned,std::string> > &itemList, unsigned nrUsedCores, const ProcessingPlan::planlet &outputConfig, Stream * (*createStream)(unsigned, unsigned)); ~OutputSection(); void addIterations(unsigned count); @@ -63,7 +63,7 @@ class OutputSection unsigned itsSequenceNumber; Semaphore itsNrIterationsToDo; - std::vector<unsigned> itsItemList; // list of either subbands or beams + std::vector<std::pair<unsigned,std::string> > itsItemList; // list of (index,filename)s const unsigned itsOutputNr; const unsigned itsNrComputeCores; unsigned itsCurrentComputeCore, itsNrUsedCores; diff --git a/RTCP/IONProc/src/OutputThread.cc b/RTCP/IONProc/src/OutputThread.cc index 15458b620fdde76dd70ebcf5d9cd9fd468c31602..4ced256d92a0e64097eef557e69d33371a7b39db 100644 --- a/RTCP/IONProc/src/OutputThread.cc +++ b/RTCP/IONProc/src/OutputThread.cc @@ -45,15 +45,14 @@ namespace LOFAR { namespace RTCP { -OutputThread::OutputThread(const Parset &parset, const unsigned subband, const ProcessingPlan::planlet &outputConfig) +OutputThread::OutputThread(const Parset &parset, const ProcessingPlan::planlet &outputConfig, unsigned index, const string &filename) : itsDone(false), itsParset(parset), - itsSubband(subband), - itsOutput(outputConfig.outputNr), - itsDistribution(outputConfig.distribution) + itsFilename(filename), + itsServer(parset.targetHost( outputConfig.info.storageLocationKey, outputConfig.info.storageFilenamesSetKey, filename )) { - itsLogPrefix = str(format("[obs %u output %u subband %3u] ") % parset.observationID() % outputConfig.outputNr % subband); + itsLogPrefix = str(format("[obs %u output %u index %3u] ") % parset.observationID() % outputConfig.outputNr % index); // transpose the data holders: create queues streams for the output streams // itsPlans is the owner of the pointers to sample data structures @@ -143,7 +142,7 @@ void OutputThread::mainLoop() #endif std::auto_ptr<Stream> streamToStorage; - std::string outputDescriptor = getStreamDescriptorBetweenIONandStorage(itsParset, itsSubband, itsOutput, itsDistribution == ProcessingPlan::DIST_SUBBAND); + std::string outputDescriptor = getStreamDescriptorBetweenIONandStorage(itsParset, itsServer, itsFilename); LOG_INFO_STR(itsLogPrefix << "Creating connection to " << outputDescriptor << "..."); diff --git a/RTCP/IONProc/src/OutputThread.h b/RTCP/IONProc/src/OutputThread.h index 2a9db140603b28e08c3e56e67996717ef6133c8b..dd8960ca2e3e482875e9d91c0f541df8dc1df9e0 100644 --- a/RTCP/IONProc/src/OutputThread.h +++ b/RTCP/IONProc/src/OutputThread.h @@ -45,13 +45,13 @@ namespace RTCP { class OutputThread { public: - OutputThread(const Parset &ps, const unsigned subband, const ProcessingPlan::planlet &outputConfig); + OutputThread(const Parset &ps, const ProcessingPlan::planlet &outputConfig, unsigned index, const std::string &filename); ~OutputThread(); bool waitForDone(const struct timespec ×pec); void abort(); - static const unsigned maxSendQueueSize = 3; // use 2 if you run out of memory, but test carefully to avoid data loss + static const unsigned maxSendQueueSize = 2; // use 2 if you run out of memory, but test carefully to avoid data loss Queue<StreamableData *> itsFreeQueue, itsSendQueue; @@ -65,8 +65,8 @@ class OutputThread Mutex itsDoneMutex; const Parset &itsParset; - const unsigned itsSubband, itsOutput; - const ProcessingPlan::distribution_t itsDistribution; + const std::string itsFilename; + const std::string itsServer; InterruptibleThread *itsThread; }; diff --git a/RTCP/Interface/include/Interface/CN_Configuration.h b/RTCP/Interface/include/Interface/CN_Configuration.h index d74d1a857a08fdafcafb01f61dd3760c9545544a..4ec1564cfdf3bde51b970b52683d77b3f09a5bcc 100644 --- a/RTCP/Interface/include/Interface/CN_Configuration.h +++ b/RTCP/Interface/include/Interface/CN_Configuration.h @@ -56,7 +56,7 @@ class CN_Configuration unsigned &nrSamplesToCNProc(); unsigned &nrSubbandsPerPset(); unsigned &nrSubbandsPerBeam(); - unsigned &nrFilesPerStokes(); + unsigned &nrPartsPerStokes(); unsigned &nrBeamsPerPset(); bool &delayCompensation(); bool &correctBandPass(); @@ -119,7 +119,7 @@ class CN_Configuration unsigned itsNrPhaseThreeCores; unsigned itsNrSubbandsPerPset; unsigned itsNrSubbandsPerBeam; - unsigned itsNrFilesPerStokes; + unsigned itsNrPartsPerStokes; unsigned itsNrBeamsPerPset; bool itsDelayCompensation; bool itsCorrectBandPass; @@ -213,9 +213,9 @@ inline unsigned &CN_Configuration::nrSubbandsPerBeam() return itsMarshalledData.itsNrSubbandsPerBeam; } -inline unsigned &CN_Configuration::nrFilesPerStokes() +inline unsigned &CN_Configuration::nrPartsPerStokes() { - return itsMarshalledData.itsNrFilesPerStokes; + return itsMarshalledData.itsNrPartsPerStokes; } inline unsigned &CN_Configuration::nrBeamsPerPset() diff --git a/RTCP/Interface/include/Interface/Parset.h b/RTCP/Interface/include/Interface/Parset.h index d3790dec0a17001bafe2d8c17f1b6760f75ba2df..c58243916228d2cdb466b6cf32662300ee6d665c 100644 --- a/RTCP/Interface/include/Interface/Parset.h +++ b/RTCP/Interface/include/Interface/Parset.h @@ -33,6 +33,7 @@ //# Includes #include <Common/ParameterSet.h> #include <Common/StreamUtil.h> +#include <Common/StringUtil.h> #include <Common/lofar_datetime.h> #include <Common/LofarLogger.h> #include <Interface/Config.h> @@ -89,7 +90,7 @@ public: uint32 nrSubbandSamples() const; uint32 nrSubbandsPerPset() const; uint32 nrSubbandsPerBeam() const; - uint32 nrFilesPerStokes() const; + uint32 nrPartsPerStokes() const; uint32 nrBeamsPerPset() const; uint32 nrHistorySamples() const; uint32 nrSamplesToCNProc() const; @@ -109,8 +110,6 @@ public: bool correctBandPass() const; bool hasStorage() const; string stationName(int index) const; - vector<unsigned> subbandStorageList() const; - vector<unsigned> beamStorageList() const; uint32 nrPsetsPerStorage() const; unsigned getLofarStManVersion() const; vector<uint32> phaseOnePsets() const; @@ -176,7 +175,16 @@ public: string contactName() const; vector<double> itsStPositions; - + + vector<string> fileNames(const string &filesSet) const; + vector<string> fileLocations(const string &locationKey) const; + string fileNameMask(const string &filesSet) const; + string targetDirectory(const string &locationKey, const string &filesSet, const string &fileName) const; + string targetHost(const string &locationKey, const string &filesSet, const string &fileName) const; + + string constructSubbandFilename( const string &mask, unsigned subband ) const; + string constructBeamFormedFilename( const string &mask, unsigned beam, unsigned stokes, unsigned file ) const; + private: const std::string itsName; @@ -236,16 +244,6 @@ inline bool Parset::hasStorage() const return getString("OLAP.OLAP_Conn.IONProc_Storage_Transport") != "NULL"; } -inline vector<unsigned> Parset::subbandStorageList() const -{ - return getUint32Vector("OLAP.storageNodeList",true); -} - -inline vector<unsigned> Parset::beamStorageList() const -{ - return getUint32Vector("OLAP.PencilInfo.storageNodeList",true); -} - inline string Parset::getTransportType(const string& prefix) const { return getString(prefix + "_Transport"); @@ -420,9 +418,9 @@ inline uint32 Parset::nrSubbandsPerBeam() const return getUint32("OLAP.Storage.nrSubbandsPerBeam"); } -inline uint32 Parset::nrFilesPerStokes() const +inline uint32 Parset::nrPartsPerStokes() const { - return getUint32("OLAP.Storage.nrFilesPerStokes"); + return getUint32("OLAP.Storage.nrPartsPerStokes"); } inline uint32 Parset::nrBeamsPerPset() const @@ -643,6 +641,60 @@ inline string Parset::antennaSet() const return getString("Observation.antennaSet"); } +inline vector<string> Parset::fileNames(const string &filesSet) const +{ + return getStringVector(filesSet + ".fileNames",true); +} + +inline vector<string> Parset::fileLocations(const string &locationKey) const +{ + return getStringVector(locationKey,true); +} + +inline string Parset::fileNameMask(const string &filesSet) const +{ + return getString(filesSet + ".nameMask"); +} + +inline string Parset::targetDirectory(const string &locationKey, const string &filesSet, const string &fileName) const +{ + vector<string> locations = fileLocations( locationKey ); + vector<string> filenames = fileNames( filesSet ); + + // TODO: cache and use a map or hashtable + + for (unsigned i = 0; i < filenames.size(); i++ ) + if (filenames[i] == fileName) { + const string &location = locations[i]; + vector<string> parts; + + parts = StringUtil::split(location, ':'); + + return parts[1]; + } + + return "none"; +} + +inline string Parset::targetHost(const string &locationKey, const string &filesSet, const string &fileName) const +{ + vector<string> locations = fileLocations( locationKey ); + vector<string> filenames = fileNames( filesSet ); + + // TODO: cache and use a map or hashtable + + for (unsigned i = 0; i < filenames.size(); i++ ) + if (filenames[i] == fileName) { + const string &location = locations[i]; + vector<string> parts; + + parts = StringUtil::split(location, ':'); + + return parts[0]; + } + + return "none"; +} } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/Interface/include/Interface/PrintVector.h b/RTCP/Interface/include/Interface/PrintVector.h index c1f3cf518b8b213e08e3023a413811af16e9814a..6fdc3f6cf1f497991eed2099f2859ad09dfc7484 100644 --- a/RTCP/Interface/include/Interface/PrintVector.h +++ b/RTCP/Interface/include/Interface/PrintVector.h @@ -43,6 +43,13 @@ template<typename T> inline std::ostream &operator << (std::ostream &str, const return str << ']'; } + +template<typename T, typename U> inline std::ostream &operator << (std::ostream &str, const std::pair<T,U> &p) +{ + return str << '(' << p.first << ',' << p.second << ')'; +} + + } // namespace RTCP } // namespace LOFAR diff --git a/RTCP/Interface/include/Interface/ProcessingPlan.h b/RTCP/Interface/include/Interface/ProcessingPlan.h index 2839b11b273c9689394493363f7bf644c1954b80..9c456c92dd4a3b446de049ecaef81f7918cf83ab 100644 --- a/RTCP/Interface/include/Interface/ProcessingPlan.h +++ b/RTCP/Interface/include/Interface/ProcessingPlan.h @@ -61,6 +61,14 @@ class ProcessingPlan public: enum distribution_t { DIST_UNKNOWN = 0, DIST_STATION, DIST_SUBBAND, DIST_BEAM }; + struct datainfo { + const char *storageLocationKey; // i.e. "OLAP.Storage.beamFormed" + const char *storageFilenamesSetKey; // i.e. "Observation.BeamFormed" + + distribution_t distribution; + unsigned nrStokes; + }; + struct planlet { StreamableData *set; // 0: barrier, require this source until here StreamableData *source; // 0: depend on nothing @@ -70,10 +78,8 @@ class ProcessingPlan int arena; // -1: not allocated, >= 0: allocated const char *name; // name of planlet or data set, for logging purposes - const char *filename; // for outputs: filename to use for this output - distribution_t distribution; - unsigned nrFilesPerBeam; + struct datainfo info; bool isOutput() const { return output; } // for filtering }; @@ -87,7 +93,7 @@ class ProcessingPlan void require( StreamableData *source ); // send set (i.e. as output) to be stored in a file or directory with a certain filename - void send( int outputNr, StreamableData *set, const char *filename, distribution_t distribution, unsigned nrFilesPerBeam = 1 ); + void send( int outputNr, StreamableData *set, const struct datainfo info ); // ----- Construct the plan: assign an arena to all // products that have to be calculated. @@ -144,7 +150,7 @@ inline void ProcessingPlan::transform( StreamableData *source, StreamableData *s p.outputNr = -1; p.arena = -1; p.name = name; - p.distribution = ProcessingPlan::DIST_UNKNOWN; + p.info.distribution = ProcessingPlan::DIST_UNKNOWN; plan.push_back( p ); } @@ -175,16 +181,14 @@ inline void ProcessingPlan::require( StreamableData *source ) { } } -inline void ProcessingPlan::send( int outputNr, StreamableData *set, const char *filename, ProcessingPlan::distribution_t distribution, unsigned nrFilesPerBeam ) { +inline void ProcessingPlan::send( int outputNr, StreamableData *set, const struct datainfo info ) { require( set ); // fake planlet to indicate we need this set // the entry we just created is an output -- configure it as such plan.back().output = true; plan.back().outputNr = outputNr; plan.back().name = find( set )->name; - plan.back().filename = filename; - plan.back().distribution = distribution; - plan.back().nrFilesPerBeam = nrFilesPerBeam; + plan.back().info = info; } inline void ProcessingPlan::assignArenas( bool assignAll ) { diff --git a/RTCP/Interface/include/Interface/Stream.h b/RTCP/Interface/include/Interface/Stream.h index 427bcba2c3452d5942e027778f7beb6d845e55af..2d2f36590f49b2c36fb0f85cd49bf53a4bd0d7dc 100644 --- a/RTCP/Interface/include/Interface/Stream.h +++ b/RTCP/Interface/include/Interface/Stream.h @@ -30,6 +30,8 @@ #include <Interface/Parset.h> #endif +#include <string> + namespace LOFAR { namespace RTCP { @@ -40,7 +42,7 @@ Stream *createStream(const std::string &descriptor, bool asReader); std::string getStreamDescriptorBetweenIONandCN(const char *streamType, unsigned pset, unsigned core, unsigned numpsets, unsigned numcores, unsigned channel); #ifndef HAVE_BGP_CN -std::string getStreamDescriptorBetweenIONandStorage(const Parset &parset, unsigned subband, unsigned output, bool perSubband = true); +std::string getStreamDescriptorBetweenIONandStorage(const Parset &parset, const std::string &host, const std::string &filename); #endif } // namespace RTCP diff --git a/RTCP/Interface/src/CN_Configuration.cc b/RTCP/Interface/src/CN_Configuration.cc index f580b09f13cfaa4a68610cce510df6cf126de5c6..cf949667a174eb1d6957353badc211d9cbbe7b71 100644 --- a/RTCP/Interface/src/CN_Configuration.cc +++ b/RTCP/Interface/src/CN_Configuration.cc @@ -59,7 +59,7 @@ CN_Configuration::CN_Configuration(const Parset &parset) nrSamplesToCNProc() = parset.nrSamplesToCNProc(); nrSubbandsPerPset() = parset.nrSubbandsPerPset(); nrSubbandsPerBeam() = parset.nrSubbandsPerBeam(); - nrFilesPerStokes() = parset.nrFilesPerStokes(); + nrPartsPerStokes() = parset.nrPartsPerStokes(); nrBeamsPerPset() = parset.nrBeamsPerPset(); delayCompensation() = parset.delayCompensation() || parset.nrPencilBeams() > 1 || parset.correctClocks(); correctBandPass() = parset.correctBandPass(); diff --git a/RTCP/Interface/src/CN_ProcessingPlan.cc b/RTCP/Interface/src/CN_ProcessingPlan.cc index 79d3136d013da36ed94c44c7c6044353a4263a4f..9968c4c10d542e9adb13889332dd660b428facfb 100644 --- a/RTCP/Interface/src/CN_ProcessingPlan.cc +++ b/RTCP/Interface/src/CN_ProcessingPlan.cc @@ -52,8 +52,6 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla const unsigned nrBaselines = configuration.nrMergedStations() * (configuration.nrMergedStations() + 1)/2; - const bool multipleBeamFiles = configuration.nrFilesPerStokes() > 1; - if (hasPhaseOne) { std::vector<unsigned> &phaseTwoPsets = configuration.phaseTwoPsets(); @@ -116,13 +114,31 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla // send all requested outputs if( configuration.outputFilteredData() ) { - send( 0, itsFilteredData, "L${MSNUMBER}_SB${SUBBAND}.filtered", ProcessingPlan::DIST_SUBBAND, 1 ); + struct datainfo info = { + "OLAP.Storage.filtered", + "Observation.Filtered", + DIST_SUBBAND, + 1 + }; + send( 0, itsFilteredData, info ); } if( configuration.outputCorrelatedData() ) { - send( 1, itsCorrelatedData, "L${MSNUMBER}_SB${SUBBAND}_uv.MS", ProcessingPlan::DIST_SUBBAND ); + struct datainfo info = { + "OLAP.Storage.correlated", + "Observation.Correlated", + DIST_SUBBAND, + 1 + }; + send( 1, itsCorrelatedData, info ); } if( configuration.outputIncoherentStokes() ) { - send( 2, itsIncoherentStokesData, "L${MSNUMBER}_SB${SUBBAND}_bf.incoherentstokes", ProcessingPlan::DIST_SUBBAND, 1 ); + struct datainfo info = { + "OLAP.Storage.incoherentStokes", + "Observation.IncoherentStokes", + DIST_SUBBAND, + 1 + }; + send( 2, itsIncoherentStokesData, info ); } } @@ -203,27 +219,36 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla TRANSFORM( itsTransposedCoherentStokesData, itsFinalCoherentStokesData ); if( configuration.outputBeamFormedData() ) { - if (multipleBeamFiles) { - send( 4, itsFinalBeamFormedData, "L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_P${BEAMFILE}_bf.raw", ProcessingPlan::DIST_BEAM, NR_POLARIZATIONS ); - } else { - send( 4, itsFinalBeamFormedData, "L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_bf.raw", ProcessingPlan::DIST_BEAM, NR_POLARIZATIONS ); - } + struct datainfo info = { + "OLAP.Storage.beamFormed", + "Observation.BeamFormed", + DIST_BEAM, + NR_POLARIZATIONS + }; + + send( 4, itsFinalBeamFormedData, info ); } if( configuration.outputTrigger() ) { - if (multipleBeamFiles) { - send( 5, itsTriggerData, "L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_P${BEAMFILE}_bf.trigger", ProcessingPlan::DIST_BEAM, configuration.nrStokes() ); - } else { - send( 5, itsTriggerData, "L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_bf.trigger", ProcessingPlan::DIST_BEAM, configuration.nrStokes() ); - } + struct datainfo info = { + "OLAP.Storage.trigger", + "Observation.Trigger", + DIST_BEAM, + NR_POLARIZATIONS + }; + + send( 5, itsTriggerData, info ); } if( configuration.outputCoherentStokes() ) { - if (multipleBeamFiles) { - send( 6, itsFinalCoherentStokesData, "L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_P${BEAMFILE}_bf.raw", ProcessingPlan::DIST_BEAM, configuration.nrStokes() ); - } else { - send( 6, itsFinalCoherentStokesData, "L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_bf.raw", ProcessingPlan::DIST_BEAM, configuration.nrStokes() ); - } + struct datainfo info = { + "OLAP.Storage.coherentStokes", + "Observation.CoherentStokes", + DIST_BEAM, + configuration.nrStokes() + }; + + send( 6, itsFinalCoherentStokesData, info ); } } } diff --git a/RTCP/Interface/src/Parset.cc b/RTCP/Interface/src/Parset.cc index 9186dd2dcd4995a94e06b543d694fea025329109..7638bbafabc03de671216de979949eaccc5529a2 100644 --- a/RTCP/Interface/src/Parset.cc +++ b/RTCP/Interface/src/Parset.cc @@ -40,6 +40,8 @@ #include <boost/algorithm/string/classification.hpp> #include <boost/format.hpp> #include <boost/lexical_cast.hpp> +#include <boost/algorithm/string/split.hpp> +#include <boost/algorithm/string.hpp> using boost::format; @@ -184,6 +186,56 @@ string Parset::getInputStreamName(const string &stationName, unsigned rspBoardNu } +string Parset::constructBeamFormedFilename( const string &mask, unsigned beam, unsigned stokes, unsigned file ) const +{ + using namespace boost; + + string name = mask; + string startTime = getString("Observation.startTime"); + vector<string> splitStartTime; + split(splitStartTime, startTime, is_any_of("- :")); + + replace_all(name, "${YEAR}", splitStartTime[0]); + replace_all(name, "${MONTH}", splitStartTime[1]); + replace_all(name, "${DAY}", splitStartTime[2]); + replace_all(name, "${HOURS}", splitStartTime[3]); + replace_all(name, "${MINUTES}", splitStartTime[4]); + replace_all(name, "${SECONDS}", splitStartTime[5]); + + replace_all(name, "${MSNUMBER}", str(format("%05u") % observationID())); + //replace_all(name, "${SAP}", str(format("%02u") % subbandToSAPmapping()[beamnr])); // station beams not supported yet + replace_all(name, "${PART}", str(format("%03u") % file)); + replace_all(name, "${BEAM}", str(format("%03u") % beam)); + replace_all(name, "${STOKES}", str(format("%u") % stokes)); + + return name; +} + + +string Parset::constructSubbandFilename( const string &mask, unsigned subband ) const +{ + using namespace boost; + + string name = mask; + string startTime = getString("Observation.startTime"); + vector<string> splitStartTime; + split(splitStartTime, startTime, is_any_of("- :")); + + replace_all(name, "${YEAR}", splitStartTime[0]); + replace_all(name, "${MONTH}", splitStartTime[1]); + replace_all(name, "${DAY}", splitStartTime[2]); + replace_all(name, "${HOURS}", splitStartTime[3]); + replace_all(name, "${MINUTES}", splitStartTime[4]); + replace_all(name, "${SECONDS}", splitStartTime[5]); + + replace_all(name, "${MSNUMBER}", str(format("%05u") % observationID())); + replace_all(name, "${SAP}", str(format("%02u") % subbandToSAPmapping()[subband])); + replace_all(name, "${SUBBAND}", str(format("%03u") % subband)); + + return name; +} + + unsigned Parset::nyquistZone() const { string bandFilter = getString("Observation.bandFilter"); diff --git a/RTCP/Interface/src/Stream.cc b/RTCP/Interface/src/Stream.cc index d9b866284ea2ce1ad4f62de0ae4902883e5d3edd..efac4ff16ba4b789d7810eca58ccf152258ae86d 100644 --- a/RTCP/Interface/src/Stream.cc +++ b/RTCP/Interface/src/Stream.cc @@ -99,22 +99,15 @@ std::string getStreamDescriptorBetweenIONandCN(const char *streamType, unsigned #ifndef HAVE_BGP_CN -std::string getStreamDescriptorBetweenIONandStorage(const Parset &parset, unsigned subband, unsigned output, bool perSubband) +std::string getStreamDescriptorBetweenIONandStorage(const Parset &parset, const string &host, const std::string &filename) { - std::string prefix = "OLAP.OLAP_Conn.IONProc_Storage"; - std::string connectionType = parset.getString(prefix + "_Transport"); + std::string connectionType = parset.getString("OLAP.OLAP_Conn.IONProc_Storage_Transport"); if (connectionType == "NULL") { return "null:"; } else if (connectionType == "TCP") { - std::string nodelist = perSubband ? "OLAP.storageNodeList" : "OLAP.PencilInfo.storageNodeList"; - unsigned serverIndex = parset.getUint32Vector(nodelist,true)[subband]; - std::string server = parset.getStringVector(prefix + "_ServerHosts")[serverIndex]; - - return str(format("tcpkey:%s:ion-storage-%s-output-%s-subband-%s") % server % parset.observationID() % output % subband); + return str(format("tcpkey:%s:ion-storage-obs-%s-file-%s") % host % parset.observationID() % filename); } else if (connectionType == "FILE") { - std::string filename = str(format("%s.%u") % parset.getString(prefix + "_BaseFileName") % subband); - return str(format("file:%s") % filename ); } else { THROW(InterfaceException, "unsupported ION->Storage stream type: " << connectionType); diff --git a/RTCP/Run/src/LOFAR/Parset.py b/RTCP/Run/src/LOFAR/Parset.py index a80eef9cf4b2a701d841c92849aad210b7140518..1afa3bc889f3e2e9ee9c39f28d7597098d52c3dc 100644 --- a/RTCP/Run/src/LOFAR/Parset.py +++ b/RTCP/Run/src/LOFAR/Parset.py @@ -114,6 +114,13 @@ class Parset(util.Parset.Parset): self.setdefault("OLAP.Correlator.integrationTime",1); + self.setdefault('Observation.Filtered.nameMask','L${MSNUMBER}_SB${SUBBAND}.filtered') + self.setdefault('Observation.BeamFormed.nameMask','L${MSNUMBER}_B${BEAM}_S${STOKES}_P${PART}_bf.raw') + self.setdefault('Observation.Correlated.nameMask','L${MSNUMBER}_SB${SUBBAND}_uv.MS') + self.setdefault('Observation.CoherentStokes.nameMask','L${MSNUMBER}_B${BEAM}_S${STOKES}_P${PART}_bf.raw') + self.setdefault('Observation.IncoherentStokes.nameMask','L${MSNUMBER}_SB${SUBBAND}_bf.incoherentstokes') + self.setdefault('Observation.Trigger.nameMask','L${MSNUMBER}_B${BEAM}_S${STOKES}_P${PART}_bf.trigger') + def convertDepricatedKeys(self): """ Converts some new keys to old ones to help old CEP code cope with new SAS code. """ @@ -274,7 +281,7 @@ class Parset(util.Parset.Parset): self['OLAP.IONProc.psetList'] = self.psets self.setdefault('OLAP.Storage.nrSubbandsPerBeam', nrSubbands); - self['OLAP.Storage.nrFilesPerStokes'] = int( math.ceil( 1.0 * nrSubbands / int(self["OLAP.Storage.nrSubbandsPerBeam"]) ) ) + self['OLAP.Storage.nrPartsPerStokes'] = int( math.ceil( 1.0 * nrSubbands / int(self["OLAP.Storage.nrSubbandsPerBeam"]) ) ) nrPsets = len(self.psets) nrStorageNodes = self.getNrUsedStorageNodes() @@ -283,7 +290,7 @@ class Parset(util.Parset.Parset): # set and resolve storage hostnames # sort them since mpirun will as well, messing with our indexing schemes! - self["OLAP.OLAP_Conn.IONProc_Storage_ServerHosts"] = [Hosts.resolve( s, "back") for s in self.storagenodes] + self["OLAP.Storage.hosts"] = self.storagenodes[:] self.setdefault('OLAP.nrPsets', nrPsets) self.setdefault('OLAP.CNProc.phaseOnePsets', [s.getPsetIndex(self.partition) for s in self.stations]) @@ -312,6 +319,57 @@ class Parset(util.Parset.Parset): else: self.setdefault('OLAP.PencilInfo.storageNodeList',[i//int(math.ceil(1.0 * nrBeamFiles/nrStorageNodes)) for i in xrange(nrBeamFiles)]) + self.setdefault('OLAP.Storage.targetDirectory',self.parseMask('/data1/L${YEAR}_${MSNUMBER}')); + + # generate filenames to produce - phase 2 + nodelist = self.getInt32Vector( "OLAP.storageNodeList" ); + products = ["filtered","correlated","incoherentStokes"] + outputkeys = ["FilteredData","CorrelatedData","IncoherentStokes"] + + def capfirst( s ): + return s[0].capitalize()+s[1:] + + for p,o in zip(products,outputkeys): + outputkey = "OLAP.output%s" % (o,) + if not self.getBool(outputkey): + continue + + maskkey = "Observation.%s.nameMask" % capfirst(p) + mask = self["OLAP.Storage.targetDirectory"] + "/" + self[maskkey] + locationkey = "OLAP.Storage.%s" % p + filenameskey = "Observation.%s.fileNames" % capfirst(p) + + paths = [ self.parseMask( mask, subband = i ) for i in xrange(nrSubbands) ] + filenames = map( os.path.basename, paths ) + dirnames = map( os.path.dirname, paths ) + locations = [ "%s:%s" % (self.storagenodes[nodelist[i]], dirnames[i]) for i in xrange(nrSubbands) ] + + self.setdefault( locationkey, locations ) + self.setdefault( filenameskey, filenames ) + + # generate filenames to produce - phase 3 + nodelist = self.getInt32Vector( "OLAP.PencilInfo.storageNodeList" ); + products = ["beamFormed","coherentStokes","trigger"] + outputkeys = ["BeamFormedData","CoherentStokes","Trigger"] + + for p,o in zip(products,outputkeys): + outputkey = "OLAP.output%s" % (o,) + if not self.getBool(outputkey): + continue + + maskkey = "Observation.%s.nameMask" % capfirst(p) + mask = self["OLAP.Storage.targetDirectory"] + "/" + self[maskkey] + locationkey = "OLAP.Storage.%s" % p + filenameskey = "Observation.%s.fileNames" % capfirst(p) + + paths = [ self.parseMask( mask, beam = b, stokes = s, file = f ) for f in xrange(self.getNrPartsPerStokes()) for s in xrange(self.getNrStokes()) for b in xrange(self.getNrBeams()) ] + filenames = map( os.path.basename, paths ) + dirnames = map( os.path.dirname, paths ) + locations = [ "%s:%s" % (self.storagenodes[nodelist[i]], dirnames[i]) for i in xrange(self.getNrPartsPerStokes() * self.getNrStokes() * self.getNrBeams()) ] + + self.setdefault( locationkey, locations ) + self.setdefault( filenameskey, filenames ) + # calculation configuration # integration times of CNProc and IONProc, based on self.integrationtime @@ -382,7 +440,7 @@ class Parset(util.Parset.Parset): self.storagenodes = sorted(storagenodes) # OLAP needs IP addresses from the backend - self["OLAP.OLAP_Conn.IONProc_Storage_ServerHosts"] = self.storagenodes + self["OLAP.Storage.hosts"] = self.storagenodes[:] def setObsID(self,obsid): self.setdefault("Observation.ObsID", obsid) @@ -407,14 +465,10 @@ class Parset(util.Parset.Parset): self["OLAP.OLAP_Conn.IONProc_Storage_Transport"] = "NULL" self.setStorageNodes([]) - def parseMask( self, mask = None ): - """ Fills a mask, by default the Observation.MSNameMask. """ + def parseMask( self, mask, subband = 0, beam = 0, stokes = 0, file = 0 ): + """ Fills a mask. """ assert "Observation.ObsID" in self, "Observation ID not generated yet." - if mask is None: - assert "Observation.MSNameMask" in self, "Observation.MSNameMask not defined in parset." - - mask = mask or self["Observation.MSNameMask"] # obtain settings date = parse( self["Observation.startTime"] ).timetuple() @@ -425,8 +479,11 @@ class Parset(util.Parset.Parset): mask = mask.replace( "${%s}" % d, "%02d" % (date[index],) ) mask = mask.replace( "${MSNUMBER}", "%05d" % (self.getObsID(),) ) - mask = mask.replace( "${SUBBAND}", "*" ) - mask = mask.replace( "${RAID}", "*" ) + mask = mask.replace( "${SUBBAND}", "%03d" % (subband,) ) + mask = mask.replace( "${SAP}", "%03d" % (beam,) ) + mask = mask.replace( "${PART}", "%03d" % (file,) ) + mask = mask.replace( "${BEAM}", "%03d" % (beam,) ) + mask = mask.replace( "${STOKES}", "%01d" % (stokes,) ) return mask @@ -471,23 +528,27 @@ class Parset(util.Parset.Parset): return max(tabList) + 1 - def getNrBeamFiles( self ): - nrSubbands = len(self.getInt32Vector("Observation.subbandList")) - nrFilesPerStokes = int(self["OLAP.Storage.nrFilesPerStokes"]) - + def getNrStokes( self ): if self.getBool("OLAP.outputBeamFormedData") or self.getBool("OLAP.outputTrigger"): - nrStokes = 2 + return 2 elif self.getBool("OLAP.outputCoherentStokes"): - nrStokes = len(self["OLAP.Stokes.which"]) + return len(self["OLAP.Stokes.which"]) else: - nrStokes = 0 + return 0 + + def getNrPartsPerStokes( self ): + return int(self["OLAP.Storage.nrPartsPerStokes"]) + + def getNrBeamFiles( self ): + nrPartsPerStokes = self.getNrPartsPerStokes() + nrStokes = self.getNrStokes() if self.getBool("OLAP.PencilInfo.flysEye"): nrBeams = self.getNrMergedStations() else: nrBeams = self.getNrBeams() - return nrBeams * nrStokes * nrFilesPerStokes + return nrBeams * nrStokes * nrPartsPerStokes def phaseThreeExists( self ): # NO support for mixing with Observation.mode and Observation.outputIncoherentStokesI diff --git a/RTCP/Run/src/RTCP.parset b/RTCP/Run/src/RTCP.parset index 8625e43f28f5a9e82658a9755b3b81880c745d32..9d2a40ece81bfbc1ba909fc076bd748cb391c8db 100644 --- a/RTCP/Run/src/RTCP.parset +++ b/RTCP/Run/src/RTCP.parset @@ -46,7 +46,6 @@ Observation.Beam[0].angle2 = 0.71094251447010637 Observation.Beam[0].directionType = J2000 # ----- Output streams -Observation.MSNameMask = /data1/D${YEAR}_${MSNUMBER}/SB${SUBBAND}.MS Observation.VirtualInstrument.storageNodeList = [lse019,lse020,lse021] OLAP.OLAP_Conn.rawDataOutputs = [tcp:10.174.0.1:4000] # output data go to lse001 @@ -59,6 +58,15 @@ OLAP.outputCoherentStokes = F OLAP.outputIncoherentStokes = F OLAP.outputTrigger = F +OLAP.Storage.targetDirectory = /data1/L${YEAR}_${MSNUMBER} + +Observation.Filtered.nameMask = L${MSNUMBER}_SB${SUBBAND}.filtered +Observation.BeamFormed.nameMask = L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_bf.raw +Observation.Correlated.nameMask = L${MSNUMBER}_SB${SUBBAND}_uv.MS +Observation.CoherentStokes.nameMask = L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_bf.raw +Observation.IncoherentStokes.nameMask = L${MSNUMBER}_SB${SUBBAND}_bf.incoherentstokes +Observation.Trigger.nameMask = L${MSNUMBER}_B${PBEAM}_S${SUBBEAM}_bf.trigger + Observation.channelsPerSubband = 256 OLAP.delayCompensation = T diff --git a/RTCP/Storage/include/Storage/InputThread.h b/RTCP/Storage/include/Storage/InputThread.h index a5f522d06c733a79aa06a475436583c712ff6b12..347642846bd292e48ba9e6d66b48e439b01fa369 100644 --- a/RTCP/Storage/include/Storage/InputThread.h +++ b/RTCP/Storage/include/Storage/InputThread.h @@ -40,7 +40,7 @@ namespace RTCP { class InputThread { public: - InputThread(const Parset &, unsigned subbandNumber, ProcessingPlan::planlet &outputConfig, /*const std::string &inputDescription,*/ Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue); + InputThread(const Parset &parset, const ProcessingPlan::planlet &outputConfig, unsigned index, const std::string &host, const std::string &filename, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue); ~InputThread(); private: @@ -49,7 +49,7 @@ class InputThread const std::string itsLogPrefix; const Parset &itsParset; - const unsigned itsSubbandNumber; + const std::string itsFilename; const unsigned itsOutputNumber; const ProcessingPlan::distribution_t itsDistribution; const std::string itsInputDescription; @@ -57,6 +57,8 @@ class InputThread Queue<StreamableData *> &itsFreeQueue, &itsReceiveQueue; + const std::string itsServer; + InterruptibleThread itsThread; }; diff --git a/RTCP/Storage/include/Storage/OutputThread.h b/RTCP/Storage/include/Storage/OutputThread.h index cd0daa1c7a42609afb0099231ade638ac3cfcb5e..c116d64cd216d12c55954b3f3ed00d07835af084 100644 --- a/RTCP/Storage/include/Storage/OutputThread.h +++ b/RTCP/Storage/include/Storage/OutputThread.h @@ -47,14 +47,13 @@ namespace RTCP { class OutputThread { public: - OutputThread(const Parset &, unsigned subbandNumber, const ProcessingPlan::planlet &outputConfig, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue, bool isBigEndian); + OutputThread(const Parset &, const ProcessingPlan::planlet &outputConfig, unsigned index, const std::string &dir, const std::string &filename, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue, bool isBigEndian); ~OutputThread(); // report any writes that take longer than this (seconds) static const float reportWriteDelay = 0.05; private: - string getMSname() const; void writeLogMessage(unsigned sequenceNumber); void flushSequenceNumbers(); void checkForDroppedData(StreamableData *data); @@ -67,7 +66,6 @@ class OutputThread Thread *itsThread; - const unsigned itsSubbandNumber; const unsigned itsOutputNumber; const unsigned itsObservationID; diff --git a/RTCP/Storage/include/Storage/SubbandWriter.h b/RTCP/Storage/include/Storage/SubbandWriter.h index 0a18d432ee58f44c95015b26e93e406512a064d6..b51ebe71e252074f5f9ddce5dcd2a0cccc29fcda 100644 --- a/RTCP/Storage/include/Storage/SubbandWriter.h +++ b/RTCP/Storage/include/Storage/SubbandWriter.h @@ -37,6 +37,8 @@ #include <Storage/MSWriter.h> #include <Stream/Stream.h> +#include <string> + namespace LOFAR { namespace RTCP { @@ -45,7 +47,7 @@ namespace RTCP { class SubbandWriter { public: - SubbandWriter(const Parset &parset, unsigned subband, ProcessingPlan::planlet &outputConfig, bool isBigEndian); + SubbandWriter(const Parset &parset, const ProcessingPlan::planlet &outputConfig, unsigned index, const std::string &host, const std::string &dir, const std::string &filename, bool isBigEndian); ~SubbandWriter(); private: diff --git a/RTCP/Storage/src/InputThread.cc b/RTCP/Storage/src/InputThread.cc index 4b9c3628dd30b78684b897de22fc7aaafbd80671..bcc5de0ae7ab8bb6635a9643c1fb98875382003c 100644 --- a/RTCP/Storage/src/InputThread.cc +++ b/RTCP/Storage/src/InputThread.cc @@ -36,17 +36,18 @@ using boost::format; namespace LOFAR { namespace RTCP { -InputThread::InputThread(const Parset &parset, unsigned subbandNumber, ProcessingPlan::planlet &outputConfig, /*const std::string &inputDescription,*/ Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue) +InputThread::InputThread(const Parset &parset, const ProcessingPlan::planlet &outputConfig, unsigned index, const string &host, const string &filename, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue) : - itsLogPrefix(str(format("[obs %u output %u subband %3u] ") % parset.observationID() % outputConfig.outputNr % subbandNumber)), + itsLogPrefix(str(format("[obs %u output %u index %3u] ") % parset.observationID() % outputConfig.outputNr % index)), itsParset(parset), - itsSubbandNumber(subbandNumber), + itsFilename(filename), itsOutputNumber(outputConfig.outputNr), - itsDistribution(outputConfig.distribution), + itsDistribution(outputConfig.info.distribution), //itsInputDescription(inputDescription), itsObservationID(parset.observationID()), itsFreeQueue(freeQueue), itsReceiveQueue(receiveQueue), + itsServer(host), itsThread(this, &InputThread::mainLoop, itsLogPrefix + "[InputThread] ") { } @@ -63,7 +64,7 @@ void InputThread::mainLoop() std::string inputDescriptor; try { - inputDescriptor = getStreamDescriptorBetweenIONandStorage(itsParset, itsSubbandNumber, itsOutputNumber, itsDistribution == ProcessingPlan::DIST_SUBBAND); + inputDescriptor = getStreamDescriptorBetweenIONandStorage(itsParset, itsServer, itsFilename); LOG_INFO_STR(itsLogPrefix << "Creating connection from " << inputDescriptor << "..." ); std::auto_ptr<Stream> streamFromION(createStream(inputDescriptor, true)); diff --git a/RTCP/Storage/src/OutputThread.cc b/RTCP/Storage/src/OutputThread.cc index 2be6ae70c55d058d8c870595c6944641a6dda5bb..ab4e53bbba31be91bace1122e2662c7c0e50b49d 100644 --- a/RTCP/Storage/src/OutputThread.cc +++ b/RTCP/Storage/src/OutputThread.cc @@ -34,8 +34,6 @@ #include <stdio.h> #include <boost/format.hpp> -#include <boost/algorithm/string/split.hpp> -#include <boost/algorithm/string.hpp> #include <sys/types.h> #include <sys/stat.h> @@ -48,6 +46,7 @@ using boost::format; namespace LOFAR { namespace RTCP { +#if 0 static string dirName( const string filename ) { using namespace boost; @@ -62,6 +61,7 @@ static string dirName( const string filename ) } return basedir; } +#endif static void makeDir( const string &dirname, const string &logPrefix ) { @@ -106,12 +106,11 @@ static void recursiveMakeDir( const string &dirname, const string &logPrefix ) } -OutputThread::OutputThread(const Parset &parset, unsigned subbandNumber, const ProcessingPlan::planlet &outputConfig, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue, bool isBigEndian) +OutputThread::OutputThread(const Parset &parset, const ProcessingPlan::planlet &outputConfig, unsigned index, const string &dir, const string &filename, Queue<StreamableData *> &freeQueue, Queue<StreamableData *> &receiveQueue, bool isBigEndian) : - itsLogPrefix(str(format("[obs %u output %u subband %3u] ") % parset.observationID() % outputConfig.outputNr % subbandNumber)), + itsLogPrefix(str(format("[obs %u output %u index %3u] ") % parset.observationID() % outputConfig.outputNr % index)), itsParset(parset), itsOutputConfig(outputConfig), - itsSubbandNumber(subbandNumber), itsOutputNumber(outputConfig.outputNr), itsObservationID(parset.observationID()), itsNextSequenceNumber(0), @@ -120,15 +119,22 @@ OutputThread::OutputThread(const Parset &parset, unsigned subbandNumber, const P itsSequenceNumbersFile(0), itsHaveCaughtException(false) { - std::string filename = getMSname(); + string fullfilename = dir + "/" + filename; - recursiveMakeDir( dirName(filename), itsLogPrefix ); + recursiveMakeDir( dir, itsLogPrefix ); if (dynamic_cast<CorrelatedData *>(outputConfig.source)) { - filename = str(format("%s/table.f0data") % getMSname()); +#if defined HAVE_AIPSPP + MeasurementSetFormat myFormat(&parset, 512); + + /// Make MeasurementSet filestructures and required tables + myFormat.addSubband(fullfilename, index, isBigEndian); + + LOG_INFO_STR(itsLogPrefix << "MeasurementSet created"); +#endif // defined HAVE_AIPSPP if (parset.getLofarStManVersion() == 2) { - string seqfilename = str(format("%s/table.f0seqnr") % getMSname()); + string seqfilename = str(format("%s/table.f0seqnr") % fullfilename); try { itsSequenceNumbersFile = new FileStream(seqfilename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); @@ -138,23 +144,15 @@ OutputThread::OutputThread(const Parset &parset, unsigned subbandNumber, const P } } -#if defined HAVE_AIPSPP - MeasurementSetFormat myFormat(&parset, 512); - - /// Make MeasurementSet filestructures and required tables - myFormat.addSubband(getMSname(), subbandNumber, isBigEndian); - - LOG_INFO_STR(itsLogPrefix << "MeasurementSet created"); -#endif // defined HAVE_AIPSPP + fullfilename = str(format("%s/table.f0data") % fullfilename); } - LOG_DEBUG_STR(itsLogPrefix << "Writing to " << filename); + LOG_INFO_STR(itsLogPrefix << "Writing to " << fullfilename); try { - itsWriter = new MSWriterFile(filename.c_str()); - + itsWriter = new MSWriterFile(fullfilename.c_str()); } catch (SystemCallException &ex) { - LOG_ERROR_STR(itsLogPrefix << "Cannot open " << filename << ": " << ex); + LOG_ERROR_STR(itsLogPrefix << "Cannot open " << fullfilename << ": " << ex); itsWriter = new MSWriterNull(); } @@ -175,50 +173,6 @@ OutputThread::~OutputThread() LOG_WARN_STR(itsLogPrefix << "OutputThread caught non-fatal exception(s).") ; } - -string OutputThread::getMSname() const -{ - using namespace boost; - - const char pols[] = "XY"; - const char stokes[] = "IQUV"; - - const unsigned nrBeamFiles = itsParset.nrFilesPerStokes(); - const int beam = itsSubbandNumber / itsOutputConfig.nrFilesPerBeam / nrBeamFiles; - const int subbeam = (itsSubbandNumber / nrBeamFiles) % itsOutputConfig.nrFilesPerBeam; - const int beamfile = itsSubbandNumber % nrBeamFiles; - - string name = dirName( itsParset.getString("Observation.MSNameMask") ) + itsOutputConfig.filename; - string startTime = itsParset.getString("Observation.startTime"); - vector<string> splitStartTime; - split(splitStartTime, startTime, is_any_of("- :")); - - replace_all(name, "${YEAR}", splitStartTime[0]); - replace_all(name, "${MONTH}", splitStartTime[1]); - replace_all(name, "${DAY}", splitStartTime[2]); - replace_all(name, "${HOURS}", splitStartTime[3]); - replace_all(name, "${MINUTES}", splitStartTime[4]); - replace_all(name, "${SECONDS}", splitStartTime[5]); - - replace_all(name, "${MSNUMBER}", str(format("%05u") % itsParset.observationID())); - replace_all(name, "${BEAM}", str(format("%02u") % itsParset.subbandToSAPmapping()[itsSubbandNumber])); - replace_all(name, "${SUBBAND}", str(format("%03u") % itsSubbandNumber)); - replace_all(name, "${BEAMFILE}", str(format("%03u") % beamfile)); - replace_all(name, "${PBEAM}", str(format("%03u") % beam)); - replace_all(name, "${POL}", str(format("%c") % pols[subbeam])); - replace_all(name, "${STOKES}", str(format("%c") % stokes[subbeam])); - replace_all(name, "${SUBBEAM}", str(format("%u") % subbeam)); - - string raidlistkey = itsOutputConfig.distribution == ProcessingPlan::DIST_SUBBAND - ? "OLAP.storageRaidList" : "OLAP.PencilInfo.storageRaidList"; - - if (itsParset.isDefined(raidlistkey)) - replace_all(name, "${RAID}", str(format("%s") % itsParset.getStringVector(raidlistkey, true)[itsSubbandNumber])); - - return name; -} - - void OutputThread::writeLogMessage(unsigned sequenceNumber) { LOG_INFO_STR(itsLogPrefix << "Written block with seqno = " << sequenceNumber); diff --git a/RTCP/Storage/src/Storage_main.cc b/RTCP/Storage/src/Storage_main.cc index 173e7f09661f336774f0c3668719169cfbca9545..5e14ec01753d76141039aa3d6a713d23f2707caf 100644 --- a/RTCP/Storage/src/Storage_main.cc +++ b/RTCP/Storage/src/Storage_main.cc @@ -30,8 +30,13 @@ #include <stdexcept> #include <cstdio> +#include <cmath> #include <cstdlib> +#include <map> +#include <vector> +#include <string> + #include <boost/format.hpp> using boost::format; @@ -232,15 +237,59 @@ int main(int argc, char *argv[]) logPrefix = str(format("[obs %u] ") % parset.observationID()); + vector<string> hosts = parset.getStringVector("OLAP.Storage.hosts"); + ASSERT( myRank < hosts.size() ); + const string myhost = hosts[myRank]; + unsigned nrparts = parset.nrPartsPerStokes(); + unsigned nrbeams = parset.flysEye() ? parset.nrMergedStations() : parset.nrPencilBeams(); + // start all writers for (unsigned output = 0; output < plan.nrOutputTypes(); output ++) { ProcessingPlan::planlet &p = plan.plan[output]; - ProcessingPlan::distribution_t distribution = p.distribution; - std::vector<unsigned> nodelist = distribution == ProcessingPlan::DIST_SUBBAND ? parset.subbandStorageList() : parset.beamStorageList(); - - for (unsigned n = 0; n < nodelist.size(); n ++) - if (nodelist[n] == myRank) - subbandWriters.push_back(new SubbandWriter(parset, n, p, isBigEndian)); + string mask = parset.fileNameMask( p.info.storageFilenamesSetKey ); + + switch (p.info.distribution) { + case ProcessingPlan::DIST_SUBBAND: + for (unsigned s = 0; s < parset.nrSubbands(); s++) { + string filename = parset.constructSubbandFilename( mask, s ); + string host = parset.targetHost( p.info.storageLocationKey, p.info.storageFilenamesSetKey, filename ); + + if (host == myhost) { + string dir = parset.targetDirectory( p.info.storageLocationKey, p.info.storageFilenamesSetKey, filename ); + unsigned index = s; + + subbandWriters.push_back(new SubbandWriter(parset, p, index, host, dir, filename, isBigEndian)); + } + } + + break; + + case ProcessingPlan::DIST_BEAM: + + for (unsigned b = 0; b < nrbeams; b++) { + unsigned nrstokes = p.info.nrStokes; + + for (unsigned s = 0; s < nrstokes; s++) { + for (unsigned q = 0; q < nrparts; q++) { + string filename = parset.constructBeamFormedFilename( mask, b, s, q ); + string host = parset.targetHost( p.info.storageLocationKey, p.info.storageFilenamesSetKey, filename ); + + if (host == myhost) { + string dir = parset.targetDirectory( p.info.storageLocationKey, p.info.storageFilenamesSetKey, filename ); + unsigned index = (b * nrstokes + s ) * nrparts + q; + + subbandWriters.push_back(new SubbandWriter(parset, p, index, host, dir, filename, isBigEndian)); + } + } + } + } + + break; + + case ProcessingPlan::DIST_UNKNOWN: + case ProcessingPlan::DIST_STATION: + continue; + } } ExitOnClosedStdin stdinWatcher; diff --git a/RTCP/Storage/src/SubbandWriter.cc b/RTCP/Storage/src/SubbandWriter.cc index 2aaccd2943756338dbf8fcfde2a716d6d75f7b5b..dcb50e0ff2f7ccbaf0ed83832378d7addab86b80 100644 --- a/RTCP/Storage/src/SubbandWriter.cc +++ b/RTCP/Storage/src/SubbandWriter.cc @@ -31,9 +31,6 @@ #include <Interface/CN_ProcessingPlan.h> #include <Storage/SubbandWriter.h> -#include <boost/lexical_cast.hpp> - - #include <time.h> #include <boost/format.hpp> @@ -43,9 +40,9 @@ namespace LOFAR { namespace RTCP { -SubbandWriter::SubbandWriter(const Parset &parset, unsigned subband, ProcessingPlan::planlet &outputConfig, bool isBigEndian) +SubbandWriter::SubbandWriter(const Parset &parset, const ProcessingPlan::planlet &outputConfig, unsigned index, const string &host, const string &dir, const string &filename, bool isBigEndian) { - const std::string logPrefix = str(format("[obs %u output %u subband %3u] ") % parset.observationID() % outputConfig.outputNr % subband); + const std::string logPrefix = str(format("[obs %u output %u index %u] ") % parset.observationID() % outputConfig.outputNr % index); StreamableData *dataTemplate = outputConfig.source; @@ -56,8 +53,8 @@ SubbandWriter::SubbandWriter(const Parset &parset, unsigned subband, ProcessingP itsFreeQueue.append(data); } - itsInputThread = new InputThread(parset, subband, outputConfig, itsFreeQueue, itsReceiveQueue); - itsOutputThread = new OutputThread(parset, subband, outputConfig, itsFreeQueue, itsReceiveQueue, isBigEndian); + itsInputThread = new InputThread(parset, outputConfig, index, host, filename, itsFreeQueue, itsReceiveQueue); + itsOutputThread = new OutputThread(parset, outputConfig, index, dir, filename, itsFreeQueue, itsReceiveQueue, isBigEndian); }