Skip to content
Snippets Groups Projects
Commit f2be9716 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

SSB-44: bug fixes and first implementation of the gains solutions

parent 48ddc0cc
No related branches found
No related tags found
1 merge request!44Merge back holography to master
Showing
with 762 additions and 69 deletions
...@@ -40,13 +40,16 @@ CAL/CalibrationProcessing/lib/__init__.py -text ...@@ -40,13 +40,16 @@ CAL/CalibrationProcessing/lib/__init__.py -text
CAL/CalibrationProcessing/lib/process_holography_dataset.py -text CAL/CalibrationProcessing/lib/process_holography_dataset.py -text
CAL/CalibrationProcessing/lib/processing/__init__.py -text CAL/CalibrationProcessing/lib/processing/__init__.py -text
CAL/CalibrationProcessing/lib/processing/averaging.py -text CAL/CalibrationProcessing/lib/processing/averaging.py -text
CAL/CalibrationProcessing/lib/processing/matrices.py -text CAL/CalibrationProcessing/lib/processing/normalize.py -text
CAL/CalibrationProcessing/lib/processing/solver.py -text
CAL/CalibrationProcessing/test/CMakeLists.txt -text CAL/CalibrationProcessing/test/CMakeLists.txt -text
CAL/CalibrationProcessing/test/__init__.py -text
CAL/CalibrationProcessing/test/processing/CMakeLists.txt -text
CAL/CalibrationProcessing/test/processing/t_processing.py -text
CAL/CalibrationProcessing/test/processing/t_processing.run -text
CAL/CalibrationProcessing/test/processing/t_processing.sh -text
CAL/CalibrationProcessing/test/profiling_processing.py -text CAL/CalibrationProcessing/test/profiling_processing.py -text
CAL/CalibrationProcessing/test/t_processing.py -text CAL/CalibrationProcessing/test/utils.py -text
CAL/CalibrationProcessing/test/t_processing.run -text
CAL/CalibrationProcessing/test/t_processing.sh -text
CAL/CalibrationProcessing/test/test_utils.py -text
CEP/Calibration/BBSControl/include/BBSControl/CommandHandlerEstimator.h -text CEP/Calibration/BBSControl/include/BBSControl/CommandHandlerEstimator.h -text
CEP/Calibration/BBSControl/include/BBSControl/CommandHandlerReducer.h -text CEP/Calibration/BBSControl/include/BBSControl/CommandHandlerReducer.h -text
CEP/Calibration/BBSControl/include/BBSControl/OptionParser.h -text CEP/Calibration/BBSControl/include/BBSControl/OptionParser.h -text
......
__all__ = ['HolographyDataset', 'HolographyObservation', 'HolographySpecification', 'HolographyMeasurementSet']
from .holography_specification import HolographySpecification from .holography_specification import HolographySpecification
from .holography_observation import HolographyObservation from .holography_observation import HolographyObservation
from .holography_measurementset import HolographyMeasurementSet from .holography_measurementset import HolographyMeasurementSet
from .holography_dataset import HolographyDataset from .holography_dataset import HolographyDataset
__all__ = ['HolographyDataset', 'HolographyObservation', 'HolographySpecification', 'HolographyMeasurementSet']
\ No newline at end of file
...@@ -20,7 +20,8 @@ ...@@ -20,7 +20,8 @@
python_install( python_install(
__init__.py __init__.py
processing/averaging.py processing/averaging.py
processing/matrices.py processing/normalize.py
processing/solver.py
processing/__init__.py processing/__init__.py
DESTINATION lofar/calibration) DESTINATION lofar/calibration)
......
from .averaging import * from .averaging import *
from .matrices import * from .normalize import *
\ No newline at end of file \ No newline at end of file
import logging import logging
import numpy import numpy
from lofar.calibration.common.datacontainers import HolographyDataset from lofar.calibration.common.datacontainers import HolographyDataset
from lofar.calibration.common.datacontainers.holography_dataset_definitions \
import HDS_data_sample_type
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -178,7 +179,7 @@ def average_datatable_by_time_interval(data_table_in, time_interval): ...@@ -178,7 +179,7 @@ def average_datatable_by_time_interval(data_table_in, time_interval):
def _average_dataset_by_function(dataset, function, *parameters): def _average_dataset_by_function(dataset, function, *parameters):
out_data = {} out_data = {}
for reference_station, data_per_reference_station in dataset.data.items(): for reference_station, data_per_reference_station in dataset.items():
if reference_station not in out_data: if reference_station not in out_data:
out_data[reference_station] = dict() out_data[reference_station] = dict()
...@@ -234,7 +235,8 @@ def average_dataset_by_sample(input_data_set, window_size): ...@@ -234,7 +235,8 @@ def average_dataset_by_sample(input_data_set, window_size):
logger.error(text) logger.error(text)
raise ValueError(text) raise ValueError(text)
else: else:
text = "Cannot average data by samples! The input data is of the invalid type %s." % (type(input_data_set)) text = "Cannot average data by samples! The input data is of the invalid type %s." % (
type(input_data_set))
logger.error(text) logger.error(text)
raise ValueError(text) raise ValueError(text)
...@@ -243,7 +245,7 @@ def average_dataset_by_sample(input_data_set, window_size): ...@@ -243,7 +245,7 @@ def average_dataset_by_sample(input_data_set, window_size):
logger.error(text) logger.error(text)
raise ValueError(text) raise ValueError(text)
out_data = _average_dataset_by_function(dataset, average_datatable_by_sample, window_size) out_data = _average_dataset_by_function(dataset.data, average_datatable_by_sample, window_size)
if dataset.derived_data is None: if dataset.derived_data is None:
dataset.derived_data = dict() dataset.derived_data = dict()
...@@ -279,7 +281,8 @@ def average_dataset_by_time(input_data_set, time_interval): ...@@ -279,7 +281,8 @@ def average_dataset_by_time(input_data_set, time_interval):
logger.error(text) logger.error(text)
raise ValueError(text) raise ValueError(text)
else: else:
text = "Cannot average data by time! The input data is of the invalid type %s." % (type(input_data_set)) text = "Cannot average data by time! The input data is of the invalid type %s." % (
type(input_data_set))
logger.error(text) logger.error(text)
raise ValueError(text) raise ValueError(text)
...@@ -288,7 +291,7 @@ def average_dataset_by_time(input_data_set, time_interval): ...@@ -288,7 +291,7 @@ def average_dataset_by_time(input_data_set, time_interval):
logger.error(text) logger.error(text)
raise ValueError(text) raise ValueError(text)
out_data = _average_dataset_by_function(dataset, average_datatable_by_time_interval, out_data = _average_dataset_by_function(dataset.data, average_datatable_by_time_interval,
time_interval) time_interval)
if dataset.derived_data is None: if dataset.derived_data is None:
dataset.derived_data = dict() dataset.derived_data = dict()
...@@ -298,6 +301,12 @@ def average_dataset_by_time(input_data_set, time_interval): ...@@ -298,6 +301,12 @@ def average_dataset_by_time(input_data_set, time_interval):
return dataset return dataset
def average_data_by_sample(input_data_table, samples):
out_data = _average_dataset_by_function(input_data_table, average_datatable_by_sample,
samples)
return out_data
def _compute_size_of_new_array(array_size, window_size): def _compute_size_of_new_array(array_size, window_size):
""" """
Round up the array size given a window size. Round up the array size given a window size.
...@@ -315,3 +324,87 @@ def _compute_size_of_new_array(array_size, window_size): ...@@ -315,3 +324,87 @@ def _compute_size_of_new_array(array_size, window_size):
# add one if the len of the data_array is not divisible by the sample_width # add one if the len of the data_array is not divisible by the sample_width
new_array_size += 1 if array_size % window_size else 0 new_array_size += 1 if array_size % window_size else 0
return new_array_size return new_array_size
def _weigthed_average_data_per_polarization(input_dataset, output_dataset,
real_weights, imag_weights, polarization):
mask = input_dataset['flag'] == False
try:
output_dataset[polarization] = numpy.average(
input_dataset[polarization][mask].real,
weights=real_weights[polarization][mask]) \
+ 1.j * numpy.average(
input_dataset[polarization][mask].imag,
weights=imag_weights[polarization][mask])
except ZeroDivisionError as e:
logger.warning('error in weighted average : %s', e)
output_dataset['flag'] = True
def weighted_average_dataset_per_station(dataset, input_data_table):
"""
:param dataset:
:type dataset: HolographyDataset
:param input_data_table:
:return:
"""
stations = dataset.reference_stations
n_stations = len(stations)
frequencies = dataset.frequencies
beams = dataset.beamlets
result_ALL = dict()
result = dict(all=result_ALL)
for frequency in frequencies:
result_per_frequency = dict()
result_ALL[frequency] = result_per_frequency
for beam in beams:
beam_str = str(beam)
result_per_beam = numpy.zeros(1, dtype=HDS_data_sample_type)
average_per_beam_station = numpy.zeros(n_stations, dtype=HDS_data_sample_type)
std_real_per_beam_station = numpy.zeros(n_stations, dtype=HDS_data_sample_type)
std_imag_per_beam_station = numpy.zeros(n_stations, dtype=HDS_data_sample_type)
for i, station in enumerate(stations):
data_per_station_frequency_beam = \
input_data_table[station][str(frequency)][beam_str]
average_per_beam_station['XX'][i] = data_per_station_frequency_beam['mean']['XX']
average_per_beam_station['XY'][i] = data_per_station_frequency_beam['mean']['XY']
average_per_beam_station['YX'][i] = data_per_station_frequency_beam['mean']['YX']
average_per_beam_station['YY'][i] = data_per_station_frequency_beam['mean']['YY']
average_per_beam_station['l'][i] = data_per_station_frequency_beam['mean']['l']
average_per_beam_station['m'][i] = data_per_station_frequency_beam['mean']['m']
std_real_per_beam_station['XX'][i] = \
data_per_station_frequency_beam['std']['XX_real']
std_real_per_beam_station['XY'][i] = \
data_per_station_frequency_beam['std']['XY_real']
std_real_per_beam_station['YX'][i] = \
data_per_station_frequency_beam['std']['YX_real']
std_real_per_beam_station['YY'][i] = \
data_per_station_frequency_beam['std']['YY_real']
std_imag_per_beam_station['XX'][i] = \
data_per_station_frequency_beam['std']['XX_imag']
std_imag_per_beam_station['XY'][i] = \
data_per_station_frequency_beam['std']['XY_imag']
std_imag_per_beam_station['YX'][i] = \
data_per_station_frequency_beam['std']['YX_imag']
std_imag_per_beam_station['YY'][i] = \
data_per_station_frequency_beam['std']['YY_imag']
for polarization in ['XX', 'XY', 'YX', 'YY']:
_weigthed_average_data_per_polarization(average_per_beam_station,
result_per_beam,
std_real_per_beam_station,
std_imag_per_beam_station,
polarization)
result_per_beam['l'] = numpy.average(average_per_beam_station['l'])
result_per_beam['m'] = numpy.average(average_per_beam_station['m'])
result_per_frequency[beam_str] = dict(mean=result_per_beam)
return result
import logging
import numpy
logger = logging.getLogger(__name__)
def extract_crosscorrelation_matrices_from_HDS_datatable(datatable):
new_shape = (2, 2, datatable.shape[0])
new_array = numpy.zeros(shape=new_shape, dtype=numpy.complex)
new_array[0, 0, :] = datatable['XX']
new_array[0, 1, :] = datatable['XY']
new_array[1, 0, :] = datatable['YX']
new_array[1, 1, :] = datatable['YY']
return new_array
def invert_crosscorrelation_matrices(cross_correlation_matrices):
"""
Invert the matrices along the last axis
:param cross_correlation_matrices: matrices to invert
:type cross_correlation_matrices: numpy.ndarray
:return:
"""
assert cross_correlation_matrices.ndim >= 3
return numpy.linalg.inv(cross_correlation_matrices)
import logging
import numpy
logger = logging.getLogger(__name__)
__POLARIZATION_TO_INDEX = {
'XX': (0,0),
'XY': (0,1),
'YX': (1,0),
'YY': (1,1)
}
def normalize_beams_by_central_one(input_data, central_beamlet_number):
"""
:param central_beamlet_number: number of the central beamleat
:type central_beamlet_number: str
:param input_data: input data to be processed
:type input_data: dict(dict(numpy.ndarray))
:return:
"""
normalized_data = dict()
for station in input_data:
normalized_data_per_station = dict()
normalized_data[station] = normalized_data_per_station
data_per_station = input_data[station]
for frequency in data_per_station:
data_per_station_per_frequency = data_per_station[frequency]
normalized_data_per_station_frequency = \
normalize_crosscorrelation_beams(data_per_station_per_frequency,
central_beamlet_number)
normalized_data_per_station[frequency] = normalized_data_per_station_frequency
return normalized_data
def normalize_crosscorrelation_beams(data_per_station_frequency,
central_beamlet):
"""
It normalize the crosscorrelation per each beam given the normalization array computed
from the central beam
:param data_per_station_frequency:
:param normalization_array:
:return:
"""
"""
TO avoid copying the data in a different array and then copy the data back
I compute directly the product of the two array of matrices
the resulting cross polarization as a function of time are:
XX = XX_b * XX_n + XY_b * YX_n
XY = XX_b * XY_n + XY_b * YY_n
YX = YX_b * XX_n + YY_b * YX_n
YY = YX_b * XY_n + YY_b * YY_n
where b is the beam and n is the normalization array
"""
is_invertion_matrix_valid = True
normalization_array = None
try:
normalization_array = invert_central_beam_crosscorrelations(data_per_station_frequency,
central_beamlet)
except numpy.linalg.LinAlgError as e:
logger.warning('error inverting central beamlet crosscorrelation for beamlet %s: %s',
central_beamlet,
e)
print('error inverting central beamlet crosscorrelation for beamlet %s: %s' % (
central_beamlet,
e))
is_invertion_matrix_valid = False
normalized_data_per_beam = dict()
for beam in data_per_station_frequency:
data_per_beam = data_per_station_frequency[beam]
normalized_data = numpy.array(data_per_beam)
normalized_data_per_beam[beam] = normalized_data
if is_invertion_matrix_valid == False:
normalized_data['flag'] = True
continue
normalized_data['XX'] = \
data_per_beam['XX'] * normalization_array[__POLARIZATION_TO_INDEX['XX']] + \
data_per_beam['XY'] * normalization_array[__POLARIZATION_TO_INDEX['YX']]
normalized_data['XY'] = \
data_per_beam['XX'] * normalization_array[__POLARIZATION_TO_INDEX['XY']] + \
data_per_beam['XY'] * normalization_array[__POLARIZATION_TO_INDEX['YY']]
normalized_data['YX'] = \
data_per_beam['YX'] * normalization_array[__POLARIZATION_TO_INDEX['XX']] + \
data_per_beam['YY'] * normalization_array[__POLARIZATION_TO_INDEX['YX']]
normalized_data['YY'] = \
data_per_beam['YX'] * normalization_array[__POLARIZATION_TO_INDEX['XY']] + \
data_per_beam['YY'] * normalization_array[__POLARIZATION_TO_INDEX['YY']]
return normalized_data_per_beam
def invert_central_beam_crosscorrelations(data_per_station_per_frequency, central_beamlet_number):
data_per_central_beamlet = __extract_crosscorrelation_matrices_from_data(
data_per_station_per_frequency[central_beamlet_number])
return __invert_crosscorrelation_matrices(data_per_central_beamlet)
def __extract_crosscorrelation_matrices_from_data(data_per_station_frequency_beam):
new_shape = (data_per_station_frequency_beam.shape[0], 2, 2)
new_array = numpy.zeros(shape=new_shape, dtype=numpy.complex)
new_array[:, 0, 0] = data_per_station_frequency_beam['XX']
new_array[:, 0, 1] = data_per_station_frequency_beam['XY']
new_array[:, 1, 0] = data_per_station_frequency_beam['YX']
new_array[:, 1, 1] = data_per_station_frequency_beam['YY']
return new_array
def __invert_crosscorrelation_matrices(cross_correlation_matrices):
"""
Invert the matrices along the last axis
:param cross_correlation_matrices: matrices to invert
:type cross_correlation_matrices: numpy.ndarray
:return:
"""
assert cross_correlation_matrices.ndim >= 3
result_array = numpy.rollaxis(numpy.linalg.inv(cross_correlation_matrices), 0, 3)
return result_array
import logging
import lofar.calibration.common.datacontainers as datacontainers
from scipy.constants import c as LIGHT_SPEED
logger = logging.getLogger(__name__)
import numpy
__FRQ_TO_PHASE = 2. * numpy.pi / LIGHT_SPEED
def _rotate_antenna_coordinates(dataset):
"""
:param dataset:
:type dataset: datacontainers.HolographyDataset
:return:
"""
station_position = numpy.array(dataset.target_station_position)
antenna_field_position = numpy.array(dataset.antenna_field_position)
antenna_position_offset = numpy.array([station_position - antenna_position for
antenna_position in antenna_field_position])
station_rotation_matrix = dataset.rotation_matrix
return numpy.dot(station_rotation_matrix, antenna_position_offset.T).T
def _compute_expected_phase_delay(l, m, x, y, frequency):
"""
Convert the antenna position (x,y) and the pointing cosines (l,m) into a phase delay for
the given frequency
:param l:
:param m:
:param x:
:param y:
:param frequency:
:return:
"""
phase = (x * l + y * m) * __FRQ_TO_PHASE * frequency
return phase
def _compute_pointing_matrices_per_station_frequency_beam(dataset, datatable, frequency):
"""
:param dataset:
:type dataset: datacontainers.HolographyDataset
:param datatable:
:type table to reduce:
:param frequency:
:type frequency: float
:return:
"""
rotated_coordinates = _rotate_antenna_coordinates(dataset)
n_antennas = len(dataset.antenna_field_position)
one_over_n_antennas = 1. / float(n_antennas)
n_beams = len(datatable)
pointing_matrix = numpy.matrix(numpy.zeros((n_beams, n_antennas), dtype=complex))
for i in range(n_beams):
for j in range(n_antennas):
l = datatable[str(i)]['mean']['l'][0]
m = datatable[str(i)]['mean']['m'][0]
x, y = rotated_coordinates[j, 0:2]
phase = _compute_expected_phase_delay(l, m, x, y, frequency)
pointing_matrix[i, j] = numpy.exp(-1.j * phase) * one_over_n_antennas
return pointing_matrix
def _solve_gains_complex(visibilities, matrix):
"""
To solve a complex linear system of equation it is possible to rewrite it in a equivalent
real system.
[re1 + i + im1 , re2 + i + im2 ; [ re1, -im1, re2, -im2]
~ [ im1, re1, im2, re2]
re3 + i + im3 , re4 + i + im4 ] [ re3, -im3, re4, -im4]
[ re3, re3, re$, -im4]
:param matrix:
:param visibilities:
:return:
"""
matrix_real = numpy.matrix(numpy.zeros((matrix.shape[0]*2, matrix.shape[1]*2)))
matrix_real[0::2, 0::2] = matrix.real
matrix_real[0::2, 1::2] = -matrix.imag
matrix_real[1::2, 0::2] = matrix.real
matrix_real[1::2, 1::2] = matrix.imag
visibilities_real = numpy.matrix(numpy.zeros(visibilities.shape[0] * 2)).T
visibilities_real[0::2] = visibilities.real
visibilities_real[1::2] = visibilities.imag
result = _solve_gains(visibilities_real, matrix_real)
if result['flag'] == False:
noise = result['relative_error']
noise = numpy.sqrt(noise[0::2].A1**2. + noise[1::2].A1**2.)
result["relative_error"] = noise
return result
def _solve_gains(visibilities, matrix):
try:
gains, residual, _, _ = numpy.linalg.lstsq(matrix, visibilities, rcond=None)
noise = abs(matrix * gains - visibilities) / abs(visibilities)
return dict(gains=gains, residual=residual, relative_error=noise,
flag=False)
except Exception as e:
logger.warning('error solving for the gains: %s', e)
__empty = dict(gains=numpy.array(numpy.nan),
residual=numpy.array(numpy.nan),
relative_error=numpy.array(numpy.nan),
flag=numpy.array(True))
return __empty
def _solve_gains_least_square(dataset, datatable, frequency, direct_complex=False):
"""
SOLVE THE EQUATION M * G = V FOR G
:param dataset:
:type dataset: datacontainers.HolographyDataset
:param datatable:
:param frequency:
:type frequency: float
:return:
"""
matrix = _compute_pointing_matrices_per_station_frequency_beam(dataset, datatable, frequency)
n_beams = len(datatable)
result = dict()
for polarization in ['XX', 'XY', 'YX', 'YY']:
visibilities = numpy.matrix([datatable[str(i)]['mean'][polarization] for i in range(n_beams)])
if direct_complex == True:
result[polarization] = _solve_gains(visibilities, matrix)
else:
result[polarization] = _solve_gains_complex(visibilities, matrix)
return result
def solve_gains_per_datatable(dataset, datatable, **kwargs):
"""
:param dataset:
:type dataset: datacontainers.HolographyDataset
:param datatable:
:type dataset: dict(dict(numpy.ndarray))
:return:
"""
result = dict()
for station in datatable:
result_per_station = dict()
result[station] = result_per_station
data_per_station = datatable[station]
for frequency in data_per_station:
data_per_frequency = data_per_station[frequency]
result_per_station[str(frequency)] = \
_solve_gains_least_square(dataset, data_per_frequency, float(frequency), **kwargs)
return result
...@@ -19,6 +19,11 @@ ...@@ -19,6 +19,11 @@
include(LofarCTest) include(LofarCTest)
lofar_add_test(t_processing) python_install(
__init__.py
utils.py
DESTINATION lofar/calibration/testing)
add_subdirectory(processing)
# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id$
include(LofarCTest)
lofar_add_test(t_processing)
import logging import logging
import unittest import unittest
import os
from lofar.calibration.common.datacontainers import HolographyDataset from lofar.calibration.common.datacontainers import HolographyDataset
from lofar.calibration.processing import average_dataset_by_sample, average_values_by_sample from lofar.calibration.processing import average_dataset_by_sample, average_values_by_sample
from lofar.calibration.common.datacontainers.holography_dataset_definitions import HDS_data_sample_type from lofar.calibration.common.datacontainers.holography_dataset_definitions import HDS_data_sample_type
from lofar.calibration.processing.matrices import extract_crosscorrelation_matrices_from_HDS_datatable from lofar.calibration.processing.normalize import __extract_crosscorrelation_matrices_from_data
from lofar.calibration.testing.utils import create_holography_dataset
import lofar.calibration.testing.utils as utils
logger = logging.getLogger('t_processing') logger = logging.getLogger('t_processing')
import numpy import numpy
import matplotlib.pyplot as plt
# READ doc/Holography_Data_Set.md! It contains the location from which the # READ doc/Holography_Data_Set.md! It contains the location from which the
# test data must be downloaded and this file generated. # test data must be downloaded and this file generated.
SAMPLE_DATASET = '/data/test/HolographyObservation/CS001HBA0.hdf5' SAMPLE_DATASET = '/data/test/HolographyObservation/CS001HBA0.hdf5'
def is_sample_dataset_available():
return os.path.exists(SAMPLE_DATASET)
class TestSignalGenerator(unittest.TestCase):
def test_gaussian_noise_generation(self):
test_average_value = 5. + 12j
test_real_std = 12
test_imag_std = 3
test_sample_size = 100
gaussian_generator = utils.TestComplexGaussianNoise(test_average_value,
test_real_std,
test_imag_std)
sample_noise = gaussian_generator.generate(test_sample_size, seed=5)
self.assertGreater(numpy.std(sample_noise.imag)/test_imag_std, .9)
self.assertGreater(numpy.std(sample_noise.imag)/test_imag_std, .9)
self.assertGreater(abs(numpy.average(sample_noise).real / test_average_value.real), .9)
self.assertGreater(abs(numpy.average(sample_noise).imag / test_average_value.imag), .9)
def test_signal_generator(self):
test_imag_coeff = 3.j
test_real_coeff = 2.
expected_dependencies_imag_real = test_imag_coeff/test_real_coeff
linear_function = lambda x: 2 * x + 3.j * x
test_average_value = 5. + 12j
test_real_std = 12
test_imag_std = 3
test_time_samples = numpy.linspace(0, 100, 1000).tolist()
gaussian_generator = utils.TestComplexGaussianNoise(test_average_value,
test_real_std,
test_imag_std)
signal_generator = utils.TestSignal(linear_function, gaussian_generator)
sample_signal = signal_generator.generate(test_time_samples)
coefficients = numpy.polyfit(sample_signal.real, sample_signal.imag, 1)
m, q = coefficients[0], coefficients[1]
self.assertGreater(abs(m/expected_dependencies_imag_real), .9 )
class TestProcessing(unittest.TestCase): class TestProcessing(unittest.TestCase):
sample_dataset = None
def setUp(self): def setUp(self):
pass self.sample_dataset = create_holography_dataset()
def test_sample_dataset(self):
for reference_station in self.sample_dataset.reference_stations:
self.assertIn(reference_station, self.sample_dataset.data)
def test_average_array_by_sample_real_values(self): def test_average_array_by_sample_real_values(self):
""" """
...@@ -51,6 +102,7 @@ class TestProcessing(unittest.TestCase): ...@@ -51,6 +102,7 @@ class TestProcessing(unittest.TestCase):
numpy.testing.assert_almost_equal(result['std_real'], expected_result_std.real) numpy.testing.assert_almost_equal(result['std_real'], expected_result_std.real)
numpy.testing.assert_almost_equal(result['std_imag'], expected_result_std.real) numpy.testing.assert_almost_equal(result['std_imag'], expected_result_std.real)
@unittest.skipIf(is_sample_dataset_available(), 'missing test dataset')
def test_average_datatable_by_sample(self): def test_average_datatable_by_sample(self):
holography_dataset = HolographyDataset.load_from_file(SAMPLE_DATASET) holography_dataset = HolographyDataset.load_from_file(SAMPLE_DATASET)
averaged_data = average_dataset_by_sample(SAMPLE_DATASET, 2).derived_data[ averaged_data = average_dataset_by_sample(SAMPLE_DATASET, 2).derived_data[
...@@ -85,7 +137,7 @@ class TestProcessing(unittest.TestCase): ...@@ -85,7 +137,7 @@ class TestProcessing(unittest.TestCase):
test_data['YX'] = numpy.random.randn(3) + 1.j * numpy.random.randn(3) test_data['YX'] = numpy.random.randn(3) + 1.j * numpy.random.randn(3)
test_data['YY'] = numpy.random.randn(3) + 1.j * numpy.random.randn(3) test_data['YY'] = numpy.random.randn(3) + 1.j * numpy.random.randn(3)
matrices_from_data = extract_crosscorrelation_matrices_from_HDS_datatable(test_data) matrices_from_data = __extract_crosscorrelation_matrices_from_data(test_data)
numpy.testing.assert_almost_equal(test_data['XX'], matrices_from_data[0, 0, :]) numpy.testing.assert_almost_equal(test_data['XX'], matrices_from_data[0, 0, :])
numpy.testing.assert_almost_equal(test_data['XY'], matrices_from_data[0, 1, :]) numpy.testing.assert_almost_equal(test_data['XY'], matrices_from_data[0, 1, :])
......
...@@ -54,6 +54,7 @@ class profile_averaging_per_sample(ProfilingTest): ...@@ -54,6 +54,7 @@ class profile_averaging_per_sample(ProfilingTest):
def process(self): def process(self):
processing.average_dataset_by_sample(SAMPLE_DATASET, 2) processing.average_dataset_by_sample(SAMPLE_DATASET, 2)
if __name__ == '__main__': if __name__ == '__main__':
profile_holography_dataset_read().run().print_results() profile_holography_dataset_read().run().print_results()
profile_averaging_per_sample().run().print_results() profile_averaging_per_sample().run().print_results()
from lofar.calibration.common.datacontainers.holography_dataset import HolographyDataset
def create_holography_dataset(target_station='CS001',
reference_stations=('RS301', 'RS302'),
pointings=(
)
):
"""
Creates a mock Holography Dataset that can be used for testing
:param target_station:
:param reference_stations:
:return:
"""
\ No newline at end of file
import numpy
from collections import defaultdict
from lofar.calibration.common.datacontainers.holography_dataset import HolographyDataset
from lofar.calibration.common.datacontainers.holography_dataset_definitions import HDS_coordinate_type, HDS_data_sample_type
DEFAULT_TARGET_STATION = 'CS001'
DEFAULT_TARGET_STATION_POSITION = numpy.array([0, 0, 0])
DEFAULT_REFERENCE_STATIONS = ('RS301', 'RS302')
DEFAULT_SOURCE_NAME = 'test source'
DEFAULT_SOURCE_POINTING = numpy.array([(1.5, 0.9, 'J2000')], HDS_coordinate_type)
DEFAULT_SAS_IDS = numpy.array([123456])
DEFAULT_SOURCE_POSITION = numpy.array([3826896.19174 , 460979.502113, 5064658.231])
DEFAULT_BEAMLETS = numpy.array([0, 1, 2, 3 , 4])
DEFAULT_FREQUENCIES = [150000.0]
DEFAULT_POINTINGS = {str(DEFAULT_FREQUENCIES[0]): {
'0': numpy.array([(1.5, 0.9, 'J2000')], HDS_coordinate_type),
'1': numpy.array([(2.0, 0.9, 'J2000')], HDS_coordinate_type),
'2': numpy.array([(1.0, 0.9, 'J2000')], HDS_coordinate_type),
'3': numpy.array([(1.0, 0.8, 'J2000')], HDS_coordinate_type),
'4': numpy.array([(1.0, 1, 'J2000')], HDS_coordinate_type)}}
DEFAULT_ANTENNA_FIELD_POSITION = numpy.array([[1, 0, 1],
[-1, 0, 1],
[+1, 0, 1]])
DEFAULT_ROTATION_MATRIX = numpy.array([[-1.19595e-01, 9.92823e-01, 3.30000e-05],
[-7.91954e-01, -9.54190e-02, 6.03078e-01],
[ 5.98753e-01, 7.20990e-02, 7.97682e-01]])
DEFAULT_MODE = 5
DEFAULT_TIME_SAMPLE = [1, 2, 3, 4, 5, 6]
class TestComplexGaussianNoise():
def __init__(self, average, std_real, std_imag):
"""
:param average: average value
:type average: complex
:param std_real: standard deviation for the real part
:type std_real: float
:param std_imag: standard deviation for the imaginary part
:type std_imag: float
"""
self.average = average
self.std_real = std_real
self.std_imag = std_imag
def generate(self, sample_size, seed=None):
"""
Generate a series of random values
:param sample_size:
:type sample_size: int
:param seed:
:type seed: int
:return:
"""
if seed:
numpy.random.seed(seed)
real = self.std_real * numpy.random.randn(sample_size)
imag = self.std_imag * numpy.random.randn(sample_size) * 1.j
return real + imag + self.average
class TestSignal():
def __init__(self, function, noise):
"""
:param function:
:type function: function
:param noise:
:type noise: TestComplexGaussianNoise
"""
self.function = function
self.noise = noise
def generate(self, timesamples, seed=None):
"""
:param timesamples: list of time samples in which to compute the signal
:type timesamples: Iterable[float]
:param seed: seed for the gaussian signal generator
:type seed: None|int
:return: the generated sample signal
"""
n_samples = len(timesamples)
result = numpy.zeros(n_samples, dtype=complex)
for i, timesample in enumerate(timesamples):
result[i] = self.function(timesample)
result += self.noise.generate(n_samples, seed)
return result
DEFAULT_AVERAGE_VALUES_PER_STATION_AND_POINTING = {
"RS301": {
'150000.0': {
'0': {
"mean": 1+5j,
"std_imag": 1,
"std_real": 2,
"real_coeff": 0,
"imag_coeff": 0
},
'1': {
"mean": 1 + 5j,
"std_imag": 1,
"std_real": 2,
"real_coeff": 0,
"imag_coeff": 0
},
'2': {
"mean": 1 + 5j,
"std_imag": 1,
"std_real": 2,
"real_coeff": 0,
"imag_coeff": 0
},
'3': {
"mean": 1 + 5j,
"std_imag": 1,
"std_real": 2,
"real_coeff": 0,
"imag_coeff": 0
},
'4': {
"mean": 1 + 5j,
"std_imag": 1,
"std_real": 2,
"real_coeff": 0,
"imag_coeff": 0
}
}
},
"RS302": {
'150000.0': {
'0': {
"mean": 1+5j,
"std_imag": 1,
"std_real": 2,
"real_coeff": 0,
"imag_coeff": 0
},
'1': {
"mean": 1 + 5j,
"std_imag": 1,
"std_real": 2,
"real_coeff": 0,
"imag_coeff": 0
},
'2': {
"mean": 1 + 5j,
"std_imag": 1,
"std_real": 2,
"real_coeff": 0,
"imag_coeff": 0
},
'3': {
"mean": 1 + 5j,
"std_imag": 1,
"std_real": 2,
"real_coeff": 0,
"imag_coeff": 0
},
'4': {
"mean": 1 + 5j,
"std_imag": 1,
"std_real": 2,
"real_coeff": 0,
"imag_coeff": 0
}
}
}
}
def recursive_default_dict_generator():
return defaultdict(recursive_default_dict_generator)
def generate_single_pointing_dataset(timesamples, parameters):
"""
:param timesamples:
:type timesamples: list
:return:
"""
dataset = numpy.zeros(len(timesamples), dtype=HDS_data_sample_type)
return dataset
def generate_sample_data(holography_dataset,
timesamples = DEFAULT_TIME_SAMPLE,
data_statistical_characteristics=DEFAULT_AVERAGE_VALUES_PER_STATION_AND_POINTING):
"""
Generates a sample data to start testing the processing step
:param holography_dataset: dataset prefilled with the header parameters
:type holography_dataset: HolographyDataset
:param timesamples:
:return:
"""
holography_dataset.data = recursive_default_dict_generator()
for reference_station in holography_dataset.reference_stations:
for frequency in holography_dataset.frequencies:
frequency_string = str(frequency)
for beamlet in holography_dataset.beamlets:
beamlet_string = str(beamlet)
holography_dataset.data[reference_station][frequency_string][beamlet_string] =\
generate_single_pointing_dataset(timesamples,
data_statistical_characteristics[reference_station][frequency_string][beamlet_string])
def create_holography_dataset(target_station_name=DEFAULT_TARGET_STATION,
target_station_position = DEFAULT_TARGET_STATION_POSITION,
reference_stations=DEFAULT_REFERENCE_STATIONS,
timesamples = DEFAULT_TIME_SAMPLE,
pointings=DEFAULT_POINTINGS,
sas_ids = DEFAULT_SAS_IDS,
source_name=DEFAULT_SOURCE_NAME,
source_position=DEFAULT_SOURCE_POINTING,
mode=DEFAULT_MODE,
beamlets=DEFAULT_BEAMLETS,
antenna_field_positions=DEFAULT_ANTENNA_FIELD_POSITION,
frequencies=DEFAULT_FREQUENCIES,
data_statistical_characteristics=DEFAULT_AVERAGE_VALUES_PER_STATION_AND_POINTING
):
"""
Creates a mock Holography Dataset that can be used for testing
"""
dataset = HolographyDataset()
dataset.mode = mode
dataset.target_station_position = target_station_position
dataset.target_station_name = target_station_name
dataset.beamlets = beamlets
dataset.source_position = source_position
dataset.reference_stations = reference_stations
dataset.antenna_field_position = antenna_field_positions
dataset.source_name = source_name
dataset.sas_ids = sas_ids
dataset.ra_dec = pointings
dataset.central_beamlets = dataset.find_central_beamlets(source_position,
pointings,
frequencies,
beamlets)
dataset.frequencies = frequencies
generate_sample_data(dataset, timesamples)
return dataset
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment