Skip to content
Snippets Groups Projects
Commit 244705f3 authored by Jan David Mol's avatar Jan David Mol
Browse files

Bug 1303: Allow Storage to write any data to file.

parent c8f24360
No related branches found
No related tags found
No related merge requests found
......@@ -908,8 +908,10 @@ RTCP/Run/bootstrap -text
RTCP/Run/src/getStats -text
RTCP/Storage/bootstrap -text
RTCP/Storage/include/Storage/MSWriterCasa.h -text
RTCP/Storage/include/Storage/MSWriterFile.h -text
RTCP/Storage/include/Storage/MSWriterNull.h -text
RTCP/Storage/src/MSWriterCasa.cc -text
RTCP/Storage/src/MSWriterFile.cc -text
RTCP/Storage/src/MSWriterNull.cc -text
RTCP/Storage/src/Storage.machinefile -text
SAS/OTB/MoM-OTDB-adapter/.cvsignore -text
......
......@@ -65,7 +65,8 @@ inline void StokesData::readData(Stream *str)
inline void StokesData::writeData(Stream *str) const
{
#if !defined WORDS_BIGENDIAN
THROW(AssertError, "not implemented: think about endianness");
std::clog << "Warning: writing data in little endian." << std::endl;
//THROW(AssertError, "not implemented: think about endianness");
#endif
str->write(samples.origin(), samples.num_elements() * sizeof(float));
......
......@@ -28,7 +28,7 @@
#include <Common/LofarTypes.h>
#include <Common/lofar_vector.h>
#include <Interface/CorrelatedData.h>
#include <Interface/StreamableData.h>
namespace LOFAR
{
......@@ -43,7 +43,7 @@ namespace LOFAR
virtual int addBand(int, int, double, double);
virtual int addBand(int, int, double, const double*, const double*);
virtual void addField(double, double, unsigned);
virtual void write(int, int, int, CorrelatedData*);
virtual void write(int, int, int, StreamableData*);
private:
......
......@@ -93,7 +93,7 @@ namespace LOFAR
// The flag array is optional. If not given, all flags are False.
// All data will be written with sigma=0 and weight=1.
void write (int bandId, int channelId, int nrChannels,
CorrelatedData *correlatedData);
StreamableData *data);
// Get the number of antennas.
int nrAntennas() const
......
// MSMriterNull.h: null implementation of MSWriter
//
// Copyright (C) 2001
// ASTRON (Netherlands Foundation for Research in Astronomy)
// P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
// $Id: MSWriterImpl.h 11891 2008-10-14 13:43:51Z gels $
//
//////////////////////////////////////////////////////////////////////
#ifndef LOFAR_STORAGE_MSWRITERFILE_H
#define LOFAR_STORAGE_MSWRITERFILE_H
//# Includes
#include <Common/LofarTypes.h>
#include <Common/lofar_vector.h>
#include <Storage/MSWriter.h>
#include <Stream/FileStream.h>
//# Forward declarations
namespace LOFAR
{
namespace RTCP
{
class MSWriterFile : public MSWriter
{
public:
MSWriterFile(const char* msName, double startTime, double timeStep,
int nfreq, int ncorr, int nantennas, const vector<double>& antPos,
const vector<std::string>& storageStationNames, float weightFactor);
~MSWriterFile();
int addBand(int, int, double, double);
int addBand(int, int, double, const double*, const double*);
void addField(double, double, unsigned);
void write(int, int, int, StreamableData *data);
inline int nrAntennas() const
{ return itsNrAnt; }
inline int nrBands() const
{ return itsNrBand; }
inline int nrFields() const
{ return itsNrField; }
inline int nrPolarizations() const
{ return itsNrPol; }
inline int nrTimes() const
{ return itsNrTimes; }
private:
int itsNrBand;
int itsNrField;
int itsNrAnt;
int itsNrFreq;
int itsNrCorr;
int itsNrTimes;
int itsNrPol;
int itsNrChan;
FileStream itsFile;
};
}
}
#endif
......@@ -51,7 +51,7 @@ namespace LOFAR
int addBand(int, int, double, double);
int addBand(int, int, double, const double*, const double*);
void addField(double, double, unsigned);
void write(int, int, int, CorrelatedData*);
void write(int, int, int, StreamableData*);
inline int nrAntennas() const
......
......@@ -5,6 +5,7 @@ noinst_HEADERS = \
SubbandWriter.h \
MSWriter.h \
MSWriterCasa.h \
MSWriterNull.h
MSWriterNull.h \
MSWriterFile.h
include $(top_srcdir)/Makefile.common
......@@ -49,7 +49,7 @@ namespace LOFAR
void MSWriter::addField(double, double, unsigned)
{}
void MSWriter::write(int, int, int, CorrelatedData*)
void MSWriter::write(int, int, int, StreamableData*)
{}
}
......
......@@ -32,6 +32,7 @@
#include <Common/Timer.h>
#include <Storage/MSWriterCasa.h>
#include <Interface/CorrelatedData.h>
#include <ms/MeasurementSets.h>
#include <tables/Tables/IncrementalStMan.h>
......@@ -629,12 +630,13 @@ namespace LOFAR
}
void MSWriterCasa::write (int bandId, int channelId, int nrChannels,
CorrelatedData *correlatedData)
StreamableData *data)
{
const fcomplex* data = correlatedData->visibilities.origin();
CorrelatedData *correlatedData = dynamic_cast<CorrelatedData*>(data);
const fcomplex* visibilityData = correlatedData->visibilities.origin();
ASSERT(bandId >= 0 && bandId < itsNrBand);
ASSERT(data != 0);
ASSERT(visibilityData != 0);
//std::cout << "write" << std::endl;
......@@ -746,7 +748,7 @@ namespace LOFAR
// Write all polarisations and nrChannels for each baseline.
// The input data array has shape nrant,nrant,nchan(subs),npol.
// So we can form an AIPS++ array for each baseline.
Array<Complex> dataArray(dShape, (Complex*)data, SHARE);
Array<Complex> dataArray(dShape, (Complex*)visibilityData, SHARE);
IPosition start(2, 0, channelId);
IPosition leng(2, shape[0], nrChannels);
dataArray.apply(std::conj); // Temporary fix, necessary to prevent flipping of the sky, since
......@@ -766,7 +768,7 @@ namespace LOFAR
IPosition leng(2, shape[0], nrChannels); // Length: ncorr, nchan
itsMSCol->flag().putSlice(rowNumber, Slicer(start, leng), flagArray);
}
data += nrel*nrChannels; // Go to next baseline data
visibilityData += nrel*nrChannels; // Go to next baseline data
if (flags != 0) {
flags += nrel*nrChannels;
}
......
//# MSWriterNull: a null MSWriter
//#
//# Copyright (C) 2001
//# ASTRON (Netherlands Foundation for Research in Astronomy)
//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl
//#
//# This program is free software; you can redistribute it and/or modify
//# it under the terms of the GNU General Public License as published by
//# the Free Software Foundation; either version 2 of the License, or
//# (at your option) any later version.
//#
//# This program is distributed in the hope that it will be useful,
//# but WITHOUT ANY WARRANTY; without even the implied warranty of
//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//# GNU General Public License for more details.
//#
//# You should have received a copy of the GNU General Public License
//# along with this program; if not, write to the Free Software
//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//#
//# $Id: $
#include <lofar_config.h>
#include <AMCBase/Epoch.h>
#include <Common/LofarLogger.h>
#include <Storage/MSWriter.h>
#include <Storage/MSWriterFile.h>
#if defined HAVE_MPI
#include <mpi.h>
#endif
namespace LOFAR
{
namespace RTCP
{
MSWriterFile::MSWriterFile (const char*msName, double , double ,
int nfreq, int ncorr, int nantennas, const vector<double>& ,
const vector<string>&, float)
: itsNrBand (0),
itsNrField (0),
itsNrAnt (nantennas),
itsNrFreq (nfreq),
itsNrCorr (ncorr),
itsNrTimes (0),
itsNrPol (0),
itsNrChan (0),
itsFile (msName,00666)
{
}
MSWriterFile::~MSWriterFile()
{
}
int MSWriterFile::addBand(int, int, double, double)
{
itsNrBand++;
return itsNrBand;
}
int MSWriterFile::addBand(int, int, double, const double*, const double*)
{
itsNrBand++;
return itsNrBand;
}
void MSWriterFile::addField(double, double, unsigned)
{
itsNrField++;
}
void MSWriterFile::write(int, int, int, StreamableData *data)
{
data->write( &itsFile, true );
}
} // namespace RTCP
} // namespace LOFAR
......@@ -74,7 +74,7 @@ namespace LOFAR
itsNrField++;
}
void MSWriterNull::write(int, int, int, CorrelatedData*)
void MSWriterNull::write(int, int, int, StreamableData*)
{
//nothing
}
......
......@@ -5,7 +5,8 @@ libstorage_la_SOURCES = Package__Version.cc \
SubbandWriter.cc \
MSWriter.cc \
MSWriterCasa.cc \
MSWriterNull.cc
MSWriterNull.cc \
MSWriterFile.cc
bin_PROGRAMS = versionstorage Storage
......
......@@ -29,6 +29,7 @@
#include <Storage/MSWriter.h>
#include <Storage/MSWriterNull.h>
#include <Storage/MSWriterCasa.h>
#include <Storage/MSWriterFile.h>
#include <Stream/FileStream.h>
#include <Stream/NullStream.h>
#include <Stream/SocketStream.h>
......@@ -168,11 +169,21 @@ void SubbandWriter::preprocess()
unsigned currentSubband = itsRank * itsNrSubbandsPerStorage + i;
#if 1
if( itsPS->mode().outputDataType() == CN_Mode::CORRELATEDDATA ) {
// use CasaCore for CorrelatedData
itsWriters[i] = new MSWriterCasa(
itsPS->getMSname(currentSubband).c_str(),
startTime, itsPS->IONintegrationTime(), itsNChannels,
itsNPolSquared, itsNStations, antPos,
stationNames, itsWeightFactor);
} else {
// write to disk otherwise
itsWriters[i] = new MSWriterFile(
itsPS->getMSname(currentSubband).c_str(),
startTime, itsPS->IONintegrationTime(), itsNChannels,
itsNPolSquared, itsNStations, antPos,
stationNames, itsWeightFactor);
}
#else
itsWriters[i] = new MSWriterNull(
itsPS->getMSname(currentSubband).c_str(),
......@@ -259,13 +270,9 @@ bool SubbandWriter::processSubband(unsigned sb)
checkForDroppedData(data, sb);
#if defined HAVE_AIPSPP
if( itsPS->mode().outputDataType() == CN_Mode::CORRELATEDDATA ) {
itsWriteTimer.start();
itsWriters[sb]->write(itsBandIDs[sb], 0, itsNChannels, static_cast<CorrelatedData*>(data));
itsWriters[sb]->write(itsBandIDs[sb], 0, itsNChannels, data);
itsWriteTimer.stop();
} else {
std::clog << "Warning: no support for writing non-CorrelatedData. dropping." << std::endl;
}
#endif
itsInputThreads[sb]->itsFreeQueue.append(data);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment