diff --git a/CEP/DP3/DPPP/include/DPPP/AORFlagger.h b/CEP/DP3/DPPP/include/DPPP/AORFlagger.h index 144e659276b5844b1b132e03a2f2c2fba1969b92..e6808a674be950adff790cdbb003a8ff17251d45 100644 --- a/CEP/DP3/DPPP/include/DPPP/AORFlagger.h +++ b/CEP/DP3/DPPP/include/DPPP/AORFlagger.h @@ -113,15 +113,17 @@ namespace LOFAR { uint itsNTimes; uint itsNTimesToDo; uint itsWindowSize; - uint itsOverlap; // extra time slots on both sides + uint itsOverlap; //# extra time slots on both sides double itsOverlapPerc; + double itsMemory; //# Usable memory in GBytes + double itsMemoryPerc; bool itsPulsarMode; bool itsPedantic; bool itsDoAutoCorr; vector<DPBuffer> itsBuf; FlagCounter itsFlagCounter; NSTimer itsTimer; - NSTimer itsComputeTimer; //# move/median timer + NSTimer itsComputeTimer; //# move/flag timer double itsMoveTime; //# data move timer (sum all threads) double itsFlagTime; //# flag timer (sum of all threads) rfiStrategy::Strategy itsStrategy; diff --git a/CEP/DP3/DPPP/src/AORFlagger.cc b/CEP/DP3/DPPP/src/AORFlagger.cc index d387a99fbf6f57af5f443cfe4b62667e5b4ffdf6..3b59399edcf9ab9266274203cb3e746b028fa442 100644 --- a/CEP/DP3/DPPP/src/AORFlagger.cc +++ b/CEP/DP3/DPPP/src/AORFlagger.cc @@ -79,7 +79,13 @@ namespace LOFAR { itsFlagTime (0) { itsWindowSize = parset.getUint (prefix+"timewindow", 0); - itsOverlap = parset.getUint (prefix+"overlap", 0); + itsMemory = parset.getUint (prefix+"memorymax", 0); + itsMemoryPerc = parset.getUint (prefix+"memoryperc", 0); + itsOverlap = parset.getUint (prefix+"overlapmax", 0); + // Also look for keyword overlap for backward compatibility. + if (itsOverlap == 0) { + itsOverlap = parset.getUint (prefix+"overlap", 0); + } itsOverlapPerc = parset.getDouble (prefix+"overlapperc", -1); itsPulsarMode = parset.getBool (prefix+"pulsar", false); itsPedantic = parset.getBool (prefix+"pedantic", false); @@ -117,8 +123,21 @@ namespace LOFAR { #else uint nthread = 1; #endif - // Determine how much memory is available. - double memory = HostInfo::memoryTotal() * 1024.; + // Determine available memory. + double availMemory = HostInfo::memoryTotal() * 1024.; + // Determine how much memory can be used. + double memoryMax = itsMemory * 1024*1024*1024; + double memory = memoryMax; + if (itsMemoryPerc > 0) { + memory = itsMemoryPerc * availMemory / 100.; + if (memoryMax > 0 && memory > memoryMax) { + memory = memoryMax; + } + } else if (itsMemory <= 0) { + // Nothing given, so use available memory on this machine. + // Set 50% (max 2 GB) aside for other purposes. + memory = availMemory - std::min(0.5 * availMemory, 2.*1024*1024*1024); + } // Determine how much buffer space is needed per time slot. // The flagger needs 3 extra work buffers (data+flags) per thread. double timeSize = (sizeof(Complex) + sizeof(bool)) * @@ -128,9 +147,8 @@ namespace LOFAR { itsOverlapPerc = 1; } // If no time window given, determine it from the available memory. - // Set 2 GB aside for other purposes. if (itsWindowSize == 0) { - double nt = (memory - 2.*1024*1024*1024) / timeSize; + double nt = memory / timeSize; if (itsOverlapPerc > 0) { // Determine the overlap (add 0.5 for rounding). // If itsOverLap is also given, it is the maximum. @@ -158,10 +176,11 @@ namespace LOFAR { itsOverlap = uint(itsOverlapPerc*itsWindowSize/100); } // Check if it all fits in memory. - ASSERTSTR ((itsWindowSize + 2*itsOverlap) * timeSize < memory, + ASSERTSTR ((itsWindowSize + 2*itsOverlap) * timeSize < availMemory, "Timewindow " << itsWindowSize << " and/or overlap " << itsOverlap - << " too large for available memory " << memory); + << ' ' << memory + << " too large for available memory " << availMemory); // Size the buffer (need overlap on both sides). itsBuf.resize (itsWindowSize + 2*itsOverlap); // Initialize the flag counters. @@ -207,6 +226,7 @@ namespace LOFAR { itsTimer.start(); // Accumulate in the time window until the window and overlap are full. itsNTimes++; + /// cout<<"inserted at " << itsBufIndex<<endl; itsBuf[itsBufIndex++] = buf; if (itsBufIndex == itsWindowSize+2*itsOverlap) { flag (2*itsOverlap); @@ -276,6 +296,7 @@ namespace LOFAR { for (uint i=0; i<itsWindowSize; ++i) { getNextStep()->process (itsBuf[i]); itsBuf[i] = DPBuffer(); + ///cout << "cleared buffer " << i << endl; } itsTimer.start(); // Shift the buffers still needed to the beginning of the vector. @@ -283,6 +304,7 @@ namespace LOFAR { // Note it is a cheap operation, because shallow copies are made. for (uint i=0; i<rightOverlap; ++i) { itsBuf[i] = itsBuf[i+itsWindowSize]; + ///cout << "moved buffer " <<i+itsWindowSize<<" to "<< i << endl; } itsBufIndex = rightOverlap; } diff --git a/CEP/DP3/DPPP/src/PhaseShift.cc b/CEP/DP3/DPPP/src/PhaseShift.cc index 833a06dfeb99e5b46e0fa99b82cd3377655f9e5d..a228db72441b233dbcd6586f678e4e4f86b1bb96 100644 --- a/CEP/DP3/DPPP/src/PhaseShift.cc +++ b/CEP/DP3/DPPP/src/PhaseShift.cc @@ -30,6 +30,7 @@ #include <Common/StreamUtil.h> #include <casa/Arrays/ArrayMath.h> #include <casa/Arrays/VectorIter.h> +#include <casa/Arrays/ArrayIO.h> #include <casa/Quanta/Quantum.h> #include <casa/Quanta/MVAngle.h> #include <casa/BasicSL/Constants.h> @@ -66,13 +67,13 @@ namespace LOFAR { newDir = handleCenter(); original = false; } - itsMachine = new casa::UVWMachine(info.phaseCenter(), newDir, false, true); + itsMachine = new casa::UVWMachine(info.phaseCenter(), newDir); info.setPhaseCenter (newDir, original); - // Calculate freq/C. + // Calculate 2*pi*freq/C to get correct phase term (in wavelengths). const Vector<double>& freq = itsInput->chanFreqs(info.nchanAvg()); itsFreqC.reserve (freq.size()); for (uint i=0; i<freq.size(); ++i) { - itsFreqC.push_back (freq[i] / C::c); + itsFreqC.push_back (2. * C::pi * freq[i] / C::c); } } @@ -100,37 +101,60 @@ namespace LOFAR { newBuf.getData().unique(); newBuf.getUVW().unique(); // // Get the uvw-s per station, so we can do convertUVW per station -// // instead of per baseline. Use the first station as reference. +// // instead of per baseline. // const Vector<Int>& ant1 = itsInput->getAnt1(); // const Vector<Int>& ant2 = itsInput->getAnt2(); // vector<Vector<double> > antUVW; // antUVW.resize (1 + std::max (max(ant1), max(ant2))); +// // The first station is taken as reference (has value 0). +// Vector<double> nullUVW(3, 0.); // uint ndone = 0; -// Vector<double> uvw(3, 0.); -// antUVW[ant1[0]] = uvw; -// const Array<double>& blUVW = newBuf.getUVW(); -// VectorIterator<double> iterUVW (blUVW, 1); // bool moreTodo = true; +// bool setFirst = true; // set first unknown uvw to 0? // while (moreTodo) { +// uint oldNdone = ndone; // moreTodo = false; +// VectorIterator<double> iterUVW (newBuf.getUVW(), 0); // for (uint i=0; i<ant1.size(); ++i) { // int a1 = ant1[i]; // int a2 = ant2[i]; +// cout<<a1<<' '<<a2<<' '<<iterUVW.vector()<<endl; // if (antUVW[a1].empty()) { // if (antUVW[a2].empty()) { -// moreTodo = true; +// if (setFirst) { +// antUVW[a1] = nullUVW; +// antUVW[a2] = iterUVW.vector(); +// setFirst = false; +// ndone++; +// } else { +// // Both stations are still unknown, so need a next round. +// moreTodo = true; +// } // } else { // // Get uvw of first station. -// uvw = iterUVW.array() - ; +// antUVW[a1] = antUVW[a2] - iterUVW.vector(); +// ndone++; // } // } else if (antUVW[a2].empty()) { -// uvw = ; +// antUVW[a2] = antUVW[a1] + iterUVW.vector(); +// ndone++; // } else { // // UVW should be about the same. -// ASSERT (near(uvw)); +// ASSERT (allNear(antUVW[a2] - antUVW[a1], iterUVW.vector(), 1e-7)); // } +// cout << " "<<antUVW[a1]<<antUVW[a2]<<endl; // iterUVW.next(); // } +// if (ndone == oldNdone) { +// setFirst = moreTodo; +// moreTodo = true; +// } +// } +// vector<double> phases(nant); +// for (uInt i=0; i<nant; ++i) { +// if (! antUVW[i].empty()) { +// itsMachine->convertUVW (phases[i], antUVW[i]); +// } // } Complex* data = newBuf.getData().data(); uint ncorr = newBuf.getData().shape()[0]; @@ -138,14 +162,15 @@ namespace LOFAR { uint nbl = newBuf.getData().shape()[2]; VectorIterator<double> uvwIter(newBuf.getUVW(), 0); double phase; - //# If ever later a time dependent phase center is used, the machine - //# must be reset for each new time, thus each new call to process. + //# If ever in the future a time dependent phase center is used, + //# the machine must be reset for each new time, thus each new call + //# to process. for (uint i=0; i<nbl; ++i) { // Convert the uvw coordinates and get the phase shift term. itsMachine->convertUVW (phase, uvwIter.vector()); for (uint j=0; j<nchan; ++j) { // Shift the phase of the data of this baseline. - // Convert the phase term to wavelengths. + // Convert the phase term to wavelengths (and apply 2*pi). // u_wvl = u_m / wvl = u_m * freq / c double phasewvl = phase * itsFreqC[j]; Complex phasor(cos(phasewvl), sin(phasewvl)); diff --git a/CEP/DP3/DPPP/test/tAORFlagger.cc b/CEP/DP3/DPPP/test/tAORFlagger.cc index 64ddd2b3207895258e92dd4b1f33433efa31cc5b..43ee2d8f9c02f62b13e9d75c27d466680754526a 100644 --- a/CEP/DP3/DPPP/test/tAORFlagger.cc +++ b/CEP/DP3/DPPP/test/tAORFlagger.cc @@ -217,13 +217,7 @@ void test1(int ntime, int nant, int nchan, int ncorr, bool flag, int threshold, TestInput* in = new TestInput(ntime, nant, nchan, ncorr, flag); DPStep::ShPtr step1(in); ParameterSet parset; - parset.add ("freqwindow", "1"); parset.add ("timewindow", "1"); - parset.add ("threshold", toString(threshold)); - if (shortbl) { - parset.add ("blmin", "0"); - parset.add ("blmax", "145"); - } DPStep::ShPtr step2(new AORFlagger(in, parset, "")); DPStep::ShPtr step3(new TestOutput(ntime, nant, nchan, ncorr, flag, false, shortbl)); @@ -244,14 +238,8 @@ void test2(int ntime, int nant, int nchan, int ncorr, bool flag, int threshold, TestInput* in = new TestInput(ntime, nant, nchan, ncorr, flag); DPStep::ShPtr step1(in); ParameterSet parset; - parset.add ("freqwindow", "4"); - parset.add ("timewindow", "100"); - parset.add ("padding", "10"); - parset.add ("threshold", toString(threshold)); - parset.add ("applyautocorr", "True"); - if (shortbl) { - parset.add ("blmax", "145"); - } + parset.add ("timewindow", "4"); + parset.add ("overlapmax", "1"); DPStep::ShPtr step2(new AORFlagger(in, parset, "")); DPStep::ShPtr step3(new TestOutput(ntime, nant, nchan, ncorr, flag, true, shortbl)); diff --git a/CEP/DP3/DPPP/test/tPhaseShift.cc b/CEP/DP3/DPPP/test/tPhaseShift.cc index 3436916de918347abde30d4a2f2e6ebe4f20c645..da07a7c94446e6ae09d9a0663d04a6ab345025f4 100644 --- a/CEP/DP3/DPPP/test/tPhaseShift.cc +++ b/CEP/DP3/DPPP/test/tPhaseShift.cc @@ -54,7 +54,59 @@ public: // Define the frequencies. itsChanFreqs.resize (nchan); indgen (itsChanFreqs, 1050000., 100000.); + // Fill the baseline stations. + // Determine nr of stations using: na*(na+1)/2 = nbl + // If many baselines, divide into groups of 6 to test if + // PhaseShift disentangles it correctly. + int nant = int(-0.5 + sqrt(0.25 + 2*nbl)); + if (nant*(nant+1)/2 < nbl) ++nant; + int grpszant = 3; + int grpszbl = grpszant*(grpszant+1)/2; + if (nbl > grpszbl) { + nant = grpszant*(nbl+grpszbl-1)/grpszbl; + } else { + grpszant = nant; + grpszbl = nbl; + } + itsAnt1.resize (nbl); + itsAnt2.resize (nbl); + int st1 = 0; + int st2 = 0; + int lastant = grpszant; + for (int i=0; i<nbl; ++i) { + itsAnt1[i] = st1; + itsAnt2[i] = st2; + if (i%grpszbl == grpszbl-1) { + st1 = lastant; + st2 = lastant; + lastant += grpszant; + } else { + if (++st2 == lastant) { + st2 = ++st1; + } + } + } + itsStatUVW.resize (3, nant); + for (int i=0; i<nant; ++i) { + itsStatUVW(0,i) = 0.01 + i*0.02; + itsStatUVW(1,i) = 0.05 + i*0.03; + itsStatUVW(2,i) = 0.015 + i*0.025; + } } + + void fillUVW (Matrix<double>& uvw, int count) + { + for (int i=0; i<itsNBl; ++i) { + uvw(0,i) = (itsStatUVW(0,itsAnt2[i]) + count*0.002 - + (itsStatUVW(0,itsAnt1[i]) + count*0.002)); + uvw(1,i) = (itsStatUVW(1,itsAnt2[i]) + count*0.004 - + (itsStatUVW(1,itsAnt1[i]) + count*0.004)); + uvw(2,i) = (itsStatUVW(2,itsAnt2[i]) + count*0.006 - + (itsStatUVW(2,itsAnt1[i]) + count*0.006)); + cout <<itsAnt1[i]<<' '<<itsAnt2[i]<<' '<<uvw(0,i)<<' '<<uvw(1,i)<<' '<<uvw(2,i)<<endl; + } + } + private: virtual bool process (const DPBuffer&) { @@ -81,7 +133,7 @@ private: fullResFlags = itsFlag; buf.setFullResFlags (fullResFlags); Matrix<double> uvw(3,itsNBl); - indgen (uvw, double(itsCount*100)); + fillUVW (uvw, itsCount); buf.setUVW (uvw); getNextStep()->process (buf); ++itsCount; @@ -100,14 +152,17 @@ private: int itsCount, itsNTime, itsNBl, itsNChan, itsNCorr; bool itsFlag; + Matrix<double> itsStatUVW; }; // Class to check result of null phase-shifted TestInput. class TestOutput: public DPStep { public: - TestOutput(int ntime, int nbl, int nchan, int ncorr, bool flag) - : itsCount(0), itsNTime(ntime), itsNBl(nbl), itsNChan(nchan), + TestOutput(TestInput* input, + int ntime, int nbl, int nchan, int ncorr, bool flag) + : itsInput(input), + itsCount(0), itsNTime(ntime), itsNBl(nbl), itsNChan(nchan), itsNCorr(ncorr), itsFlag(flag) {} private: @@ -122,14 +177,14 @@ private: result.data()[i] = Complex(i+itsCount*10,i-1000+itsCount*6); } Matrix<double> uvw(3,itsNBl); - indgen (uvw, double(itsCount*100)); + itsInput->fillUVW (uvw, itsCount); // Check the result. - ASSERT (allNear(real(buf.getData()), real(result), 1e-5)); + ASSERT (allNear(real(buf.getData()), real(result), 1e-7)); ///cout << imag(buf.getData()) << endl<<imag(result); - ASSERT (allNear(imag(buf.getData()), imag(result), 1e-5)); + ASSERT (allNear(imag(buf.getData()), imag(result), 1e-7)); ASSERT (allEQ(buf.getFlags(), itsFlag)); ASSERT (near(buf.getTime(), 2.+5*itsCount)); - ASSERT (allNear(buf.getUVW(), uvw, 1e-5)); + ASSERT (allNear(buf.getUVW(), uvw, 1e-7)); ++itsCount; return true; } @@ -143,6 +198,7 @@ private: ASSERT (near(dir.getLat("deg").getValue(), 30.)); } + TestInput* itsInput; int itsCount; int itsNTime, itsNBl, itsNChan, itsNCorr; bool itsFlag; @@ -152,8 +208,10 @@ private: class TestOutput1: public DPStep { public: - TestOutput1(int ntime, int nbl, int nchan, int ncorr, bool flag) - : itsCount(0), itsNTime(ntime), itsNBl(nbl), itsNChan(nchan), + TestOutput1(TestInput* input, + int ntime, int nbl, int nchan, int ncorr, bool flag) + : itsInput(input), + itsCount(0), itsNTime(ntime), itsNBl(nbl), itsNChan(nchan), itsNCorr(ncorr), itsFlag(flag) {} private: @@ -168,7 +226,7 @@ private: result.data()[i] = Complex(i+itsCount*10,i-1000+itsCount*6); } Matrix<double> uvw(3,itsNBl); - indgen (uvw, double(itsCount*100)); + itsInput->fillUVW (uvw, itsCount); // Check the result. ASSERT (! allNear(real(buf.getData()), real(result), 1e-5)); ASSERT (! allEQ(real(buf.getData()), real(result))); @@ -187,10 +245,11 @@ private: virtual void updateInfo (DPInfo& info) { MVDirection dir = info.phaseCenter().getValue(); - ASSERT (near(dir.getLong("deg").getValue(), 45.)); - ASSERT (near(dir.getLat("deg").getValue(), 31.)); + ASSERT (near(dir.getLong("deg").getValue(), 50.)); + ASSERT (near(dir.getLat("deg").getValue(), 35.)); } + TestInput* itsInput; int itsCount; int itsNTime, itsNBl, itsNChan, itsNCorr; bool itsFlag; @@ -225,7 +284,7 @@ void test1(int ntime, int nbl, int nchan, int ncorr, bool flag) // Keep phase center the same to be able to check if data are correct. parset.add ("phasecenter", "[45deg, 30deg]"); DPStep::ShPtr step2(new PhaseShift(in, parset, "")); - DPStep::ShPtr step3(new TestOutput(ntime, nbl, nchan, ncorr, flag)); + DPStep::ShPtr step3(new TestOutput(in, ntime, nbl, nchan, ncorr, flag)); step1->setNextStep (step2); step2->setNextStep (step3); execute (step1); @@ -241,13 +300,13 @@ void test2(int ntime, int nbl, int nchan, int ncorr, bool flag) DPStep::ShPtr step1(in); // First shift to another center, then back to original. ParameterSet parset; - parset.add ("phasecenter", "[45deg, 31deg]"); + parset.add ("phasecenter", "[50deg, 35deg]"); ParameterSet parset1; parset1.add ("phasecenter", "[]"); DPStep::ShPtr step2(new PhaseShift(in, parset, "")); - DPStep::ShPtr step3(new TestOutput1(ntime, nbl, nchan, ncorr, flag)); + DPStep::ShPtr step3(new TestOutput1(in, ntime, nbl, nchan, ncorr, flag)); DPStep::ShPtr step4(new PhaseShift(in, parset1, "")); - DPStep::ShPtr step5(new TestOutput(ntime, nbl, nchan, ncorr, flag)); + DPStep::ShPtr step5(new TestOutput(in, ntime, nbl, nchan, ncorr, flag)); step1->setNextStep (step2); step2->setNextStep (step3); step3->setNextStep (step4); @@ -260,7 +319,7 @@ int main() { try { test1(10, 3, 32, 4, false); - test1(10, 3, 30, 1, true); + test1(10, 10, 30, 1, true); test2(10, 6, 32, 4, false); test2(10, 6, 30, 1, true); } catch (std::exception& x) {