From 244705f31c8536d89bd4a71f46da05ce7cea80c7 Mon Sep 17 00:00:00 2001 From: Jan David Mol <mol@astron.nl> Date: Fri, 9 Jan 2009 11:12:21 +0000 Subject: [PATCH] Bug 1303: Allow Storage to write any data to file. --- .gitattributes | 2 + RTCP/Interface/include/Interface/StokesData.h | 3 +- RTCP/Storage/include/Storage/MSWriter.h | 4 +- RTCP/Storage/include/Storage/MSWriterCasa.h | 2 +- RTCP/Storage/include/Storage/MSWriterFile.h | 89 +++++++++++++++++++ RTCP/Storage/include/Storage/MSWriterNull.h | 2 +- RTCP/Storage/include/Storage/Makefile.am | 3 +- RTCP/Storage/src/MSWriter.cc | 2 +- RTCP/Storage/src/MSWriterCasa.cc | 12 +-- RTCP/Storage/src/MSWriterFile.cc | 85 ++++++++++++++++++ RTCP/Storage/src/MSWriterNull.cc | 2 +- RTCP/Storage/src/Makefile.am | 3 +- RTCP/Storage/src/SubbandWriter.cc | 31 ++++--- 13 files changed, 214 insertions(+), 26 deletions(-) create mode 100644 RTCP/Storage/include/Storage/MSWriterFile.h create mode 100644 RTCP/Storage/src/MSWriterFile.cc diff --git a/.gitattributes b/.gitattributes index fa3a89a0d30..3246361bc7c 100644 --- a/.gitattributes +++ b/.gitattributes @@ -908,8 +908,10 @@ RTCP/Run/bootstrap -text RTCP/Run/src/getStats -text RTCP/Storage/bootstrap -text RTCP/Storage/include/Storage/MSWriterCasa.h -text +RTCP/Storage/include/Storage/MSWriterFile.h -text RTCP/Storage/include/Storage/MSWriterNull.h -text RTCP/Storage/src/MSWriterCasa.cc -text +RTCP/Storage/src/MSWriterFile.cc -text RTCP/Storage/src/MSWriterNull.cc -text RTCP/Storage/src/Storage.machinefile -text SAS/OTB/MoM-OTDB-adapter/.cvsignore -text diff --git a/RTCP/Interface/include/Interface/StokesData.h b/RTCP/Interface/include/Interface/StokesData.h index cc891c60fad..4850451f29b 100644 --- a/RTCP/Interface/include/Interface/StokesData.h +++ b/RTCP/Interface/include/Interface/StokesData.h @@ -65,7 +65,8 @@ inline void StokesData::readData(Stream *str) inline void StokesData::writeData(Stream *str) const { #if !defined WORDS_BIGENDIAN - THROW(AssertError, "not implemented: think about endianness"); + std::clog << "Warning: writing data in little endian." << std::endl; + //THROW(AssertError, "not implemented: think about endianness"); #endif str->write(samples.origin(), samples.num_elements() * sizeof(float)); diff --git a/RTCP/Storage/include/Storage/MSWriter.h b/RTCP/Storage/include/Storage/MSWriter.h index 6a7809f4299..b5217ccd87f 100644 --- a/RTCP/Storage/include/Storage/MSWriter.h +++ b/RTCP/Storage/include/Storage/MSWriter.h @@ -28,7 +28,7 @@ #include <Common/LofarTypes.h> #include <Common/lofar_vector.h> -#include <Interface/CorrelatedData.h> +#include <Interface/StreamableData.h> namespace LOFAR { @@ -43,7 +43,7 @@ namespace LOFAR virtual int addBand(int, int, double, double); virtual int addBand(int, int, double, const double*, const double*); virtual void addField(double, double, unsigned); - virtual void write(int, int, int, CorrelatedData*); + virtual void write(int, int, int, StreamableData*); private: diff --git a/RTCP/Storage/include/Storage/MSWriterCasa.h b/RTCP/Storage/include/Storage/MSWriterCasa.h index cc9a4d6d727..fc11899b1ed 100644 --- a/RTCP/Storage/include/Storage/MSWriterCasa.h +++ b/RTCP/Storage/include/Storage/MSWriterCasa.h @@ -93,7 +93,7 @@ namespace LOFAR // The flag array is optional. If not given, all flags are False. // All data will be written with sigma=0 and weight=1. void write (int bandId, int channelId, int nrChannels, - CorrelatedData *correlatedData); + StreamableData *data); // Get the number of antennas. int nrAntennas() const diff --git a/RTCP/Storage/include/Storage/MSWriterFile.h b/RTCP/Storage/include/Storage/MSWriterFile.h new file mode 100644 index 00000000000..9d95de2d7d4 --- /dev/null +++ b/RTCP/Storage/include/Storage/MSWriterFile.h @@ -0,0 +1,89 @@ +// MSMriterNull.h: null implementation of MSWriter +// +// Copyright (C) 2001 +// ASTRON (Netherlands Foundation for Research in Astronomy) +// P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// $Id: MSWriterImpl.h 11891 2008-10-14 13:43:51Z gels $ +// +////////////////////////////////////////////////////////////////////// + + +#ifndef LOFAR_STORAGE_MSWRITERFILE_H +#define LOFAR_STORAGE_MSWRITERFILE_H + + +//# Includes +#include <Common/LofarTypes.h> +#include <Common/lofar_vector.h> + +#include <Storage/MSWriter.h> + +#include <Stream/FileStream.h> + +//# Forward declarations + +namespace LOFAR +{ + + namespace RTCP + { + class MSWriterFile : public MSWriter + { + public: + MSWriterFile(const char* msName, double startTime, double timeStep, + int nfreq, int ncorr, int nantennas, const vector<double>& antPos, + const vector<std::string>& storageStationNames, float weightFactor); + ~MSWriterFile(); + + int addBand(int, int, double, double); + int addBand(int, int, double, const double*, const double*); + void addField(double, double, unsigned); + void write(int, int, int, StreamableData *data); + + + inline int nrAntennas() const + { return itsNrAnt; } + + inline int nrBands() const + { return itsNrBand; } + + inline int nrFields() const + { return itsNrField; } + + inline int nrPolarizations() const + { return itsNrPol; } + + inline int nrTimes() const + { return itsNrTimes; } + + + private: + int itsNrBand; + int itsNrField; + int itsNrAnt; + int itsNrFreq; + int itsNrCorr; + int itsNrTimes; + int itsNrPol; + int itsNrChan; + FileStream itsFile; + }; + } +} + +#endif diff --git a/RTCP/Storage/include/Storage/MSWriterNull.h b/RTCP/Storage/include/Storage/MSWriterNull.h index aa5cd592940..f051c14b2ad 100644 --- a/RTCP/Storage/include/Storage/MSWriterNull.h +++ b/RTCP/Storage/include/Storage/MSWriterNull.h @@ -51,7 +51,7 @@ namespace LOFAR int addBand(int, int, double, double); int addBand(int, int, double, const double*, const double*); void addField(double, double, unsigned); - void write(int, int, int, CorrelatedData*); + void write(int, int, int, StreamableData*); inline int nrAntennas() const diff --git a/RTCP/Storage/include/Storage/Makefile.am b/RTCP/Storage/include/Storage/Makefile.am index 53a7e891d01..adb044d5f45 100644 --- a/RTCP/Storage/include/Storage/Makefile.am +++ b/RTCP/Storage/include/Storage/Makefile.am @@ -5,6 +5,7 @@ noinst_HEADERS = \ SubbandWriter.h \ MSWriter.h \ MSWriterCasa.h \ - MSWriterNull.h + MSWriterNull.h \ + MSWriterFile.h include $(top_srcdir)/Makefile.common diff --git a/RTCP/Storage/src/MSWriter.cc b/RTCP/Storage/src/MSWriter.cc index 3314e92cfbd..aafbf1efc4b 100644 --- a/RTCP/Storage/src/MSWriter.cc +++ b/RTCP/Storage/src/MSWriter.cc @@ -49,7 +49,7 @@ namespace LOFAR void MSWriter::addField(double, double, unsigned) {} - void MSWriter::write(int, int, int, CorrelatedData*) + void MSWriter::write(int, int, int, StreamableData*) {} } diff --git a/RTCP/Storage/src/MSWriterCasa.cc b/RTCP/Storage/src/MSWriterCasa.cc index 9debdffd8da..1e4360a7a5f 100644 --- a/RTCP/Storage/src/MSWriterCasa.cc +++ b/RTCP/Storage/src/MSWriterCasa.cc @@ -32,6 +32,7 @@ #include <Common/Timer.h> #include <Storage/MSWriterCasa.h> +#include <Interface/CorrelatedData.h> #include <ms/MeasurementSets.h> #include <tables/Tables/IncrementalStMan.h> @@ -629,12 +630,13 @@ namespace LOFAR } void MSWriterCasa::write (int bandId, int channelId, int nrChannels, - CorrelatedData *correlatedData) + StreamableData *data) { - const fcomplex* data = correlatedData->visibilities.origin(); + CorrelatedData *correlatedData = dynamic_cast<CorrelatedData*>(data); + const fcomplex* visibilityData = correlatedData->visibilities.origin(); ASSERT(bandId >= 0 && bandId < itsNrBand); - ASSERT(data != 0); + ASSERT(visibilityData != 0); //std::cout << "write" << std::endl; @@ -746,7 +748,7 @@ namespace LOFAR // Write all polarisations and nrChannels for each baseline. // The input data array has shape nrant,nrant,nchan(subs),npol. // So we can form an AIPS++ array for each baseline. - Array<Complex> dataArray(dShape, (Complex*)data, SHARE); + Array<Complex> dataArray(dShape, (Complex*)visibilityData, SHARE); IPosition start(2, 0, channelId); IPosition leng(2, shape[0], nrChannels); dataArray.apply(std::conj); // Temporary fix, necessary to prevent flipping of the sky, since @@ -766,7 +768,7 @@ namespace LOFAR IPosition leng(2, shape[0], nrChannels); // Length: ncorr, nchan itsMSCol->flag().putSlice(rowNumber, Slicer(start, leng), flagArray); } - data += nrel*nrChannels; // Go to next baseline data + visibilityData += nrel*nrChannels; // Go to next baseline data if (flags != 0) { flags += nrel*nrChannels; } diff --git a/RTCP/Storage/src/MSWriterFile.cc b/RTCP/Storage/src/MSWriterFile.cc new file mode 100644 index 00000000000..babd1646e77 --- /dev/null +++ b/RTCP/Storage/src/MSWriterFile.cc @@ -0,0 +1,85 @@ +//# MSWriterNull: a null MSWriter +//# +//# Copyright (C) 2001 +//# ASTRON (Netherlands Foundation for Research in Astronomy) +//# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands, seg@astron.nl +//# +//# This program is free software; you can redistribute it and/or modify +//# it under the terms of the GNU General Public License as published by +//# the Free Software Foundation; either version 2 of the License, or +//# (at your option) any later version. +//# +//# This program is distributed in the hope that it will be useful, +//# but WITHOUT ANY WARRANTY; without even the implied warranty of +//# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//# GNU General Public License for more details. +//# +//# You should have received a copy of the GNU General Public License +//# along with this program; if not, write to the Free Software +//# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +//# +//# $Id: $ + +#include <lofar_config.h> + +#include <AMCBase/Epoch.h> +#include <Common/LofarLogger.h> +#include <Storage/MSWriter.h> +#include <Storage/MSWriterFile.h> + +#if defined HAVE_MPI +#include <mpi.h> +#endif + + +namespace LOFAR +{ + + namespace RTCP + { + + MSWriterFile::MSWriterFile (const char*msName, double , double , + int nfreq, int ncorr, int nantennas, const vector<double>& , + const vector<string>&, float) + : itsNrBand (0), + itsNrField (0), + itsNrAnt (nantennas), + itsNrFreq (nfreq), + itsNrCorr (ncorr), + itsNrTimes (0), + itsNrPol (0), + itsNrChan (0), + itsFile (msName,00666) + { + } + + MSWriterFile::~MSWriterFile() + { + } + + int MSWriterFile::addBand(int, int, double, double) + { + itsNrBand++; + return itsNrBand; + } + + int MSWriterFile::addBand(int, int, double, const double*, const double*) + { + itsNrBand++; + return itsNrBand; + } + + void MSWriterFile::addField(double, double, unsigned) + { + itsNrField++; + } + + void MSWriterFile::write(int, int, int, StreamableData *data) + { + data->write( &itsFile, true ); + } + + + } // namespace RTCP +} // namespace LOFAR + diff --git a/RTCP/Storage/src/MSWriterNull.cc b/RTCP/Storage/src/MSWriterNull.cc index e62cf280e85..ecd74f738e3 100644 --- a/RTCP/Storage/src/MSWriterNull.cc +++ b/RTCP/Storage/src/MSWriterNull.cc @@ -74,7 +74,7 @@ namespace LOFAR itsNrField++; } - void MSWriterNull::write(int, int, int, CorrelatedData*) + void MSWriterNull::write(int, int, int, StreamableData*) { //nothing } diff --git a/RTCP/Storage/src/Makefile.am b/RTCP/Storage/src/Makefile.am index 6bda101cc00..1d781bbd423 100644 --- a/RTCP/Storage/src/Makefile.am +++ b/RTCP/Storage/src/Makefile.am @@ -5,7 +5,8 @@ libstorage_la_SOURCES = Package__Version.cc \ SubbandWriter.cc \ MSWriter.cc \ MSWriterCasa.cc \ - MSWriterNull.cc + MSWriterNull.cc \ + MSWriterFile.cc bin_PROGRAMS = versionstorage Storage diff --git a/RTCP/Storage/src/SubbandWriter.cc b/RTCP/Storage/src/SubbandWriter.cc index 03350c8e79f..3197b5ae4e9 100644 --- a/RTCP/Storage/src/SubbandWriter.cc +++ b/RTCP/Storage/src/SubbandWriter.cc @@ -29,6 +29,7 @@ #include <Storage/MSWriter.h> #include <Storage/MSWriterNull.h> #include <Storage/MSWriterCasa.h> +#include <Storage/MSWriterFile.h> #include <Stream/FileStream.h> #include <Stream/NullStream.h> #include <Stream/SocketStream.h> @@ -168,11 +169,21 @@ void SubbandWriter::preprocess() unsigned currentSubband = itsRank * itsNrSubbandsPerStorage + i; #if 1 - itsWriters[i] = new MSWriterCasa( - itsPS->getMSname(currentSubband).c_str(), - startTime, itsPS->IONintegrationTime(), itsNChannels, - itsNPolSquared, itsNStations, antPos, - stationNames, itsWeightFactor); + if( itsPS->mode().outputDataType() == CN_Mode::CORRELATEDDATA ) { + // use CasaCore for CorrelatedData + itsWriters[i] = new MSWriterCasa( + itsPS->getMSname(currentSubband).c_str(), + startTime, itsPS->IONintegrationTime(), itsNChannels, + itsNPolSquared, itsNStations, antPos, + stationNames, itsWeightFactor); + } else { + // write to disk otherwise + itsWriters[i] = new MSWriterFile( + itsPS->getMSname(currentSubband).c_str(), + startTime, itsPS->IONintegrationTime(), itsNChannels, + itsNPolSquared, itsNStations, antPos, + stationNames, itsWeightFactor); + } #else itsWriters[i] = new MSWriterNull( itsPS->getMSname(currentSubband).c_str(), @@ -259,13 +270,9 @@ bool SubbandWriter::processSubband(unsigned sb) checkForDroppedData(data, sb); #if defined HAVE_AIPSPP - if( itsPS->mode().outputDataType() == CN_Mode::CORRELATEDDATA ) { - itsWriteTimer.start(); - itsWriters[sb]->write(itsBandIDs[sb], 0, itsNChannels, static_cast<CorrelatedData*>(data)); - itsWriteTimer.stop(); - } else { - std::clog << "Warning: no support for writing non-CorrelatedData. dropping." << std::endl; - } + itsWriteTimer.start(); + itsWriters[sb]->write(itsBandIDs[sb], 0, itsNChannels, data); + itsWriteTimer.stop(); #endif itsInputThreads[sb]->itsFreeQueue.append(data); -- GitLab