From df9dbba6e36162df2745d6c48a69f5a01f4a5266 Mon Sep 17 00:00:00 2001
From: Rob van Nieuwpoort <nieuwpoort@astron.nl>
Date: Thu, 23 Dec 2010 12:47:19 +0000
Subject: [PATCH] Bug 1198: working on online flagger and broken station
 detector

---
 .gitattributes                            |   6 +
 RTCP/CNProc/src/CMakeLists.txt            |   3 +
 RTCP/CNProc/src/Flagger.cc                |  45 +++++++
 RTCP/CNProc/src/Flagger.h                 |  32 +++++
 RTCP/CNProc/src/PostCorrelationFlagger.cc | 148 ++++++++++++++++++++++
 RTCP/CNProc/src/PostCorrelationFlagger.h  |  39 ++++++
 RTCP/CNProc/src/PreCorrelationFlagger.cc  | 100 +++++++++++++++
 RTCP/CNProc/src/PreCorrelationFlagger.h   |  33 +++++
 8 files changed, 406 insertions(+)
 create mode 100644 RTCP/CNProc/src/Flagger.cc
 create mode 100644 RTCP/CNProc/src/Flagger.h
 create mode 100644 RTCP/CNProc/src/PostCorrelationFlagger.cc
 create mode 100644 RTCP/CNProc/src/PostCorrelationFlagger.h
 create mode 100644 RTCP/CNProc/src/PreCorrelationFlagger.cc
 create mode 100644 RTCP/CNProc/src/PreCorrelationFlagger.h

diff --git a/.gitattributes b/.gitattributes
index b23b7e4e58a..49f9db19d3a 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -1994,8 +1994,14 @@ RTCP/CNProc/src/FIR_OriginalCepPPFWeights.h -text
 RTCP/CNProc/src/FIR_OriginalStationPPFWeights.h -text
 RTCP/CNProc/src/FilterBank.cc -text
 RTCP/CNProc/src/FilterBank.h -text
+RTCP/CNProc/src/Flagger.cc -text
+RTCP/CNProc/src/Flagger.h -text
 RTCP/CNProc/src/InversePPF.cc -text
 RTCP/CNProc/src/InversePPF.h -text
+RTCP/CNProc/src/PostCorrelationFlagger.cc -text
+RTCP/CNProc/src/PostCorrelationFlagger.h -text
+RTCP/CNProc/src/PreCorrelationFlagger.cc -text
+RTCP/CNProc/src/PreCorrelationFlagger.h -text
 RTCP/CNProc/src/Ring.h -text
 RTCP/CNProc/src/Stokes.cc -text
 RTCP/CNProc/src/Stokes.h -text
diff --git a/RTCP/CNProc/src/CMakeLists.txt b/RTCP/CNProc/src/CMakeLists.txt
index e455faabc6d..2e08f8ee239 100644
--- a/RTCP/CNProc/src/CMakeLists.txt
+++ b/RTCP/CNProc/src/CMakeLists.txt
@@ -26,6 +26,9 @@ set(cnproc_LIB_SRCS
   FilterBank.cc
   LocationInfo.cc
   PPF.cc
+  Flagger.cc
+  PreCorrelationFlagger.cc
+  PostCorrelationFlagger.cc
   Stokes.cc)
 
 # Maybe we shoud use LOFAR_COMPILER_SUITE, because ASM-BGP_COMPILER_WORKS
diff --git a/RTCP/CNProc/src/Flagger.cc b/RTCP/CNProc/src/Flagger.cc
new file mode 100644
index 00000000000..c12c78211a4
--- /dev/null
+++ b/RTCP/CNProc/src/Flagger.cc
@@ -0,0 +1,45 @@
+//# Always #include <lofar_config.h> first!
+#include <lofar_config.h>
+
+#include <Flagger.h>
+#include <math.h>
+#include <algorithm>
+
+namespace LOFAR {
+namespace RTCP {
+
+Flagger::  Flagger(const unsigned nrStations, const unsigned nrChannels, const unsigned totalNrSamples, const float cutoffThreshold) :
+  itsNrStations(nrStations),
+  itsNrChannels(nrChannels),
+  itsTotalNrSamples(totalNrSamples),
+  itsCutoffThreshold(cutoffThreshold)
+{
+}
+
+
+float Flagger::calculateStdDev(const std::vector<float>& data, const float mean) {
+  float stdDev = 0.0f;
+
+  for(unsigned i = 0; i<data.size(); i++) {
+    float diff = data[i] - mean;
+    stdDev += diff * diff;
+  }
+  stdDev /= data.size();
+  stdDev = sqrt(stdDev);
+
+  return stdDev;
+}
+
+
+float Flagger::calculateMedian(const std::vector<float>& origData) {
+  // we have to copy the vector, nth_element changes the ordering.
+  std::vector<float> data = origData;
+
+  // calculate median, expensive, but nth_element is guaranteed to be O(n)
+  std::vector<float>::iterator it = data.begin() + (data.size() / 2);
+  std::nth_element(data.begin(), it, data.end());
+  return *it;
+}
+
+} // namespace RTCP
+} // namespace LOFAR
diff --git a/RTCP/CNProc/src/Flagger.h b/RTCP/CNProc/src/Flagger.h
new file mode 100644
index 00000000000..70695c59a09
--- /dev/null
+++ b/RTCP/CNProc/src/Flagger.h
@@ -0,0 +1,32 @@
+#ifndef LOFAR_CNPROC_FLAGGER_H
+#define LOFAR_CNPROC_FLAGGER_H
+
+#include <vector>
+
+namespace LOFAR {
+namespace RTCP {
+
+class Flagger
+{
+  public:
+
+  Flagger(const unsigned nrStations, const unsigned nrChannels, const unsigned itsTotalNrSamples, const float cutoffThreshold);
+
+  static float calculateStdDev(const std::vector<float>& data, const float mean);
+  static float calculateMedian(const std::vector<float>& origData);
+
+
+  const unsigned itsNrStations, itsNrChannels;
+  const unsigned itsTotalNrSamples;
+  const float itsCutoffThreshold;
+
+  std::vector<float> itsPowers;
+  float itsPowerMean;
+  float itsPowerStdDev;
+  float itsPowerMedian;
+};
+
+} // namespace RTCP
+} // namespace LOFAR
+
+#endif // LOFAR_CNPROC_FLAGGER_H
diff --git a/RTCP/CNProc/src/PostCorrelationFlagger.cc b/RTCP/CNProc/src/PostCorrelationFlagger.cc
new file mode 100644
index 00000000000..f567a4e1192
--- /dev/null
+++ b/RTCP/CNProc/src/PostCorrelationFlagger.cc
@@ -0,0 +1,148 @@
+//# Always #include <lofar_config.h> first!
+#include <lofar_config.h>
+
+#include <Common/Timer.h>
+
+#include <PostCorrelationFlagger.h>
+#include <Correlator.h>
+
+namespace LOFAR {
+namespace RTCP {
+
+static NSTimer RFIStatsTimer("RFI statistics calculations (post correlation)", true, true);
+static NSTimer thresholdingFlaggerTimer("Thresholding flagger (post correlation)", true, true);
+static NSTimer detectBrokenStationsTimer("DetectBrokenStations (post correlation)", true, true);
+
+  // CorrelatedData samples: [nrBaselines][nrChannels][NR_POLARIZATIONS][NR_POLARIZATIONS]
+
+  PostCorrelationFlagger::PostCorrelationFlagger(const unsigned nrStations, unsigned nrBaselines, const unsigned nrChannels, float cutoffThreshold)
+:
+  Flagger(nrStations, nrChannels, nrBaselines * nrChannels * NR_POLARIZATIONS * NR_POLARIZATIONS, cutoffThreshold),
+  itsNrBaselines(nrBaselines)
+{
+  itsPowers.resize(itsTotalNrSamples);
+  itsSummedBaselinePowers.resize(itsNrBaselines);
+  itsSummedStationPowers.resize(itsNrStations);
+}
+
+
+void PostCorrelationFlagger::flag(CorrelatedData* correlatedData)
+{
+  calculateGlobalStatistics(correlatedData);
+  thresholdingFlagger(correlatedData);
+}
+
+
+void PostCorrelationFlagger::thresholdingFlagger(CorrelatedData* correlatedData)
+{
+  thresholdingFlaggerTimer.start();
+  
+  float threshold = itsPowerMean + itsCutoffThreshold * itsPowerStdDev;
+
+  unsigned index = 0;
+  unsigned totalSamplesFlagged = 0;
+
+  for(unsigned baseline = 0; baseline < itsNrBaselines; baseline++) {
+    for(unsigned channel = 0; channel < itsNrChannels; channel++) {
+      for(unsigned pol1 = 0; pol1< NR_POLARIZATIONS; pol1++) {
+	for(unsigned pol2 = 0; pol2< NR_POLARIZATIONS; pol2++) {
+	  const float power = itsPowers[index++];
+
+	  if(power > threshold) {
+	    // flag this sample, both polarizations.
+	    // TODO: currently, we can only flag all channels at once! This is a limitation in CorrelatedData.
+	    //	    correlatedData->flags[station].include(time, time);
+
+#ifdef LOFAR_STMAN_V2      
+	    correlatedData->nrValidSamplesV2[baseline] = 0;
+#else
+	    correlatedData->nrValidSamples[baseline][channel] = 0;
+#endif
+
+	    totalSamplesFlagged++;
+	  }
+	}
+      }
+    }
+  }
+
+  thresholdingFlaggerTimer.stop();
+
+  float percentageFlagged = totalSamplesFlagged * 100.0f / itsPowers.size();
+
+  std::cerr << "thresholdingFlagger: flagged " << totalSamplesFlagged << " samples, " << percentageFlagged << " %" << std::endl;
+}
+
+
+void PostCorrelationFlagger::detectBrokenStations()
+{
+  detectBrokenStationsTimer.start();
+
+  // Sum all baselines that involve a station (both horizontally and vertically).
+  float total = 0.0f;
+
+  for(unsigned station=0; station < itsNrStations; station++) {
+    float sum = 0.0f;
+    for(unsigned stationH=station; stationH < itsNrStations; stationH++) {
+      for(unsigned stationV=0; stationV < station; stationV++) {
+	unsigned baseline = Correlator::baseline(stationH, stationV);
+	sum += itsSummedBaselinePowers[baseline];
+      }
+    } 
+    itsSummedStationPowers[station] = sum;
+    total += sum;
+  }
+
+  float mean = total / itsNrStations;
+  float stdDev = calculateStdDev(itsSummedStationPowers, mean);
+  float median = calculateMedian(itsSummedStationPowers);
+  float threshold = mean + itsCutoffThreshold * stdDev;
+
+  std::cout << "detectBrokenStations: mean = " << mean << ", median = " << median << " stdDev = " << stdDev << ", threshold = " << threshold << std::endl;
+
+  for(unsigned station=0; station < itsNrStations; station++) {
+    if(itsSummedStationPowers[station] > threshold) {
+      std::cerr << "WARNING, station " << station << " seems to be corrupted, total summed power = " << itsSummedStationPowers[station] << std::endl;
+    }
+  }
+
+  detectBrokenStationsTimer.stop();
+}
+
+
+void PostCorrelationFlagger::calculateGlobalStatistics(CorrelatedData* correlatedData)
+{
+  RFIStatsTimer.start();
+
+  float sum = 0.0f;
+  unsigned index = 0;
+
+  for(unsigned baseline = 0; baseline < itsNrBaselines; baseline++) {
+    float baselineSum = 0.0f;
+    for(unsigned channel = 0; channel < itsNrChannels; channel++) {
+      for(unsigned pol1 = 0; pol1< NR_POLARIZATIONS; pol1++) {
+	for(unsigned pol2 = 0; pol2< NR_POLARIZATIONS; pol2++) {
+	  fcomplex sample = correlatedData->visibilities[baseline][channel][pol1][pol2];
+	  float power = real(sample) * real(sample) + imag(sample) * imag(sample);
+	  itsPowers[index++] = power;
+	  baselineSum += power;
+	}
+      }
+    }
+    itsSummedBaselinePowers[baseline] = baselineSum;
+    sum += baselineSum;
+  }
+ 
+  itsPowerMean = sum / itsTotalNrSamples;
+
+  itsPowerStdDev = calculateStdDev(itsPowers, itsPowerMean);
+  itsPowerMedian = calculateMedian(itsPowers);
+
+  RFIStatsTimer.stop();
+
+  std::cerr << "global RFI stats: mean = " << itsPowerMean << ", median = " << itsPowerMedian << ", stddev = " << itsPowerStdDev << std::endl;
+}
+
+
+} // namespace RTCP
+} // namespace LOFAR
diff --git a/RTCP/CNProc/src/PostCorrelationFlagger.h b/RTCP/CNProc/src/PostCorrelationFlagger.h
new file mode 100644
index 00000000000..3954a29ba1a
--- /dev/null
+++ b/RTCP/CNProc/src/PostCorrelationFlagger.h
@@ -0,0 +1,39 @@
+#ifndef LOFAR_CNPROC_POST_CORRELATION_FLAGGER_H
+#define LOFAR_CNPROC_POST_CORRELATION_FLAGGER_H
+
+#include <Flagger.h>
+#include <Interface/CorrelatedData.h>
+
+namespace LOFAR {
+namespace RTCP {
+
+  class PostCorrelationFlagger : public Flagger
+{
+  public:
+  PostCorrelationFlagger(const unsigned nrStations, unsigned nrBaselines, const unsigned nrChannels, float cutoffThreshold = 7.0f);
+
+  void flag(CorrelatedData* correlatedData);
+
+  // Does simple thresholding
+  void thresholdingFlagger(CorrelatedData* correlatedData);
+
+  // Tries to detect broken stations
+  void detectBrokenStations();
+
+  // calculates mean, stddev, and median.
+  void calculateGlobalStatistics(CorrelatedData* correlatedData);
+
+  private:
+  const unsigned itsNrBaselines;
+  
+  std::vector<float> itsSummedBaselinePowers; // [nrBaselines]
+
+  std::vector<float> itsSummedStationPowers; // [nrStations]
+
+};
+
+
+} // namespace RTCP
+} // namespace LOFAR
+
+#endif // LOFAR_CNPROC_POST_CORRELATION_FLAGGER_H
diff --git a/RTCP/CNProc/src/PreCorrelationFlagger.cc b/RTCP/CNProc/src/PreCorrelationFlagger.cc
new file mode 100644
index 00000000000..dc068cea321
--- /dev/null
+++ b/RTCP/CNProc/src/PreCorrelationFlagger.cc
@@ -0,0 +1,100 @@
+//# Always #include <lofar_config.h> first!
+#include <lofar_config.h>
+
+#include <Common/Timer.h>
+
+#include <PreCorrelationFlagger.h>
+
+
+namespace LOFAR {
+namespace RTCP {
+
+static NSTimer RFIStatsTimer("RFI statistics calculations (pre correlation)", true, true);
+static NSTimer thresholdingFlaggerTimer("Thresholding flagger (pre correlation)", true, true);
+
+  // FilteredData samples: [nrChannels][nrStations][nrSamplesPerIntegration][NR_POLARIZATIONS]
+  // FilteredData flags:   std::vector<SparseSet<unsigned> >  flags;
+
+  PreCorrelationFlagger::PreCorrelationFlagger(const unsigned nrStations, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, const float cutoffThreshold)
+:
+  Flagger(nrStations, nrChannels, nrStations * nrChannels * nrSamplesPerIntegration * NR_POLARIZATIONS, cutoffThreshold),
+  itsNrSamplesPerIntegration(nrSamplesPerIntegration)
+{
+  itsPowers.resize(itsTotalNrSamples);
+}
+
+
+void PreCorrelationFlagger::flag(FilteredData* filteredData)
+{
+  calculateGlobalStatistics(filteredData);
+  thresholdingFlagger(filteredData);
+}
+
+
+void PreCorrelationFlagger::thresholdingFlagger(FilteredData* filteredData)
+{
+  thresholdingFlaggerTimer.start();
+  
+  float threshold = itsPowerMean + itsCutoffThreshold * itsPowerStdDev;
+
+  unsigned index = 0;
+  unsigned totalSamplesFlagged = 0;
+
+  for(unsigned channel = 0; channel < itsNrChannels; channel++) {
+    for(unsigned station = 0; station < itsNrStations; station++) {
+      for(unsigned time = 0; time < itsNrSamplesPerIntegration; time++) {
+	for(unsigned pol = 0; pol< NR_POLARIZATIONS; pol++) {
+	  const float power = itsPowers[index++];
+
+	  if(power > threshold) {
+	    // flag this sample, both polarizations.
+	    // TODO: currently, we can only flag all channels at once! This is a limitation in FilteredData.
+	    filteredData->flags[station].include(time, time);
+	    totalSamplesFlagged++;
+	  }
+	}
+      }
+    }
+  }
+
+  thresholdingFlaggerTimer.stop();
+
+  float percentageFlagged = totalSamplesFlagged * 100.0f / itsPowers.size();
+
+  std::cerr << "thresholdingFlagger: flagged " << totalSamplesFlagged << " samples, " << percentageFlagged << " %" << std::endl;
+}
+
+
+void PreCorrelationFlagger::calculateGlobalStatistics(FilteredData* filteredData)
+{
+  RFIStatsTimer.start();
+
+  float sum = 0.0f;
+  unsigned index = 0;
+
+  for(unsigned channel = 0; channel < itsNrChannels; channel++) {
+    for(unsigned station = 0; station < itsNrStations; station++) {
+      for(unsigned time = 0; time < itsNrSamplesPerIntegration; time++) {
+	for(unsigned pol = 0; pol< NR_POLARIZATIONS; pol++) {
+	  fcomplex sample = filteredData->samples[channel][station][time][pol];
+	  float power = real(sample) * real(sample) + imag(sample) * imag(sample);
+	  sum += power;
+	  itsPowers[index++] = power;
+	}
+      }
+    }
+  }
+
+  itsPowerMean = sum / itsTotalNrSamples;
+
+  itsPowerStdDev = calculateStdDev(itsPowers, itsPowerMean);
+  itsPowerMedian = calculateMedian(itsPowers);
+
+  RFIStatsTimer.stop();
+
+  std::cerr << "global RFI stats: mean = " << itsPowerMean << ", median = " << itsPowerMedian << ", stddev = " << itsPowerStdDev << std::endl;
+}
+
+
+} // namespace RTCP
+} // namespace LOFAR
diff --git a/RTCP/CNProc/src/PreCorrelationFlagger.h b/RTCP/CNProc/src/PreCorrelationFlagger.h
new file mode 100644
index 00000000000..e065fdb8c8e
--- /dev/null
+++ b/RTCP/CNProc/src/PreCorrelationFlagger.h
@@ -0,0 +1,33 @@
+#ifndef LOFAR_CNPROC_PRE_CORRELATION_FLAGGER_H
+#define LOFAR_CNPROC_PRE_CORRELATION_FLAGGER_H
+
+#include <Flagger.h>
+#include <Interface/FilteredData.h>
+
+namespace LOFAR {
+namespace RTCP {
+
+  class PreCorrelationFlagger : public Flagger
+{
+  public:
+  PreCorrelationFlagger(const unsigned nrStations, const unsigned nrChannels, const unsigned nrSamplesPerIntegration, float cutoffThreshold = 7.0f);
+
+  void flag(FilteredData* filteredData);
+
+  private:
+
+  // Does simple thresholding
+  void thresholdingFlagger(FilteredData* filteredData);
+
+  // calculates mean, stddev, and median.
+  void calculateGlobalStatistics(FilteredData* filteredData);
+
+
+  const unsigned itsNrSamplesPerIntegration;
+};
+
+
+} // namespace RTCP
+} // namespace LOFAR
+
+#endif // LOFAR_CNPROC_PRE_CORRELATION_FLAGGER_H
-- 
GitLab