Skip to content
Snippets Groups Projects
Commit b55f134f authored by Jan David Mol's avatar Jan David Mol
Browse files

bug 1362:

- enable O_DIRECT|O_SYNC only for correlated data
- honor beam and stokes number when creating HDF5 files
- updated DAL code for latest API
parent f02e527c
Branches
Tags
No related merge requests found
...@@ -38,7 +38,10 @@ namespace RTCP { ...@@ -38,7 +38,10 @@ namespace RTCP {
class MSWriterFile : public MSWriter class MSWriterFile : public MSWriter
{ {
public: public:
MSWriterFile(const char *msName); // fastWrite uses O_DIRECT | O_SYNC, which means
// that data should be aligned at 512 bytes and
// a multiple of 512 bytes in size.
MSWriterFile(const char *msName, bool fastWrite);
~MSWriterFile(); ~MSWriterFile();
virtual void write(StreamableData *data); virtual void write(StreamableData *data);
......
...@@ -48,14 +48,20 @@ namespace LOFAR ...@@ -48,14 +48,20 @@ namespace LOFAR
: :
itsNrChannels(parset.nrChannelsPerSubband() * parset.nrSubbands()) itsNrChannels(parset.nrChannelsPerSubband() * parset.nrSubbands())
{ {
unsigned sapNr = 0;
unsigned beamNr;
unsigned stokesNr;
Stokes::Component stokes; Stokes::Component stokes;
switch (outputType) { switch (outputType) {
case COHERENT_STOKES: { case COHERENT_STOKES: {
// assume stokes are either I or IQUV // assume stokes are either I or IQUV
const Stokes::Component stokesVars[] = { Stokes::I, Stokes::Q, Stokes::U, Stokes::V }; const Stokes::Component stokesVars[] = { Stokes::I, Stokes::Q, Stokes::U, Stokes::V };
stokes = stokesVars[fileno % parset.nrCoherentStokes()]; stokesNr = fileno % parset.nrCoherentStokes();
beamNr = fileno / parset.nrCoherentStokes() / parset.nrPartsPerStokes();
itsDatatype = isBigEndian ? H5T_IEEE_F32BE : H5T_IEEE_F32LE; itsDatatype = isBigEndian ? H5T_IEEE_F32BE : H5T_IEEE_F32LE;
stokes = stokesVars[stokesNr];
itsNrSamples = parset.CNintegrationSteps() / parset.coherentStokesTimeIntegrationFactor(); itsNrSamples = parset.CNintegrationSteps() / parset.coherentStokesTimeIntegrationFactor();
break; break;
...@@ -63,10 +69,12 @@ namespace LOFAR ...@@ -63,10 +69,12 @@ namespace LOFAR
case BEAM_FORMED_DATA: { case BEAM_FORMED_DATA: {
const Stokes::Component stokesVars[] = { Stokes::X, Stokes::Y }; const Stokes::Component stokesVars[] = { Stokes::X, Stokes::Y };
stokes = stokesVars[fileno % NR_POLARIZATIONS]; stokesNr = fileno % NR_POLARIZATIONS;
beamNr = fileno / NR_POLARIZATIONS / parset.nrPartsPerStokes();
// emulate fcomplex with a 64-bit bitfield // emulate fcomplex with a 64-bit bitfield
itsDatatype = isBigEndian ? H5T_STD_B64BE : H5T_STD_B64LE; itsDatatype = isBigEndian ? H5T_STD_B64BE : H5T_STD_B64LE;
stokes = stokesVars[stokesNr];
itsNrSamples = parset.CNintegrationSteps(); itsNrSamples = parset.CNintegrationSteps();
break; break;
...@@ -116,16 +124,16 @@ namespace LOFAR ...@@ -116,16 +124,16 @@ namespace LOFAR
BF_RootGroup rootGroup( ca ); BF_RootGroup rootGroup( ca );
// create a hierarchy for one Stokes set // create a hierarchy for one Stokes set
rootGroup.openPrimaryPointing( 0, true ); rootGroup.openSubArrayPointing( sapNr );
BF_SubArrayPointing sap = rootGroup.primaryPointing( 0 ); BF_SubArrayPointing sap = rootGroup.primaryPointing( sapNr );
sap.openBeam( 0, true ); sap.openBeam( beamNr );
BF_BeamGroup bg = sap.getBeamGroup( 0 ); BF_BeamGroup bg = sap.getBeamGroup( beamNr );
bg.openStokesDataset( 0, itsNrSamples, parset.nrSubbands(), parset.nrChannelsPerSubband(), stokes, itsDatatype ); bg.createStokesDataset( stokesNr, itsNrSamples, parset.nrSubbands(), parset.nrChannelsPerSubband(), stokes, itsDatatype );
itsStokesDataset = bg.getStokesDataset( 0 ); itsStokesDataset = bg.getStokesDataset( stokesNr );
} }
template <typename T,unsigned DIM> MSWriterDAL<T,DIM>::~MSWriterDAL() template <typename T,unsigned DIM> MSWriterDAL<T,DIM>::~MSWriterDAL()
......
...@@ -32,12 +32,10 @@ namespace LOFAR { ...@@ -32,12 +32,10 @@ namespace LOFAR {
namespace RTCP { namespace RTCP {
MSWriterFile::MSWriterFile (const char *msName) MSWriterFile::MSWriterFile (const char *msName, bool fastWrite)
: :
itsFile(msName, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH itsFile(msName, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH
#if 1 | (fastWrite ? O_SYNC | O_DIRECT : 0)
| O_SYNC | O_DIRECT
#endif
) )
{ {
} }
......
...@@ -117,6 +117,8 @@ void OutputThread::createMS() ...@@ -117,6 +117,8 @@ void OutputThread::createMS()
std::string fileName = itsParset.getFileName(itsOutputType, itsStreamNr); std::string fileName = itsParset.getFileName(itsOutputType, itsStreamNr);
std::string path = directoryName + "/" + fileName; std::string path = directoryName + "/" + fileName;
bool fastWrite = (itsOutputType == CORRELATED_DATA);
recursiveMakeDir(directoryName, itsLogPrefix); recursiveMakeDir(directoryName, itsLogPrefix);
if (itsOutputType == CORRELATED_DATA) { if (itsOutputType == CORRELATED_DATA) {
...@@ -163,10 +165,10 @@ void OutputThread::createMS() ...@@ -163,10 +165,10 @@ void OutputThread::createMS()
THROW(StorageException, "HDF5 not supported for this data type"); THROW(StorageException, "HDF5 not supported for this data type");
} }
} else { } else {
itsWriter = new MSWriterFile(path.c_str()); itsWriter = new MSWriterFile(path.c_str(), fastWrite);
} }
#else #else
itsWriter = new MSWriterFile(path.c_str()); itsWriter = new MSWriterFile(path.c_str(), fastWrite);
#endif #endif
} catch (SystemCallException &ex) { } catch (SystemCallException &ex) {
LOG_ERROR_STR(itsLogPrefix << "Cannot open " << path << ": " << ex); LOG_ERROR_STR(itsLogPrefix << "Cannot open " << path << ": " << ex);
...@@ -214,7 +216,7 @@ void OutputThread::checkForDroppedData(StreamableData *data) ...@@ -214,7 +216,7 @@ void OutputThread::checkForDroppedData(StreamableData *data)
} }
static Semaphore writeSemaphore(3); static Semaphore writeSemaphore(300);
void OutputThread::doWork() void OutputThread::doWork()
......
...@@ -20,9 +20,8 @@ ...@@ -20,9 +20,8 @@
//# //#
//# $Id: $ //# $Id: $
#include <lofar_config.h> //#if 1 && defined HAVE_DAL && defined HAVE_HDF5
#if 1
#if 1 && defined HAVE_DAL && defined HAVE_HDF5
#define FILENAME "test.h5" #define FILENAME "test.h5"
#define SAMPLES 3056 #define SAMPLES 3056
...@@ -46,40 +45,37 @@ using namespace std; ...@@ -46,40 +45,37 @@ using namespace std;
using namespace boost; using namespace boost;
using boost::format; using boost::format;
#endif
int main() int main()
{ {
#if 1 && defined HAVE_DAL && defined HAVE_HDF5
const char * const filename = FILENAME; const char * const filename = FILENAME;
const unsigned nrSamples = SAMPLES; const unsigned nrSamples = SAMPLES;
const unsigned nrChannels = SUBBANDS * CHANNELS; const unsigned nrChannels = SUBBANDS * CHANNELS;
{ {
cout << "Creating file " << filename << endl; CommonAttributes ca;
BF_RootGroup rootGroup( filename ); Filename fn( "12345", "test", Filename::bf, Filename::h5, "");
ca.setFilename( fn );
ca.setTelescope( "LOFAR" );
cout << "Creating file " << endl;
BF_RootGroup rootGroup( fn );
cout << "Creating primary pointing 0" << endl; cout << "Creating primary pointing 0" << endl;
rootGroup.openPrimaryPointing( 0, true ); rootGroup.openSubArrayPointing( 0 );
BF_SubArrayPointing sap = rootGroup.primaryPointing( 0 ); BF_SubArrayPointing sap = rootGroup.primaryPointing( 0 );
cout << "Creating tied-array beam 0" << endl; cout << "Creating tied-array beam 0" << endl;
sap.openBeam( 0, true ); sap.openBeam( 0 );
cout << "Closing file" << endl; BF_BeamGroup bg = sap.getBeamGroup( 0 );
}
{ cout << "Creating stokes group 0" << endl;
cout << "Reopening file " << filename << endl;
hid_t fileID = H5Fcreate( filename, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT ); bg.createStokesDataset( 0, nrSamples, SUBBANDS, CHANNELS, Stokes::I );
cout << "Creating stokes set 0" << endl; BF_StokesDataset stokesDataset = bg.getStokesDataset( 0 );
BF_StokesDataset stokesDataset(
fileID, 0,
nrSamples, SUBBANDS, CHANNELS,
Stokes::I );
cout << "Creating sample multiarray of " << (SAMPLES|2) << " x " << SUBBANDS << " x " << CHANNELS << endl; cout << "Creating sample multiarray of " << (SAMPLES|2) << " x " << SUBBANDS << " x " << CHANNELS << endl;
typedef multi_array<float,3> array; typedef multi_array<float,3> array;
...@@ -105,7 +101,11 @@ int main() ...@@ -105,7 +101,11 @@ int main()
stokesDataset.writeData( samples.origin(), start, block ); stokesDataset.writeData( samples.origin(), start, block );
} }
} }
#endif
return 0; return 0;
} }
#else
int main() {
return 0;
}
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment