diff --git a/RTCP/Cobalt/OutputProc/src/MSWriter.cc b/RTCP/Cobalt/OutputProc/src/MSWriter.cc index 3f7078668fd0b374ed70ab1065131394bf3d2221..826c6b422157e1ef18d21c0eb1d1c169cf7ea8c5 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriter.cc +++ b/RTCP/Cobalt/OutputProc/src/MSWriter.cc @@ -42,6 +42,10 @@ namespace LOFAR { } + void MSWriter::createMetaData() + { + } + void MSWriter::augment(const FinalMetaData &finalMetaData) { (void)finalMetaData; diff --git a/RTCP/Cobalt/OutputProc/src/MSWriter.h b/RTCP/Cobalt/OutputProc/src/MSWriter.h index 0cf9257b77d97efffe124bbb523dd9d1f40614de..82b6ed55750040dffa8d0b44121c60927dbe5e02 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriter.h +++ b/RTCP/Cobalt/OutputProc/src/MSWriter.h @@ -39,6 +39,8 @@ namespace LOFAR virtual void write(StreamableData *) = 0; + virtual void createMetaData(); + virtual void augment(const FinalMetaData &finalMetaData); virtual size_t getDataSize(); diff --git a/RTCP/Cobalt/OutputProc/src/MSWriterCorrelated.cc b/RTCP/Cobalt/OutputProc/src/MSWriterCorrelated.cc index 5fbd6da2aa83ee586440ac39ae0153b94cf83815..bcac57dd52d0089224e6b7a871a233fa23c413fe 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriterCorrelated.cc +++ b/RTCP/Cobalt/OutputProc/src/MSWriterCorrelated.cc @@ -27,7 +27,6 @@ #include <boost/format.hpp> #include <boost/lexical_cast.hpp> -#include <Common/SystemUtil.h> #include <MSLofar/FailedTileInfo.h> #include <CoInterface/CorrelatedData.h> #include <CoInterface/LTAFeedback.h> @@ -48,18 +47,18 @@ namespace LOFAR MSWriterCorrelated::MSWriterCorrelated (const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex) : - MSWriterFile( - (makeMeasurementSet(logPrefix, msName, parset, subbandIndex), - str(format("%s/table.f0data") % msName))), + MSWriterFile(str(format("%s/table.f0data") % msName)), itsLogPrefix(logPrefix), itsMSname(msName), - itsParset(parset) + itsParset(parset), + itsSubbandIndex(subbandIndex) { // Add file-specific processing feedback LTAFeedback fb(itsParset.settings); - itsConfiguration.adoptCollection(fb.correlatedFeedback(subbandIndex)); - itsConfigurationPrefix = fb.correlatedPrefix(subbandIndex); + itsConfiguration.adoptCollection(fb.correlatedFeedback(itsSubbandIndex)); + itsConfigurationPrefix = fb.correlatedPrefix(itsSubbandIndex); + // Create Sequence file if (LofarStManVersion > 1) { string seqfilename = str(format("%s/table.f0seqnr") % msName); @@ -69,23 +68,19 @@ namespace LOFAR LOG_WARN_STR(itsLogPrefix << "Could not open sequence numbers file " << seqfilename); } } + } -#if 0 - // derive baseline names - std::vector<std::string> stationNames = parset.mergedStationNames(); - std::vector<std::string> baselineNames(parset.nrBaselines()); - unsigned nrStations = stationNames.size(); - // order of baselines as station indices: - // 0-0, 1-0, 1-1, 2-0, 2-1, 2-2 ... (see RTCP/CNProc/Correlator.cc) + void MSWriterCorrelated::createMetaData() + { + // Creaate MeasurementSet +#if defined HAVE_AIPSPP + MeasurementSetFormat myFormat(itsParset, 512); - unsigned bl = 0; + myFormat.addSubband(itsMSname, itsSubbandIndex); - for(unsigned s1 = 0; s1 < nrStations; s1++) - for(unsigned s2 = 0; s2 <= s1; s2++) - //bl = s1 * (s1 + 1) / 2 + stat2 ; - baselineNames[bl++] = str(format("%s_%s") % stationNames[s1] % stationNames[s2]); -#endif + LOG_DEBUG_STR(itsLogPrefix << "MeasurementSet created"); +#endif // defined HAVE_AIPSPP } @@ -94,18 +89,6 @@ namespace LOFAR } - void MSWriterCorrelated::makeMeasurementSet(const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex) - { -#if defined HAVE_AIPSPP - MeasurementSetFormat myFormat(parset, 512); - - myFormat.addSubband(msName, subbandIndex); - - LOG_DEBUG_STR(logPrefix << "MeasurementSet created"); -#endif // defined HAVE_AIPSPP - } - - void MSWriterCorrelated::write(StreamableData *data) { CorrelatedData *cdata = dynamic_cast<CorrelatedData*>(data); diff --git a/RTCP/Cobalt/OutputProc/src/MSWriterCorrelated.h b/RTCP/Cobalt/OutputProc/src/MSWriterCorrelated.h index 50167c4af1a3e1d1d5ec27a1625e53c1fc4bccb2..243e60d567c3c5f306f18641ecfd43162693c784 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriterCorrelated.h +++ b/RTCP/Cobalt/OutputProc/src/MSWriterCorrelated.h @@ -43,6 +43,8 @@ namespace LOFAR MSWriterCorrelated(const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex); ~MSWriterCorrelated(); + virtual void createMetaData(); + virtual void write(StreamableData *data); virtual void augment(const FinalMetaData &finalMetaData); @@ -51,11 +53,9 @@ namespace LOFAR const std::string itsLogPrefix; const std::string itsMSname; const Parset &itsParset; + const unsigned itsSubbandIndex; SmartPtr<FileStream> itsSequenceNumbersFile; - - private: - void makeMeasurementSet(const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex); }; diff --git a/RTCP/Cobalt/OutputProc/src/MSWriterDAL.cc b/RTCP/Cobalt/OutputProc/src/MSWriterDAL.cc index 114b14e042b1eb6bb55e3323eee9d1e760d08de5..5c7399f39cdbec7f9769e2b60bdfa5180cf40b7e 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriterDAL.cc +++ b/RTCP/Cobalt/OutputProc/src/MSWriterDAL.cc @@ -83,20 +83,41 @@ namespace LOFAR const Parset &parset, unsigned fileno) : - MSWriterFile(forceextension(string(filename),".raw")), + MSWriterFile(forceextension(string(filename), ".raw")), + itsFilename(filename), itsParset(parset), itsNextSeqNr(0), itsFileNr(fileno) { + itsNrExpectedBlocks = itsParset.settings.nrBlocks(); + + const struct ObservationSettings::BeamFormer::File &f = itsParset.settings.beamFormer.files[itsFileNr]; + + const struct ObservationSettings::BeamFormer::StokesSettings &stokesSet = + f.coherent ? itsParset.settings.beamFormer.coherentSettings + : itsParset.settings.beamFormer.incoherentSettings; + + // All subbands in the SAP that we store in this file. + // We could have multiple SAPs and/or have split up the subbands over multiple files (parts). + unsigned nrSubbands = f.lastSubbandIdx - f.firstSubbandIdx; + + itsNrChannels = stokesSet.nrChannels * nrSubbands; + itsNrSamples = itsParset.settings.blockSize / + stokesSet.nrChannels / stokesSet.timeIntegrationFactor; + + itsBlockSize = itsNrSamples * itsNrChannels; + // Add file-specific processing feedback LTAFeedback fb(itsParset.settings); itsConfiguration.adoptCollection(fb.beamFormedFeedback(itsFileNr)); itsConfigurationPrefix = fb.beamFormedPrefix(itsFileNr); + } - itsNrExpectedBlocks = itsParset.settings.nrBlocks(); - - string h5filename = forceextension(string(filename),".h5"); - string rawfilename = forceextension(string(filename),".raw"); + template <typename T,unsigned DIM> + void MSWriterDAL<T,DIM>::createMetaData() + { + string h5filename = forceextension(itsFilename, ".h5"); + string rawfilename = forceextension(itsFilename, ".raw"); ScopedLock sl(HDF5Mutex); @@ -105,15 +126,15 @@ namespace LOFAR H5Eset_auto_stack(H5E_DEFAULT, my_hdf5_error_handler, NULL); #endif - const struct ObservationSettings::BeamFormer::File &f = parset.settings.beamFormer.files[fileno]; + const struct ObservationSettings::BeamFormer::File &f = itsParset.settings.beamFormer.files[itsFileNr]; const unsigned sapNr = f.sapNr; const unsigned beamNr = f.tabNr; const unsigned stokesNr = f.stokesNr; const struct ObservationSettings::BeamFormer::StokesSettings &stokesSet = - f.coherent ? parset.settings.beamFormer.coherentSettings - : parset.settings.beamFormer.incoherentSettings; + f.coherent ? itsParset.settings.beamFormer.coherentSettings + : itsParset.settings.beamFormer.incoherentSettings; //******************************* @@ -122,12 +143,6 @@ namespace LOFAR unsigned firstSubbandIdx = f.firstSubbandIdx; unsigned nrSubbands = f.lastSubbandIdx - f.firstSubbandIdx; - itsNrChannels = stokesSet.nrChannels * nrSubbands; - itsNrSamples = parset.settings.blockSize / - stokesSet.nrChannels / stokesSet.timeIntegrationFactor; - - itsBlockSize = itsNrSamples * itsNrChannels; - //******************************* vector<string> stokesVars; @@ -164,30 +179,30 @@ namespace LOFAR return; } - LOG_DEBUG_STR("MSWriterDAL: opening " << filename); + LOG_DEBUG_STR("MSWriterDAL: opening " << h5filename); // create the top structure BF_File file(h5filename, BF_File::CREATE); // Common Attributes - writeCommonLofarAttributes(file, parset); + writeCommonLofarAttributes(file, itsParset); // BF_File specific root group parameters - file.createOfflineOnline().value = parset.settings.realTime ? "Online" : "Offline"; + file.createOfflineOnline().value = itsParset.settings.realTime ? "Online" : "Offline"; file.BFFormat().value = "TAB"; file.BFVersion().value = str(format("Cobalt/OutputProc %s r%s using DAL %s and HDF5 %s") % OutputProcVersion::getVersion() % OutputProcVersion::getRevision() % dal::version().to_string() % dal::version_hdf5().to_string()); - file.totalIntegrationTime().value = itsNrExpectedBlocks * parset.settings.blockDuration(); + file.totalIntegrationTime().value = itsNrExpectedBlocks * itsParset.settings.blockDuration(); file.totalIntegrationTimeUnit().value = "s"; //file.subArrayPointingDiameter().value = 0.0; //file.subArrayPointingDiameterUnit().value = "arcmin"; - file.bandwidth().value = parset.settings.subbands.size() * parset.settings.subbandWidth() / 1e6; + file.bandwidth().value = itsParset.settings.subbands.size() * itsParset.settings.subbandWidth() / 1e6; file.bandwidthUnit().value = "MHz"; //file.beamDiameter() .value = 0.0; //file.beamDiameterUnit() .value = "arcmin"; - file.observationNofSubArrayPointings().value = parset.settings.SAPs.size(); + file.observationNofSubArrayPointings().value = itsParset.settings.SAPs.size(); file.nofSubArrayPointings().value = 1; // SysLog group -- empty for now @@ -198,30 +213,30 @@ namespace LOFAR sap.create(); sap.groupType().value = "SubArrayPointing"; - sap.expTimeStartUTC().value = toUTC(parset.settings.startTime); - sap.expTimeStartMJD().value = toMJD(parset.settings.startTime); + sap.expTimeStartUTC().value = toUTC(itsParset.settings.startTime); + sap.expTimeStartMJD().value = toMJD(itsParset.settings.startTime); - sap.expTimeEndUTC().value = toUTC(parset.getRealStopTime()); - sap.expTimeEndMJD().value = toMJD(parset.getRealStopTime()); + sap.expTimeEndUTC().value = toUTC(itsParset.getRealStopTime()); + sap.expTimeEndMJD().value = toMJD(itsParset.getRealStopTime()); - // TODO: fix the system to use the parset.beamDuration(sapNr), but OLAP + // TODO: fix the system to use the itsParset.beamDuration(sapNr), but OLAP // does not work that way yet (beamDuration is currently unsupported). - sap.totalIntegrationTime().value = itsNrExpectedBlocks * parset.settings.blockDuration(); + sap.totalIntegrationTime().value = itsNrExpectedBlocks * itsParset.settings.blockDuration(); sap.totalIntegrationTimeUnit().value = "s"; // TODO: non-J2000 pointings. // Idem for TABs: now we subtract absolute angles to store TAB offsets. Also see TODO below. - if ( parset.settings.SAPs[sapNr].direction.type != "J2000" ) { + if ( itsParset.settings.SAPs[sapNr].direction.type != "J2000" ) { LOG_WARN("HDF5 writer does not record positions of non-J2000 observations yet."); } - const struct ObservationSettings::Direction &beamDir = parset.settings.SAPs[sapNr].direction; + const struct ObservationSettings::Direction &beamDir = itsParset.settings.SAPs[sapNr].direction; sap.pointRA().value = beamDir.angle1 * 180.0 / M_PI; sap.pointRAUnit().value = "deg"; sap.pointDEC().value = beamDir.angle2 * 180.0 / M_PI; sap.pointDECUnit().value = "deg"; - sap.observationNofBeams().value = parset.settings.beamFormer.SAPs[sapNr].TABs.size(); + sap.observationNofBeams().value = itsParset.settings.beamFormer.SAPs[sapNr].TABs.size(); sap.nofBeams().value = 1; BF_ProcessingHistory sapHistory = sap.processHistory(); @@ -229,10 +244,10 @@ namespace LOFAR sapHistory.groupType().value = "ProcessingHistory"; Attribute<string> sapObservationParset(sapHistory, "OBSERVATION_PARSET"); - string parsetAsString; - parset.writeBuffer(parsetAsString); + string itsParsetAsString; + itsParset.writeBuffer(itsParsetAsString); - sapObservationParset.value = parsetAsString; + sapObservationParset.value = itsParsetAsString; // information about the pencil beam @@ -240,20 +255,20 @@ namespace LOFAR beam.create(); beam.groupType().value = "Beam"; - if (parset.settings.beamFormer.doFlysEye) { + if (itsParset.settings.beamFormer.doFlysEye) { beam.nofStations().value = 1; - beam.stationsList().value = vector<string>(1, parset.settings.antennaFields[beamNr].name); + beam.stationsList().value = vector<string>(1, itsParset.settings.antennaFields[beamNr].name); } else { - beam.nofStations().value = parset.settings.antennaFields.size(); - beam.stationsList().value = parset.allStationNames(); + beam.nofStations().value = itsParset.settings.antennaFields.size(); + beam.stationsList().value = itsParset.allStationNames(); } - const vector<string> beamtargets(1, parset.settings.SAPs[sapNr].target); + const vector<string> beamtargets(1, itsParset.settings.SAPs[sapNr].target); beam.targets().value = beamtargets; - beam.tracking().value = parset.settings.SAPs[sapNr].direction.type; + beam.tracking().value = itsParset.settings.SAPs[sapNr].direction.type; - const struct ObservationSettings::Direction &tabDir = parset.settings.beamFormer.SAPs[sapNr].TABs[beamNr].direction; + const struct ObservationSettings::Direction &tabDir = itsParset.settings.beamFormer.SAPs[sapNr].TABs[beamNr].direction; beam.pointRA().value = tabDir.angle1 * 180.0 / M_PI; beam.pointRAUnit().value = "deg"; beam.pointDEC().value = tabDir.angle2 * 180.0 / M_PI; @@ -264,7 +279,7 @@ namespace LOFAR beam.pointOffsetDECUnit().value = "deg"; - beam.subbandWidth().value = parset.settings.subbandWidth(); + beam.subbandWidth().value = itsParset.settings.subbandWidth(); beam.subbandWidthUnit().value = "Hz"; beam.beamDiameterRA().value = 0; @@ -273,20 +288,20 @@ namespace LOFAR beam.beamDiameterDECUnit().value = "arcmin"; beam.nofSamples().value = itsNrSamples * itsNrExpectedBlocks; - beam.samplingRate().value = parset.settings.subbandWidth() / stokesSet.nrChannels / stokesSet.timeIntegrationFactor; + beam.samplingRate().value = itsParset.settings.subbandWidth() / stokesSet.nrChannels / stokesSet.timeIntegrationFactor; beam.samplingRateUnit().value = "Hz"; - beam.samplingTime().value = parset.settings.sampleDuration() * stokesSet.nrChannels * stokesSet.timeIntegrationFactor; + beam.samplingTime().value = itsParset.settings.sampleDuration() * stokesSet.nrChannels * stokesSet.timeIntegrationFactor; beam.samplingTimeUnit().value = "s"; beam.channelsPerSubband().value = stokesSet.nrChannels; - const double channelBandwidth = parset.settings.subbandWidth() / stokesSet.nrChannels; + const double channelBandwidth = itsParset.settings.subbandWidth() / stokesSet.nrChannels; beam.channelWidth().value = channelBandwidth; beam.channelWidthUnit().value = "Hz"; // First, init tmp vector for the whole obs, regardless which SAP and subbands (parts) this file contains. - vector<double> subbandCenterFrequencies(parset.settings.subbands.size()); + vector<double> subbandCenterFrequencies(itsParset.settings.subbands.size()); for (unsigned sb = 0; sb < subbandCenterFrequencies.size(); ++sb) { - subbandCenterFrequencies[sb] = parset.settings.subbands[sb].centralFrequency; + subbandCenterFrequencies[sb] = itsParset.settings.subbands[sb].centralFrequency; } vector<double> beamCenterFrequencies(nrSubbands, 0.0); @@ -296,12 +311,12 @@ namespace LOFAR const double beamCenterFrequencySum = accumulate(beamCenterFrequencies.begin(), beamCenterFrequencies.end(), 0.0); const double frequencyOffsetPPF = stokesSet.nrChannels > 1 ? // See getFrequencyOffsetPPF() for why/how. - getFrequencyOffsetPPF(parset.settings.subbandWidth(), stokesSet.nrChannels) : + getFrequencyOffsetPPF(itsParset.settings.subbandWidth(), stokesSet.nrChannels) : 0.0; beam.beamFrequencyCenter().value = (beamCenterFrequencySum / nrSubbands - frequencyOffsetPPF) / 1e6; beam.beamFrequencyCenterUnit().value = "MHz"; - const double DM = parset.settings.corrections.dedisperse ? parset.settings.beamFormer.SAPs[sapNr].TABs[beamNr].dispersionMeasure : 0.0; + const double DM = itsParset.settings.corrections.dedisperse ? itsParset.settings.beamFormer.SAPs[sapNr].TABs[beamNr].dispersionMeasure : 0.0; beam.foldedData().value = false; beam.foldPeriod().value = 0.0; @@ -331,17 +346,17 @@ namespace LOFAR Attribute<string> beamObservationParset(beamHistory, "OBSERVATION_PARSET"); - beamObservationParset.value = parsetAsString; + beamObservationParset.value = itsParsetAsString; CoordinatesGroup coordinates = beam.coordinates(); coordinates.create(); coordinates.groupType().value = "Coordinates"; - coordinates.refLocationValue().value = parset.settings.delayCompensation.referencePhaseCenter; + coordinates.refLocationValue().value = itsParset.settings.delayCompensation.referencePhaseCenter; coordinates.refLocationUnit().value = vector<string>(3, "m"); coordinates.refLocationFrame().value = "ITRF"; - coordinates.refTimeValue().value = toMJD(parset.settings.startTime); + coordinates.refTimeValue().value = toMJD(itsParset.settings.startTime); coordinates.refTimeUnit().value = "d"; coordinates.refTimeFrame().value = "MJD"; @@ -373,7 +388,7 @@ namespace LOFAR timeCoordinate.get()->referenceValue().value = 0; timeCoordinate.get()->referencePixel().value = 0; - timeCoordinate.get()->increment().value = parset.sampleDuration() * stokesSet.nrChannels * stokesSet.timeIntegrationFactor; + timeCoordinate.get()->increment().value = itsParset.sampleDuration() * stokesSet.nrChannels * stokesSet.timeIntegrationFactor; timeCoordinate.get()->pc().value = unitvector; timeCoordinate.get()->axisValuesPixel().value = vector<unsigned>(1, 0); // not used @@ -402,7 +417,7 @@ namespace LOFAR vector<double> spectralWorld; for (unsigned sb = 0; sb < nrSubbands; sb++) { - const double subbandBeginFreq = parset.channel0Frequency( firstSubbandIdx + sb, stokesSet.nrChannels ); + const double subbandBeginFreq = itsParset.channel0Frequency( firstSubbandIdx + sb, stokesSet.nrChannels ); // NOTE: channel 0 will be wrongly annotated if nrChannels > 1, because it is a combination of the // highest and the lowest frequencies (half a channel each). diff --git a/RTCP/Cobalt/OutputProc/src/MSWriterDAL.h b/RTCP/Cobalt/OutputProc/src/MSWriterDAL.h index 4e9108dea2086377a3a7df5ccb68354ab9820f95..dff266f738c99dd4d2b4c5c14efe8a7e2b3e1cd8 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriterDAL.h +++ b/RTCP/Cobalt/OutputProc/src/MSWriterDAL.h @@ -44,13 +44,18 @@ namespace LOFAR public: MSWriterDAL(const std::string &filename, const Parset &parset, unsigned fileno); ~MSWriterDAL(); + + virtual void createMetaData(); + virtual void write(StreamableData *data); + private: + const std::string itsFilename; const Parset &itsParset; unsigned itsNrChannels; unsigned itsNrSamples; unsigned itsNextSeqNr; - unsigned itsFileNr; + const unsigned itsFileNr; unsigned itsBlockSize; // the size of StreamableData::samples, in T }; } diff --git a/RTCP/Cobalt/OutputProc/src/MSWriterFile.cc b/RTCP/Cobalt/OutputProc/src/MSWriterFile.cc index f509abc2b0bb175767096fc95b2cd6429cc9ca5f..ebbea6c74b6ba4794414fd889a32fcd6ecb7a3c1 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriterFile.cc +++ b/RTCP/Cobalt/OutputProc/src/MSWriterFile.cc @@ -21,19 +21,64 @@ #include <lofar_config.h> #include "MSWriterFile.h" +#include <Common/Thread/Mutex.h> +#include <Common/SystemUtil.h> #include <sys/types.h> +#include <sys/stat.h> #include <fcntl.h> +#include <cerrno> +#include <boost/algorithm/string.hpp> namespace LOFAR { namespace Cobalt { + static Mutex makeDirMutex; + + static void makeDir(const string &dirname, const string &logPrefix) + { + ScopedLock scopedLock(makeDirMutex); + struct stat s; + + if (stat(dirname.c_str(), &s) == 0) { + // path already exists + if ((s.st_mode & S_IFMT) != S_IFDIR) { + LOG_WARN_STR(logPrefix << "Not a directory: " << dirname); + } + } else if (errno == ENOENT) { + // create directory + LOG_DEBUG_STR(logPrefix << "Creating directory " << dirname); + + if (mkdir(dirname.c_str(), 0777) != 0 && errno != EEXIST) { + THROW_SYSCALL(string("mkdir ") + dirname); + } + } else { + // something else went wrong + THROW_SYSCALL(string("stat ") + dirname); + } + } + + /* create a directory as well as all its parent directories */ + static void recursiveMakeDir(const string &dirname, const string &logPrefix) + { + using namespace boost; + + string curdir; + vector<string> splitName; + + boost::split(splitName, dirname, boost::is_any_of("/")); + + for (unsigned i = 0; i < splitName.size(); i++) { + curdir += splitName[i] + '/'; + makeDir(curdir, logPrefix); + } + } MSWriterFile::MSWriterFile (const std::string &msName) : - itsFile(msName, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) + itsFile((recursiveMakeDir(dirname(msName), ""), msName), O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) { } diff --git a/RTCP/Cobalt/OutputProc/src/MSWriterFile.h b/RTCP/Cobalt/OutputProc/src/MSWriterFile.h index 08a37224bb6cc363938a88201020a2c5d3c301be..9bbaba94bd4f776ca16aab26445c0abdaf0df396 100644 --- a/RTCP/Cobalt/OutputProc/src/MSWriterFile.h +++ b/RTCP/Cobalt/OutputProc/src/MSWriterFile.h @@ -36,6 +36,11 @@ namespace LOFAR class MSWriterFile : public MSWriter { public: + /* + * Write data to the provided file name. + * + * Any parent directories are automatically created. + */ MSWriterFile(const std::string &msName); ~MSWriterFile(); diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.cc b/RTCP/Cobalt/OutputProc/src/OutputThread.cc index 88a761c4bc89864e6b7ae49576e3a80639be87a1..6fb2e8230a66e9332f2b720b5a8e998053e388fd 100644 --- a/RTCP/Cobalt/OutputProc/src/OutputThread.cc +++ b/RTCP/Cobalt/OutputProc/src/OutputThread.cc @@ -23,17 +23,11 @@ #include "OutputThread.h" -#include <cerrno> -#include <ctime> -#include <sys/types.h> -#include <sys/stat.h> #include <unistd.h> #include <iomanip> #include <boost/lexical_cast.hpp> #include <boost/format.hpp> -#include <boost/algorithm/string.hpp> -#include <Common/StringUtil.h> #include <Common/SystemCallException.h> #include <Common/Thread/Mutex.h> #include <Common/Thread/Cancellation.h> @@ -57,53 +51,11 @@ namespace LOFAR namespace Cobalt { - static Mutex makeDirMutex; static Mutex casacoreMutex; using namespace std; using boost::lexical_cast; - static void makeDir(const string &dirname, const string &logPrefix) - { - ScopedLock scopedLock(makeDirMutex); - struct stat s; - - if (stat(dirname.c_str(), &s) == 0) { - // path already exists - if ((s.st_mode & S_IFMT) != S_IFDIR) { - LOG_WARN_STR(logPrefix << "Not a directory: " << dirname); - } - } else if (errno == ENOENT) { - // create directory - LOG_DEBUG_STR(logPrefix << "Creating directory " << dirname); - - if (mkdir(dirname.c_str(), 0777) != 0 && errno != EEXIST) { - THROW_SYSCALL(string("mkdir ") + dirname); - } - } else { - // something else went wrong - THROW_SYSCALL(string("stat ") + dirname); - } - } - - - /* create a directory as well as all its parent directories */ - static void recursiveMakeDir(const string &dirname, const string &logPrefix) - { - using namespace boost; - - string curdir; - vector<string> splitName; - - boost::split(splitName, dirname, boost::is_any_of("/")); - - for (unsigned i = 0; i < splitName.size(); i++) { - curdir += splitName[i] + '/'; - makeDir(curdir, logPrefix); - } - } - - template<typename T> OutputThread<T>::OutputThread(const Parset &parset, unsigned streamNr, Pool<T> &outputPool, RTmetadata &mdLogger, const std::string &mdKeyPrefix, @@ -209,6 +161,19 @@ namespace LOFAR } + template<typename T> void OutputThread<T>::createMetaData() + { + try { + // augment the data product + ASSERT(itsWriter.get()); + + itsWriter->createMetaData(); + } catch (Exception &ex) { + LOG_ERROR_STR(itsLogPrefix << "Could not create meta data: " << ex); + } + } + + template<typename T> void OutputThread<T>::augment( const FinalMetaData &finalMetaData ) { try { @@ -241,8 +206,18 @@ namespace LOFAR LOG_DEBUG_STR(itsLogPrefix << "process() entered"); createMS(); - doWork(); - cleanUp(); + +# pragma omp parallel sections num_threads(2) + { +# pragma omp section + { + doWork(); + cleanUp(); + } + +# pragma omp section + createMetaData(); + } } // Make required instantiations @@ -282,7 +257,6 @@ namespace LOFAR try { - recursiveMakeDir(directoryName, itsLogPrefix); LOG_INFO_STR(itsLogPrefix << "Writing to " << path); itsWriter = new MSWriterCorrelated(itsLogPrefix, path, itsParset, itsStreamNr); @@ -347,7 +321,6 @@ namespace LOFAR try { - recursiveMakeDir(directoryName, itsLogPrefix); LOG_INFO_STR(itsLogPrefix << "Writing to " << path); #ifdef HAVE_DAL diff --git a/RTCP/Cobalt/OutputProc/src/OutputThread.h b/RTCP/Cobalt/OutputProc/src/OutputThread.h index a99bc244f87fe39058a14104f2751acf218a3ccd..717980740873c4ff48fec72536e82fcc1ca2582a 100644 --- a/RTCP/Cobalt/OutputProc/src/OutputThread.h +++ b/RTCP/Cobalt/OutputProc/src/OutputThread.h @@ -69,6 +69,9 @@ namespace LOFAR // Wrap-up the writing. void cleanUp() const; + // Create the metadata files in parallel to writing the data + void createMetaData(); + // Add FinalMetaData to the data container. void augment(const FinalMetaData &finalMetaData);