Skip to content
Snippets Groups Projects
Commit ff488e53 authored by Jan David Mol's avatar Jan David Mol
Browse files

Task #9052: Create measurementsets in parallel to writing data, for faster startup

parent 4af24511
No related branches found
No related tags found
No related merge requests found
...@@ -42,6 +42,10 @@ namespace LOFAR ...@@ -42,6 +42,10 @@ namespace LOFAR
{ {
} }
void MSWriter::createMetaData()
{
}
void MSWriter::augment(const FinalMetaData &finalMetaData) void MSWriter::augment(const FinalMetaData &finalMetaData)
{ {
(void)finalMetaData; (void)finalMetaData;
......
...@@ -39,6 +39,8 @@ namespace LOFAR ...@@ -39,6 +39,8 @@ namespace LOFAR
virtual void write(StreamableData *) = 0; virtual void write(StreamableData *) = 0;
virtual void createMetaData();
virtual void augment(const FinalMetaData &finalMetaData); virtual void augment(const FinalMetaData &finalMetaData);
virtual size_t getDataSize(); virtual size_t getDataSize();
......
...@@ -27,7 +27,6 @@ ...@@ -27,7 +27,6 @@
#include <boost/format.hpp> #include <boost/format.hpp>
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <Common/SystemUtil.h>
#include <MSLofar/FailedTileInfo.h> #include <MSLofar/FailedTileInfo.h>
#include <CoInterface/CorrelatedData.h> #include <CoInterface/CorrelatedData.h>
#include <CoInterface/LTAFeedback.h> #include <CoInterface/LTAFeedback.h>
...@@ -48,18 +47,18 @@ namespace LOFAR ...@@ -48,18 +47,18 @@ namespace LOFAR
MSWriterCorrelated::MSWriterCorrelated (const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex) MSWriterCorrelated::MSWriterCorrelated (const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex)
: :
MSWriterFile( MSWriterFile(str(format("%s/table.f0data") % msName)),
(makeMeasurementSet(logPrefix, msName, parset, subbandIndex),
str(format("%s/table.f0data") % msName))),
itsLogPrefix(logPrefix), itsLogPrefix(logPrefix),
itsMSname(msName), itsMSname(msName),
itsParset(parset) itsParset(parset),
itsSubbandIndex(subbandIndex)
{ {
// Add file-specific processing feedback // Add file-specific processing feedback
LTAFeedback fb(itsParset.settings); LTAFeedback fb(itsParset.settings);
itsConfiguration.adoptCollection(fb.correlatedFeedback(subbandIndex)); itsConfiguration.adoptCollection(fb.correlatedFeedback(itsSubbandIndex));
itsConfigurationPrefix = fb.correlatedPrefix(subbandIndex); itsConfigurationPrefix = fb.correlatedPrefix(itsSubbandIndex);
// Create Sequence file
if (LofarStManVersion > 1) { if (LofarStManVersion > 1) {
string seqfilename = str(format("%s/table.f0seqnr") % msName); string seqfilename = str(format("%s/table.f0seqnr") % msName);
...@@ -69,23 +68,19 @@ namespace LOFAR ...@@ -69,23 +68,19 @@ namespace LOFAR
LOG_WARN_STR(itsLogPrefix << "Could not open sequence numbers file " << seqfilename); LOG_WARN_STR(itsLogPrefix << "Could not open sequence numbers file " << seqfilename);
} }
} }
}
#if 0
// derive baseline names
std::vector<std::string> stationNames = parset.mergedStationNames();
std::vector<std::string> baselineNames(parset.nrBaselines());
unsigned nrStations = stationNames.size();
// order of baselines as station indices: void MSWriterCorrelated::createMetaData()
// 0-0, 1-0, 1-1, 2-0, 2-1, 2-2 ... (see RTCP/CNProc/Correlator.cc) {
// Creaate MeasurementSet
#if defined HAVE_AIPSPP
MeasurementSetFormat myFormat(itsParset, 512);
unsigned bl = 0; myFormat.addSubband(itsMSname, itsSubbandIndex);
for(unsigned s1 = 0; s1 < nrStations; s1++) LOG_DEBUG_STR(itsLogPrefix << "MeasurementSet created");
for(unsigned s2 = 0; s2 <= s1; s2++) #endif // defined HAVE_AIPSPP
//bl = s1 * (s1 + 1) / 2 + stat2 ;
baselineNames[bl++] = str(format("%s_%s") % stationNames[s1] % stationNames[s2]);
#endif
} }
...@@ -94,18 +89,6 @@ namespace LOFAR ...@@ -94,18 +89,6 @@ namespace LOFAR
} }
void MSWriterCorrelated::makeMeasurementSet(const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex)
{
#if defined HAVE_AIPSPP
MeasurementSetFormat myFormat(parset, 512);
myFormat.addSubband(msName, subbandIndex);
LOG_DEBUG_STR(logPrefix << "MeasurementSet created");
#endif // defined HAVE_AIPSPP
}
void MSWriterCorrelated::write(StreamableData *data) void MSWriterCorrelated::write(StreamableData *data)
{ {
CorrelatedData *cdata = dynamic_cast<CorrelatedData*>(data); CorrelatedData *cdata = dynamic_cast<CorrelatedData*>(data);
......
...@@ -43,6 +43,8 @@ namespace LOFAR ...@@ -43,6 +43,8 @@ namespace LOFAR
MSWriterCorrelated(const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex); MSWriterCorrelated(const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex);
~MSWriterCorrelated(); ~MSWriterCorrelated();
virtual void createMetaData();
virtual void write(StreamableData *data); virtual void write(StreamableData *data);
virtual void augment(const FinalMetaData &finalMetaData); virtual void augment(const FinalMetaData &finalMetaData);
...@@ -51,11 +53,9 @@ namespace LOFAR ...@@ -51,11 +53,9 @@ namespace LOFAR
const std::string itsLogPrefix; const std::string itsLogPrefix;
const std::string itsMSname; const std::string itsMSname;
const Parset &itsParset; const Parset &itsParset;
const unsigned itsSubbandIndex;
SmartPtr<FileStream> itsSequenceNumbersFile; SmartPtr<FileStream> itsSequenceNumbersFile;
private:
void makeMeasurementSet(const std::string &logPrefix, const std::string &msName, const Parset &parset, unsigned subbandIndex);
}; };
......
...@@ -83,20 +83,41 @@ namespace LOFAR ...@@ -83,20 +83,41 @@ namespace LOFAR
const Parset &parset, const Parset &parset,
unsigned fileno) unsigned fileno)
: :
MSWriterFile(forceextension(string(filename),".raw")), MSWriterFile(forceextension(string(filename), ".raw")),
itsFilename(filename),
itsParset(parset), itsParset(parset),
itsNextSeqNr(0), itsNextSeqNr(0),
itsFileNr(fileno) itsFileNr(fileno)
{ {
itsNrExpectedBlocks = itsParset.settings.nrBlocks();
const struct ObservationSettings::BeamFormer::File &f = itsParset.settings.beamFormer.files[itsFileNr];
const struct ObservationSettings::BeamFormer::StokesSettings &stokesSet =
f.coherent ? itsParset.settings.beamFormer.coherentSettings
: itsParset.settings.beamFormer.incoherentSettings;
// All subbands in the SAP that we store in this file.
// We could have multiple SAPs and/or have split up the subbands over multiple files (parts).
unsigned nrSubbands = f.lastSubbandIdx - f.firstSubbandIdx;
itsNrChannels = stokesSet.nrChannels * nrSubbands;
itsNrSamples = itsParset.settings.blockSize /
stokesSet.nrChannels / stokesSet.timeIntegrationFactor;
itsBlockSize = itsNrSamples * itsNrChannels;
// Add file-specific processing feedback // Add file-specific processing feedback
LTAFeedback fb(itsParset.settings); LTAFeedback fb(itsParset.settings);
itsConfiguration.adoptCollection(fb.beamFormedFeedback(itsFileNr)); itsConfiguration.adoptCollection(fb.beamFormedFeedback(itsFileNr));
itsConfigurationPrefix = fb.beamFormedPrefix(itsFileNr); itsConfigurationPrefix = fb.beamFormedPrefix(itsFileNr);
}
itsNrExpectedBlocks = itsParset.settings.nrBlocks(); template <typename T,unsigned DIM>
void MSWriterDAL<T,DIM>::createMetaData()
string h5filename = forceextension(string(filename),".h5"); {
string rawfilename = forceextension(string(filename),".raw"); string h5filename = forceextension(itsFilename, ".h5");
string rawfilename = forceextension(itsFilename, ".raw");
ScopedLock sl(HDF5Mutex); ScopedLock sl(HDF5Mutex);
...@@ -105,15 +126,15 @@ namespace LOFAR ...@@ -105,15 +126,15 @@ namespace LOFAR
H5Eset_auto_stack(H5E_DEFAULT, my_hdf5_error_handler, NULL); H5Eset_auto_stack(H5E_DEFAULT, my_hdf5_error_handler, NULL);
#endif #endif
const struct ObservationSettings::BeamFormer::File &f = parset.settings.beamFormer.files[fileno]; const struct ObservationSettings::BeamFormer::File &f = itsParset.settings.beamFormer.files[itsFileNr];
const unsigned sapNr = f.sapNr; const unsigned sapNr = f.sapNr;
const unsigned beamNr = f.tabNr; const unsigned beamNr = f.tabNr;
const unsigned stokesNr = f.stokesNr; const unsigned stokesNr = f.stokesNr;
const struct ObservationSettings::BeamFormer::StokesSettings &stokesSet = const struct ObservationSettings::BeamFormer::StokesSettings &stokesSet =
f.coherent ? parset.settings.beamFormer.coherentSettings f.coherent ? itsParset.settings.beamFormer.coherentSettings
: parset.settings.beamFormer.incoherentSettings; : itsParset.settings.beamFormer.incoherentSettings;
//******************************* //*******************************
...@@ -122,12 +143,6 @@ namespace LOFAR ...@@ -122,12 +143,6 @@ namespace LOFAR
unsigned firstSubbandIdx = f.firstSubbandIdx; unsigned firstSubbandIdx = f.firstSubbandIdx;
unsigned nrSubbands = f.lastSubbandIdx - f.firstSubbandIdx; unsigned nrSubbands = f.lastSubbandIdx - f.firstSubbandIdx;
itsNrChannels = stokesSet.nrChannels * nrSubbands;
itsNrSamples = parset.settings.blockSize /
stokesSet.nrChannels / stokesSet.timeIntegrationFactor;
itsBlockSize = itsNrSamples * itsNrChannels;
//******************************* //*******************************
vector<string> stokesVars; vector<string> stokesVars;
...@@ -164,30 +179,30 @@ namespace LOFAR ...@@ -164,30 +179,30 @@ namespace LOFAR
return; return;
} }
LOG_DEBUG_STR("MSWriterDAL: opening " << filename); LOG_DEBUG_STR("MSWriterDAL: opening " << h5filename);
// create the top structure // create the top structure
BF_File file(h5filename, BF_File::CREATE); BF_File file(h5filename, BF_File::CREATE);
// Common Attributes // Common Attributes
writeCommonLofarAttributes(file, parset); writeCommonLofarAttributes(file, itsParset);
// BF_File specific root group parameters // BF_File specific root group parameters
file.createOfflineOnline().value = parset.settings.realTime ? "Online" : "Offline"; file.createOfflineOnline().value = itsParset.settings.realTime ? "Online" : "Offline";
file.BFFormat().value = "TAB"; file.BFFormat().value = "TAB";
file.BFVersion().value = str(format("Cobalt/OutputProc %s r%s using DAL %s and HDF5 %s") % OutputProcVersion::getVersion() % OutputProcVersion::getRevision() % dal::version().to_string() % dal::version_hdf5().to_string()); file.BFVersion().value = str(format("Cobalt/OutputProc %s r%s using DAL %s and HDF5 %s") % OutputProcVersion::getVersion() % OutputProcVersion::getRevision() % dal::version().to_string() % dal::version_hdf5().to_string());
file.totalIntegrationTime().value = itsNrExpectedBlocks * parset.settings.blockDuration(); file.totalIntegrationTime().value = itsNrExpectedBlocks * itsParset.settings.blockDuration();
file.totalIntegrationTimeUnit().value = "s"; file.totalIntegrationTimeUnit().value = "s";
//file.subArrayPointingDiameter().value = 0.0; //file.subArrayPointingDiameter().value = 0.0;
//file.subArrayPointingDiameterUnit().value = "arcmin"; //file.subArrayPointingDiameterUnit().value = "arcmin";
file.bandwidth().value = parset.settings.subbands.size() * parset.settings.subbandWidth() / 1e6; file.bandwidth().value = itsParset.settings.subbands.size() * itsParset.settings.subbandWidth() / 1e6;
file.bandwidthUnit().value = "MHz"; file.bandwidthUnit().value = "MHz";
//file.beamDiameter() .value = 0.0; //file.beamDiameter() .value = 0.0;
//file.beamDiameterUnit() .value = "arcmin"; //file.beamDiameterUnit() .value = "arcmin";
file.observationNofSubArrayPointings().value = parset.settings.SAPs.size(); file.observationNofSubArrayPointings().value = itsParset.settings.SAPs.size();
file.nofSubArrayPointings().value = 1; file.nofSubArrayPointings().value = 1;
// SysLog group -- empty for now // SysLog group -- empty for now
...@@ -198,30 +213,30 @@ namespace LOFAR ...@@ -198,30 +213,30 @@ namespace LOFAR
sap.create(); sap.create();
sap.groupType().value = "SubArrayPointing"; sap.groupType().value = "SubArrayPointing";
sap.expTimeStartUTC().value = toUTC(parset.settings.startTime); sap.expTimeStartUTC().value = toUTC(itsParset.settings.startTime);
sap.expTimeStartMJD().value = toMJD(parset.settings.startTime); sap.expTimeStartMJD().value = toMJD(itsParset.settings.startTime);
sap.expTimeEndUTC().value = toUTC(parset.getRealStopTime()); sap.expTimeEndUTC().value = toUTC(itsParset.getRealStopTime());
sap.expTimeEndMJD().value = toMJD(parset.getRealStopTime()); sap.expTimeEndMJD().value = toMJD(itsParset.getRealStopTime());
// TODO: fix the system to use the parset.beamDuration(sapNr), but OLAP // TODO: fix the system to use the itsParset.beamDuration(sapNr), but OLAP
// does not work that way yet (beamDuration is currently unsupported). // does not work that way yet (beamDuration is currently unsupported).
sap.totalIntegrationTime().value = itsNrExpectedBlocks * parset.settings.blockDuration(); sap.totalIntegrationTime().value = itsNrExpectedBlocks * itsParset.settings.blockDuration();
sap.totalIntegrationTimeUnit().value = "s"; sap.totalIntegrationTimeUnit().value = "s";
// TODO: non-J2000 pointings. // TODO: non-J2000 pointings.
// Idem for TABs: now we subtract absolute angles to store TAB offsets. Also see TODO below. // Idem for TABs: now we subtract absolute angles to store TAB offsets. Also see TODO below.
if ( parset.settings.SAPs[sapNr].direction.type != "J2000" ) { if ( itsParset.settings.SAPs[sapNr].direction.type != "J2000" ) {
LOG_WARN("HDF5 writer does not record positions of non-J2000 observations yet."); LOG_WARN("HDF5 writer does not record positions of non-J2000 observations yet.");
} }
const struct ObservationSettings::Direction &beamDir = parset.settings.SAPs[sapNr].direction; const struct ObservationSettings::Direction &beamDir = itsParset.settings.SAPs[sapNr].direction;
sap.pointRA().value = beamDir.angle1 * 180.0 / M_PI; sap.pointRA().value = beamDir.angle1 * 180.0 / M_PI;
sap.pointRAUnit().value = "deg"; sap.pointRAUnit().value = "deg";
sap.pointDEC().value = beamDir.angle2 * 180.0 / M_PI; sap.pointDEC().value = beamDir.angle2 * 180.0 / M_PI;
sap.pointDECUnit().value = "deg"; sap.pointDECUnit().value = "deg";
sap.observationNofBeams().value = parset.settings.beamFormer.SAPs[sapNr].TABs.size(); sap.observationNofBeams().value = itsParset.settings.beamFormer.SAPs[sapNr].TABs.size();
sap.nofBeams().value = 1; sap.nofBeams().value = 1;
BF_ProcessingHistory sapHistory = sap.processHistory(); BF_ProcessingHistory sapHistory = sap.processHistory();
...@@ -229,10 +244,10 @@ namespace LOFAR ...@@ -229,10 +244,10 @@ namespace LOFAR
sapHistory.groupType().value = "ProcessingHistory"; sapHistory.groupType().value = "ProcessingHistory";
Attribute<string> sapObservationParset(sapHistory, "OBSERVATION_PARSET"); Attribute<string> sapObservationParset(sapHistory, "OBSERVATION_PARSET");
string parsetAsString; string itsParsetAsString;
parset.writeBuffer(parsetAsString); itsParset.writeBuffer(itsParsetAsString);
sapObservationParset.value = parsetAsString; sapObservationParset.value = itsParsetAsString;
// information about the pencil beam // information about the pencil beam
...@@ -240,20 +255,20 @@ namespace LOFAR ...@@ -240,20 +255,20 @@ namespace LOFAR
beam.create(); beam.create();
beam.groupType().value = "Beam"; beam.groupType().value = "Beam";
if (parset.settings.beamFormer.doFlysEye) { if (itsParset.settings.beamFormer.doFlysEye) {
beam.nofStations().value = 1; beam.nofStations().value = 1;
beam.stationsList().value = vector<string>(1, parset.settings.antennaFields[beamNr].name); beam.stationsList().value = vector<string>(1, itsParset.settings.antennaFields[beamNr].name);
} else { } else {
beam.nofStations().value = parset.settings.antennaFields.size(); beam.nofStations().value = itsParset.settings.antennaFields.size();
beam.stationsList().value = parset.allStationNames(); beam.stationsList().value = itsParset.allStationNames();
} }
const vector<string> beamtargets(1, parset.settings.SAPs[sapNr].target); const vector<string> beamtargets(1, itsParset.settings.SAPs[sapNr].target);
beam.targets().value = beamtargets; beam.targets().value = beamtargets;
beam.tracking().value = parset.settings.SAPs[sapNr].direction.type; beam.tracking().value = itsParset.settings.SAPs[sapNr].direction.type;
const struct ObservationSettings::Direction &tabDir = parset.settings.beamFormer.SAPs[sapNr].TABs[beamNr].direction; const struct ObservationSettings::Direction &tabDir = itsParset.settings.beamFormer.SAPs[sapNr].TABs[beamNr].direction;
beam.pointRA().value = tabDir.angle1 * 180.0 / M_PI; beam.pointRA().value = tabDir.angle1 * 180.0 / M_PI;
beam.pointRAUnit().value = "deg"; beam.pointRAUnit().value = "deg";
beam.pointDEC().value = tabDir.angle2 * 180.0 / M_PI; beam.pointDEC().value = tabDir.angle2 * 180.0 / M_PI;
...@@ -264,7 +279,7 @@ namespace LOFAR ...@@ -264,7 +279,7 @@ namespace LOFAR
beam.pointOffsetDECUnit().value = "deg"; beam.pointOffsetDECUnit().value = "deg";
beam.subbandWidth().value = parset.settings.subbandWidth(); beam.subbandWidth().value = itsParset.settings.subbandWidth();
beam.subbandWidthUnit().value = "Hz"; beam.subbandWidthUnit().value = "Hz";
beam.beamDiameterRA().value = 0; beam.beamDiameterRA().value = 0;
...@@ -273,20 +288,20 @@ namespace LOFAR ...@@ -273,20 +288,20 @@ namespace LOFAR
beam.beamDiameterDECUnit().value = "arcmin"; beam.beamDiameterDECUnit().value = "arcmin";
beam.nofSamples().value = itsNrSamples * itsNrExpectedBlocks; beam.nofSamples().value = itsNrSamples * itsNrExpectedBlocks;
beam.samplingRate().value = parset.settings.subbandWidth() / stokesSet.nrChannels / stokesSet.timeIntegrationFactor; beam.samplingRate().value = itsParset.settings.subbandWidth() / stokesSet.nrChannels / stokesSet.timeIntegrationFactor;
beam.samplingRateUnit().value = "Hz"; beam.samplingRateUnit().value = "Hz";
beam.samplingTime().value = parset.settings.sampleDuration() * stokesSet.nrChannels * stokesSet.timeIntegrationFactor; beam.samplingTime().value = itsParset.settings.sampleDuration() * stokesSet.nrChannels * stokesSet.timeIntegrationFactor;
beam.samplingTimeUnit().value = "s"; beam.samplingTimeUnit().value = "s";
beam.channelsPerSubband().value = stokesSet.nrChannels; beam.channelsPerSubband().value = stokesSet.nrChannels;
const double channelBandwidth = parset.settings.subbandWidth() / stokesSet.nrChannels; const double channelBandwidth = itsParset.settings.subbandWidth() / stokesSet.nrChannels;
beam.channelWidth().value = channelBandwidth; beam.channelWidth().value = channelBandwidth;
beam.channelWidthUnit().value = "Hz"; beam.channelWidthUnit().value = "Hz";
// First, init tmp vector for the whole obs, regardless which SAP and subbands (parts) this file contains. // First, init tmp vector for the whole obs, regardless which SAP and subbands (parts) this file contains.
vector<double> subbandCenterFrequencies(parset.settings.subbands.size()); vector<double> subbandCenterFrequencies(itsParset.settings.subbands.size());
for (unsigned sb = 0; sb < subbandCenterFrequencies.size(); ++sb) { for (unsigned sb = 0; sb < subbandCenterFrequencies.size(); ++sb) {
subbandCenterFrequencies[sb] = parset.settings.subbands[sb].centralFrequency; subbandCenterFrequencies[sb] = itsParset.settings.subbands[sb].centralFrequency;
} }
vector<double> beamCenterFrequencies(nrSubbands, 0.0); vector<double> beamCenterFrequencies(nrSubbands, 0.0);
...@@ -296,12 +311,12 @@ namespace LOFAR ...@@ -296,12 +311,12 @@ namespace LOFAR
const double beamCenterFrequencySum = accumulate(beamCenterFrequencies.begin(), beamCenterFrequencies.end(), 0.0); const double beamCenterFrequencySum = accumulate(beamCenterFrequencies.begin(), beamCenterFrequencies.end(), 0.0);
const double frequencyOffsetPPF = stokesSet.nrChannels > 1 ? // See getFrequencyOffsetPPF() for why/how. const double frequencyOffsetPPF = stokesSet.nrChannels > 1 ? // See getFrequencyOffsetPPF() for why/how.
getFrequencyOffsetPPF(parset.settings.subbandWidth(), stokesSet.nrChannels) : getFrequencyOffsetPPF(itsParset.settings.subbandWidth(), stokesSet.nrChannels) :
0.0; 0.0;
beam.beamFrequencyCenter().value = (beamCenterFrequencySum / nrSubbands - frequencyOffsetPPF) / 1e6; beam.beamFrequencyCenter().value = (beamCenterFrequencySum / nrSubbands - frequencyOffsetPPF) / 1e6;
beam.beamFrequencyCenterUnit().value = "MHz"; beam.beamFrequencyCenterUnit().value = "MHz";
const double DM = parset.settings.corrections.dedisperse ? parset.settings.beamFormer.SAPs[sapNr].TABs[beamNr].dispersionMeasure : 0.0; const double DM = itsParset.settings.corrections.dedisperse ? itsParset.settings.beamFormer.SAPs[sapNr].TABs[beamNr].dispersionMeasure : 0.0;
beam.foldedData().value = false; beam.foldedData().value = false;
beam.foldPeriod().value = 0.0; beam.foldPeriod().value = 0.0;
...@@ -331,17 +346,17 @@ namespace LOFAR ...@@ -331,17 +346,17 @@ namespace LOFAR
Attribute<string> beamObservationParset(beamHistory, "OBSERVATION_PARSET"); Attribute<string> beamObservationParset(beamHistory, "OBSERVATION_PARSET");
beamObservationParset.value = parsetAsString; beamObservationParset.value = itsParsetAsString;
CoordinatesGroup coordinates = beam.coordinates(); CoordinatesGroup coordinates = beam.coordinates();
coordinates.create(); coordinates.create();
coordinates.groupType().value = "Coordinates"; coordinates.groupType().value = "Coordinates";
coordinates.refLocationValue().value = parset.settings.delayCompensation.referencePhaseCenter; coordinates.refLocationValue().value = itsParset.settings.delayCompensation.referencePhaseCenter;
coordinates.refLocationUnit().value = vector<string>(3, "m"); coordinates.refLocationUnit().value = vector<string>(3, "m");
coordinates.refLocationFrame().value = "ITRF"; coordinates.refLocationFrame().value = "ITRF";
coordinates.refTimeValue().value = toMJD(parset.settings.startTime); coordinates.refTimeValue().value = toMJD(itsParset.settings.startTime);
coordinates.refTimeUnit().value = "d"; coordinates.refTimeUnit().value = "d";
coordinates.refTimeFrame().value = "MJD"; coordinates.refTimeFrame().value = "MJD";
...@@ -373,7 +388,7 @@ namespace LOFAR ...@@ -373,7 +388,7 @@ namespace LOFAR
timeCoordinate.get()->referenceValue().value = 0; timeCoordinate.get()->referenceValue().value = 0;
timeCoordinate.get()->referencePixel().value = 0; timeCoordinate.get()->referencePixel().value = 0;
timeCoordinate.get()->increment().value = parset.sampleDuration() * stokesSet.nrChannels * stokesSet.timeIntegrationFactor; timeCoordinate.get()->increment().value = itsParset.sampleDuration() * stokesSet.nrChannels * stokesSet.timeIntegrationFactor;
timeCoordinate.get()->pc().value = unitvector; timeCoordinate.get()->pc().value = unitvector;
timeCoordinate.get()->axisValuesPixel().value = vector<unsigned>(1, 0); // not used timeCoordinate.get()->axisValuesPixel().value = vector<unsigned>(1, 0); // not used
...@@ -402,7 +417,7 @@ namespace LOFAR ...@@ -402,7 +417,7 @@ namespace LOFAR
vector<double> spectralWorld; vector<double> spectralWorld;
for (unsigned sb = 0; sb < nrSubbands; sb++) { for (unsigned sb = 0; sb < nrSubbands; sb++) {
const double subbandBeginFreq = parset.channel0Frequency( firstSubbandIdx + sb, stokesSet.nrChannels ); const double subbandBeginFreq = itsParset.channel0Frequency( firstSubbandIdx + sb, stokesSet.nrChannels );
// NOTE: channel 0 will be wrongly annotated if nrChannels > 1, because it is a combination of the // NOTE: channel 0 will be wrongly annotated if nrChannels > 1, because it is a combination of the
// highest and the lowest frequencies (half a channel each). // highest and the lowest frequencies (half a channel each).
......
...@@ -44,13 +44,18 @@ namespace LOFAR ...@@ -44,13 +44,18 @@ namespace LOFAR
public: public:
MSWriterDAL(const std::string &filename, const Parset &parset, unsigned fileno); MSWriterDAL(const std::string &filename, const Parset &parset, unsigned fileno);
~MSWriterDAL(); ~MSWriterDAL();
virtual void createMetaData();
virtual void write(StreamableData *data); virtual void write(StreamableData *data);
private: private:
const std::string itsFilename;
const Parset &itsParset; const Parset &itsParset;
unsigned itsNrChannels; unsigned itsNrChannels;
unsigned itsNrSamples; unsigned itsNrSamples;
unsigned itsNextSeqNr; unsigned itsNextSeqNr;
unsigned itsFileNr; const unsigned itsFileNr;
unsigned itsBlockSize; // the size of StreamableData::samples, in T unsigned itsBlockSize; // the size of StreamableData::samples, in T
}; };
} }
......
...@@ -21,19 +21,64 @@ ...@@ -21,19 +21,64 @@
#include <lofar_config.h> #include <lofar_config.h>
#include "MSWriterFile.h" #include "MSWriterFile.h"
#include <Common/Thread/Mutex.h>
#include <Common/SystemUtil.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>
#include <cerrno>
#include <boost/algorithm/string.hpp>
namespace LOFAR namespace LOFAR
{ {
namespace Cobalt namespace Cobalt
{ {
static Mutex makeDirMutex;
static void makeDir(const string &dirname, const string &logPrefix)
{
ScopedLock scopedLock(makeDirMutex);
struct stat s;
if (stat(dirname.c_str(), &s) == 0) {
// path already exists
if ((s.st_mode & S_IFMT) != S_IFDIR) {
LOG_WARN_STR(logPrefix << "Not a directory: " << dirname);
}
} else if (errno == ENOENT) {
// create directory
LOG_DEBUG_STR(logPrefix << "Creating directory " << dirname);
if (mkdir(dirname.c_str(), 0777) != 0 && errno != EEXIST) {
THROW_SYSCALL(string("mkdir ") + dirname);
}
} else {
// something else went wrong
THROW_SYSCALL(string("stat ") + dirname);
}
}
/* create a directory as well as all its parent directories */
static void recursiveMakeDir(const string &dirname, const string &logPrefix)
{
using namespace boost;
string curdir;
vector<string> splitName;
boost::split(splitName, dirname, boost::is_any_of("/"));
for (unsigned i = 0; i < splitName.size(); i++) {
curdir += splitName[i] + '/';
makeDir(curdir, logPrefix);
}
}
MSWriterFile::MSWriterFile (const std::string &msName) MSWriterFile::MSWriterFile (const std::string &msName)
: :
itsFile(msName, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) itsFile((recursiveMakeDir(dirname(msName), ""), msName), O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
{ {
} }
......
...@@ -36,6 +36,11 @@ namespace LOFAR ...@@ -36,6 +36,11 @@ namespace LOFAR
class MSWriterFile : public MSWriter class MSWriterFile : public MSWriter
{ {
public: public:
/*
* Write data to the provided file name.
*
* Any parent directories are automatically created.
*/
MSWriterFile(const std::string &msName); MSWriterFile(const std::string &msName);
~MSWriterFile(); ~MSWriterFile();
......
...@@ -23,17 +23,11 @@ ...@@ -23,17 +23,11 @@
#include "OutputThread.h" #include "OutputThread.h"
#include <cerrno>
#include <ctime>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include <iomanip> #include <iomanip>
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <boost/format.hpp> #include <boost/format.hpp>
#include <boost/algorithm/string.hpp>
#include <Common/StringUtil.h>
#include <Common/SystemCallException.h> #include <Common/SystemCallException.h>
#include <Common/Thread/Mutex.h> #include <Common/Thread/Mutex.h>
#include <Common/Thread/Cancellation.h> #include <Common/Thread/Cancellation.h>
...@@ -57,53 +51,11 @@ namespace LOFAR ...@@ -57,53 +51,11 @@ namespace LOFAR
namespace Cobalt namespace Cobalt
{ {
static Mutex makeDirMutex;
static Mutex casacoreMutex; static Mutex casacoreMutex;
using namespace std; using namespace std;
using boost::lexical_cast; using boost::lexical_cast;
static void makeDir(const string &dirname, const string &logPrefix)
{
ScopedLock scopedLock(makeDirMutex);
struct stat s;
if (stat(dirname.c_str(), &s) == 0) {
// path already exists
if ((s.st_mode & S_IFMT) != S_IFDIR) {
LOG_WARN_STR(logPrefix << "Not a directory: " << dirname);
}
} else if (errno == ENOENT) {
// create directory
LOG_DEBUG_STR(logPrefix << "Creating directory " << dirname);
if (mkdir(dirname.c_str(), 0777) != 0 && errno != EEXIST) {
THROW_SYSCALL(string("mkdir ") + dirname);
}
} else {
// something else went wrong
THROW_SYSCALL(string("stat ") + dirname);
}
}
/* create a directory as well as all its parent directories */
static void recursiveMakeDir(const string &dirname, const string &logPrefix)
{
using namespace boost;
string curdir;
vector<string> splitName;
boost::split(splitName, dirname, boost::is_any_of("/"));
for (unsigned i = 0; i < splitName.size(); i++) {
curdir += splitName[i] + '/';
makeDir(curdir, logPrefix);
}
}
template<typename T> OutputThread<T>::OutputThread(const Parset &parset, template<typename T> OutputThread<T>::OutputThread(const Parset &parset,
unsigned streamNr, Pool<T> &outputPool, unsigned streamNr, Pool<T> &outputPool,
RTmetadata &mdLogger, const std::string &mdKeyPrefix, RTmetadata &mdLogger, const std::string &mdKeyPrefix,
...@@ -209,6 +161,19 @@ namespace LOFAR ...@@ -209,6 +161,19 @@ namespace LOFAR
} }
template<typename T> void OutputThread<T>::createMetaData()
{
try {
// augment the data product
ASSERT(itsWriter.get());
itsWriter->createMetaData();
} catch (Exception &ex) {
LOG_ERROR_STR(itsLogPrefix << "Could not create meta data: " << ex);
}
}
template<typename T> void OutputThread<T>::augment( const FinalMetaData &finalMetaData ) template<typename T> void OutputThread<T>::augment( const FinalMetaData &finalMetaData )
{ {
try { try {
...@@ -241,8 +206,18 @@ namespace LOFAR ...@@ -241,8 +206,18 @@ namespace LOFAR
LOG_DEBUG_STR(itsLogPrefix << "process() entered"); LOG_DEBUG_STR(itsLogPrefix << "process() entered");
createMS(); createMS();
doWork();
cleanUp(); # pragma omp parallel sections num_threads(2)
{
# pragma omp section
{
doWork();
cleanUp();
}
# pragma omp section
createMetaData();
}
} }
// Make required instantiations // Make required instantiations
...@@ -282,7 +257,6 @@ namespace LOFAR ...@@ -282,7 +257,6 @@ namespace LOFAR
try try
{ {
recursiveMakeDir(directoryName, itsLogPrefix);
LOG_INFO_STR(itsLogPrefix << "Writing to " << path); LOG_INFO_STR(itsLogPrefix << "Writing to " << path);
itsWriter = new MSWriterCorrelated(itsLogPrefix, path, itsParset, itsStreamNr); itsWriter = new MSWriterCorrelated(itsLogPrefix, path, itsParset, itsStreamNr);
...@@ -347,7 +321,6 @@ namespace LOFAR ...@@ -347,7 +321,6 @@ namespace LOFAR
try try
{ {
recursiveMakeDir(directoryName, itsLogPrefix);
LOG_INFO_STR(itsLogPrefix << "Writing to " << path); LOG_INFO_STR(itsLogPrefix << "Writing to " << path);
#ifdef HAVE_DAL #ifdef HAVE_DAL
......
...@@ -69,6 +69,9 @@ namespace LOFAR ...@@ -69,6 +69,9 @@ namespace LOFAR
// Wrap-up the writing. // Wrap-up the writing.
void cleanUp() const; void cleanUp() const;
// Create the metadata files in parallel to writing the data
void createMetaData();
// Add FinalMetaData to the data container. // Add FinalMetaData to the data container.
void augment(const FinalMetaData &finalMetaData); void augment(const FinalMetaData &finalMetaData);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment