diff --git a/RTCP/Storage/include/Storage/MSWriterFile.h b/RTCP/Storage/include/Storage/MSWriterFile.h index a894b5e406ed4c3bbbd0276b7a02b18414939e2e..4dfb3256ace2be754c6a8cb03bd674ed2d889b56 100644 --- a/RTCP/Storage/include/Storage/MSWriterFile.h +++ b/RTCP/Storage/include/Storage/MSWriterFile.h @@ -38,7 +38,10 @@ namespace RTCP { class MSWriterFile : public MSWriter { public: - MSWriterFile(const char *msName); + // fastWrite uses O_DIRECT | O_SYNC, which means + // that data should be aligned at 512 bytes and + // a multiple of 512 bytes in size. + MSWriterFile(const char *msName, bool fastWrite); ~MSWriterFile(); virtual void write(StreamableData *data); diff --git a/RTCP/Storage/src/MSWriterDAL.cc b/RTCP/Storage/src/MSWriterDAL.cc index a1e9288d76ccdcd649304176cc5ca9768213a787..2c683c52e230cd74629f674b90a0263300d125d4 100644 --- a/RTCP/Storage/src/MSWriterDAL.cc +++ b/RTCP/Storage/src/MSWriterDAL.cc @@ -48,14 +48,20 @@ namespace LOFAR : itsNrChannels(parset.nrChannelsPerSubband() * parset.nrSubbands()) { + unsigned sapNr = 0; + unsigned beamNr; + unsigned stokesNr; Stokes::Component stokes; switch (outputType) { case COHERENT_STOKES: { // assume stokes are either I or IQUV const Stokes::Component stokesVars[] = { Stokes::I, Stokes::Q, Stokes::U, Stokes::V }; - stokes = stokesVars[fileno % parset.nrCoherentStokes()]; + stokesNr = fileno % parset.nrCoherentStokes(); + beamNr = fileno / parset.nrCoherentStokes() / parset.nrPartsPerStokes(); + itsDatatype = isBigEndian ? H5T_IEEE_F32BE : H5T_IEEE_F32LE; + stokes = stokesVars[stokesNr]; itsNrSamples = parset.CNintegrationSteps() / parset.coherentStokesTimeIntegrationFactor(); break; @@ -63,10 +69,12 @@ namespace LOFAR case BEAM_FORMED_DATA: { const Stokes::Component stokesVars[] = { Stokes::X, Stokes::Y }; - stokes = stokesVars[fileno % NR_POLARIZATIONS]; + stokesNr = fileno % NR_POLARIZATIONS; + beamNr = fileno / NR_POLARIZATIONS / parset.nrPartsPerStokes(); // emulate fcomplex with a 64-bit bitfield itsDatatype = isBigEndian ? H5T_STD_B64BE : H5T_STD_B64LE; + stokes = stokesVars[stokesNr]; itsNrSamples = parset.CNintegrationSteps(); break; @@ -116,16 +124,16 @@ namespace LOFAR BF_RootGroup rootGroup( ca ); // create a hierarchy for one Stokes set - rootGroup.openPrimaryPointing( 0, true ); - BF_SubArrayPointing sap = rootGroup.primaryPointing( 0 ); + rootGroup.openSubArrayPointing( sapNr ); + BF_SubArrayPointing sap = rootGroup.primaryPointing( sapNr ); - sap.openBeam( 0, true ); + sap.openBeam( beamNr ); - BF_BeamGroup bg = sap.getBeamGroup( 0 ); + BF_BeamGroup bg = sap.getBeamGroup( beamNr ); - bg.openStokesDataset( 0, itsNrSamples, parset.nrSubbands(), parset.nrChannelsPerSubband(), stokes, itsDatatype ); + bg.createStokesDataset( stokesNr, itsNrSamples, parset.nrSubbands(), parset.nrChannelsPerSubband(), stokes, itsDatatype ); - itsStokesDataset = bg.getStokesDataset( 0 ); + itsStokesDataset = bg.getStokesDataset( stokesNr ); } template <typename T,unsigned DIM> MSWriterDAL<T,DIM>::~MSWriterDAL() diff --git a/RTCP/Storage/src/MSWriterFile.cc b/RTCP/Storage/src/MSWriterFile.cc index 26a3e42df4e1ce4e1cbba35916fa0ab611b30af9..9daf7129f14dd5482e08f1609b86b213a1127d9f 100644 --- a/RTCP/Storage/src/MSWriterFile.cc +++ b/RTCP/Storage/src/MSWriterFile.cc @@ -32,12 +32,10 @@ namespace LOFAR { namespace RTCP { -MSWriterFile::MSWriterFile (const char *msName) +MSWriterFile::MSWriterFile (const char *msName, bool fastWrite) : itsFile(msName, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH -#if 1 - | O_SYNC | O_DIRECT -#endif + | (fastWrite ? O_SYNC | O_DIRECT : 0) ) { } diff --git a/RTCP/Storage/src/OutputThread.cc b/RTCP/Storage/src/OutputThread.cc index fd2e6f45be70693a6a0a96e39db8d37c6acc4065..4c12ecc7f7fb0d42b45a2de38b5bc3535929896f 100644 --- a/RTCP/Storage/src/OutputThread.cc +++ b/RTCP/Storage/src/OutputThread.cc @@ -117,6 +117,8 @@ void OutputThread::createMS() std::string fileName = itsParset.getFileName(itsOutputType, itsStreamNr); std::string path = directoryName + "/" + fileName; + bool fastWrite = (itsOutputType == CORRELATED_DATA); + recursiveMakeDir(directoryName, itsLogPrefix); if (itsOutputType == CORRELATED_DATA) { @@ -163,10 +165,10 @@ void OutputThread::createMS() THROW(StorageException, "HDF5 not supported for this data type"); } } else { - itsWriter = new MSWriterFile(path.c_str()); + itsWriter = new MSWriterFile(path.c_str(), fastWrite); } #else - itsWriter = new MSWriterFile(path.c_str()); + itsWriter = new MSWriterFile(path.c_str(), fastWrite); #endif } catch (SystemCallException &ex) { LOG_ERROR_STR(itsLogPrefix << "Cannot open " << path << ": " << ex); @@ -214,7 +216,7 @@ void OutputThread::checkForDroppedData(StreamableData *data) } -static Semaphore writeSemaphore(3); +static Semaphore writeSemaphore(300); void OutputThread::doWork() diff --git a/RTCP/Storage/test/tDAL_HDF5.cc b/RTCP/Storage/test/tDAL_HDF5.cc index b2977ceee2cfb46a3641a532e7626b0fe885ec96..fd6206126f16fb1f1129655e98276297afc92981 100644 --- a/RTCP/Storage/test/tDAL_HDF5.cc +++ b/RTCP/Storage/test/tDAL_HDF5.cc @@ -20,9 +20,8 @@ //# //# $Id: $ -#include <lofar_config.h> - -#if 1 && defined HAVE_DAL && defined HAVE_HDF5 +//#if 1 && defined HAVE_DAL && defined HAVE_HDF5 +#if 1 #define FILENAME "test.h5" #define SAMPLES 3056 @@ -46,40 +45,37 @@ using namespace std; using namespace boost; using boost::format; -#endif - - int main() { -#if 1 && defined HAVE_DAL && defined HAVE_HDF5 const char * const filename = FILENAME; const unsigned nrSamples = SAMPLES; const unsigned nrChannels = SUBBANDS * CHANNELS; { - cout << "Creating file " << filename << endl; - BF_RootGroup rootGroup( filename ); + CommonAttributes ca; + Filename fn( "12345", "test", Filename::bf, Filename::h5, ""); + ca.setFilename( fn ); + + ca.setTelescope( "LOFAR" ); + + cout << "Creating file " << endl; + BF_RootGroup rootGroup( fn ); cout << "Creating primary pointing 0" << endl; - rootGroup.openPrimaryPointing( 0, true ); + rootGroup.openSubArrayPointing( 0 ); BF_SubArrayPointing sap = rootGroup.primaryPointing( 0 ); cout << "Creating tied-array beam 0" << endl; - sap.openBeam( 0, true ); + sap.openBeam( 0 ); - cout << "Closing file" << endl; - } + BF_BeamGroup bg = sap.getBeamGroup( 0 ); - { - cout << "Reopening file " << filename << endl; - hid_t fileID = H5Fcreate( filename, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT ); + cout << "Creating stokes group 0" << endl; + + bg.createStokesDataset( 0, nrSamples, SUBBANDS, CHANNELS, Stokes::I ); - cout << "Creating stokes set 0" << endl; - BF_StokesDataset stokesDataset( - fileID, 0, - nrSamples, SUBBANDS, CHANNELS, - Stokes::I ); + BF_StokesDataset stokesDataset = bg.getStokesDataset( 0 ); cout << "Creating sample multiarray of " << (SAMPLES|2) << " x " << SUBBANDS << " x " << CHANNELS << endl; typedef multi_array<float,3> array; @@ -105,7 +101,11 @@ int main() stokesDataset.writeData( samples.origin(), start, block ); } } -#endif return 0; } +#else +int main() { + return 0; +} +#endif