diff --git a/RTCP/IONProc/src/OutputThread.cc b/RTCP/IONProc/src/OutputThread.cc index 4ced256d92a0e64097eef557e69d33371a7b39db..aa2bc60e735e496232289ffed410beb7df5fe254 100644 --- a/RTCP/IONProc/src/OutputThread.cc +++ b/RTCP/IONProc/src/OutputThread.cc @@ -50,7 +50,7 @@ OutputThread::OutputThread(const Parset &parset, const ProcessingPlan::planlet & itsDone(false), itsParset(parset), itsFilename(filename), - itsServer(parset.targetHost( outputConfig.info.storageLocationKey, outputConfig.info.storageFilenamesSetKey, filename )) + itsServer(parset.targetHost( outputConfig.info.storageFilenamesSetKey, filename )) { itsLogPrefix = str(format("[obs %u output %u index %3u] ") % parset.observationID() % outputConfig.outputNr % index); diff --git a/RTCP/Interface/include/Interface/Parset.h b/RTCP/Interface/include/Interface/Parset.h index cde36b37c0e2ddcf0338b727d2f29876f8285a57..b6a0da6494aa7a1cbcf736f2a0f31ec6e0565b01 100644 --- a/RTCP/Interface/include/Interface/Parset.h +++ b/RTCP/Interface/include/Interface/Parset.h @@ -177,10 +177,10 @@ public: vector<double> itsStPositions; vector<string> fileNames(const string &filesSet) const; - vector<string> fileLocations(const string &locationKey) const; + vector<string> fileLocations(const string &filesSet) const; string fileNameMask(const string &filesSet) const; - string targetDirectory(const string &locationKey, const string &filesSet, const string &fileName) const; - string targetHost(const string &locationKey, const string &filesSet, const string &fileName) const; + string targetDirectory(const string &filesSet, const string &filename) const; + string targetHost(const string &filesSet, const string &filename) const; string constructSubbandFilename( const string &mask, unsigned subband ) const; string constructBeamFormedFilename( const string &mask, unsigned beam, unsigned stokes, unsigned file ) const; @@ -643,28 +643,28 @@ inline string Parset::antennaSet() const inline vector<string> Parset::fileNames(const string &filesSet) const { - return getStringVector(filesSet + ".fileNames",true); + return getStringVector(filesSet + ".filenames",true); } -inline vector<string> Parset::fileLocations(const string &locationKey) const +inline vector<string> Parset::fileLocations(const string &filesSet) const { - return getStringVector(locationKey,true); + return getStringVector(filesSet + ".locations",true); } inline string Parset::fileNameMask(const string &filesSet) const { - return getString(filesSet + ".nameMask"); + return getString(filesSet + ".namemask"); } -inline string Parset::targetDirectory(const string &locationKey, const string &filesSet, const string &fileName) const +inline string Parset::targetDirectory(const string &filesSet, const string &filename) const { - vector<string> locations = fileLocations( locationKey ); + vector<string> locations = fileLocations( filesSet ); vector<string> filenames = fileNames( filesSet ); // TODO: cache and use a map or hashtable for (unsigned i = 0; i < filenames.size(); i++ ) - if (filenames[i] == fileName) { + if (filenames[i] == filename) { const string &location = locations[i]; vector<string> parts; @@ -676,15 +676,15 @@ inline string Parset::targetDirectory(const string &locationKey, const string &f return "none"; } -inline string Parset::targetHost(const string &locationKey, const string &filesSet, const string &fileName) const +inline string Parset::targetHost(const string &filesSet, const string &filename) const { - vector<string> locations = fileLocations( locationKey ); + vector<string> locations = fileLocations( filesSet ); vector<string> filenames = fileNames( filesSet ); // TODO: cache and use a map or hashtable for (unsigned i = 0; i < filenames.size(); i++ ) - if (filenames[i] == fileName) { + if (filenames[i] == filename) { const string &location = locations[i]; vector<string> parts; diff --git a/RTCP/Interface/include/Interface/ProcessingPlan.h b/RTCP/Interface/include/Interface/ProcessingPlan.h index 9c456c92dd4a3b446de049ecaef81f7918cf83ab..83348c29837ceb78136f641438f6231a74568e06 100644 --- a/RTCP/Interface/include/Interface/ProcessingPlan.h +++ b/RTCP/Interface/include/Interface/ProcessingPlan.h @@ -62,8 +62,7 @@ class ProcessingPlan enum distribution_t { DIST_UNKNOWN = 0, DIST_STATION, DIST_SUBBAND, DIST_BEAM }; struct datainfo { - const char *storageLocationKey; // i.e. "OLAP.Storage.beamFormed" - const char *storageFilenamesSetKey; // i.e. "Observation.BeamFormed" + const char *storageFilenamesSetKey; // i.e. "Observation.BeamFormed", the parset prefix for filename/location info distribution_t distribution; unsigned nrStokes; diff --git a/RTCP/Interface/src/CN_ProcessingPlan.cc b/RTCP/Interface/src/CN_ProcessingPlan.cc index e543e0b75149ee1f5bba203e285e2f7d5e50f9f3..a7292c975c2a7f83cfbdbedcfc5bf5a4e007f016 100644 --- a/RTCP/Interface/src/CN_ProcessingPlan.cc +++ b/RTCP/Interface/src/CN_ProcessingPlan.cc @@ -115,8 +115,7 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla // send all requested outputs if( configuration.outputFilteredData() ) { struct datainfo info = { - "OLAP.Storage.filtered", - "Observation.Filtered", + "OLAP.Storage.Filtered", DIST_SUBBAND, 1 }; @@ -124,8 +123,7 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla } if( configuration.outputCorrelatedData() ) { struct datainfo info = { - "OLAP.Storage.correlated", - "Observation.Correlated", + "OLAP.Storage.Correlated", DIST_SUBBAND, 1 }; @@ -133,8 +131,7 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla } if( configuration.outputIncoherentStokes() ) { struct datainfo info = { - "OLAP.Storage.incoherentStokes", - "Observation.IncoherentStokes", + "OLAP.Storage.IncoherentStokes", DIST_SUBBAND, 1 }; @@ -220,8 +217,7 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla if( configuration.outputBeamFormedData() ) { struct datainfo info = { - "OLAP.Storage.beamformed", - "Observation.Beamformed", + "OLAP.Storage.Beamformed", DIST_BEAM, NR_POLARIZATIONS }; @@ -231,8 +227,7 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla if( configuration.outputTrigger() ) { struct datainfo info = { - "OLAP.Storage.trigger", - "Observation.Trigger", + "OLAP.Storage.Trigger", DIST_BEAM, NR_POLARIZATIONS }; @@ -242,8 +237,7 @@ template <typename SAMPLE_TYPE> CN_ProcessingPlan<SAMPLE_TYPE>::CN_ProcessingPla if( configuration.outputCoherentStokes() ) { struct datainfo info = { - "OLAP.Storage.coherentStokes", - "Observation.CoherentStokes", + "OLAP.Storage.CoherentStokes", DIST_BEAM, configuration.nrStokes() }; diff --git a/RTCP/Run/src/LOFAR/Parset.py b/RTCP/Run/src/LOFAR/Parset.py index 01951d58f9d515c821a7f8d50ce95fb74cd5dcad..f4370385aa8f5ede520fa3db6bc806b160868eba 100644 --- a/RTCP/Run/src/LOFAR/Parset.py +++ b/RTCP/Run/src/LOFAR/Parset.py @@ -114,12 +114,19 @@ class Parset(util.Parset.Parset): self.setdefault("OLAP.Correlator.integrationTime",1); - self.setdefault('Observation.Filtered.nameMask','L${OBSID}_SB${SUBBAND}.filtered') - self.setdefault('Observation.Beamformed.nameMask','L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.raw') - self.setdefault('Observation.Correlated.nameMask','L${OBSID}_SB${SUBBAND}_uv.MS') - self.setdefault('Observation.CoherentStokes.nameMask','L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.raw') - self.setdefault('Observation.IncoherentStokes.nameMask','L${OBSID}_SB${SUBBAND}_bf.incoherentstokes') - self.setdefault('Observation.Trigger.nameMask','L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.trigger') + self.setdefault('OLAP.Storage.Filtered.namemask','L${OBSID}_SB${SUBBAND}.filtered') + self.setdefault('OLAP.Storage.Beamformed.namemask','L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.raw') + self.setdefault('OLAP.Storage.Correlated.namemask','L${OBSID}_SB${SUBBAND}_uv.MS') + self.setdefault('OLAP.Storage.CoherentStokes.namemask','L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.raw') + self.setdefault('OLAP.Storage.IncoherentStokes.namemask','L${OBSID}_SB${SUBBAND}_bf.incoherentstokes') + self.setdefault('OLAP.Storage.Trigger.namemask','L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.trigger') + + self.setdefault('OLAP.Storage.Filtered.dirmask','L${YEAR}_${OBSID}') + self.setdefault('OLAP.Storage.Beamformed.dirmask','L${YEAR}_${OBSID}') + self.setdefault('OLAP.Storage.Correlated.dirmask','L${YEAR}_${OBSID}') + self.setdefault('OLAP.Storage.CoherentStokes.dirmask','L${YEAR}_${OBSID}') + self.setdefault('OLAP.Storage.IncoherentStokes.dirmask','L${YEAR}_${OBSID}') + self.setdefault('OLAP.Storage.Trigger.dirmask','L${YEAR}_${OBSID}') # default beamlet settings, derived from subbandlist, for development if "Observation.subbandList" in self: @@ -330,37 +337,34 @@ class Parset(util.Parset.Parset): else: self.setdefault('OLAP.PencilInfo.storageNodeList',[i//int(math.ceil(1.0 * nrBeamFiles/nrStorageNodes)) for i in xrange(nrBeamFiles)]) - self.setdefault('OLAP.Storage.targetDirectory',self.parseMask('/data1/L${YEAR}_${OBSID}')); + self.setdefault('OLAP.Storage.targetDirectory','/data1') # generate filenames to produce - phase 2 nodelist = self.getInt32Vector( "OLAP.storageNodeList" ); - products = ["filtered","correlated","incoherentStokes"] + products = ["Filtered","Correlated","IncoherentStokes"] outputkeys = ["FilteredData","CorrelatedData","IncoherentStokes"] - def capfirst( s ): - return s[0].capitalize()+s[1:] - for p,o in zip(products,outputkeys): outputkey = "OLAP.output%s" % (o,) if not self.getBool(outputkey): continue - maskkey = "Observation.%s.nameMask" % capfirst(p) - mask = self["OLAP.Storage.targetDirectory"] + "/" + self[maskkey] - locationkey = "OLAP.Storage.%s" % p - filenameskey = "Observation.%s.fileNames" % capfirst(p) + maskkey = "OLAP.Storage.%s.namemask" % p + mask = self["OLAP.Storage.targetDirectory"] + "/" + self["OLAP.Storage.%s.dirmask" % p] + "/" + self[maskkey] + locationkey = "OLAP.Storage.%s.locations" % p + filenameskey = "OLAP.Storage.%s.filenames" % p paths = [ self.parseMask( mask, subband = i ) for i in xrange(nrSubbands) ] filenames = map( os.path.basename, paths ) dirnames = map( os.path.dirname, paths ) - locations = [ "%s:%s" % (self.storagenodes[nodelist[i]], dirnames[i]) for i in xrange(nrSubbands) ] + locations = [ "%s:%s/" % (self.storagenodes[nodelist[i]], dirnames[i]) for i in xrange(nrSubbands) ] self.setdefault( locationkey, locations ) self.setdefault( filenameskey, filenames ) # generate filenames to produce - phase 3 nodelist = self.getInt32Vector( "OLAP.PencilInfo.storageNodeList" ); - products = ["beamformed","coherentStokes","trigger"] + products = ["Beamformed","CoherentStokes","Trigger"] outputkeys = ["BeamFormedData","CoherentStokes","Trigger"] for p,o in zip(products,outputkeys): @@ -368,15 +372,15 @@ class Parset(util.Parset.Parset): if not self.getBool(outputkey): continue - maskkey = "Observation.%s.nameMask" % capfirst(p) - mask = self["OLAP.Storage.targetDirectory"] + "/" + self[maskkey] - locationkey = "OLAP.Storage.%s" % p - filenameskey = "Observation.%s.fileNames" % capfirst(p) + maskkey = "OLAP.Storage.%s.namemask" % p + mask = self["OLAP.Storage.targetDirectory"] + "/" + self["OLAP.Storage.%s.dirmask" % p] + "/" + self[maskkey] + locationkey = "OLAP.Storage.%s.locations" % p + filenameskey = "OLAP.Storage.%s.filenames" % p paths = [ self.parseMask( mask, beam = b, stokes = s, file = f ) for f in xrange(self.getNrPartsPerStokes()) for s in xrange(self.getNrStokes()) for b in xrange(self.getNrBeams()) ] filenames = map( os.path.basename, paths ) dirnames = map( os.path.dirname, paths ) - locations = [ "%s:%s" % (self.storagenodes[nodelist[i]], dirnames[i]) for i in xrange(self.getNrPartsPerStokes() * self.getNrStokes() * self.getNrBeams()) ] + locations = [ "%s:%s/" % (self.storagenodes[nodelist[i]], dirnames[i]) for i in xrange(self.getNrPartsPerStokes() * self.getNrStokes() * self.getNrBeams()) ] self.setdefault( locationkey, locations ) self.setdefault( filenameskey, filenames ) diff --git a/RTCP/Run/src/RTCP.parset b/RTCP/Run/src/RTCP.parset index 9cecf271c6c54a3abbffa9b1e63502b4f98c3260..71ae367711b5641cccc902b2715ee8e053a383d7 100644 --- a/RTCP/Run/src/RTCP.parset +++ b/RTCP/Run/src/RTCP.parset @@ -45,7 +45,7 @@ Observation.Beam[0].directionType = J2000 # ----- Output streams Observation.VirtualInstrument.storageNodeList = [lse019,lse020,lse021] # hosts -OLAP.Storage.targetDirectory = /data1/L${YEAR}_${MSNUMBER} # directory +OLAP.Storage.targetDirectory = /data1 # will be appended with dirmask and filenammask OLAP.OLAP_Conn.rawDataOutputs = [tcp:10.174.0.1:4000] # output data go to lse001 OLAP.OLAP_Conn.rawDataOutputOnly = F @@ -57,13 +57,13 @@ OLAP.outputCoherentStokes = F OLAP.outputIncoherentStokes = F OLAP.outputTrigger = F -Observation.Filtered.nameMask = L${OBSID}_SB${SUBBAND}.filtered -Observation.Correlated.nameMask = L${OBSID}_SB${SUBBAND}_uv.MS -Observation.IncoherentStokes.nameMask = L${OBSID}_SB${SUBBAND}_bf.incoherentstokes +OLAP.Storage.Filtered.namemask = L${OBSID}_SB${SUBBAND}.filtered +OLAP.Storage.Correlated.namemask = L${OBSID}_SB${SUBBAND}_uv.MS +OLAP.Storage.IncoherentStokes.namemask = L${OBSID}_SB${SUBBAND}_bf.incoherentstokes -Observation.BeamFormed.nameMask = L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.raw -Observation.CoherentStokes.nameMask = L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.raw -Observation.Trigger.nameMask = L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.trigger +OLAP.Storage.BeamFormed.namemask = L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.raw +OLAP.Storage.CoherentStokes.namemask = L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.raw +OLAP.Storage.Trigger.namemask = L${OBSID}_B${BEAM}_S${STOKES}_P${PART}_bf.trigger Observation.channelsPerSubband = 256 diff --git a/RTCP/Storage/src/Storage_main.cc b/RTCP/Storage/src/Storage_main.cc index 5e14ec01753d76141039aa3d6a713d23f2707caf..1ac5edb312f9fda0f9e9926df8b798f1497122cb 100644 --- a/RTCP/Storage/src/Storage_main.cc +++ b/RTCP/Storage/src/Storage_main.cc @@ -252,10 +252,10 @@ int main(int argc, char *argv[]) case ProcessingPlan::DIST_SUBBAND: for (unsigned s = 0; s < parset.nrSubbands(); s++) { string filename = parset.constructSubbandFilename( mask, s ); - string host = parset.targetHost( p.info.storageLocationKey, p.info.storageFilenamesSetKey, filename ); + string host = parset.targetHost( p.info.storageFilenamesSetKey, filename ); if (host == myhost) { - string dir = parset.targetDirectory( p.info.storageLocationKey, p.info.storageFilenamesSetKey, filename ); + string dir = parset.targetDirectory( p.info.storageFilenamesSetKey, filename ); unsigned index = s; subbandWriters.push_back(new SubbandWriter(parset, p, index, host, dir, filename, isBigEndian)); @@ -272,10 +272,10 @@ int main(int argc, char *argv[]) for (unsigned s = 0; s < nrstokes; s++) { for (unsigned q = 0; q < nrparts; q++) { string filename = parset.constructBeamFormedFilename( mask, b, s, q ); - string host = parset.targetHost( p.info.storageLocationKey, p.info.storageFilenamesSetKey, filename ); + string host = parset.targetHost( p.info.storageFilenamesSetKey, filename ); if (host == myhost) { - string dir = parset.targetDirectory( p.info.storageLocationKey, p.info.storageFilenamesSetKey, filename ); + string dir = parset.targetDirectory( p.info.storageFilenamesSetKey, filename ); unsigned index = (b * nrstokes + s ) * nrparts + q; subbandWriters.push_back(new SubbandWriter(parset, p, index, host, dir, filename, isBigEndian));