Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
holography_dataset.py 13.80 KiB
from .holography_specification import HolographySpecification
from lofar.calibration.common.datacontainers.holography_observation import HolographyObservation
import logging
import numpy
from collections import defaultdict
import h5py

logger = logging.getLogger(__name__)

class HolographyDataset():
    HOLOGRAPHY_DATA_SET_VERSION = 1.0

    def __init__(self):
        self.rcu_list = set()  # array of ints
        self.mode = None  # integer
        self.sas_ids = set()  # array of strings
        self.target_station_name = None  # string
        self.target_station_position = None  # list of 3 floats
        self.source_name = None  # string
        self.source_position = None  # list of 3 floats
        self.start_time = None  # date time when the observation started in MJD in seconds (float)
        self.end_time = None  # date time when the observation started in MJD in seconds (float)
        self.rotation_matrix = None  # array(3,3), translation matrix for
        # (RA, DEC) <-> (l, m) conversion

        self.beamlets = None # list of beamlet numbers
        self.antenna_field_position = []  # coordinates of the antenna position in the target
        # station
        self.reference_stations = set()  # set of reference station names
        self.frequencies = list()  # list of frequencies
        self.ra_dec = None # array(Nfrequency, Nbeamlets, 2) contains the ra_dec of which a beam
        # points at given a frequency and a beamlet number
        self.data = None # array(NreferenceStations, Nfrequencies, Nbeamlets) that contains the
        # 4 polarization crosscorrelation for the 4 polarizations, the l and m coordinates, and
        # the timestamp in mjd of the sample

    def load_from_beam_specification_and_ms(self, station_name, list_of_hbs_ms_tuples):
        """
        Loads the dataset from the specification files and the measurements for the given station
        name
        :param station_name: target station name
        :param hb_specifications: a list containing (hbs, ms) per frequency

        """
        self.__collect_preliminary_information(station_name, list_of_hbs_ms_tuples)
        self.__read_data(station_name, list_of_hbs_ms_tuples)


    def __read_data(self, station_name, list_of_hbs_ms_tuples):
        """

        :param station_name:
        :param list_of_hbs_ms_tuples:
        :type list_of_hbs_ms_tuples: list[(HolographySpecification, HolographyObservation)]
        :return:
        """

        n_frequencies = len(self.frequencies)
        n_beamlets = len(self.beamlets)
        n_reference_stations = len(self.reference_stations)

        data = defaultdict()
        for hbs, ho in list_of_hbs_ms_tuples:
            if station_name in hbs.target_station_names:
                frequency = ho.frequency



                for beamlet in self.beamlets:
                    cross_correlation, timestamp, flags = ho.ms_for_a_given_beamlet_number[beamlet].\
                        read_cross_correlation_time_flags_per_station_names(station_name,
                                                                 self.reference_stations)

                    #for reference_station in enumerate(self.reference_stations):
                    #data[(reference_station, )]

    def __collect_preliminary_information(self, station_name, list_of_hbs_ho_tuples):
        """
        This routines reads both the holography beam specifications files and the holography
        observation to gather the list of rcus, the mode, the target station name and position,
        the source name and position, the start and the end time, the rotation matrix to convert
        from ra and dec to l and m, the antenna field positions, the list of the reference
        stations, the frequencies, the ra and dec at which the beams point at.

        All this data is essential to interpret the recorded holography beams cross correlations
        :param list_of_hbs_ho_tuples: a list containing (hbs, ho) per frequency
        :type list_of_hbs_ho_tuples: list[(HolographySpecification, HolographyObservation)]
        """
        mode = set()
        source_name = set()
        source_position = set()
        target_stations = set()
        reference_stations = set()
        beamlets = set()
        virtual_pointing = dict()
        frequencies = set()
        start_mjd = None
        end_mjd = None
        for hbs, ho in list_of_hbs_ho_tuples:

            target_stations.update(hbs.target_station_names)

            if station_name in hbs.target_station_names:
                beam_specifications = hbs.get_beam_specifications_per_station_name(station_name)
                for beam_specification in beam_specifications:
                    self.rcu_list.update(beam_specification.rcus_involved)

                    mode.add(beam_specification.rcus_mode)

                    source_name.add(ho.source_name)
                    source_position.add(
                        (beam_specification.station_pointing['ra'],
                         beam_specification.station_pointing['dec'],
                         beam_specification.station_pointing['coordinate_system']
                        ))
                    if start_mjd is None or start_mjd > ho.start_mjd:
                        start_mjd = ho.start_mjd

                    if end_mjd is None or end_mjd < ho.end_mjd:
                        end_mjd = ho.end_mjd

                    frequencies.add(ho.frequency)

                    self.sas_ids.add(ho.sas_id)

                    self.target_station_name = station_name
                    reference_stations.update(hbs.reference_station_names)
                    try:
                        single_beamlet = int(beam_specification.beamlets)
                    except ValueError as e:
                        logger.exception('Target station specification incorrect')
                        raise e

                    beamlets.add(single_beamlet)

                    virtual_pointing [(ho.frequency, single_beamlet)] =\
                        (beam_specification.virtual_pointing['ra'],
                         beam_specification.virtual_pointing['dec'])
            else:
                continue
        self.frequencies = sorted(frequencies)
        self.beamlets = sorted(beamlets)
        self.start_time = start_mjd
        self.end_time = end_mjd

        self.reference_stations = list(reference_stations)

        n_frequencies = len(self.frequencies)
        n_beamlets = len(beamlets)
        ra_dec = numpy.zeros((n_frequencies, n_beamlets, 2))
        for frequency_index, frequency in enumerate(self.frequencies):
            for beamlet_index, beamlet in enumerate(self.beamlets):
                ra_dec[frequency_index, beamlet_index, :] = \
                    virtual_pointing[(frequency, beamlet)]

        # reads the target station position and the coordinate of its axes
        # and does this only once since the coordinate will not change
        first_holography_observation = list_of_hbs_ho_tuples[0][1]
        first_ms = first_holography_observation.ms_for_a_given_beamlet_number.values()[0]
        station_position, tile_offset, axes_coordinates  = first_ms.\
            get_station_position_tile_offsets_and_axes_coordinate_for_station_name(
            station_name)

        self.antenna_field_position = [list(station_position - antenna_offset)
                                       for antenna_offset in tile_offset]
        self.target_station_position = list(station_position)
        self.rotation_matrix = axes_coordinates

        if station_name not in target_stations:
            logger.error('Station %s was not involved in the observation.'
                         ' The target stations for this observation are %s',
                         station_name, target_stations)
            raise Exception('Station %s was not involved in the observation.'
                            % station_name,)

        if len(mode) > 1:
            raise ValueError('Multiple RCUs mode are not supported')
        else:
            self.mode = mode.pop()

        if len(source_position) > 1:
            logger.error('Multiple source positions are not supported: %s', source_position)
            raise ValueError('Multiple source positions are not supported')
        else:
            self.source_position = source_position.pop()

        if len(source_name) > 1:
            raise ValueError('Multiple source name are not supported')
        else:
            self.source_name = source_name.pop()

    @staticmethod
    def load_from_file(path):
        """
        It reads a holography dataset from an HDF5 file and returns a
        HolographyDataset class
        :param path: path to file
        :return: the read dataset
        """

        f = h5py.File(path, "r")

        result = HolographyDataset()
        result.version = f.attr["Version"]

        result.mode= f.attr["mode"]

        result.rcu_list = f.attr["RCU list"]

        result.sas_ids = f.attr["SAS id"]

        result.target_station_name = f.attr["Target station name"]

        result.target_station_position = f.attr["Target station position"]

        result.source_name = f.attr["Source name"]

        result.source_position = f.attr["Source position"]
        (result.start_time, result.end_time) = f.attr["Observation time"]

        result.rotation_matrix = f.attr["Rotation matrix"]

        result.antenna_field_position = f.attr["Antenna field position"]

        result.reference_stations = f["Reference station"]

        result.frequencies = f["frequency"]

        result.ra_dec = f["RA DEC"]

        data = f["data"]
        for reference_station in result.reference_stations:
            for frequency in result.frequencies:
                for beamlet in self.beamlets:
                    result.data[reference_station][frequency][beamlet]["XX"] = data[reference_station][frequency][beamlet]["XX"]
                    result.data[reference_station][frequency][beamlet]["XY"] = data[reference_station][frequency][beamlet]["XY"]
                    result.data[reference_station][frequency][beamlet]["YX"] = data[reference_station][frequency][beamlet]["YX"]
                    result.data[reference_station][frequency][beamlet]["YY"] = data[reference_station][frequency][beamlet]["YY"]
                    result.data[reference_station][frequency][beamlet]["t"] = data[reference_station][frequency][beamlet]["t"]
                    result.data[reference_station][frequency][beamlet]["l"] = data[reference_station][frequency][beamlet]["l"]
                    result.data[reference_station][frequency][beamlet]["m"] = data[reference_station][frequency][beamlet]["m"]
                    result.data[reference_station][frequency][beamlet]["flag"] = data[reference_station][frequency][beamlet]["flag"]
        result.data = data

        f.close()

        return result

    def store_to_file(self, path):
        """
        Stores the holography dataset at the given path
        :param path: path to file
        """

        # Prepare the HDF5 data structs.
        f = h5py.File(path, "w")

        # Create the ATTRS
        # Start with the version information
        f.attrs["Version"] = self.HOLOGRAPHY_DATA_SET_VERSION

        # Create a numpy array that contains the RCU list as unicode strings.
        # The type has to be str because nump/h5py cannot handle unicode string
        # arrays.
        f.attr["RCU list"] = numpy.array(self.rcu_list, dtype = h5py.special_dtype(vlen = str))

        # RCU mode
        f.attr["mode"] = self.mode

        # Moan...  Again this needs to be stored like that.
        f.attr["SAS id"] = numpy.array(self.sas_ids, dtype = h5py.special_dtype(vlen = str))

        f.attr["Target station name"] = self.target_station_name

        f.attr["Target station position"] = self.target_station_position

        f.attr["Source name"] = self.source_name

        f.attr["Source position"] = self.source_position

        f.attr["Observation time"] = numpy.array([self.start_time, self.end_time])

        f.attr["Rotation matrix"] = self.rotation_matrix

        f.attr["Antenna field position"] = self.antenna_field_position


        # Data tables
        f["Reference station"] = numpy.array(self.reference_stations, dtype = h5py.special_dtype(vlen = str))

        f["frequency"] = self.frequencies

        f["RA DEC"] = self.ra_dec

        sample_type = numpy.dtype([
            ('XX', numpy.complex64),
            ('XY', numpy.complex64),
            ('YX', numpy.complex64),
            ('YY', numpy.complex64),
            ('t', numpy.float64),
            ('l', numpy.float64),
            ('m', numpy.float64),
            ('flag', numpy.bool)])
        dataset = f.create_dataset("data", (len(self.reference_stations), len(self.frequencies), len(self.beamlets)), dtype = h5py.special_dtype(vlen = sample_type))
        for reference_station in self.reference_stations:
            for frequency in self.frequencies:
                for beamlet in self.beamlets:
                    dataset[reference_station][frequency][beamlet]["XX"] = self.data[reference_station][frequency][beamlet]["XX"]
                    dataset[reference_station][frequency][beamlet]["XY"] = self.data[reference_station][frequency][beamlet]["XY"]
                    dataset[reference_station][frequency][beamlet]["YX"] = self.data[reference_station][frequency][beamlet]["YX"]
                    dataset[reference_station][frequency][beamlet]["YY"] = self.data[reference_station][frequency][beamlet]["YY"]
                    dataset[reference_station][frequency][beamlet]["t"] = self.data[reference_station][frequency][beamlet]["t"]
                    dataset[reference_station][frequency][beamlet]["l"] = self.data[reference_station][frequency][beamlet]["l"]
                    dataset[reference_station][frequency][beamlet]["m"] = self.data[reference_station][frequency][beamlet]["m"]
                    dataset[reference_station][frequency][beamlet]["flag"] = self.data[reference_station][frequency][beamlet]["flag"]

        f.close()