diff --git a/RTCP/Storage/include/Storage/MSWriter.h b/RTCP/Storage/include/Storage/MSWriter.h index d29d3daf0b62a25b92363ccf0eef53eced7b5ba4..4b9b4ad9c616314115598add73334c056aae0db5 100644 --- a/RTCP/Storage/include/Storage/MSWriter.h +++ b/RTCP/Storage/include/Storage/MSWriter.h @@ -51,7 +51,6 @@ namespace LOFAR int itsNrFreq; int itsNrCorr; int itsNrTimes; - int itsTimesToIntegrate; int itsNrPol; int itsNrChan; diff --git a/RTCP/Storage/include/Storage/MSWriterCasa.h b/RTCP/Storage/include/Storage/MSWriterCasa.h index 22c8495cda9300c20ecaae0c437d81d438b26f93..99548cc1bce92cda08eae014977bd78c925a2dd2 100644 --- a/RTCP/Storage/include/Storage/MSWriterCasa.h +++ b/RTCP/Storage/include/Storage/MSWriterCasa.h @@ -66,8 +66,7 @@ namespace LOFAR // must have shape [3,nantennas]. MSWriterCasa (const char* msName, double startTime, double timeStep, int nfreq, int ncorr, int nantennas, const vector<double>& antPos, - const vector<std::string>& storageStationNames, - int timesToIntegrate); + const vector<std::string>& storageStationNames); // Destructor ~MSWriterCasa(); diff --git a/RTCP/Storage/include/Storage/MSWriterNull.h b/RTCP/Storage/include/Storage/MSWriterNull.h index bd677486b13b4e918d0657c74a598ff399849279..9c6b3e91ec8d12bc65c268a3c4b4b6268e8ca704 100644 --- a/RTCP/Storage/include/Storage/MSWriterNull.h +++ b/RTCP/Storage/include/Storage/MSWriterNull.h @@ -45,7 +45,7 @@ namespace LOFAR public: MSWriterNull(const char* msName, double startTime, double timeStep, int nfreq, int ncorr, int nantennas, const vector<double>& antPos, - const vector<std::string>& storageStationNames, int timesToIntegrate); + const vector<std::string>& storageStationNames); ~MSWriterNull(); int addBand(int, int, double, double); @@ -77,7 +77,6 @@ namespace LOFAR int itsNrFreq; int itsNrCorr; int itsNrTimes; - int itsTimesToIntegrate; int itsNrPol; int itsNrChan; }; diff --git a/RTCP/Storage/include/Storage/SubbandWriter.h b/RTCP/Storage/include/Storage/SubbandWriter.h index 457be77076961945ff4a29e4d62cea5b35581d1e..cdf72f1f5c77a5692eff0361264f256d1419ffa1 100644 --- a/RTCP/Storage/include/Storage/SubbandWriter.h +++ b/RTCP/Storage/include/Storage/SubbandWriter.h @@ -85,7 +85,6 @@ class SubbandWriter vector<unsigned> itsBandIDs; unsigned itsTimeCounter; - unsigned itsTimesToIntegrate; bool *itsFlagsBuffers;//[NR_SUBBANDS][NR_BASELINES][NR_SUBBAND_CHANNELS][NR_POLARIZATIONS][NR_POLARIZATIONS]; float *itsWeightsBuffers;//[NR_SUBBANDS][NR_BASELINES][NR_SUBBAND_CHANNELS]; fcomplex *itsVisibilities;//[NR_SUBBANDS][NR_BASELINES][NR_SUBBAND_CHANNELS][NR_POLARIZATIONS][NR_POLARIZATIONS]; diff --git a/RTCP/Storage/src/MSWriterCasa.cc b/RTCP/Storage/src/MSWriterCasa.cc index f67bba2a1adbdd35c394f958c548039bde724178..64faec0d8f24b0f30ed5d9ac8f705319395c0271 100644 --- a/RTCP/Storage/src/MSWriterCasa.cc +++ b/RTCP/Storage/src/MSWriterCasa.cc @@ -87,8 +87,7 @@ namespace LOFAR MSWriterCasa::MSWriterCasa (const char* msName, double startTime, double timeStep, int nfreq, int ncorr, int nantennas, const vector<double>& antPos, - const vector<string>& storageStationNames, - int timesToIntegrate) + const vector<string>& storageStationNames) : itsNrBand (0), itsNrField (0), itsNrAnt (nantennas), @@ -96,7 +95,6 @@ namespace LOFAR itsNrCorr (ncorr), itsNrTimes (0), itsTimeStep (timeStep), - itsTimesToIntegrate(timesToIntegrate), itsStartTime(0), itsField (), itsNrPol (0), @@ -582,7 +580,7 @@ namespace LOFAR void MSWriterCasa::updateTimes() { // Calculate the interval, end, and central time. - Double interval = (itsNrTimes/itsTimesToIntegrate)*itsTimeStep; + Double interval = itsNrTimes*itsTimeStep; Double endTime = itsStartTime + interval; Double midTime = (itsStartTime + endTime) / 2; @@ -612,7 +610,7 @@ namespace LOFAR MSObservationColumns msobsCol(msobs); Vector<Double> timeRange(2); timeRange(0) = itsStartTime; - timeRange(1) = itsStartTime + ((itsNrTimes/itsTimesToIntegrate)*itsTimeStep); + timeRange(1) = itsStartTime + (itsNrTimes*itsTimeStep); for (uInt i=0; i<msobs.nrow(); i++) { msobsCol.timeRange().put (i, timeRange); } @@ -636,7 +634,7 @@ namespace LOFAR if (timeCounter >= itsNrTimes) itsNrTimes = timeCounter + 1; - Double time = itsStartTime + (itsNrTimes/itsTimesToIntegrate - .5) * itsTimeStep; + Double time = itsStartTime + (itsNrTimes - .5) * itsTimeStep; // Find the shape of the data array in each table row. IPosition shape(2, (*itsNrPol)[bandId], (*itsNrChan)[bandId]); diff --git a/RTCP/Storage/src/MSWriterNull.cc b/RTCP/Storage/src/MSWriterNull.cc index 30bc918c1f3520e9f601e029dce0039130319e2e..0ffde100fe6b49d1f077a048e2aae4461e3440a2 100644 --- a/RTCP/Storage/src/MSWriterNull.cc +++ b/RTCP/Storage/src/MSWriterNull.cc @@ -40,14 +40,13 @@ namespace LOFAR MSWriterNull::MSWriterNull (const char* , double , double , int nfreq, int ncorr, int nantennas, const vector<double>& , - const vector<string>& , int timesToIntegrate) + const vector<string>&) : itsNrBand (0), itsNrField (0), itsNrAnt (nantennas), itsNrFreq (nfreq), itsNrCorr (ncorr), itsNrTimes (0), - itsTimesToIntegrate (timesToIntegrate), itsNrPol (0), itsNrChan (0) { diff --git a/RTCP/Storage/src/SubbandWriter.cc b/RTCP/Storage/src/SubbandWriter.cc index aca36aa75f56064822d739da9cba1af03985deed..02447a4b316b26a9e5e24d388115e85aeab51a4d 100644 --- a/RTCP/Storage/src/SubbandWriter.cc +++ b/RTCP/Storage/src/SubbandWriter.cc @@ -51,7 +51,6 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank) itsPS(ps), itsRank(rank), itsTimeCounter(0), - itsTimesToIntegrate(ps->storageIntegrationSteps()), itsFlagsBuffers(0), itsWeightsBuffers(0), itsVisibilities(0), @@ -60,9 +59,6 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank) ,itsPropertySet(0) #endif { - if (itsTimesToIntegrate != 1) // FIXME: does not work with dropped blocks - THROW(StorageException, "integration on storage nodes is broken"); - #ifdef USE_MAC_PI itsWriteToMAC = itsPS.getBool("Storage.WriteToMAC"); #endif @@ -78,7 +74,7 @@ SubbandWriter::SubbandWriter(const Parset *ps, unsigned rank) itsNPolSquared = pols*pols; // itsWeightFactor = the inverse of maximum number of valid samples - itsWeightFactor = 1.0 / (ps->CNintegrationSteps() * ps->IONintegrationSteps() * ps->storageIntegrationSteps()); + itsWeightFactor = 1.0 / (ps->CNintegrationSteps() * ps->IONintegrationSteps()); itsNVisibilities = itsNBaselines * itsNChannels * itsNPolSquared; } @@ -177,15 +173,15 @@ void SubbandWriter::preprocess() #if 1 itsWriters[i] = new MSWriterCasa( itsPS->getMSname(currentSubband).c_str(), - startTime, itsPS->storageIntegrationTime(), itsNChannels, + startTime, itsPS->IONintegrationTime(), itsNChannels, itsNPolSquared, itsNStations, antPos, - stationNames, itsTimesToIntegrate); + stationNames); #else itsWriters[i] = new MSWriterNull( itsPS->getMSname(currentSubband).c_str(), - startTime, itsPS->storageIntegrationTime(), itsNChannels, + startTime, itsPS->IONintegrationTime(), itsNChannels, itsNPolSquared, itsNStations, antPos, - stationNames, itsTimesToIntegrate); + stationNames); #endif unsigned beam = subbandToBeamMapping[currentSubband]; @@ -211,15 +207,8 @@ void SubbandWriter::preprocess() } // Allocate buffers - if (itsTimesToIntegrate > 1) { - itsFlagsBuffers = new bool[itsNrSubbandsPerStorage * itsNVisibilities]; - itsWeightsBuffers = new float[itsNrSubbandsPerStorage * itsNBaselines * itsNChannels]; - itsVisibilities = new fcomplex[itsNrSubbandsPerStorage * itsNVisibilities]; - clearAllSums(); - } else { - itsFlagsBuffers = new bool[itsNVisibilities]; - itsWeightsBuffers = new float[itsNBaselines * itsNChannels]; - } + itsFlagsBuffers = new bool[itsNVisibilities]; + itsWeightsBuffers = new float[itsNBaselines * itsNChannels]; #endif // defined HAVE_AIPSPP itsPreviousSequenceNumbers.resize(itsNrSubbandsPerStorage, -1); @@ -230,17 +219,6 @@ void SubbandWriter::preprocess() } -void SubbandWriter::clearAllSums() -{ - assert(itsTimesToIntegrate > 1); - memset(itsWeightsBuffers, 0, itsNrSubbandsPerStorage * itsNBaselines * itsNChannels * sizeof(float)); - memset(itsVisibilities, 0, itsNrSubbandsPerStorage * itsNVisibilities * sizeof(fcomplex)); - for (unsigned i = 0; i < itsNrSubbandsPerStorage * itsNVisibilities; i++) { - itsFlagsBuffers[i] = true; - } -} - - void SubbandWriter::writeLogMessage() { static int counter = 0; @@ -294,47 +272,20 @@ bool SubbandWriter::processSubband(unsigned sb) unsigned short *valSamples = data->nrValidSamples.origin(); fcomplex *newVis = data->visibilities.origin(); - if (itsTimesToIntegrate > 1) { - for (unsigned i = 0; i < itsNBaselines * itsNChannels; i ++) { - itsWeightsBuffers[sb * itsNBaselines * itsNChannels + i] += itsWeightFactor * valSamples[i]; - bool flagged = valSamples[i] == 0; - itsFlagsBuffers[sb * itsNVisibilities + 4 * i ] &= flagged; - itsFlagsBuffers[sb * itsNVisibilities + 4 * i + 1] &= flagged; - itsFlagsBuffers[sb * itsNVisibilities + 4 * i + 2] &= flagged; - itsFlagsBuffers[sb * itsNVisibilities + 4 * i + 3] &= flagged; - // Currently we just add the samples, this way the time centroid stays in place - // We could also divide by the weight and multiple the sum by the total weight. - itsVisibilities[sb * itsNVisibilities + 4 * i ] += newVis[4 * i ]; - itsVisibilities[sb * itsNVisibilities + 4 * i + 1] += newVis[4 * i + 1]; - itsVisibilities[sb * itsNVisibilities + 4 * i + 2] += newVis[4 * i + 2]; - itsVisibilities[sb * itsNVisibilities + 4 * i + 3] += newVis[4 * i + 3]; - } - - if ((itsTimeCounter + 1) % itsTimesToIntegrate == 0) { - itsWriteTimer.start(); - itsWriters[sb]->write(itsBandIDs[sb], 0, itsNChannels, itsTimeCounter, - itsNVisibilities, - &itsVisibilities[sb * itsNVisibilities], - &itsFlagsBuffers[sb * itsNVisibilities], - &itsWeightsBuffers[sb * itsNBaselines * itsNChannels]); - itsWriteTimer.stop(); - } - } else { - for (unsigned i = 0; i < itsNBaselines * itsNChannels; i ++) { - itsWeightsBuffers[i] = itsWeightFactor * valSamples[i]; - bool flagged = valSamples[i] == 0; - itsFlagsBuffers[4 * i ] = flagged; - itsFlagsBuffers[4 * i + 1] = flagged; - itsFlagsBuffers[4 * i + 2] = flagged; - itsFlagsBuffers[4 * i + 3] = flagged; - } - - itsWriteTimer.start(); - itsWriters[sb]->write(itsBandIDs[sb], 0, itsNChannels, data->sequenceNumber, - itsNVisibilities, newVis, itsFlagsBuffers, - itsWeightsBuffers); - itsWriteTimer.stop(); + for (unsigned i = 0; i < itsNBaselines * itsNChannels; i ++) { + itsWeightsBuffers[i] = itsWeightFactor * valSamples[i]; + bool flagged = valSamples[i] == 0; + itsFlagsBuffers[4 * i ] = flagged; + itsFlagsBuffers[4 * i + 1] = flagged; + itsFlagsBuffers[4 * i + 2] = flagged; + itsFlagsBuffers[4 * i + 3] = flagged; } + + itsWriteTimer.start(); + itsWriters[sb]->write(itsBandIDs[sb], 0, itsNChannels, data->sequenceNumber, + itsNVisibilities, newVis, itsFlagsBuffers, + itsWeightsBuffers); + itsWriteTimer.stop(); #endif itsInputThreads[sb]->itsFreeQueue.append(data); @@ -350,11 +301,6 @@ void SubbandWriter::process() while (finishedSubbandsCount < itsNrSubbandsPerStorage) { writeLogMessage(); -#if defined HAVE_AIPSPP - if (itsTimesToIntegrate > 1 && itsTimeCounter % itsTimesToIntegrate == 0) - clearAllSums(); -#endif - for (unsigned sb = 0; sb < itsNrSubbandsPerStorage; ++ sb) if (!finishedSubbands[sb]) if (!processSubband(sb)) {