diff --git a/.gitattributes b/.gitattributes index fa3a89a0d308b88ea914c1fd4adf012c54b4dffe..3246361bc7c51100e5c74a4359d94ffb711b92ce 100644 --- a/.gitattributes +++ b/.gitattributes @@ -908,8 +908,10 @@ RTCP/Run/bootstrap -text RTCP/Run/src/getStats -text RTCP/Storage/bootstrap -text RTCP/Storage/include/Storage/MSWriterCasa.h -text +RTCP/Storage/include/Storage/MSWriterFile.h -text RTCP/Storage/include/Storage/MSWriterNull.h -text RTCP/Storage/src/MSWriterCasa.cc -text +RTCP/Storage/src/MSWriterFile.cc -text RTCP/Storage/src/MSWriterNull.cc -text RTCP/Storage/src/Storage.machinefile -text SAS/OTB/MoM-OTDB-adapter/.cvsignore -text diff --git a/RTCP/Interface/include/Interface/StokesData.h b/RTCP/Interface/include/Interface/StokesData.h index cc891c60fad1510b7d837b8ebdec93cf7559a9d9..4850451f29b7c4c6f76947fe8aab3c6e02ff29f9 100644 --- a/RTCP/Interface/include/Interface/StokesData.h +++ b/RTCP/Interface/include/Interface/StokesData.h @@ -65,7 +65,8 @@ inline void StokesData::readData(Stream *str) inline void StokesData::writeData(Stream *str) const { #if !defined WORDS_BIGENDIAN - THROW(AssertError, "not implemented: think about endianness"); + std::clog << "Warning: writing data in little endian." << std::endl; + //THROW(AssertError, "not implemented: think about endianness"); #endif str->write(samples.origin(), samples.num_elements() * sizeof(float)); diff --git a/RTCP/Storage/include/Storage/MSWriter.h b/RTCP/Storage/include/Storage/MSWriter.h index 6a7809f429919877d181ff45328d9f51a6ce744b..b5217ccd87fabf8483d61d1ac117f5f13398c190 100644 --- a/RTCP/Storage/include/Storage/MSWriter.h +++ b/RTCP/Storage/include/Storage/MSWriter.h @@ -28,7 +28,7 @@ #include <Common/LofarTypes.h> #include <Common/lofar_vector.h> -#include <Interface/CorrelatedData.h> +#include <Interface/StreamableData.h> namespace LOFAR { @@ -43,7 +43,7 @@ namespace LOFAR virtual int addBand(int, int, double, double); virtual int addBand(int, int, double, const double*, const double*); virtual void addField(double, double, unsigned); - virtual void write(int, int, int, CorrelatedData*); + virtual void write(int, int, int, StreamableData*); private: diff --git a/RTCP/Storage/include/Storage/MSWriterCasa.h b/RTCP/Storage/include/Storage/MSWriterCasa.h index cc9a4d6d727892ccc9770080d28cd295c4f56fb4..fc11899b1ed716e47f7dd3357b4ea18d4e748334 100644 --- a/RTCP/Storage/include/Storage/MSWriterCasa.h +++ b/RTCP/Storage/include/Storage/MSWriterCasa.h @@ -93,7 +93,7 @@ namespace LOFAR // The flag array is optional. If not given, all flags are False. // All data will be written with sigma=0 and weight=1. void write (int bandId, int channelId, int nrChannels, - CorrelatedData *correlatedData); + StreamableData *data); // Get the number of antennas. int nrAntennas() const diff --git a/RTCP/Storage/include/Storage/MSWriterFile.h b/RTCP/Storage/include/Storage/MSWriterFile.h new file mode 100644 index 0000000000000000000000000000000000000000..9d95de2d7d4caabf78f30df9fc9d696154ae5259 --- /dev/null +++ b/RTCP/Storage/include/Storage/MSWriterFile.h @@ -0,0 +1,89 @@ +// MSMriterNull.h: null implementation of MSWriter +// +// Copyright (C) 2001 +// ASTRON (Netherlands Foundation for Research in Astronomy) +// P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// $Id: MSWriterImpl.h 11891 2008-10-14 13:43:51Z gels $ +// +////////////////////////////////////////////////////////////////////// + + +#ifndef LOFAR_STORAGE_MSWRITERFILE_H +#define LOFAR_STORAGE_MSWRITERFILE_H + + +//# Includes +#include <Common/LofarTypes.h> +#include <Common/lofar_vector.h> + +#include <Storage/MSWriter.h> + +#include <Stream/FileStream.h> + +//# Forward declarations + +namespace LOFAR +{ + + namespace RTCP + { + class MSWriterFile : public MSWriter + { + public: + MSWriterFile(const char* msName, double startTime, double timeStep, + int nfreq, int ncorr, int nantennas, const vector<double>& antPos, + const vector<std::string>& storageStationNames, float weightFactor); + ~MSWriterFile(); + + int addBand(int, int, double, double); + int addBand(int, int, double, const double*, const double*); + void addField(double, double, unsigned); + void write(int, int, int, StreamableData *data); + + + inline int nrAntennas() const + { return itsNrAnt; } + + inline int nrBands() const + { return itsNrBand; } + + inline int nrFields() const + { return itsNrField; } + + inline int nrPolarizations() const + { return itsNrPol; } + + inline int nrTimes() const + { return itsNrTimes; } + + + private: + int itsNrBand; + int itsNrField; + int itsNrAnt; + int itsNrFreq; + int itsNrCorr; + int itsNrTimes; + int itsNrPol; + int itsNrChan; + FileStream itsFile; + }; + } +} + +#endif diff --git a/RTCP/Storage/include/Storage/MSWriterNull.h b/RTCP/Storage/include/Storage/MSWriterNull.h index aa5cd5929407d14441062878d3cab81bb4fcf35f..f051c14b2ad6eca87fcbc65ef743f7bae28bf399 100644 --- a/RTCP/Storage/include/Storage/MSWriterNull.h +++ b/RTCP/Storage/include/Storage/MSWriterNull.h @@ -51,7 +51,7 @@ namespace LOFAR int addBand(int, int, double, double); int addBand(int, int, double, const double*, const double*); void addField(double, double, unsigned); - void write(int, int, int, CorrelatedData*); + void write(int, int, int, StreamableData*); inline int nrAntennas() const diff --git a/RTCP/Storage/include/Storage/Makefile.am b/RTCP/Storage/include/Storage/Makefile.am index 53a7e891d01e99b249fbbc359b4d3eaa84b7d6a3..adb044d5f451876f8beda61af2550d3a53d836b8 100644 --- a/RTCP/Storage/include/Storage/Makefile.am +++ b/RTCP/Storage/include/Storage/Makefile.am @@ -5,6 +5,7 @@ noinst_HEADERS = \ SubbandWriter.h \ MSWriter.h \ MSWriterCasa.h \ - MSWriterNull.h + MSWriterNull.h \ + MSWriterFile.h include $(top_srcdir)/Makefile.common diff --git a/RTCP/Storage/src/MSWriter.cc b/RTCP/Storage/src/MSWriter.cc index 3314e92cfbdb469402f4d8f8143969078bb53ddd..aafbf1efc4bcba5805049e2d502bd5f2c6d4549e 100644 --- a/RTCP/Storage/src/MSWriter.cc +++ b/RTCP/Storage/src/MSWriter.cc @@ -49,7 +49,7 @@ namespace LOFAR void MSWriter::addField(double, double, unsigned) {} - void MSWriter::write(int, int, int, CorrelatedData*) + void MSWriter::write(int, int, int, StreamableData*) {} } diff --git a/RTCP/Storage/src/MSWriterCasa.cc b/RTCP/Storage/src/MSWriterCasa.cc index 9debdffd8dabd668509f15f306f029c76900c2aa..1e4360a7a5f5c326ce9cde88cdc37d914a0a05d0 100644 --- a/RTCP/Storage/src/MSWriterCasa.cc +++ b/RTCP/Storage/src/MSWriterCasa.cc @@ -32,6 +32,7 @@ #include <Common/Timer.h> #include <Storage/MSWriterCasa.h> +#include <Interface/CorrelatedData.h> #include <ms/MeasurementSets.h> #include <tables/Tables/IncrementalStMan.h> @@ -629,12 +630,13 @@ namespace LOFAR } void MSWriterCasa::write (int bandId, int channelId, int nrChannels, - CorrelatedData *correlatedData) + StreamableData *data) { - const fcomplex* data = correlatedData->visibilities.origin(); + CorrelatedData *correlatedData = dynamic_cast<CorrelatedData*>(data); + const fcomplex* visibilityData = correlatedData->visibilities.origin(); ASSERT(bandId >= 0 && bandId < itsNrBand); - ASSERT(data != 0); + ASSERT(visibilityData != 0); //std::cout << "write" << std::endl; @@ -746,7 +748,7 @@ namespace LOFAR // Write all polarisations and nrChannels for each baseline. // The input data array has shape nrant,nrant,nchan(subs),npol. // So we can form an AIPS++ array for each baseline. - Array<Complex> dataArray(dShape, (Complex*)data, SHARE); + Array<Complex> dataArray(dShape, (Complex*)visibilityData, SHARE); IPosition start(2, 0, channelId); IPosition leng(2, shape[0], nrChannels); dataArray.apply(std::conj); // Temporary fix, necessary to prevent flipping of the sky, since @@ -766,7 +768,7 @@ namespace LOFAR IPosition leng(2, shape[0], nrChannels); // Length: ncorr, nchan itsMSCol->flag().putSlice(rowNumber, Slicer(start, leng), flagArray); } - data += nrel*nrChannels; // Go to next baseline data + visibilityData += nrel*nrChannels; // Go to next baseline data if (flags != 0) { flags += nrel*nrChannels; } diff --git a/RTCP/Storage/src/MSWriterFile.cc b/RTCP/Storage/src/MSWriterFile.cc new file mode 100644 index 0000000000000000000000000000000000000000..babd1646e77c5139dc74760b4319fb19b21d0cdb --- /dev/null +++ b/RTCP/Storage/src/MSWriterFile.cc @@ -0,0 +1,85 @@ +//# MSWriterNull: a null MSWriter +//# +//# Copyright (C) 2001 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id: $ + +#include <lofar_config.h> + +#include <AMCBase/Epoch.h> +#include <Common/LofarLogger.h> +#include <Storage/MSWriter.h> +#include <Storage/MSWriterFile.h> + +#if defined HAVE_MPI +#include <mpi.h> +#endif + + +namespace LOFAR +{ + + namespace RTCP + { + + MSWriterFile::MSWriterFile (const char*msName, double , double , + int nfreq, int ncorr, int nantennas, const vector<double>& , + const vector<string>&, float) + : itsNrBand (0), + itsNrField (0), + itsNrAnt (nantennas), + itsNrFreq (nfreq), + itsNrCorr (ncorr), + itsNrTimes (0), + itsNrPol (0), + itsNrChan (0), + itsFile (msName,00666) + { + } + + MSWriterFile::~MSWriterFile() + { + } + + int MSWriterFile::addBand(int, int, double, double) + { + itsNrBand++; + return itsNrBand; + } + + int MSWriterFile::addBand(int, int, double, const double*, const double*) + { + itsNrBand++; + return itsNrBand; + } + + void MSWriterFile::addField(double, double, unsigned) + { + itsNrField++; + } + + void MSWriterFile::write(int, int, int, StreamableData *data) + { + data->write( &itsFile, true ); + } + + + } // namespace RTCP +} // namespace LOFAR + diff --git a/RTCP/Storage/src/MSWriterNull.cc b/RTCP/Storage/src/MSWriterNull.cc index e62cf280e855c5a2a2253442311b290afe6d9fbe..ecd74f738e333a731cca1222ea99d4bbf5d39384 100644 --- a/RTCP/Storage/src/MSWriterNull.cc +++ b/RTCP/Storage/src/MSWriterNull.cc @@ -74,7 +74,7 @@ namespace LOFAR itsNrField++; } - void MSWriterNull::write(int, int, int, CorrelatedData*) + void MSWriterNull::write(int, int, int, StreamableData*) { //nothing } diff --git a/RTCP/Storage/src/Makefile.am b/RTCP/Storage/src/Makefile.am index 6bda101cc005a7935b2f2e40bcd057628bfc5f60..1d781bbd4237b2a201aef48dc0d40f6ed606484c 100644 --- a/RTCP/Storage/src/Makefile.am +++ b/RTCP/Storage/src/Makefile.am @@ -5,7 +5,8 @@ libstorage_la_SOURCES = Package__Version.cc \ SubbandWriter.cc \ MSWriter.cc \ MSWriterCasa.cc \ - MSWriterNull.cc + MSWriterNull.cc \ + MSWriterFile.cc bin_PROGRAMS = versionstorage Storage diff --git a/RTCP/Storage/src/SubbandWriter.cc b/RTCP/Storage/src/SubbandWriter.cc index 03350c8e79fae4af938a9cdb8ffafb64fc18a92b..3197b5ae4e907314cc71c83eac92c6997aaf2f8d 100644 --- a/RTCP/Storage/src/SubbandWriter.cc +++ b/RTCP/Storage/src/SubbandWriter.cc @@ -29,6 +29,7 @@ #include <Storage/MSWriter.h> #include <Storage/MSWriterNull.h> #include <Storage/MSWriterCasa.h> +#include <Storage/MSWriterFile.h> #include <Stream/FileStream.h> #include <Stream/NullStream.h> #include <Stream/SocketStream.h> @@ -168,11 +169,21 @@ void SubbandWriter::preprocess() unsigned currentSubband = itsRank * itsNrSubbandsPerStorage + i; #if 1 - itsWriters[i] = new MSWriterCasa( - itsPS->getMSname(currentSubband).c_str(), - startTime, itsPS->IONintegrationTime(), itsNChannels, - itsNPolSquared, itsNStations, antPos, - stationNames, itsWeightFactor); + if( itsPS->mode().outputDataType() == CN_Mode::CORRELATEDDATA ) { + // use CasaCore for CorrelatedData + itsWriters[i] = new MSWriterCasa( + itsPS->getMSname(currentSubband).c_str(), + startTime, itsPS->IONintegrationTime(), itsNChannels, + itsNPolSquared, itsNStations, antPos, + stationNames, itsWeightFactor); + } else { + // write to disk otherwise + itsWriters[i] = new MSWriterFile( + itsPS->getMSname(currentSubband).c_str(), + startTime, itsPS->IONintegrationTime(), itsNChannels, + itsNPolSquared, itsNStations, antPos, + stationNames, itsWeightFactor); + } #else itsWriters[i] = new MSWriterNull( itsPS->getMSname(currentSubband).c_str(), @@ -259,13 +270,9 @@ bool SubbandWriter::processSubband(unsigned sb) checkForDroppedData(data, sb); #if defined HAVE_AIPSPP - if( itsPS->mode().outputDataType() == CN_Mode::CORRELATEDDATA ) { - itsWriteTimer.start(); - itsWriters[sb]->write(itsBandIDs[sb], 0, itsNChannels, static_cast<CorrelatedData*>(data)); - itsWriteTimer.stop(); - } else { - std::clog << "Warning: no support for writing non-CorrelatedData. dropping." << std::endl; - } + itsWriteTimer.start(); + itsWriters[sb]->write(itsBandIDs[sb], 0, itsNChannels, data); + itsWriteTimer.stop(); #endif itsInputThreads[sb]->itsFreeQueue.append(data);