diff --git a/.gitattributes b/.gitattributes index 2415b8797c3fd0b294fb03b567ca0ef7625d20a1..7659a1ac42e1731505e3bfcb9ace321e98e0daa0 100644 --- a/.gitattributes +++ b/.gitattributes @@ -849,6 +849,7 @@ CEP/DP3/DPPP/include/DPPP/ScaleData.h -text CEP/DP3/DPPP/include/DPPP/Simulate.h -text CEP/DP3/DPPP/include/DPPP/Simulator.h -text CEP/DP3/DPPP/include/DPPP/SourceDBUtil.h -text +CEP/DP3/DPPP/include/DPPP/StManParsetKeys.h -text CEP/DP3/DPPP/include/DPPP/StefCal.h -text CEP/DP3/DPPP/include/DPPP/Stokes.h -text CEP/DP3/DPPP/include/DPPP/SubtractMixed.h -text diff --git a/CEP/DP3/DPPP/include/DPPP/CMakeLists.txt b/CEP/DP3/DPPP/include/DPPP/CMakeLists.txt index 07b6c83e8f27ca040c85187388ceca20bafc15c2..498b197ff2707c1575eeacd8b774b201578d8949 100644 --- a/CEP/DP3/DPPP/include/DPPP/CMakeLists.txt +++ b/CEP/DP3/DPPP/include/DPPP/CMakeLists.txt @@ -17,6 +17,7 @@ set(inst_HEADERS ApplyBeam.h ApplyBeam.tcc Predict.h GainCal.h StefCal.h phasefitter.h + StManParsetKeys.h ) # Create symbolic link to include directory. diff --git a/CEP/DP3/DPPP/include/DPPP/MSUpdater.h b/CEP/DP3/DPPP/include/DPPP/MSUpdater.h index ae2a6c66d1435bb53adefb8336211aecfc521caf..1ce92bdc62e62ed54be34fd014bdbf9374fab9d3 100644 --- a/CEP/DP3/DPPP/include/DPPP/MSUpdater.h +++ b/CEP/DP3/DPPP/include/DPPP/MSUpdater.h @@ -28,6 +28,8 @@ // @brief DPPP step writing to an MS #include <DPPP/DPStep.h> +#include <DPPP/StManParsetKeys.h> + #include <Common/LofarTypes.h> #include <tables/Tables/ColumnDesc.h> #include <tables/Tables/RefRows.h> @@ -122,6 +124,8 @@ namespace LOFAR { bool itsWeightColAdded; //# has weight column been added? bool itsWriteHistory; //# Should history be written? NSTimer itsTimer; + uint itsTileSize; + StManParsetKeys itsStManKeys; }; } //# end namespace diff --git a/CEP/DP3/DPPP/include/DPPP/MSWriter.h b/CEP/DP3/DPPP/include/DPPP/MSWriter.h index 2ef2f60274c0c7178e2634b4ed0cf03e45fdcb3c..bd98edff54e0e4be403720f8e014867dcd3c6196 100644 --- a/CEP/DP3/DPPP/include/DPPP/MSWriter.h +++ b/CEP/DP3/DPPP/include/DPPP/MSWriter.h @@ -29,6 +29,8 @@ #include <DPPP/DPStep.h> #include <DPPP/MSReader.h> +#include <DPPP/StManParsetKeys.h> + #include <tables/Tables/Table.h> #include <tables/Tables/ColumnDesc.h> #include <tables/Tables/ScalarColumn.h> @@ -93,7 +95,7 @@ namespace LOFAR { // Create an array column description and add to table with given // stoage manager (if given). void makeArrayColumn (casa::ColumnDesc desc, const casa::IPosition& shape, - casa::TiledColumnStMan* tsm, casa::Table& table); + casa::DataManager* dm, casa::Table& table, bool makeDirectColumn = false); // Create the MS by cloning all subtables from the input MS. // All output columns in the main table are using normal storage managers. @@ -189,6 +191,7 @@ namespace LOFAR { std::string itsVdsDir; //# directory where to put VDS file std::string itsClusterDesc; //# name of clusterdesc file NSTimer itsTimer; + StManParsetKeys itsStManKeys; }; } //# end namespace diff --git a/CEP/DP3/DPPP/include/DPPP/StManParsetKeys.h b/CEP/DP3/DPPP/include/DPPP/StManParsetKeys.h new file mode 100644 index 0000000000000000000000000000000000000000..f513e128b0779c71b6857513a6dd2b52d7863bdb --- /dev/null +++ b/CEP/DP3/DPPP/include/DPPP/StManParsetKeys.h @@ -0,0 +1,61 @@ +#ifndef DPPP_STMANPARSETKEYS_H +#define DPPP_STMANPARSETKEYS_H + +#include <Common/ParameterSet.h> + +#include <string> + +#include <casa/Containers/Record.h> + +namespace LOFAR { + namespace DPPP { + struct StManParsetKeys + { + casa::String stManName; + uint dyscoDataBitRate; //# Bits per data float, or 0 if data column is not compressed + uint dyscoWeightBitRate; //# Bits per weight float, or 0 if weight column is not compressed + std::string dyscoDistribution; //# Distribution assumed for compression; e.g. "Uniform" or "TruncatedGaussian" + double dyscoDistTruncation; //# For truncated distributions, the truncation point (e.g. 3 for 3 sigma in TruncGaus). + std::string dyscoNormalization; //# Kind of normalization; "AF", "RF" or "Row". + + void Set(const ParameterSet& parset, const std::string& prefix) + { + stManName = toLower(parset.getString(prefix+"storagemanager", string())); + if(stManName == "dysco") { + dyscoDataBitRate = parset.getInt( + prefix+"storagemanager.databitrate", 10); + dyscoWeightBitRate = parset.getInt( + prefix+"storagemanager.weightbitrate", 12); + dyscoDistribution = parset.getString( + prefix+"storagemanager.distribution", + "TruncatedGaussian"); + dyscoDistTruncation = parset.getDouble( + prefix+"storagemanager.disttruncation", 2.5); + dyscoNormalization = parset.getString( + prefix+"storagemanager.normalization", "AF"); + } + } + + casa::Record GetDyscoSpec() const + { + casa::Record dyscoSpec; + dyscoSpec.define ("distribution", dyscoDistribution); + dyscoSpec.define ("normalization", dyscoNormalization); + dyscoSpec.define ("distributionTruncation", dyscoDistTruncation); + // DPPP uses bitrate of 0 to disable compression of the data/weight column. + // However, Dysco does not allow the data or weight bitrates to be set to 0, + // so we set the values to something different. The values are not actually used. + uint dataBitRate = dyscoDataBitRate; + if(dataBitRate == 0) + dataBitRate = 16; + dyscoSpec.define ("dataBitCount", dataBitRate); + uint weightBitRate = dyscoWeightBitRate; + if(weightBitRate == 0) + weightBitRate = 16; + dyscoSpec.define ("weightBitCount", weightBitRate); + return dyscoSpec; + } + }; + } +} +#endif diff --git a/CEP/DP3/DPPP/src/MSUpdater.cc b/CEP/DP3/DPPP/src/MSUpdater.cc index e74f07ac4ce5105e649fe7569cfe2f46ead0f54f..b8d72496af1e65e2063db9570464c941200b6ee1 100644 --- a/CEP/DP3/DPPP/src/MSUpdater.cc +++ b/CEP/DP3/DPPP/src/MSUpdater.cc @@ -32,10 +32,12 @@ #include <tables/Tables/ScalarColumn.h> #include <tables/Tables/ArrColDesc.h> #include <tables/Tables/ColumnDesc.h> +#include <tables/Tables/StandardStMan.h> #include <casa/Containers/Record.h> #include <casa/Utilities/LinearSearch.h> #include <ms/MeasurementSets/MeasurementSet.h> #include <iostream> +#include <limits> using namespace casa; @@ -60,6 +62,8 @@ namespace LOFAR { itsDataColName = parset.getString (prefix+"datacolumn", ""); itsWeightColName = parset.getString (prefix+"weightcolumn",""); itsNrTimesFlush = parset.getUint (prefix+"flush", 0); + itsTileSize = parset.getUint (prefix+"tilesize", 1024); + itsStManKeys.Set(parset, prefix); } MSUpdater::~MSUpdater() @@ -76,24 +80,51 @@ namespace LOFAR { return false; } - TableDesc td; - td.addColumn (cd, colName); - - // Use the same data manager as the DATA column. - // Get the data manager info and find the DATA column in it. - Record dminfo = itsMS.dataManagerInfo(); - Record colinfo; - for (uInt i=0; i<dminfo.nfields(); ++i) { - const Record& subrec = dminfo.subRecord(i); - if (linearSearch1 (Vector<String>(subrec.asArrayString("COLUMNS")), - "DATA") >= 0) { - colinfo = subrec; - break; + if(itsStManKeys.stManName == "dysco" && itsStManKeys.dyscoDataBitRate != 0) + { + casa::Record dyscoSpec = itsStManKeys.GetDyscoSpec(); + DataManagerCtor dyscoConstructor = DataManager::getCtor("DyscoStMan"); + CountedPtr<DataManager> dyscoStMan(dyscoConstructor(colName + "_dm", dyscoSpec)); + ColumnDesc directColumnDesc(cd); + directColumnDesc.setOptions(casacore::ColumnDesc::Direct | casacore::ColumnDesc::FixedShape); + TableDesc td; + td.addColumn (directColumnDesc, colName); + itsMS.addColumn (td, *dyscoStMan); + } + else { + // When no specific storage manager is requested, use the same + // as for the DATA column. + // Get the data manager info and find the DATA column in it. + Record dminfo = itsMS.dataManagerInfo(); + Record colinfo; + for (uInt i=0; i<dminfo.nfields(); ++i) { + const Record& subrec = dminfo.subRecord(i); + if (linearSearch1 (Vector<String>(subrec.asArrayString("COLUMNS")), + "DATA") >= 0) { + colinfo = subrec; + break; + } + } + ASSERT(colinfo.nfields()>0); + // When the storage manager is compressed, do not implicitly (re)compress it. Use TiledStMan instead. + std::string dmType = colinfo.asString("TYPE"); + TableDesc td; + td.addColumn (cd, colName); + if(dmType == "DyscoStMan") + { + IPosition tileShape(3, info().ncorr(), info().nchan(), 1); + tileShape[2] = itsTileSize * 1024 / (8 * tileShape[0] * tileShape[1]); + if (tileShape[2] < 1) { + tileShape[2] = 1; + } + TiledColumnStMan tsm(colName + "_dm", tileShape); + itsMS.addColumn (td, tsm); + } + else { + colinfo.define ("NAME", colName + "_dm"); + itsMS.addColumn (td, colinfo); } } - ASSERT(colinfo.nfields()>0); - colinfo.define ("NAME", colName + "_dm"); - itsMS.addColumn (td, colinfo); return true; } @@ -104,7 +135,22 @@ namespace LOFAR { putFlags (buf.getRowNrs(), buf.getFlags()); } if (itsWriteData) { - putData (buf.getRowNrs(), buf.getData()); + // If compressing, flagged values need to be set to NaN to decrease the dynamic range + if(itsStManKeys.stManName == "dysco") + { + casa::Cube<casa::Complex> dataCopy = buf.getData().copy(); + casa::Cube<casa::Complex>::iterator dataIter = dataCopy.begin(); + for(casa::Cube<bool>::const_iterator flagIter = buf.getFlags().begin(); flagIter != buf.getFlags().end(); ++flagIter) + { + if(*flagIter) + *dataIter = casa::Complex(std::numeric_limits<float>::quiet_NaN(), std::numeric_limits<float>::quiet_NaN()); + ++dataIter; + } + putData (buf.getRowNrs(), dataCopy); + } + else { + putData (buf.getRowNrs(), buf.getData()); + } } if (itsWriteWeights) { if (!buf.getWeights().empty()) { @@ -215,6 +261,17 @@ namespace LOFAR { if (itsWriteWeights) os << " weights"; os << std::endl; } + if(itsStManKeys.stManName == "dysco") { + os + << " Compressed: yes\n" + << " Data bitrate: " << itsStManKeys.dyscoDataBitRate << '\n' + << " Weight bitrate: " << itsStManKeys.dyscoWeightBitRate << '\n' + << " Dysco mode: " << itsStManKeys.dyscoNormalization << ' ' + << itsStManKeys.dyscoDistribution << '(' << itsStManKeys.dyscoDistTruncation << ")\n"; + } + else { + os << " Compressed: no\n"; + } os << std::endl; os << " flush: " << itsNrTimesFlush << std::endl; } diff --git a/CEP/DP3/DPPP/src/MSWriter.cc b/CEP/DP3/DPPP/src/MSWriter.cc index a4cf6ee2fb1ac2782979350f521b1bb84ad711bb..bb708f6d4735eb4f037ec79422041d951a225ce5 100644 --- a/CEP/DP3/DPPP/src/MSWriter.cc +++ b/CEP/DP3/DPPP/src/MSWriter.cc @@ -47,6 +47,7 @@ #include <casa/Containers/Record.h> #include <casa/OS/Path.h> #include <iostream> +#include <limits> using namespace casa; @@ -78,6 +79,8 @@ namespace LOFAR { " can be used as output when writing a new MS"); ASSERTSTR (itsWeightColName == "WEIGHT_SPECTRUM", "Currently only the " "WEIGHT_SPECTRUM column can be used as output when writing a new MS"); + + itsStManKeys.Set(parset, prefix); } MSWriter::~MSWriter() @@ -181,6 +184,17 @@ namespace LOFAR { os << " time interval: " << itsInterval << std::endl; os << " DATA column: " << itsDataColName << std::endl; os << " WEIGHT column: " << itsWeightColName << std::endl; + if(itsStManKeys.stManName == "dysco") { + os + << " Compressed: yes\n" + << " Data bitrate: " << itsStManKeys.dyscoDataBitRate << '\n' + << " Weight bitrate: " << itsStManKeys.dyscoWeightBitRate << '\n' + << " Dysco mode: " << itsStManKeys.dyscoNormalization << ' ' + << itsStManKeys.dyscoDistribution << '(' << itsStManKeys.dyscoDistTruncation << ")\n"; + } + else { + os << " Compressed: no\n"; + } } void MSWriter::showTimings (std::ostream& os, double duration) const @@ -191,19 +205,24 @@ namespace LOFAR { } void MSWriter::makeArrayColumn (ColumnDesc desc, const IPosition& ipos, - TiledColumnStMan* tsm, Table& table) + DataManager* dm, Table& table, bool makeDirectColumn) { desc.setOptions(0); desc.setShape(ipos); - desc.setOptions(ColumnDesc::FixedShape); + if (makeDirectColumn) { + desc.setOptions(ColumnDesc::Direct | ColumnDesc::FixedShape); + } + else { + desc.setOptions(ColumnDesc::FixedShape); + } if (table.tableDesc().isColumn(desc.name())) { table.removeColumn(desc.name()); } - // Use tiled storage manager if given. - if (tsm == 0) { + // Use storage manager if given. + if (dm == 0) { table.addColumn (desc); } else { - table.addColumn (desc, *tsm); + table.addColumn (desc, *dm); } } @@ -300,6 +319,7 @@ namespace LOFAR { // Setup table creation. Exception is thrown if it exists already. Table::TableOption opt = itsOverwrite ? Table::New : Table::NewNoReplace; SetupNewTable newtab(outName, newdesc, opt); + // First bind all columns to SSM. // For all columns defined in dminfo the bindings will be overwritten. // In this way variable columns like ANTENNA1/2 are bound to SSM. @@ -308,22 +328,35 @@ namespace LOFAR { StandardStMan ssm("SSMVar", 32768); newtab.bindAll (ssm); } + // Bind all columns according to dminfo. newtab.bindCreate (dminfo); + DataManagerCtor dyscoConstructor = 0; + Record dyscoSpec; + if(itsStManKeys.stManName == "dysco") { + dyscoSpec = itsStManKeys.GetDyscoSpec(); + dyscoConstructor = DataManager::getCtor("DyscoStMan"); + } itsMS = Table(newtab); - { + + if (itsStManKeys.stManName == "dysco" && itsStManKeys.dyscoDataBitRate != 0) { + // Add DATA column using Dysco stman. + CountedPtr<DataManager> dyscoStMan(dyscoConstructor("DyscoData", dyscoSpec)); + makeArrayColumn (tdesc["DATA"], dataShape, dyscoStMan.get(), itsMS, true); + } + else { // Add DATA column using tsm. TiledColumnStMan tsm("TiledData", tileShape); makeArrayColumn (tdesc["DATA"], dataShape, &tsm, itsMS); } - { - // Add FLAG column using tsm. - // Use larger tile shape because flags are stored as bits. - IPosition tileShapeF(tileShape); - tileShapeF[2] *= 8; - TiledColumnStMan tsmf("TiledFlag", tileShapeF); - makeArrayColumn(tdesc["FLAG"], dataShape, &tsmf, itsMS); - } + + // Add FLAG column using tsm. + // Use larger tile shape because flags are stored as bits. + IPosition tileShapeF(tileShape); + tileShapeF[2] *= 8; + TiledColumnStMan tsmf("TiledFlag", tileShapeF); + makeArrayColumn(tdesc["FLAG"], dataShape, &tsmf, itsMS); + if (itsWriteFullResFlags) { // Add LOFAR_FULL_RES_FLAG column using tsm. // The input can already be averaged and averaging can be done in @@ -337,7 +370,17 @@ namespace LOFAR { dataShapeF, ColumnDesc::FixedShape); makeArrayColumn(padesc, dataShapeF, &tsmf, itsMS); } - { + if (itsStManKeys.stManName == "dysco" && itsStManKeys.dyscoWeightBitRate != 0) { + // Add WEIGHT_SPECTRUM column using Dysco stman. + CountedPtr<DataManager> dyscoStMan(dyscoConstructor( + "DyscoWeightSpectrum", dyscoSpec) + ); + ArrayColumnDesc<float> wsdesc("WEIGHT_SPECTRUM", "weight per corr/chan", + dataShape, + ColumnDesc::FixedShape | ColumnDesc::Direct); + makeArrayColumn (wsdesc, dataShape, dyscoStMan.get(), itsMS, true); + } + else { // Add WEIGHT_SPECTRUM column using tsm. TiledColumnStMan tsmw("TiledWeightSpectrum", tileShape); ArrayColumnDesc<float> wsdesc("WEIGHT_SPECTRUM", "weight per corr/chan", @@ -355,7 +398,7 @@ namespace LOFAR { TiledColumnStMan tsmc("CorrectedData", tileShape); makeArrayColumn(tdesc["CORRECTED_DATA"], dataShape, &tsmc, itsMS); - + IPosition iwShape(1, dataShape[1]); IPosition iwShapeTile(2, tileShape[1], tileShape[2]); TiledColumnStMan tsmw("TiledImagingWeight", iwShapeTile); @@ -513,7 +556,23 @@ namespace LOFAR { return; } - dataCol.putColumn (buf.getData()); + // If compressing, flagged values need to be set to NaN to decrease the dynamic range + if(itsStManKeys.stManName == "dysco") + { + casa::Cube<casa::Complex> dataCopy = buf.getData().copy(); + casa::Cube<casa::Complex>::iterator dataIter = dataCopy.begin(); + for(casa::Cube<bool>::const_iterator flagIter = buf.getFlags().begin(); flagIter != buf.getFlags().end(); ++flagIter) + { + if(*flagIter) + *dataIter = casa::Complex(std::numeric_limits<float>::quiet_NaN(), std::numeric_limits<float>::quiet_NaN()); + ++dataIter; + } + dataCol.putColumn (dataCopy); + } + else { + dataCol.putColumn (buf.getData()); + } + flagCol.putColumn (buf.getFlags()); // A row is flagged if no flags in the row are False. Vector<Bool> rowFlags (partialNFalse(buf.getFlags(), IPosition(2,0,1)) == 0u);