Skip to content
Snippets Groups Projects
Commit 26752503 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

SSB-43: merging to epich branch

parents e9cddfa0 fa2a4db4
No related branches found
No related tags found
1 merge request!44Merge back holography to master
Showing
with 917 additions and 106 deletions
...@@ -33,6 +33,19 @@ CAL/CalibrationCommon/test/t_holography_ms_class.sh -text ...@@ -33,6 +33,19 @@ CAL/CalibrationCommon/test/t_holography_ms_class.sh -text
CAL/CalibrationCommon/test/t_holography_observation.py -text CAL/CalibrationCommon/test/t_holography_observation.py -text
CAL/CalibrationCommon/test/t_holography_observation.run -text CAL/CalibrationCommon/test/t_holography_observation.run -text
CAL/CalibrationCommon/test/t_holography_observation.sh -text CAL/CalibrationCommon/test/t_holography_observation.sh -text
CAL/CalibrationProcessing/CMakeLists.txt -text
CAL/CalibrationProcessing/bin/CMakeLists.txt -text
CAL/CalibrationProcessing/lib/CMakeLists.txt -text
CAL/CalibrationProcessing/lib/__init__.py -text
CAL/CalibrationProcessing/lib/process_holography_dataset.py -text
CAL/CalibrationProcessing/lib/processing/__init__.py -text
CAL/CalibrationProcessing/lib/processing/averaging.py -text
CAL/CalibrationProcessing/lib/processing/matrices.py -text
CAL/CalibrationProcessing/test/CMakeLists.txt -text
CAL/CalibrationProcessing/test/profiling_processing.py -text
CAL/CalibrationProcessing/test/t_processing.py -text
CAL/CalibrationProcessing/test/t_processing.run -text
CAL/CalibrationProcessing/test/t_processing.sh -text
CEP/Calibration/BBSControl/include/BBSControl/CommandHandlerEstimator.h -text CEP/Calibration/BBSControl/include/BBSControl/CommandHandlerEstimator.h -text
CEP/Calibration/BBSControl/include/BBSControl/CommandHandlerReducer.h -text CEP/Calibration/BBSControl/include/BBSControl/CommandHandlerReducer.h -text
CEP/Calibration/BBSControl/include/BBSControl/OptionParser.h -text CEP/Calibration/BBSControl/include/BBSControl/OptionParser.h -text
......
...@@ -18,3 +18,4 @@ ...@@ -18,3 +18,4 @@
# $Id$ # $Id$
lofar_add_package(CalibrationCommon) lofar_add_package(CalibrationCommon)
lofar_add_package(CalibrationProcessing)
...@@ -39,8 +39,9 @@ Below is the list of defined attributes in this version of HDS: ...@@ -39,8 +39,9 @@ Below is the list of defined attributes in this version of HDS:
- **SAS_ids**: An array that contains the SAS ids that were created for all observation runs of the holography observation. *Unit: [n.a.]* - **SAS_ids**: An array that contains the SAS ids that were created for all observation runs of the holography observation. *Unit: [n.a.]*
- **Target_station_name**: Name of the LOFAR target station. *Unit: [n.a.]* - **Target_station_name**: Name of the LOFAR target station. *Unit: [n.a.]*
- **Target_station_position**: Position in cartesian coordinates. *Unit: [m, m, m]* - **Target_station_position**: Position in cartesian coordinates. *Unit: [m, m, m]*
- **Central_beamlets**: An array that contains the central beamlet number for each frequency. *Unit: [n.a.]*
- **Source_name**: Name of the source observed during the holography observation. *Unit: [n.a.]* - **Source_name**: Name of the source observed during the holography observation. *Unit: [n.a.]*
- **Source_position**: Celestial coordinates (RA, DEC) of the source. *Unit: [rad, rad]* - **Source_position**: Celestial coordinates (RA, DEC, EPOCH) of the source. *Unit: [rad, rad, n.a.]*
- **Observation_time**: Planned start and end time-stamps of the entire holography observation in MJD. *Unit: [s]* - **Observation_time**: Planned start and end time-stamps of the entire holography observation in MJD. *Unit: [s]*
- **Rotation_matrix**: Rotation matrix that allows to convert (RA, DEC) coordinates to (l, m) coordinates and vice versa. *Unit: [n.a.]* - **Rotation_matrix**: Rotation matrix that allows to convert (RA, DEC) coordinates to (l, m) coordinates and vice versa. *Unit: [n.a.]*
- **Antenna_field_position**: Cartesian coordinates with reference to IRTF of the centre of each antenna field that will be used in the holography observation. *Unit: [m, m, m]* - **Antenna_field_position**: Cartesian coordinates with reference to IRTF of the centre of each antenna field that will be used in the holography observation. *Unit: [m, m, m]*
...@@ -60,9 +61,11 @@ Below is the list of defined data tables in this version of HDS: ...@@ -60,9 +61,11 @@ Below is the list of defined data tables in this version of HDS:
- **frequency**: List of frequencies that were actually used for the holography observation. This list can contain less elements than the **Specified frequency** list. For a given frequency the index of it in this list serves as one of the three indices of **data**. *Unit [Hz]* - **frequency**: List of frequencies that were actually used for the holography observation. This list can contain less elements than the **Specified frequency** list. For a given frequency the index of it in this list serves as one of the three indices of **data**. *Unit [Hz]*
Additionally, a table containing the cross correlations and the right ascension and declination of each beam are stored in a group structure that will be described in the next sub section. Additionally, a table containing the cross correlations and the right ascension and declination of each beam are stored in a group structure that will be described in the next sub section.
### Groups in the HDS ### Groups in the HDS
The right ascension, declination and the crosscorrelation are stored in the HDS in hierarchical groups structure. This facilitate the access to a specific set of sample given the reference station name, The right ascension, declination and the crosscorrelation are stored in the HDS in a hierarchical group structure. This allows to access a specific set of samples by the reference station name, the frequency of the samples and the beamlet number.
the frequency of the sample and the beamlet number.
The notation ```%(station_name)``` is used to refer to a string containing a station name. The same notation is also used to refer to the string representation of the frequency in Hz (```%(frequency)```) and the beamlet number (```%(beamlet)```). The notation ```%(station_name)``` is used to refer to a string containing a station name. The same notation is also used to refer to the string representation of the frequency in Hz (```%(frequency)```) and the beamlet number (```%(beamlet)```).
The structure of the groups is the following: The structure of the groups is the following:
...@@ -75,11 +78,12 @@ The structure of the groups is the following: ...@@ -75,11 +78,12 @@ The structure of the groups is the following:
/RA_DEC /RA_DEC
/%(frequency) /%(frequency)
%(beamlet) This datatable contains the (rightascension, declination, epoch) of the target station beam pointings. This list allows it to translate between (frequency, beamlet) and (RA, DEC). *Unit: [rad, rad]* %(beamlet) This data table contains the (right ascension, declination, epoch) of the target station beam pointings. This list makes it possible to translate between (frequency, beamlet) and (RA, DEC). *Unit: [rad, rad]*
``` ```
## Cross-correlation data in HDS ## Cross-correlation data in HDS
Values in the cross-correlation tables are stored in a pseudo-structure. A simple representation of a single element in cross-correlation table can be given in Python language: Values in the cross-correlation tables are stored in a pseudo-structure. A simple representation of a single element in cross-correlation table can be given in Python language:
``` ```
data_element = dict( data_element = dict(
"XX" = c_XX, "XX" = c_XX,
...@@ -93,8 +97,7 @@ Values in the cross-correlation tables are stored in a pseudo-structure. A simp ...@@ -93,8 +97,7 @@ Values in the cross-correlation tables are stored in a pseudo-structure. A simp
``` ```
Here the indices "XX", "XY", "YX", "YY" allow accessing the cross-correlation values, "t" allows to access the MJD of the observation and "l", "m" allow to access the coordinates l and m and finally flag to access to the flag value. Here the indices "XX", "XY", "YX", "YY" allow accessing the cross-correlation values, "t" allows to access the MJD of the observation and "l", "m" allow to access the coordinates l and m and finally flag to access to the flag value.
As described before each cross-correlation table can be accessed by address. Meaning, that the observation sample at a given frequency `%(frequency)`, for a reference station `%(station_name)` and a beamlet `%(beamlet)` can be directly accessed As described before each cross-correlation table can be accessed by indices, meaning that the observation samples at a given frequency `%(frequency)`, for a reference station `%(station_name)` and a beamlet `%(beamlet)` can be directly accessed as `/%(station_name)/%(frequency)/%(beamlet)`. Example: `/RS409C/114843750.0/0`
as `/%(station_name)/%(frequency)/%(beamlet)` (Ex. `/RS409C/114843750.0/0`).
## Layout of the HDS in HDF5 files ## Layout of the HDS in HDF5 files
...@@ -114,7 +117,7 @@ as `/%(station_name)/%(frequency)/%(beamlet)` (Ex. `/RS409C/114843750.0/0`). ...@@ -114,7 +117,7 @@ as `/%(station_name)/%(frequency)/%(beamlet)` (Ex. `/RS409C/114843750.0/0`).
/Source_position (compound(double, double, string), RA, DEC and Epoch of the source in the sky /Source_position (compound(double, double, string), RA, DEC and Epoch of the source in the sky
/Target_station_name (string), name of the target station /Target_station_name (string), name of the target station
/Target_station_position (double[3]), position of the centre of the station in cartesian coordinates /Target_station_position (double[3]), position of the centre of the station in cartesian coordinates
/Central_beamlets (1-d array, int), beamlet number of the central beamlet, index is the frequency
/Specified_reference_station (1-d array, string), entries are reference station names [optional] /Specified_reference_station (1-d array, string), entries are reference station names [optional]
/Specified_frequency (1-d array, double), entries are the frequencies of all observations that were specified [optional] /Specified_frequency (1-d array, double), entries are the frequencies of all observations that were specified [optional]
/Specified_RA_DEC (2-d array, compound[double, double, string[10]]), indices are frequency index, beamlet# [optional] /Specified_RA_DEC (2-d array, compound[double, double, string[10]]), indices are frequency index, beamlet# [optional]
...@@ -123,14 +126,14 @@ as `/%(station_name)/%(frequency)/%(beamlet)` (Ex. `/RS409C/114843750.0/0`). ...@@ -123,14 +126,14 @@ as `/%(station_name)/%(frequency)/%(beamlet)` (Ex. `/RS409C/114843750.0/0`).
/Reference_station (1-d array, string[8]), entries are reference station names /Reference_station (1-d array, string[8]), entries are reference station names
/Frequency (1-d array, double), entries are the frequencies of all observations /Frequency (1-d array, double), entries are the frequencies of all observations
/RA_DEC /RA_DEC
/[One entry for each of the frequencies] The int(frequency) is the index. /[One entry for each of the frequencies] Index is the frequency in Hz as string.
/[One entry for each of the beamlets] The beamlet number is the index. /[One entry for each of the beamlets] Index is the beamlet number as string.
/(1-d array, compound[double, double, string[10]]), entries contain a struct of {RA, DEC, Epoch} /(1-d array, compound[double, double, string[10]]), entries contain a struct of {RA, DEC, Epoch}
/CROSSCORRELATION /CROSSCORRELATION
/[One entry for each of the reference stations] The name of the reference station is the index. /[One entry for each of the reference stations] Index is the name of the reference station.
/[One entry for each of the frequencies] The int(frequency in Hz) is the index. /[One entry for each of the frequencies] Index is the frequency in Hz as string.
/[One entry for each of the beamlets] The beamlet number is the index. /[One entry for each of the beamlets] Index is the beamlet number as string.
/data (1-d array, vararray[compound[compound[4][double, double], double[3], boolean]], entries contain a struct of {XX, XY, YX, YY, l, m, t, flag}. /data (1-d array, vararray[compound[compound[4][double, double], double[3], boolean]], entries contain a struct of {XX, XY, YX, YY, l, m, t, flag}.
``` ```
...@@ -140,7 +143,5 @@ Test data for two frequencies can be found on LOFAR's CEP3 in the directories ...@@ -140,7 +143,5 @@ Test data for two frequencies can be found on LOFAR's CEP3 in the directories
/data014/scratch/veen/holography/20180718/obs3/L661022 and /data014/scratch/veen/holography/20180718/obs3/L661022 and
/data014/scratch/veen/holography/20180718/obs3/L661024. /data014/scratch/veen/holography/20180718/obs3/L661024.
The MS to HDS conversion utility will create an HDS file from a MS. This file The MS to HDS conversion utility will create an HDF5 file containing an HDS from a MS. This file can then be used as input for some of the tests of the conversion utility that verifies a correct conversion.
can then be fed to some of the tests for the conversion utility to verify the
correct conversion.
import logging import logging
import h5py import h5py
from lofar.calibration.common.datacontainers.holography_observation import HolographyObservation from lofar.calibration.common.datacontainers.holography_observation import HolographyObservation
...@@ -33,36 +32,57 @@ class HolographyDataset(): ...@@ -33,36 +32,57 @@ class HolographyDataset():
# list of 3 floats # list of 3 floats
self.target_station_position = None self.target_station_position = None
self.source_name = None # string # name of the source (str)
self.source_position = None # list of 3 floats self.source_name = None
self.start_time = None # date time when the observation started in MJD in seconds (float) # position of the source array[ (RA, numpy.float64), (DEC, numpy.float64), (EPOCH, 'S10')]
self.end_time = None # date time when the observation started in MJD in seconds (float) self.source_position = None
self.rotation_matrix = None # array(3,3), translation matrix for # date time when the observation started in MJD in seconds (float)
self.start_time = None
# date time when the observation started in MJD in seconds (float)
self.end_time = None
# array(3,3), translation matrix for
# (RA, DEC) <-> (l, m) conversion # (RA, DEC) <-> (l, m) conversion
self.rotation_matrix = None
# list of beamlet numbers
self.beamlets = list()
# The central beam or beamlet for each frequency
self.central_beamlets = dict()
self.beamlets = list() # list of beamlet numbers
# coordinates of the antenna position in the target # coordinates of the antenna position in the target
self.antenna_field_position = [] self.antenna_field_position = []
# station # list of reference station names
self.reference_stations = list() # list of reference station names self.reference_stations = list()
self.frequencies = list() # list of frequencies # list of frequencies
self.ra_dec = dict() # array(Nfrequency, Nbeamlets) contains the ra_dec of which a beam self.frequencies = list()
# points at given a frequency and a beamlet number # dict(reference_station_name:
# numpy.dtype([('RA', numpy.float64), # dict(frequency:
# ('DEC',numpy.float64), # array that contains the ra_dec of which a beam
# ('EPOCH', 'S10')]) # points at given a frequency and a beamlet number
self.data = dict() # array(NreferenceStations, Nfrequencies, Nbeamlets) that contains the # numpy.dtype([('RA', numpy.float64),
# 4 polarization crosscorrelation for the 4 polarizations, the l and m coordinates, and # ('DEC',numpy.float64),
# the timestamp in mjd of the sample, and whether or not the data has been flagged # ('EPOCH', 'S10')])
# numpy.dtype([('XX', numpy.float), self.ra_dec = dict()
# ('YY', numpy.float), # dict(reference_station_name:
# ('XY', numpy.float), # dict(frequency:
# ('YX', numpy.float), # dict(beamlet_number:
# ('l', numpy.float), # array that contains the 4 polarization crosscorrelation for
# ('m', numpy.float), # the 4 polarizations, the l and m coordinates, and the timestamp
# ('t', numpy.float), # in mjd of the sample, and whether or not the data has been flagged
# numpy.dtype([('XX', numpy.float64),
# ('YY', numpy.float64),
# ('XY', numpy.float64),
# ('YX', numpy.float64),
# ('l', numpy.float64),
# ('m', numpy.float64),
# ('t', numpy.float64),
# ('flag', numpy.bool)] # ('flag', numpy.bool)]
# ) # )
self.data = dict()
# a dict of dicts and eventually str, ndarray or that can be converted in a ndarray calling
# numpy.array()
self.derived_data = None
@staticmethod @staticmethod
def compare_dicts(dict1, dict2): def compare_dicts(dict1, dict2):
...@@ -118,17 +138,92 @@ class HolographyDataset(): ...@@ -118,17 +138,92 @@ class HolographyDataset():
other_value) other_value)
elif attribute_value != other_value: elif attribute_value != other_value:
this_equality = False this_equality = False
logger.error("values for field \"%s\" differs", attribute_name)
except Exception as e: except Exception as e:
logger.warn("Cannot compare HDS values!", attribute_name, type(attribute_value), e) logger.warn("Cannot compare HDS values!", attribute_name, type(attribute_value),
e)
return False return False
try: try:
equality = equality and this_equality equality = equality and this_equality
except Exception as e: except Exception as e:
logger.warn("Comparison of two values resulted in something that is neither True nor False!", attribute_name, type(attribute_value), type(other_value)) logger.warn(
"Comparison of two values resulted in something that is neither True nor False!",
attribute_name, type(attribute_value), type(other_value))
return False return False
return equality return equality
def find_central_beamlets(self, source, ra_dec, frequencies, beamlets):
'''
This function finds the central beamlet of a target station for every
frequency.
@param source: A dict containing right ascension (source["RA"]) and
declination (source["DEC"]) of the source.
@param ra_dec: A dict[frequency][beamlet] that contains the right
ascension (ra_dec[frequency][beamlet][0]) and declination
(ra_dec[frequency][beamlet][1]) for every beamlet.
@param frequencies: A dict that contains the frequencies at which the
holography observation was performed.
@param beamlets: A list of beamlet numbers.
'''
logger.debug("Finding the central beamlets...")
source_ra = source["RA"]
source_dec = source["DEC"]
# Store each pseudo distance in a dict of dicts: pd[frequency][beamlet]
pseudo_distance = dict()
# Iterate over all frequencies...
for frequency in frequencies:
frequency_string = str(frequency)
pseudo_distance[frequency_string] = dict()
# and iterate over the beamlets.
for beamlet in beamlets:
beamlet_string = str(beamlet)
ra = ra_dec[frequency_string][beamlet_string]['RA']
dec = ra_dec[frequency_string][beamlet_string]['DEC']
# calculate a pseudo distance that is an indicator if this
# beamlet is closer to the source than the ones before.
# Note that I am too lazy to calculate the spherical distance.
# But keep in mind that this pseudo distance like its cousin
# the geometrical distance are not really cutting it since RA
# and DEC are coordinates of a spherical coordinate system.
# But here the pseudo distance is good enough.
pseudo_distance[frequency_string][beamlet_string] = abs(ra - source_ra) + abs(dec - source_dec)
# OK. Done with all the iteration business. Now check if across all
# frequencies the same beamlet is the central one. It is allowed that
# the central beamlets are not always the same but it would be better
# and let the astronomers sleep better if it would always be the same.
# And also check if there is actually only one central beamlet per
# frequency.
central_beamlet = dict()
for (frequency_string, beamlets) in pseudo_distance.items():
values = beamlets.values()
keys = beamlets.keys()
minimal_distance = min(values)
# If I find a second beamlet that is 'closest' to the source for the
# same frequency then that is a problem. The outset was that there
# is only a single central beamlet and not a bunch of 'em.
if values.count(minimal_distance) == 1:
# All is good. Only one central beamlet.
central_beamlet[frequency_string] = keys[values.index(minimal_distance)]
else:
text = "Found %d beamlets that have the same distance from the source position. Therefore a unique central beamlet does not exist." % (values.count(minimal_distance))
logger.error(text)
raise ValueError(text)
# Now check if the central beamlet is the same across all frequencies.
# I do that by creating a set from the central beamlets of all stations.
# If there is only one central beamlet for all frequencies, the size of
# the set will be 1.
central_beamlet_set = set(central_beamlet.values())
if len(central_beamlet_set) == 1:
logger.debug("All is good, unicorns everywhere, there is only one central beamlet \"%s\" for all frequencies.", central_beamlet_set)
else:
logger.warn("Multiple central beamlets have been identified: ", central_beamlet)
return central_beamlet
def load_from_beam_specification_and_ms(self, station_name, list_of_hbs_ms_tuples): def load_from_beam_specification_and_ms(self, station_name, list_of_hbs_ms_tuples):
""" """
Loads the dataset from the specification files and the measurements for the given station Loads the dataset from the specification files and the measurements for the given station
...@@ -138,9 +233,14 @@ class HolographyDataset(): ...@@ -138,9 +233,14 @@ class HolographyDataset():
""" """
logger.info("Creating a holography data set for station \"%s\"...", station_name) logger.info("Creating a holography data set for station \"%s\"...", station_name)
self.__collect_preliminary_information(station_name, list_of_hbs_ms_tuples) try:
self.__read_data(station_name, list_of_hbs_ms_tuples) self.__collect_preliminary_information(station_name, list_of_hbs_ms_tuples)
logger.info("Creation of a holography data set for station \"%s\" done.", station_name) self.__read_data(station_name, list_of_hbs_ms_tuples)
self.central_beamlets = self.find_central_beamlets(self.source_position, self.ra_dec, self.frequencies, self.beamlets)
logger.info("Creation of a holography data set for station \"%s\" done.", station_name)
except Exception as e:
logger.exception("Errore creating dataset for station \"%s\": %s", station_name, e)
raise e
def __read_data(self, station_name, list_of_hbs_ms_tuples): def __read_data(self, station_name, list_of_hbs_ms_tuples):
""" """
...@@ -196,6 +296,7 @@ class HolographyDataset(): ...@@ -196,6 +296,7 @@ class HolographyDataset():
target_stations = set() target_stations = set()
reference_stations = set() reference_stations = set()
beamlets = set() beamlets = set()
central_beamlet = None
virtual_pointing = dict() virtual_pointing = dict()
frequencies = set() frequencies = set()
sas_ids = set() sas_ids = set()
...@@ -284,21 +385,21 @@ class HolographyDataset(): ...@@ -284,21 +385,21 @@ class HolographyDataset():
raise Exception('Station %s was not involved in the observation.' raise Exception('Station %s was not involved in the observation.'
% station_name, ) % station_name, )
if len(mode) > 1: if len(mode) == 1:
raise ValueError('Multiple RCUs mode are not supported')
else:
self.mode = mode.pop() self.mode = mode.pop()
else:
raise ValueError('Multiple RCUs mode are not supported')
if len(source_position) > 1: if len(source_position) == 1:
self.source_position = numpy.array(source_position.pop(), dtype=HDS_coordinate_type)
else:
logger.error('Multiple source positions are not supported: %s', source_position) logger.error('Multiple source positions are not supported: %s', source_position)
raise ValueError('Multiple source positions are not supported') raise ValueError('Multiple source positions are not supported')
else:
self.source_position = source_position.pop()
if len(source_name) > 1: if len(source_name) == 1:
raise ValueError('Multiple source name are not supported')
else:
self.source_name = source_name.pop() self.source_name = source_name.pop()
else:
raise ValueError('Multiple source name are not supported')
@staticmethod @staticmethod
def print_info(hds=None, text=None): def print_info(hds=None, text=None):
...@@ -319,6 +420,7 @@ class HolographyDataset(): ...@@ -319,6 +420,7 @@ class HolographyDataset():
logger.info("Target station position = %s", hds.target_station_position) logger.info("Target station position = %s", hds.target_station_position)
logger.info("Source name = %s", hds.source_name) logger.info("Source name = %s", hds.source_name)
logger.info("Source position = %s", hds.source_position) logger.info("Source position = %s", hds.source_position)
logger.info("Central beamlets = %s", hds.central_beamlets)
logger.info("Start time = %s", hds.start_time) logger.info("Start time = %s", hds.start_time)
logger.info("End time = %s", hds.end_time) logger.info("End time = %s", hds.end_time)
logger.info("Rotation matrix = %s", hds.rotation_matrix) logger.info("Rotation matrix = %s", hds.rotation_matrix)
...@@ -329,7 +431,83 @@ class HolographyDataset(): ...@@ -329,7 +431,83 @@ class HolographyDataset():
logger.info("RA DEC = %s", hds.ra_dec) logger.info("RA DEC = %s", hds.ra_dec)
logger.info("Data = %s", hds.data) logger.info("Data = %s", hds.data)
else: else:
logger.warn("The object passed is not a HolographyDataset instance. Cannot print any data.") logger.warn(
"The object passed is not a HolographyDataset instance. Cannot print any data.")
@staticmethod
def _read_grouped_data(h5file, uri):
"""
Read the data in a nested hierarchy starting from the address uri
into a python dict
:param h5file: input HDF5 file
:type h5file: h5py.File
:param uri: starting point address
:type uri: str
:return: a dict encoding the structure
"""
starting_leaf = h5file[uri]
result = dict()
# Read the attributes in the hdf5 dataset
for attribute_key, attribute_value in starting_leaf.attrs.items():
if 'ATTRIBUTES' not in result:
result['ATTRIBUTES'] = dict()
result['ATTRIBUTES'][attribute_key] = attribute_value
for key, value in starting_leaf.items():
if isinstance(value, h5py.Group) is True:
result[key] = HolographyDataset._read_grouped_data(h5file, value.name)
else:
try:
if numpy.issubdtype(value.dtype, numpy.string_):
result[key] = str(numpy.array(value))
else:
result[key] = numpy.array(value)
except ValueError as e:
logger.exception('Cannot interpret %s a a numpy array: %s', type(value), e)
raise e
return result
@staticmethod
def _store_grouped_data(h5file, uri, data_to_store):
"""
Store the data in a nested hierarchy starting from the address uri
into the HDS
:param h5file: input HDF5 file
:type h5file: h5py.File
:param uri: starting point address
:type uri: str
:param data_to_store: dict that contains the data to store
:type data_to_store: dict
:return: a dict encoding the structure
"""
if uri not in h5file:
starting_leaf = h5file.create_group(uri)
else:
starting_leaf = h5file[uri]
# Store the attributes in the hdf5 dataset
attributes = data_to_store.get('ATTRIBUTES', dict())
for attribute_key, attribute_value in attributes.items():
starting_leaf.attrs[attribute_key] = attribute_value
for key, value in data_to_store.items():
if key is 'ATTRIBUTES':
continue
if isinstance(value, dict) is True:
HolographyDataset._store_grouped_data(h5file, '/'.join([uri, key]), value)
else:
try:
if isinstance(value, str):
starting_leaf[key] = numpy.string_(value)
else:
starting_leaf[key] = value
except ValueError as e:
logger.exception('Cannot interpret %s a a numpy array: %s', type(value), e)
raise e
@staticmethod @staticmethod
def load_from_file(path): def load_from_file(path):
...@@ -351,13 +529,15 @@ class HolographyDataset(): ...@@ -351,13 +529,15 @@ class HolographyDataset():
result.target_station_name = f.attrs[HDS_TARGET_STATION_NAME] result.target_station_name = f.attrs[HDS_TARGET_STATION_NAME]
result.target_station_position = list(f.attrs[HDS_TARGET_STATION_POSITION]) result.target_station_position = list(f.attrs[HDS_TARGET_STATION_POSITION])
result.source_name = f.attrs[HDS_SOURCE_NAME] result.source_name = f.attrs[HDS_SOURCE_NAME]
result.source_position = tuple(f.attrs[HDS_SOURCE_POSITION].tolist()) result.source_position = numpy.array(f.attrs[HDS_SOURCE_POSITION])
(result.start_time, result.end_time) = f.attrs[HDS_OBSERVATION_TIME] (result.start_time, result.end_time) = f.attrs[HDS_OBSERVATION_TIME]
result.rotation_matrix = f.attrs[HDS_ROTATION_MATRIX] result.rotation_matrix = f.attrs[HDS_ROTATION_MATRIX]
result.beamlets = list(f.attrs[HDS_BEAMLETS])
result.antenna_field_position = f.attrs[HDS_ANTENNA_FIELD_POSITION].tolist() result.antenna_field_position = f.attrs[HDS_ANTENNA_FIELD_POSITION].tolist()
result.reference_stations = list(f[HDS_REFERENCE_STATION]) result.reference_stations = list(f[HDS_REFERENCE_STATION])
result.frequencies = list(f[HDS_FREQUENCY]) result.frequencies = list(f[HDS_FREQUENCY])
result.ra_dec = dict() result.ra_dec = dict()
for frequency in f["RA_DEC"].keys(): for frequency in f["RA_DEC"].keys():
for beamlet in f["RA_DEC"][frequency].keys(): for beamlet in f["RA_DEC"][frequency].keys():
...@@ -380,7 +560,10 @@ class HolographyDataset(): ...@@ -380,7 +560,10 @@ class HolographyDataset():
result.data[reference_station][frequency][beamlet] = numpy.array( result.data[reference_station][frequency][beamlet] = numpy.array(
f["CROSSCORRELATION"][reference_station][frequency][beamlet]) f["CROSSCORRELATION"][reference_station][frequency][beamlet])
result.beamlets = list(beamlets) result.central_beamlets = HolographyDataset._read_grouped_data(f, HDS_CENTRAL_BEAMLETS)
if '/DERIVED_DATA' in f:
result.derived_data = HolographyDataset._read_grouped_data(f, '/DERIVED_DATA')
except Exception as e: except Exception as e:
logger.exception( logger.exception(
"Cannot read the Holography Data Set data from the HDF5 file \"%s\". This is the exception that was thrown: %s", "Cannot read the Holography Data Set data from the HDF5 file \"%s\". This is the exception that was thrown: %s",
...@@ -419,12 +602,11 @@ class HolographyDataset(): ...@@ -419,12 +602,11 @@ class HolographyDataset():
f.attrs[HDS_TARGET_STATION_POSITION] = self.target_station_position f.attrs[HDS_TARGET_STATION_POSITION] = self.target_station_position
f.attrs[HDS_SOURCE_NAME] = self.source_name f.attrs[HDS_SOURCE_NAME] = self.source_name
f.attrs[HDS_SOURCE_POSITION] = numpy.array(self.source_position, f.attrs[HDS_SOURCE_POSITION] = self.source_position
dtype=HDS_coordinate_type)
f.attrs[HDS_OBSERVATION_TIME] = numpy.array([self.start_time, self.end_time]) f.attrs[HDS_OBSERVATION_TIME] = numpy.array([self.start_time, self.end_time])
f.attrs[HDS_ROTATION_MATRIX] = self.rotation_matrix f.attrs[HDS_ROTATION_MATRIX] = self.rotation_matrix
f.attrs[HDS_ANTENNA_FIELD_POSITION] = self.antenna_field_position f.attrs[HDS_ANTENNA_FIELD_POSITION] = self.antenna_field_position
f.attrs[HDS_BEAMLETS] = self.beamlets
# Store the list of reference stations and frequencies. We just # Store the list of reference stations and frequencies. We just
# want to keep 'em around for quick reference. # want to keep 'em around for quick reference.
f[HDS_REFERENCE_STATION] = self.reference_stations f[HDS_REFERENCE_STATION] = self.reference_stations
...@@ -450,6 +632,15 @@ class HolographyDataset(): ...@@ -450,6 +632,15 @@ class HolographyDataset():
for beamlet in self.data[reference_station][frequency].keys(): for beamlet in self.data[reference_station][frequency].keys():
f["CROSSCORRELATION"][reference_station][frequency][beamlet] = \ f["CROSSCORRELATION"][reference_station][frequency][beamlet] = \
self.data[reference_station][frequency][beamlet] self.data[reference_station][frequency][beamlet]
HolographyDataset._store_grouped_data(h5file=f,
uri=HDS_CENTRAL_BEAMLETS,
data_to_store=self.central_beamlets)
if self.derived_data:
self._store_grouped_data(h5file=f,
uri='/DERIVED_DATA',
data_to_store=self.derived_data)
except Exception as e: except Exception as e:
logger.exception( logger.exception(
"Cannot write the Holography Data Set data to the HDF5 file \"%s\". This is the exception that was thrown: %s", "Cannot write the Holography Data Set data to the HDF5 file \"%s\". This is the exception that was thrown: %s",
......
...@@ -18,6 +18,8 @@ HDS_ROTATION_MATRIX = "Rotation_matrix" ...@@ -18,6 +18,8 @@ HDS_ROTATION_MATRIX = "Rotation_matrix"
HDS_ANTENNA_FIELD_POSITION = "Antenna_field_position" HDS_ANTENNA_FIELD_POSITION = "Antenna_field_position"
HDS_SPECIFIED_REFERENCE_STATION = "Specified_Reference_station" HDS_SPECIFIED_REFERENCE_STATION = "Specified_Reference_station"
HDS_SPECIFIED_FREQUENCY = "Specified_frequency" HDS_SPECIFIED_FREQUENCY = "Specified_frequency"
HDS_BEAMLETS = "Beamlets"
HDS_CENTRAL_BEAMLETS = "Central_beamlets"
# GROUP "RA DEC" # GROUP "RA DEC"
HDS_SPECIFIED_RA_DEC = "Specified_RA_DEC" HDS_SPECIFIED_RA_DEC = "Specified_RA_DEC"
......
...@@ -19,13 +19,13 @@ class HolographyMeasurementSet(object): ...@@ -19,13 +19,13 @@ class HolographyMeasurementSet(object):
def __init__(self, ms_name, ms_path): def __init__(self, ms_name, ms_path):
self.path = os.path.join(ms_path, ms_name) self.path = os.path.join(ms_path, ms_name)
if not HolographyMeasurementSet.is_a_valid_ms_name(ms_name): if HolographyMeasurementSet.is_a_valid_ms_name(ms_name):
self.name = ms_name
self.sas_id, self.beamlet = \
HolographyMeasurementSet.parse_sas_id_and_sub_band_from_ms_name(self.name)
else:
raise ValueError('The measurement set located in %s has not a valid name' % self.path, ) raise ValueError('The measurement set located in %s has not a valid name' % self.path, )
self.name = ms_name
self.sas_id, self.beamlet = \
HolographyMeasurementSet.parse_sas_id_and_sub_band_from_ms_name(self.name)
def get_frequency(self): def get_frequency(self):
observation_table = self.get_spectral_window_table() observation_table = self.get_spectral_window_table()
try: try:
...@@ -94,10 +94,10 @@ class HolographyMeasurementSet(object): ...@@ -94,10 +94,10 @@ class HolographyMeasurementSet(object):
try: try:
unique_names = {name for name in pointing_table.getcol('NAME')} unique_names = {name for name in pointing_table.getcol('NAME')}
if len(unique_names) > 1: if len(unique_names) == 1:
raise ValueError('Expected only a source as a target for the observation')
else:
source_name = unique_names.pop() source_name = unique_names.pop()
else:
raise ValueError('Expected only a source as a target for the observation')
finally: finally:
pointing_table.close() pointing_table.close()
...@@ -255,23 +255,23 @@ class HolographyMeasurementSet(object): ...@@ -255,23 +255,23 @@ class HolographyMeasurementSet(object):
def _compute_lm_from_ra_dec_station_position_rotation_matrix_and_time(ra_dec_epoch, def _compute_lm_from_ra_dec_station_position_rotation_matrix_and_time(ra_dec_epoch,
rotation_matrix, rotation_matrix,
mjd_times): mjd_times):
if isinstance(ra_dec_epoch, numpy.ndarray):
if not isinstance(ra_dec_epoch, numpy.ndarray): ra, dec, epoch = ra_dec_epoch.tolist()
astropy_times = [HolographyMeasurementSet.__mjd_to_astropy_time(mjd_time)
for mjd_time in mjd_times]
n_samples = len(astropy_times)
return_value_dtype = [('l', numpy.float64),
('m', numpy.float64)]
return_value = numpy.empty(n_samples, dtype=return_value_dtype)
l_m_arrays = pqr_from_icrs(numpy.array((ra, dec)), astropy_times, rotation_matrix)
return_value['l'][:] = l_m_arrays[:, 0]
return_value['m'][:] = l_m_arrays[:, 1]
else:
raise TypeError('Expected a structured numpy array for ra_dec obtained {}'. raise TypeError('Expected a structured numpy array for ra_dec obtained {}'.
format(ra_dec_epoch)) format(ra_dec_epoch))
ra, dec, epoch = ra_dec_epoch.tolist()
astropy_times = [HolographyMeasurementSet.__mjd_to_astropy_time(mjd_time)
for mjd_time in mjd_times]
n_samples = len(astropy_times)
return_value_dtype = [('l', numpy.float64),
('m', numpy.float64)]
return_value = numpy.empty(n_samples, dtype=return_value_dtype)
l_m_arrays = pqr_from_icrs(numpy.array((ra, dec)), astropy_times, rotation_matrix)
return_value['l'][:] = l_m_arrays[:, 0]
return_value['m'][:] = l_m_arrays[:, 1]
return return_value return return_value
...@@ -283,9 +283,10 @@ class HolographyMeasurementSet(object): ...@@ -283,9 +283,10 @@ class HolographyMeasurementSet(object):
@staticmethod @staticmethod
def parse_sas_id_and_sub_band_from_ms_name(ms_name): def parse_sas_id_and_sub_band_from_ms_name(ms_name):
if not HolographyMeasurementSet.is_a_valid_ms_name(ms_name): if HolographyMeasurementSet.is_a_valid_ms_name(ms_name):
match = re.match(HolographyMeasurementSet.ms_name_pattern, ms_name)
else:
raise ValueError('The measurement set %s has not a valid name' % ms_name, ) raise ValueError('The measurement set %s has not a valid name' % ms_name, )
match = re.match(HolographyMeasurementSet.ms_name_pattern, ms_name)
return str(match.group('sas_id')), int(match.group('sub_band_id')) return str(match.group('sas_id')), int(match.group('sub_band_id'))
@staticmethod @staticmethod
......
...@@ -117,31 +117,29 @@ class HolographyObservation(): ...@@ -117,31 +117,29 @@ class HolographyObservation():
extract_unique_reference_frequencies_from_ms_list( extract_unique_reference_frequencies_from_ms_list(
ms_indexed_per_beamlet_number.values()) ms_indexed_per_beamlet_number.values())
if len(unique_frequencies) > 1: if len(unique_frequencies) == 1:
frequency = unique_frequencies.pop()
else:
raise ValueError( raise ValueError(
'Multiple reference frequencies per observation are not supported') 'Multiple reference frequencies per observation are not supported')
else:
frequency = unique_frequencies.pop()
unique_source_names = HolographyObservation.\ unique_source_names = HolographyObservation.\
extract_unique_source_names_from_ms_list(ms_indexed_per_beamlet_number.values()) extract_unique_source_names_from_ms_list(ms_indexed_per_beamlet_number.values())
if len(unique_source_names) > 1: if len(unique_source_names) == 1:
raise ValueError(
'Multiple source target per observation are not supported'
)
else:
source_name = unique_source_names.pop() source_name = unique_source_names.pop()
else:
raise ValueError(
'Multiple source target per observation are not supported')
unique_subband = HolographyObservation.\ unique_subband = HolographyObservation.\
extract_unique_subband_from_ms_list(ms_indexed_per_beamlet_number.values()) extract_unique_subband_from_ms_list(ms_indexed_per_beamlet_number.values())
if len(unique_subband) > 1: if len(unique_subband) == 1:
raise ValueError(
'Multiple subband per observation are not supported'
)
else:
sub_band = unique_subband.pop() sub_band = unique_subband.pop()
else:
raise ValueError(
'Multiple subband per observation are not supported')
observations_list.append( observations_list.append(
HolographyObservation(path, sas_id, ms_indexed_per_beamlet_number, HolographyObservation(path, sas_id, ms_indexed_per_beamlet_number,
......
...@@ -14,12 +14,12 @@ def is_observation_in_range(start, end, from_datetime, to_datetime): ...@@ -14,12 +14,12 @@ def is_observation_in_range(start, end, from_datetime, to_datetime):
:return: true if the observation is contained in the range false otherwise :return: true if the observation is contained in the range false otherwise
:raises: ValueError if start > end :raises: ValueError if start > end
""" """
if start > end: if start <= end:
start_in_range = from_datetime < start < to_datetime
end_in_range = from_datetime < end < to_datetime
else:
raise ValueError('start datetime is greater then end datetime') raise ValueError('start datetime is greater then end datetime')
start_in_range = from_datetime < start < to_datetime
end_in_range = from_datetime < end < to_datetime
return start_in_range and end_in_range return start_in_range and end_in_range
......
import logging import logging
import unittest import unittest
import tempfile import tempfile
import h5py
import numpy
import os import os
from lofar.calibration.common.datacontainers import HolographyDataset, HolographySpecification from lofar.calibration.common.datacontainers import HolographyDataset, HolographySpecification
from lofar.calibration.common.datacontainers import HolographyObservation from lofar.calibration.common.datacontainers import HolographyObservation
...@@ -10,7 +12,7 @@ logger = logging.getLogger('t_holography_dataset_class') ...@@ -10,7 +12,7 @@ logger = logging.getLogger('t_holography_dataset_class')
# READ doc/Holography_Data_Set.md! It contains the location from which the # READ doc/Holography_Data_Set.md! It contains the location from which the
# test data must be downloaded. # test data must be downloaded.
path_to_test_data = '/var/tmp/holography' path_to_test_data = '/var/tmp/holography'
path_to_test_dataset = path_to_test_data + '/CS001HBA0.hdf5'
class TestHolographyDatasetClass(unittest.TestCase): class TestHolographyDatasetClass(unittest.TestCase):
def test_create_hds(self): def test_create_hds(self):
...@@ -32,6 +34,20 @@ class TestHolographyDatasetClass(unittest.TestCase): ...@@ -32,6 +34,20 @@ class TestHolographyDatasetClass(unittest.TestCase):
# Make sure the data in memory is OK. # Make sure the data in memory is OK.
self.assertEqual(holography_dataset.source_name, '3C 147') self.assertEqual(holography_dataset.source_name, '3C 147')
def test_store_and_read_from_hdf(self):
test_dict = dict(CS001=dict(BEAM0=1, BEAM1=2, BEAM3=dict(FR1='str2'), ATTRIBUTES=dict(a=1)), CS003='str')
with tempfile.NamedTemporaryFile(suffix='.hdf5') as tfile:
tfile.close()
h5file = h5py.File(tfile.name)
HolographyDataset._store_grouped_data(h5file, '/test', test_dict)
h5file.close()
h5file = h5py.File(tfile.name)
read_dict = HolographyDataset._read_grouped_data(h5file, '/test')
self.assertDictEqual(test_dict, read_dict)
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(format='%(name)s : %(message)s') logging.basicConfig(format='%(name)s : %(message)s')
......
...@@ -3,7 +3,7 @@ import unittest ...@@ -3,7 +3,7 @@ import unittest
from lofar.calibration.common.datacontainers import HolographyObservation from lofar.calibration.common.datacontainers import HolographyObservation
logger = logging.getLogger('t_holography_dataset_class') logger = logging.getLogger('t_holography_dataset_class')
path_to_test_data = '/data/test/HolographyObservation' path_to_test_data = '/var/tmp/holography'
class TestHolographyDatasetClass(unittest.TestCase): class TestHolographyDatasetClass(unittest.TestCase):
...@@ -11,9 +11,8 @@ class TestHolographyDatasetClass(unittest.TestCase): ...@@ -11,9 +11,8 @@ class TestHolographyDatasetClass(unittest.TestCase):
list = HolographyObservation.list_observations_in_path(path_to_test_data) list = HolographyObservation.list_observations_in_path(path_to_test_data)
## Assert correct reading of the files ## Assert correct reading of the files
self.assertEqual(len(list), 1)
HolographyObservation_freq_one = list.pop() HolographyObservation_freq_one = list.pop()
self.assertEqual(HolographyObservation_freq_one.sas_id, 661022) self.assertEqual(HolographyObservation_freq_one.sas_id, "661022")
self.assertEqual(HolographyObservation_freq_one.source_name, '3C 147') self.assertEqual(HolographyObservation_freq_one.source_name, '3C 147')
self.assertEqual(HolographyObservation_freq_one.sub_band, 76) self.assertEqual(HolographyObservation_freq_one.sub_band, 76)
self.assertEqual(HolographyObservation_freq_one.start_mjd, 5038710120.0) self.assertEqual(HolographyObservation_freq_one.start_mjd, 5038710120.0)
......
# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id$
lofar_package(CalibrationProcessing 1.0 DEPENDS PyCommon CalibrationCommon)
include(PythonInstall)
add_subdirectory(lib)
add_subdirectory(bin)
add_subdirectory(test)
# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id$
python_install(
__init__.py
processing/averaging.py
processing/matrices.py
processing/__init__.py
DESTINATION lofar/calibration)
lofar_add_bin_script(process_holography_dataset.py RENAME process_holography_dataset)
\ No newline at end of file
#!/usr/bin/env python
import argparse
import logging
from lofar.calibration.common.utils import *
import lofar.calibration.processing as processing
import sys
DEFAULT_SLEEP_TIME = 1
logger = logging.getLogger('mshologextract')
def main():
cla_parser = specify_command_line_arguments()
arguments = parse_command_line_arguments_and_set_verbose_logging(cla_parser)
preaverage_holography_dataset(input_path=arguments.input_path,
output_path=arguments.output_path,
average_samples=arguments.average_samples,
time_average_step=arguments.time_average_step)
def parse_command_line_arguments_and_set_verbose_logging(parser):
"""
Given a parser it parses the command line arguments and return the result of the parsing.
:param parser: command line parser
:type parser: argparse.ArgumentParser
:return: the parsed arguments
:rtype: argparse.Namespace
"""
arguments=parser.parse_args()
if arguments.v:
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG)
else:
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
logger.info('The selected arguments are %s', arguments)
return arguments
def specify_command_line_arguments():
"""
Setup the command argument parser and returns it
:return: the parser for the command line arguments
:rtype: argparse.ArgumentParser
"""
parser = argparse.ArgumentParser(description='This program is meant for convert the Holography'
' observation\'s data into an holography dataset')
parser.add_argument('input_path', help='path to the holography observation data')
parser.add_argument('output_path', help='path to the holography observation data')
parser.add_argument('--time_average_step',
help='average window for the crosscorrelation in seconds', default=None,
type=float)
parser.add_argument('--average_samples',
help='number of samples to perform one average step of the'
'cross correlations', default=None, type=int)
parser.add_argument('--num_proc', help='number of processes used to convert the holography'
' observation', type=int)
parser.add_argument('-v', help='verbose logging', action='store_true', default=False)
return parser
def preaverage_holography_dataset(input_path, output_path, average_samples, time_average_step):
"""
:param infile_path: path to the input file
:param outfile_path: path to the output file
:param average_samples: averaging sample step
:param time_average_step: averaging time window
:return:
"""
if average_samples is not None and time_average_step is not None:
logger.error('Please specify either the average window in samples'
' or the time average step in seconds')
raise ValueError('Both average_samples and time_average_step have been specified')
if average_samples is None and time_average_step is None:
logger.error('Neither average_samples nor time_average_step has been specified')
if average_samples is not None:
logger.info('Averaging %s with a time sample window of %s', input_path, average_samples)
output_hds = processing.average_dataset_by_sample(input_path, average_samples)
logger.info('Averaged %s with a time sample window of %s', input_path, time_average_step)
logger.info('Storing processed file %s in %s', input_path, output_path)
output_hds.store_to_file(output_path)
logger.info('Stored processed file %s in %s', input_path, output_path)
if time_average_step is not None:
logger.info('Averaging %s with a time sample window of %s s', input_path, time_average_step)
output_hds = processing.average_dataset_by_time(input_path, time_average_step)
logger.info('Averaged %s with a time sample window of %s', input_path, time_average_step)
logger.info('Storing processed file %s in %s', input_path, output_path)
output_hds.store_to_file(output_path)
logger.info('Stored processed file %s in %s', input_path, output_path)
if __name__ == '__main__':
main()
\ No newline at end of file
from .averaging import *
from .matrices import *
\ No newline at end of file
import logging
import numpy
from lofar.calibration.common.datacontainers import HolographyDataset
logger = logging.getLogger(__name__)
def average_values_by_sample(data_array, window_size, field_name=None):
"""
Average the values present in the data_array with a given sample width
:param data_array: input array
:type data_array: numpy.ndarray
:param window_size: window width in samples
:type window_size: int
:param field_name: name of the field to select
:type field_name: str
:return: an array containing the averaged values and the standard deviation
:rtype: numpy.ndarray
"""
new_array_size = _compute_size_of_new_array(len(data_array), window_size)
# select the right field
if field_name is not None:
data_array = data_array[field_name]
field_dtype = data_array.dtype
if numpy.issubdtype(field_dtype, numpy.complex):
result = numpy.zeros(new_array_size, dtype=[('mean', field_dtype),
('std_real', numpy.float),
('std_imag', numpy.float),
('averaged_samples', numpy.int)])
else:
result = numpy.zeros(new_array_size, dtype=[('mean', field_dtype),
('std', field_dtype),
('averaged_samples', numpy.int)])
# compute the average and the std of all the steps except for the last one
for i in range(new_array_size - 1):
data_array_view = numpy.array(data_array[i * window_size: (i + 1) * window_size])
result['mean'][i] = numpy.nanmean(data_array_view)
if numpy.issubdtype(field_dtype, numpy.complex):
result['std_real'][i] = numpy.std(numpy.real(data_array_view))
result['std_imag'][i] = numpy.std(numpy.imag(data_array_view))
else:
result['std'][i] = numpy.std(data_array_view)
# Counts the values that are not nan therefore the one used to do the mean
result['averaged_samples'][i] = numpy.count_nonzero(~numpy.isnan(data_array_view))
data_array_view = data_array[(new_array_size - 1) * window_size:]
result['mean'][-1] = numpy.nanmean(data_array_view)
if numpy.issubdtype(field_dtype, numpy.complex):
result['std_real'][-1] = numpy.std(numpy.real(data_array_view))
result['std_imag'][-1] = numpy.std(numpy.imag(data_array_view))
else:
result['std'][-1] = numpy.std(data_array_view)
# Counts the values that are not nan therefore the one used to do the mean
result['averaged_samples'][-1] = numpy.count_nonzero(~numpy.isnan(data_array_view))
return result
def _average_datatable_with_function(data_table_in, parameter, expected_array_size, function):
field_names_mean = []
formats_mean = []
field_names_std = []
formats_std = []
for field_name in data_table_in.dtype.names:
field_names_mean.append(field_name)
field_dtype = data_table_in.dtype[field_name]
formats_mean.append(field_dtype)
# if the datatype is complex the standard deviation is computed only on the real(imaginary)
# part
if numpy.issubdtype(field_dtype, numpy.complex):
field_names_std.append(field_name + '_imag')
formats_std.append(numpy.float)
field_names_std.append(field_name + '_real')
formats_std.append(numpy.float)
else:
field_names_std.append(field_name)
formats_std.append(field_dtype)
field_names_mean += ['averaged_samples']
formats_mean += [numpy.int]
field_names_std += ['averaged_samples']
formats_std += [numpy.int]
mean_output_data_table_dtype = numpy.dtype(dict(names=field_names_mean, formats=formats_mean))
std_output_data_table_dtype = numpy.dtype(dict(names=field_names_std, formats=formats_std))
output_array_average = numpy.zeros((expected_array_size,), dtype=mean_output_data_table_dtype)
output_array_std = numpy.zeros((expected_array_size,), dtype=std_output_data_table_dtype)
fields_to_average = list(data_table_in.dtype.names)
fields_to_average.remove('flag')
for field_name in fields_to_average:
datatable_values = numpy.array(data_table_in)
# set flagged values to nan
if field_name is not 't':
datatable_values[field_name][data_table_in['flag']] = numpy.nan
averaged_content = function(datatable_values, parameter, field_name=field_name)
output_array_average[field_name] = averaged_content['mean']
field_dtype = data_table_in.dtype[field_name]
if numpy.issubdtype(field_dtype, numpy.complex):
output_array_std[field_name + '_real'] = averaged_content['std' + '_real']
output_array_std[field_name + '_imag'] = averaged_content['std' + '_imag']
else:
output_array_std[field_name] = averaged_content['std']
# flag the sample if the samples used are less that the window size
output_array_average['averaged_samples'] = averaged_content['averaged_samples']
output_array_std['averaged_samples'] = averaged_content['averaged_samples']
return dict(mean=output_array_average, std=output_array_std)
def average_datatable_by_sample(data_table_in, window_size):
"""
Average the datatable with a given sample window width
:param data_table_in: the datatable to be averaged
:type data_table_in: numpy.ndarray
:param window_size: window width in samples
:return: the array with the averaged values and the array with the standard deviations
:rtype: dict(str:numpy.ndarray)
"""
new_array_size = _compute_size_of_new_array(len(data_table_in), window_size)
result = _average_datatable_with_function(data_table_in,
window_size,
new_array_size,
average_values_by_sample)
result['averaged_samples'] = window_size
return result
def average_datatable_by_time_interval(data_table_in, time_interval):
"""
Average the datatable with a given sample time interval window
:param data_table_in: the datatable to be averaged
:type data_table_in: numpy.ndarray
:param time_interval: time interval window
:type time_interval: numpy.float
:return: the array with the averaged values and the array with the standard deviations
:rtype: dict(str:numpy.ndarray)
"""
assert (time_interval > 0)
max_t, min_t = numpy.max(data_table_in['t']), numpy.min(data_table_in['t'])
assert (max_t > min_t)
samples = len(data_table_in['t'])
assert (samples > 0)
average_samples_dt = (max_t - min_t) / float(samples)
window_size = int(time_interval / average_samples_dt)
if window_size == 0:
logger.warning('The time window %s is smaller than the sample interval %s', time_interval,
average_samples_dt)
window_size = 1
logging.debug('averaging with a sample size of %s', window_size)
new_array_size = _compute_size_of_new_array(len(data_table_in), window_size)
result = _average_datatable_with_function(data_table_in,
window_size,
new_array_size,
average_values_by_sample)
result['averaged_samples'] = window_size
return result
def _average_dataset_by_function(dataset, function, *parameters):
out_data = {}
for reference_station, data_per_reference_station in dataset.data.items():
if reference_station not in out_data:
out_data[reference_station] = dict()
out_data_per_reference_station = out_data[reference_station]
for frequency_string, data_per_frequency in data_per_reference_station.items():
if frequency_string not in out_data_per_reference_station:
out_data_per_reference_station[frequency_string] = dict()
outdata_per_beamlet = out_data_per_reference_station[frequency_string]
for beamlet, data_per_beamlet in data_per_frequency.items():
logger.debug('processing reference station=%s, frequency=%s, beamlet=%s with %s',
reference_station,
frequency_string,
beamlet,
function.__name__)
outdata_per_beamlet[beamlet] = dict()
averaged_datatable = function(data_per_beamlet, *parameters)
outdata_per_beamlet[beamlet]['mean'] = averaged_datatable['mean']
outdata_per_beamlet[beamlet]['std'] = averaged_datatable['std']
outdata_per_beamlet[beamlet]['ATTRIBUTES'] = {'AVERAGED_SAMPLES':
averaged_datatable[
'averaged_samples']}
return out_data
def average_dataset_by_sample(input_data_set, window_size):
"""
Averaged the dataset with a given sample window width
:param input_data_set: holography dataset, either in an HDF5 file or as an
HDS in memory
:type input_data_set: str (file name) or HolographyDataset
:param window_size: window width in samples
:type window_size: int (enforced!)
:return: the averaged data set structured in a dict of dicts. Keys are
[the reference station name]
[the frequency]
[the beamlet]
mean : -> containing the averaged results
std : -> containing the standard deviation of the averaged results
:rtype: HolographyDataset
"""
if input_data_set is not None:
if isinstance(input_data_set, HolographyDataset) is True
dataset = input_data_set
elif isinstance(str, input_data_set):
dataset = HolographyDataset.load_from_file(input_data_set)
else:
text = "Cannot average data by samples! The input data is neither an HDS not a string."
logger.error(text)
raise ValueError(text)
else:
text = "Cannot average data by samples! The input data is of the invalid type %s." % (type(input_data_set))
logger.error(text)
raise ValueError(text)
if isinstance(int, window_size) is False or window_size < 1:
text = "Cannot average data by samples! The window size must be an integer value bigger than 0."
logger.error(text)
raise ValueError(text)
out_data = _average_dataset_by_function(dataset, average_datatable_by_sample, window_size)
if dataset.derived_data is None:
dataset.derived_data = dict()
dataset.derived_data['PRE_NORMALIZATION_AVERAGE_TIME'] = out_data
dataset.derived_data['PRE_NORMALIZATION_AVERAGE_TIME']['ATTRIBUTES'] = \
{"AVERAGING_WINDOW_SAMPLES": numpy.array(window_size, dtype=int)}
return dataset
def average_dataset_by_time(input_data_set, time_interval):
"""
Averaged the dataset with a given time interval in seconds
:param input_data_set: holography dataset, either in an HDF5 file or as an
HDS in memory
:type input_data_set: str (file name) or HolographyDataset
:param time_interval: average width in seconds
:type time_interval: int (enforced!)
:return: the averaged data set structured in a dict of dicts which keys are
[the reference station name]
[the frequency]
[the beamlet]
mean : -> containing the averaged results
std : -> containing the standard deviation of the averaged results
:rtype: HolographyDataset
"""
if input_data_set is not None:
if isinstance(input_data_set, HolographyDataset) is True
dataset = input_data_set
elif isinstance(str, input_data_set):
dataset = HolographyDataset.load_from_file(input_data_set)
else:
text = "Cannot average data by time! The input data is neither an HDS not a string."
logger.error(text)
raise ValueError(text)
else:
text = "Cannot average data by time! The input data is of the invalid type %s." % (type(input_data_set))
logger.error(text)
raise ValueError(text)
logger.error("Here we need a verification that the time_window is an integer multiple of the sampling time.")
# if isinstance(int, time_interval) is False or time_interval < 1:
# text = "Cannot average data by samples! The window size must be an integer value bigger than 0."
# logger.error(text)
# raise ValueError(text)
out_data = _average_dataset_by_function(dataset, average_datatable_by_time_interval,
time_interval)
if dataset.derived_data is None:
dataset.derived_data = dict()
dataset.derived_data['PRE_NORMALIZATION_AVERAGE_TIME'] = out_data
dataset.derived_data['PRE_NORMALIZATION_AVERAGE_TIME']['ATTRIBUTES'] = \
{"AVERAGING_WINDOW_INTERVAL": time_interval}
return dataset
def _compute_size_of_new_array(array_size, window_size):
"""
Round up the array size given a window size.
If array_size % window_size = 0 it returns array_size/window_size
otherwise it returns array_size/window_size + 1
:param array_size: size of the input array
:param window_size: window size for the averaging
:return: the new array size after averaging with the given window size
"""
# python does not enforce the sample_width to be an integer and a value between 0 and 1
# will create an oversampling of the array
assert window_size >= 1
new_array_size = int(array_size // window_size)
# add one if the len of the data_array is not divisible by the sample_width
new_array_size += 1 if array_size % window_size else 0
return new_array_size
import logging
import numpy
logger = logging.getLogger(__name__)
def extract_crosscorrelation_matrices_from_HDS_datatable(datatable):
new_shape = (2, 2, datatable.shape[0])
new_array = numpy.zeros(shape=new_shape, dtype=numpy.complex)
new_array[0, 0, :] = datatable['XX']
new_array[0, 1, :] = datatable['XY']
new_array[1, 0, :] = datatable['YX']
new_array[1, 1, :] = datatable['YY']
return new_array
def invert_crosscorrelation_matrices(cross_correlation_matrices):
"""
Invert the matrices along the last axis
:param cross_correlation_matrices: matrices to invert
:type cross_correlation_matrices: numpy.ndarray
:return:
"""
assert cross_correlation_matrices.ndim >= 3
return numpy.linalg.inv(cross_correlation_matrices)
# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id$
include(LofarCTest)
lofar_add_test(t_processing)
#!/usr/bin/env python
import cProfile
import pstats, StringIO
import lofar.calibration.processing as processing
from lofar.calibration.common.datacontainers import HolographyDataset
# READ doc/Holography_Data_Set.md! It contains the location from which the
# test data must be downloaded and this file generated.
SAMPLE_DATASET = '/data/test/HolographyObservation/CS001HBA0.hdf5'
class ProfilingTest(object):
def __init__(self):
super(ProfilingTest, self).__init__()
def pre_setup(self):
self._profiler = cProfile.Profile()
self._profiler.enable()
def setup(self):
pass
def tear_down(self):
pass
def post_teardown(self):
self._profiler.disable()
self.string_results = StringIO.StringIO()
sortby = 'cumulative'
self.result = pstats.Stats(self._profiler, stream=self.string_results).sort_stats(sortby)
self.result.print_stats()
def print_results(self):
print(self.string_results.getvalue())
def process(self):
pass
def run(self):
self.pre_setup()
self.setup()
self.process()
self.tear_down()
self.post_teardown()
return self
class profile_holography_dataset_read(ProfilingTest):
def process(self):
HolographyDataset.load_from_file(SAMPLE_DATASET)
class profile_averaging_per_sample(ProfilingTest):
def process(self):
processing.average_dataset_by_sample(SAMPLE_DATASET, 2)
if __name__ == '__main__':
profile_holography_dataset_read().run().print_results()
profile_averaging_per_sample().run().print_results()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment