from .holography_specification import HolographySpecification from lofar.calibration.common.datacontainers.holography_observation import HolographyObservation import logging logger = logging.getLogger(__name__) class HolographyDataset(): def __init__(self): self.rcu_list = set() # array of ints self.mode = None # integer self.sas_ids = set() # array of strings self.target_station_name = None # string self.target_station_position = None # list of 3 floats self.source_name = None # string self.source_position = None # list of 3 floats self.start_time = None # date time when the observation started in MJD in seconds (float) self.end_time = None # date time when the observation started in MJD in seconds (float) self.rotation_matrix = None # array(3,3), translation matrix for # (RA, DEC) <-> (l, m) conversion self.antenna_field_position = [] # coordinates of the antenna position in the target # station self.reference_stations = set() # set of reference station names self.frequencies = set() # set of frequencies self.ra_dec = None # array(Nfrequency, Nbeamlets, 2) contains the ra_dec of which a beam # points at given a frequency and a beamlet number self.data = None # array(NreferenceStations, Nfrequencies, Nbeamlets) that contains the # 4 polarization crosscorrelation for the 4 polarizations, the l and m coordinates, and # the timestamp in mjd of the sample def load_from_beam_specification_and_ms(self, station_name, list_of_hbs_ms_tuples): """ Loads the dataset from the specification files and the measurements for the given station name :param station_name: target station name :param hb_specifications: a list containing (hbs, ms) per frequency """ self.__collect_preliminary_information(station_name, list_of_hbs_ms_tuples) def __collect_preliminary_information(self, station_name, list_of_hbs_ho_tuples): """ This routines reads both the holography beam specifications files and the holography observation to gather the list of rcus, the mode, the target station name and position, the source name and position, the start and the end time, the rotation matrix to convert from ra and dec to l and m, the antenna field positions, the list of the reference stations, the frequencies, the ra and dec at which the beams point at. All this data is essential to interpret the recorded holography beams cross correlations :param list_of_hbs_ho_tuples: a list containing (hbs, ho) per frequency :type list_of_hbs_ho_tuples: list[(HolographySpecification, HolographyObservation)] """ mode = set() source_name = set() source_position = set() target_stations = set() for hbs, ho in list_of_hbs_ho_tuples: target_stations.update(hbs.target_station_names) if station_name in hbs.target_station_names: beam_specifications = hbs.get_beam_specifications_per_station_name(station_name) for beam_specification in beam_specifications: self.rcu_list.update(beam_specification.rcus_involved) mode.add(beam_specification.rcus_mode) source_name.add(ho.source_name) source_position.add( (beam_specification.station_pointing['ra'], beam_specification.station_pointing['dec'], beam_specification.station_pointing['coordinate_system'] )) self.start_time = ho.start_mjd self.end_time = ho.end_mjd self.frequencies.add(ho.frequency) self.sas_ids.add(ho.sas_id) self.target_station_name = station_name self.reference_stations.update(hbs.reference_station_names) else: continue # reads the target station position and the coordinate of its axes # and does this only once since the coordinate will not change first_holography_observation = list_of_hbs_ho_tuples[0][1] first_ms = first_holography_observation.ms_for_a_given_beamlet_number.values()[0] station_position, tile_offset, axes_coordinates = first_ms.\ get_station_position_tile_offsets_and_axes_coordinate_for_station_name( station_name) self.antenna_field_position = [list(station_position - antenna_offset) for antenna_offset in tile_offset] self.target_station_position = list(station_position) self.rotation_matrix = axes_coordinates if station_name not in target_stations: logger.error('Station %s was not involved in the observation.' ' The target stations for this observation are %s', station_name, target_stations) raise Exception('Station %s was not involved in the observation.' % station_name,) if len(mode) > 1: raise ValueError('Multiple RCUs mode are not supported') else: self.mode = mode.pop() if len(source_position) > 1: logger.error('Multiple source positions are not supported: %s', source_position) raise ValueError('Multiple source positions are not supported') else: self.source_position = source_position.pop() if len(source_name) > 1: raise ValueError('Multiple source name are not supported') else: self.source_name = source_name.pop() @staticmethod def load_from_file(path): """ It reads the dataset from file and returns a HolographyDataset class :param path: path to file :return: the read dataset """ result = HolographyDataset() raise NotImplementedError def store_to_file(self, path): """ Stores the holography dataset at the given path :param path: path to file """ raise NotImplementedError