import logging import os import h5py from .calibration_table import CalibrationTable from .holography_dataset_definitions import * from .holography_observation import HolographyObservation from .holography_specification import HolographySpecification logger = logging.getLogger(__file__) def bytestring_list_to_string(list): return [item.decode('utf-8') for item in list] def bytestring_to_string(bstring): if isinstance(bstring, bytearray): return bstring.decode('utf-8') else: return bstring def to_numpy_array_string(string_list): string_array = numpy.empty(len(string_list), dtype="<S10") for i, string_item in enumerate(string_list): string_array[i] = string_item return string_array class HolographyDataset(): def __init__(self): ''' Initialise the member variables with sane defaults. Simple types are initialised with None, dicts, lists and other fancy things are initialised with an assignment to their respective constructors. ''' # float, HDS version self.version = HOLOGRAPHY_DATA_SET_VERSION # list of ints self.rcu_list = list() # integer self.mode = None # list of strings self.sas_ids = list() # string self.target_station_name = None # list of 3 floats self.target_station_position = None # name of the source (str) self.source_name = None # position of the source array[ (RA, numpy.float64), (DEC, numpy.float64), (EPOCH, 'S10')] self.source_position = None # date time when the observation started in MJD in seconds (float) self.start_time = None # date time when the observation started in MJD in seconds (float) self.end_time = None # array(3,3), translation matrix for # (RA, DEC) <-> (l, m) conversion self.rotation_matrix = None # list of beamlet numbers self.beamlets = list() # The central beam or beamlet for each frequency self.central_beamlets = dict() self.calibration_tables = dict() self.derived_calibration_tables = dict() # coordinates of the antenna position in the target self.antenna_field_position = [] # list of reference station names self.reference_stations = list() # list of _frequencies self.frequencies = list() # dict(frequency: # array that contains the ra_dec of which a beam # points at given a frequency and a beamlet number # numpy.dtype([('RA', numpy.float64), # ('DEC',numpy.float64), # ('EPOCH', 'S10')]) self.ra_dec = dict() # dict(reference_station_name: # dict(frequency: # dict(beamlet_number: # array that contains the 4 polarization crosscorrelation for # the 4 polarizations, the l and m coordinates, and the timestamp # in mjd of the sample, and whether or not the data has been flagged # numpy.dtype([('XX', numpy.float64), # ('YY', numpy.float64), # ('XY', numpy.float64), # ('YX', numpy.float64), # ('l', numpy.float64), # ('m', numpy.float64), # ('t', numpy.float64), # ('flag', numpy.bool)] # ) self.data = dict() # a dict of dicts and eventually str, ndarray or that can be converted in a ndarray calling # numpy.array() self.derived_data = None @staticmethod def compare_dicts(dict1, dict2): ''' This function compares the contents of two dicts. It recursively iterates over all keys and embedded dicts. @return: False if a key in dict1 is not a key in dict2 or if values for a key differ in the dicts, True if the contents of both dicts match. @param dict1: A dict @param dict2: Another dict ''' result = True for key in dict1.keys(): if key in dict2.keys(): if isinstance(dict1[key], dict) and isinstance(dict2[key], dict): result = result and HolographyDataset.compare_dicts(dict1[key], dict2[key]) else: if isinstance(dict1[key], numpy.ndarray) and isinstance(dict2[key], numpy.ndarray): # Compares element by element the two arrays return numpy.array_equal(dict1[key], dict2[key]) else: return dict1[key] == dict2[key] else: return False return result def __eq__(self, hds=None): ''' This comparison operator compares the values of the relevant members of this class instance and the ones in the parameter hds. This allows to compare a Holography Data Set in memory with one that has been loaded from an HDF5 file. It is also possible to compare two HDSs that were imported from Holography Measurement Sets. @param hds: An instance of the HolographyDataset class @return: True if the values of the members that are relevant for HDS are identical, False otherwise. ''' equality = False if hds is not None and isinstance(hds, HolographyDataset) is True: equality = True for attribute_name, attribute_value in self.__dict__.items(): other_value = getattr(hds, attribute_name) this_equality = True try: if isinstance(attribute_value, numpy.ndarray) is True and isinstance( other_value, numpy.ndarray) is True: this_equality = numpy.array_equal(attribute_value, other_value) elif isinstance(attribute_value, dict) is True and isinstance(other_value, dict) is True: this_equality = HolographyDataset.compare_dicts(attribute_value, other_value) elif attribute_value != other_value: this_equality = False logger.error("values for field \"%s\" differs", attribute_name) except Exception as e: logger.warning("Cannot compare HDS values!", attribute_name, type(attribute_value), e) return False try: equality = equality and this_equality except Exception as e: logger.warning( "Comparison of two values resulted in something that is neither True nor False!", attribute_name, type(attribute_value), type(other_value)) return False return equality def find_central_beamlets(self, source, ra_dec, frequencies, beamlets): ''' This function finds the central beamlet of a target station for every frequency. @param source: A dict containing right ascension (source["RA"]) and declination (source["DEC"]) of the source. @param ra_dec: A dict[frequency][beamlet] that contains the right ascension (ra_dec[frequency][beamlet][0]) and declination (ra_dec[frequency][beamlet][1]) for every beamlet. @param frequencies: A dict that contains the _frequencies at which the holography observation was performed. @param beamlets: A list of beamlet numbers. ''' logger.debug("Finding the central beamlets...") source_ra = source["RA"] source_dec = source["DEC"] # Store each pseudo distance in a dict of dicts: pd[frequency][beamlet] pseudo_distance = dict() # Iterate over all _frequencies... for frequency in frequencies: frequency_string = str(frequency) pseudo_distance[frequency_string] = dict() # and iterate over the beamlets. for beamlet in beamlets: beamlet_string = str(beamlet) ra = ra_dec[frequency_string][beamlet_string]['RA'] dec = ra_dec[frequency_string][beamlet_string]['DEC'] # calculate a pseudo distance that is an indicator if this # beamlet is closer to the source than the ones before. # Note that I am too lazy to calculate the spherical distance. # But keep in mind that this pseudo distance like its cousin # the geometrical distance are not really cutting it since RA # and DEC are coordinates of a spherical coordinate system. # But here the pseudo distance is good enough. pseudo_distance[frequency_string][beamlet_string] = abs(ra - source_ra) + abs( dec - source_dec) # OK. Done with all the iteration business. Now check if across all # _frequencies the same beamlet is the central one. It is allowed that # the central beamlets are not always the same but it would be better # and let the astronomers sleep better if it would always be the same. # And also check if there is actually only one central beamlet per # frequency. central_beamlet = dict() for (frequency_string, beamlets) in pseudo_distance.items(): values = list(beamlets.values()) keys = list(beamlets.keys()) minimal_distance = min(values) # If I find a second beamlet that is 'closest' to the source for the # same frequency then that is a problem. The outset was that there # is only a single central beamlet and not a bunch of 'em. if values.count(minimal_distance) == 1: # All is good. Only one central beamlet. central_beamlet[frequency_string] = keys[values.index(minimal_distance)] else: text = "Found %d beamlets that have the same distance from the source position. Therefore a unique central beamlet does not exist." % ( values.count(minimal_distance)) logger.error(text) raise ValueError(text) # Now check if the central beamlet is the same across all _frequencies. # I do that by creating a set from the central beamlets of all stations. # If there is only one central beamlet for all _frequencies, the size of # the set will be 1. central_beamlet_set = set(central_beamlet.values()) if len(central_beamlet_set) == 1: logger.debug( "All is good, unicorns everywhere, there is only one central beamlet \"%s\" for all _frequencies.", central_beamlet_set) else: logger.warning("Multiple central beamlets have been identified: ", central_beamlet) return central_beamlet def load_from_beam_specification_and_ms(self, station_name, list_of_hbs_ms_tuples): """ Loads the dataset from the specification files and the measurements for the given station name :param station_name: target station name :param hb_specifications: a list containing (hbs, ms) per frequency """ logger.info("Creating a holography data set for station \"%s\"...", station_name) try: logger.debug('collecting preliminary information') self.__collect_preliminary_information(station_name, list_of_hbs_ms_tuples) logger.debug('collected preliminary information') logger.debug('reading data') self.__read_data(station_name, list_of_hbs_ms_tuples) logger.debug('read data') self.central_beamlets = self.find_central_beamlets(self.source_position, self.ra_dec, self.frequencies, self.beamlets) logger.info("Creation of a holography data set for station \"%s\" done.", station_name) except Exception as e: logger.exception("Error creating dataset for station \"%s\": %s", station_name, e) raise e def __read_data(self, station_name, list_of_hbs_ms_tuples): """ :param station_name: :param list_of_hbs_ms_tuples: :type list_of_hbs_ms_tuples: list[(HolographySpecification, HolographyObservation)] :return: """ self.data = dict() for hbs, ho in list_of_hbs_ms_tuples: if station_name in hbs.target_station_names: frequency = ho.frequency frequency_string = str(frequency) for beamlet in self.beamlets: beamlet_string = str(beamlet) reference_station_names, cross_correlation = \ ho.ms_for_a_given_beamlet_number[ beamlet].read_cross_correlation_time_flags_lm_per_station_name( station_name, self.reference_stations, self.ra_dec[frequency_string][beamlet_string], self.rotation_matrix) for reference_station_index, reference_station in \ enumerate(reference_station_names): if reference_station not in self.data: self.data[reference_station] = dict() if frequency_string not in self.data[reference_station]: self.data[reference_station][frequency_string] = dict() self.data[reference_station][frequency_string][beamlet_string] = \ cross_correlation[reference_station_index, :] def __collect_preliminary_information(self, station_name, list_of_hbs_ho_tuples): """ This routines reads both the holography beam specifications files and the holography observation to gather the list of rcus, the mode, the target station name and position, the source name and position, the start and the end time, the rotation matrix to convert from ra and dec to l and m, the antenna field positions, the list of the reference stations, the _frequencies, the ra and dec at which the beams point at. All this data is essential to interpret the recorded holography beams cross correlations :param list_of_hbs_ho_tuples: a list containing (hbs, ho) per frequency :type list_of_hbs_ho_tuples: list[(HolographySpecification, HolographyObservation)] """ mode = set() source_name = set() source_position = set() target_stations = set() reference_stations = set() beamlets = set() virtual_pointing = dict() frequencies = set() sas_ids = set() rcu_list = set() start_mjd = None end_mjd = None used_antennas = set() for hbs, ho in list_of_hbs_ho_tuples: target_stations.update(hbs.target_station_names) if station_name in hbs.target_station_names: beam_specifications = hbs.get_beam_specifications_per_station_name(station_name) for beam_specification in beam_specifications: rcu_list.update(beam_specification.rcus_involved) used_antennas.update(beam_specification.antennas_used) mode.add(beam_specification.rcus_mode) source_name.add(ho.source_name) source_position.add( (beam_specification.station_pointing['ra'], beam_specification.station_pointing['dec'], beam_specification.station_pointing['coordinate_system'] )) if start_mjd is None or start_mjd > ho.start_mjd: start_mjd = ho.start_mjd if end_mjd is None or end_mjd < ho.end_mjd: end_mjd = ho.end_mjd frequencies.add(ho.frequency) sas_ids.add(ho.sas_id) self.target_station_name = station_name reference_stations.update(hbs.reference_station_names) try: single_beamlet = int(beam_specification.beamlets) except ValueError as e: logger.exception('Target station specification incorrect') raise e beamlets.add(single_beamlet) virtual_pointing[(ho.frequency, single_beamlet)] = \ (beam_specification.virtual_pointing['ra'], beam_specification.virtual_pointing['dec'], beam_specification.virtual_pointing['coordinate_system']) else: continue self.frequencies = sorted(frequencies) self.beamlets = sorted(beamlets) self.start_time = start_mjd self.end_time = end_mjd self.sas_ids = list(sas_ids) self.reference_stations = list(reference_stations) self.rcu_list = list(rcu_list) self.ra_dec = dict() for frequency in self.frequencies: frequency_string = str(frequency) if frequency not in self.ra_dec: self.ra_dec[frequency_string] = dict() for beamlet in self.beamlets: beamlet_string = str(beamlet) self.ra_dec[frequency_string][beamlet_string] = numpy.array( virtual_pointing[(frequency, beamlet)], dtype=HDS_coordinate_type) # reads the target station position and the coordinate of its axes # and does this only once since the coordinate will not change first_holography_observation = list_of_hbs_ho_tuples[0][1] first_ms = list(first_holography_observation.ms_for_a_given_beamlet_number.values())[0] station_position, tile_offset, axes_coordinates = first_ms. \ get_station_position_tile_offsets_and_axes_coordinate_for_station_name( station_name) logger.debug('selecting used antenna ids %s', used_antennas) self.antenna_field_position = [list(station_position - antenna_offset) for antenna_id, antenna_offset in enumerate(tile_offset) if antenna_id in used_antennas] self.target_station_position = list(station_position) self.rotation_matrix = axes_coordinates if station_name not in target_stations: logger.error('Station %s was not involved in the observation.' ' The target stations for this observation are %s', station_name, target_stations) raise Exception('Station %s was not involved in the observation.' % station_name, ) if len(mode) == 1: self.mode = mode.pop() else: raise ValueError('Multiple RCUs mode are not supported') if len(source_position) == 1: self.source_position = numpy.array(source_position.pop(), dtype=HDS_coordinate_type) else: logger.error('Multiple source positions are not supported: %s', source_position) raise ValueError('Multiple source positions are not supported') if len(source_name) == 1: self.source_name = source_name.pop() else: raise ValueError('Multiple source name are not supported') @staticmethod def print_info(hds=None, text=None): ''' Print out the values of an HDS. @param hds: The HDS of which values shall be printed. @param text: An additional text string that will be printed alongside the values in the HDS. ''' if text is not None and isinstance(text, str): logger.info("%s", text) if hds is not None and isinstance(hds, HolographyDataset) is True: logger.info("Version = %s", hds.version) logger.info("Mode = %s", hds.mode) logger.info("RCU list = ", hds.rcu_list) logger.info("SAS IDs = %s", hds.sas_ids) logger.info("Target station name = %s", hds.target_station_name) logger.info("Target station position = %s", hds.target_station_position) logger.info("Source name = %s", hds.source_name) logger.info("Source position = %s", hds.source_position) logger.info("Central beamlets = %s", hds.central_beamlets) logger.info("Start time = %s", hds.start_time) logger.info("End time = %s", hds.end_time) logger.info("Rotation matrix = %s", hds.rotation_matrix) logger.info("Antenna field position = %s", hds.antenna_field_position) logger.info("Reference stations = %s", hds.reference_stations) logger.info("Frequencies = %s", hds.frequencies) logger.info("Beamlets = %s", hds.beamlets) logger.info("RA DEC = %s", hds.ra_dec) logger.info("Data = %s", hds.data) else: logger.warning( "The object passed is not a HolographyDataset instance. Cannot print any data.") @staticmethod def _read_grouped_data(h5file, uri): """ Read the data in a nested hierarchy starting from the address uri into a python dict :param h5file: input HDF5 file :type h5file: h5py.File :param uri: starting point address :type uri: str :return: a dict encoding the structure """ starting_leaf = h5file[uri] result = dict() # Read the attributes in the hdf5 dataset for attribute_key, attribute_value in starting_leaf.attrs.items(): if 'ATTRIBUTES' not in result: result['ATTRIBUTES'] = dict() result['ATTRIBUTES'][attribute_key] = attribute_value for key, value in starting_leaf.items(): if isinstance(value, h5py.Group) is True: result[key] = HolographyDataset._read_grouped_data(h5file, value.name) else: try: if numpy.issubdtype(value.dtype, numpy.string_): result[key] = str(numpy.array(value).astype(str)) else: result[key] = numpy.array(value) except ValueError as e: logger.exception('Cannot interpret %s a a numpy array: %s', type(value), e) raise e return result @staticmethod def _store_grouped_data(h5file, uri, data_to_store): """ Store the data in a nested hierarchy starting from the address uri into the HDS :param h5file: input HDF5 file :type h5file: h5py.File :param uri: starting point address :type uri: str :param data_to_store: dict that contains the data to store :type data_to_store: dict :return: a dict encoding the structure """ if uri not in h5file: starting_leaf = h5file.create_group(uri) else: starting_leaf = h5file[uri] # Store the attributes in the hdf5 dataset attributes = data_to_store.get('ATTRIBUTES', dict()) for attribute_key, attribute_value in attributes.items(): starting_leaf.attrs[attribute_key] = attribute_value for key, value in data_to_store.items(): if key is 'ATTRIBUTES': continue if isinstance(value, dict) is True: HolographyDataset._store_grouped_data(h5file, '/'.join([uri, key]), value) else: try: if isinstance(value, str): starting_leaf[key] = numpy.string_(value) else: starting_leaf[key] = value except ValueError as e: logger.exception('Cannot interpret %s a a numpy array: %s', type(value), e) raise e @staticmethod def load_from_file(path): """ Read a holography dataset from an HDF5 file and returns a HolographyDataset class :param path: path to file :return: the read dataset """ f = None try: result, f = HolographyDataset.open_holography_file(path) result.data = dict() for reference_station in f["CROSSCORRELATION"].keys(): for frequency in f["CROSSCORRELATION"][reference_station].keys(): for beamlet in f["CROSSCORRELATION"][reference_station][frequency].keys(): if reference_station not in result.data: result.data[reference_station] = dict() if frequency not in result.data[reference_station]: result.data[reference_station][frequency] = dict() result.data[reference_station][frequency][beamlet] = numpy.array( f["CROSSCORRELATION"][reference_station][frequency][beamlet]) if '/DERIVED_DATA' in f: result.derived_data = HolographyDataset._read_grouped_data(f, '/DERIVED_DATA') except Exception as e: logger.exception( "Cannot read the Holography Data Set data from the HDF5 file \"%s\"." " This is the exception that was thrown: %s", path, e) raise e finally: if f is not None: f.close() return result @staticmethod def open_holography_file(path: str): """ Opens the Holography HDF file for access and returns the Holography Dataset and it's file descriptor :param path: path to file :return: the read dataset :rtype: List[HolographyDataset, h5py.File] """ f = None if not os.path.exists(path): raise FileNotFoundError(path) try: f = h5py.File(path, "r") result = HolographyDataset() result.version = f.attrs[HDS_VERSION] result.mode = f.attrs[HDS_MODE] result.rcu_list = list(f.attrs[HDS_RCU_LIST]) result.sas_ids = list(f.attrs[HDS_SAS_ID]) result.target_station_name = f.attrs[HDS_TARGET_STATION_NAME] result.target_station_position = list(f.attrs[HDS_TARGET_STATION_POSITION]) result.source_name = f.attrs[HDS_SOURCE_NAME] result.source_position = numpy.array(f.attrs[HDS_SOURCE_POSITION]) (result.start_time, result.end_time) = f.attrs[HDS_OBSERVATION_TIME] result.rotation_matrix = f.attrs[HDS_ROTATION_MATRIX] result.beamlets = list(f.attrs[HDS_BEAMLETS]) result.antenna_field_position = f.attrs[HDS_ANTENNA_FIELD_POSITION].tolist() result.reference_stations = bytestring_list_to_string(list(f[HDS_REFERENCE_STATION])) result.frequencies = list(f[HDS_FREQUENCY]) if HDS_CALIBRATION_TABLES in f: for mode in f[HDS_CALIBRATION_TABLES]: uri = '/%s/%s' % (HDS_CALIBRATION_TABLES, mode) result.calibration_tables[mode] = \ CalibrationTable.load_from_hdf(file_descriptor=f, uri=uri) if HDS_DERIVED_CAL_TABLES in f: for mode in f[HDS_DERIVED_CAL_TABLES]: uri = '/%s/%s' % (HDS_DERIVED_CAL_TABLES, mode) result.calibration_tables[mode] = \ CalibrationTable.load_from_hdf(file_descriptor=f, uri=uri) result.ra_dec = dict() for frequency in f["RA_DEC"].keys(): for beamlet in f["RA_DEC"][frequency].keys(): if frequency not in result.ra_dec: result.ra_dec[frequency] = dict() result.ra_dec[frequency][beamlet] = numpy.array(f["RA_DEC"][frequency][beamlet]) beamlets = set() for reference_station in f["CROSSCORRELATION"].keys(): for frequency in f["CROSSCORRELATION"][reference_station].keys(): for beamlet in f["CROSSCORRELATION"][reference_station][frequency].keys(): beamlets.add(int(beamlet)) result.data = f['CROSSCORRELATION'] result.central_beamlets = HolographyDataset._read_grouped_data(f, HDS_CENTRAL_BEAMLETS) if '/DERIVED_DATA' in f: result.derived_data = f['/DERIVED_DATA'] except Exception as e: logger.exception( "Cannot read the Holography Data Set data from the HDF5 file \"%s\"." " This is the exception that was thrown: %s", path, e) raise e return result, f def insert_calibration_table(self, caltable: CalibrationTable): mode = caltable.observation_mode self.calibration_tables[mode] = caltable def store_to_file(self, path): """ Stores the holography dataset at the given path :param path: path to file """ f = None # Prepare the HDF5 data structs. try: f = h5py.File(path, "w") # Create the ATTRS # Start with the version information f.attrs[HDS_VERSION] = HOLOGRAPHY_DATA_SET_VERSION # RCU list f.attrs[HDS_RCU_LIST] = numpy.array(self.rcu_list, dtype=int) # RCU mode f.attrs[HDS_MODE] = self.mode # Moan... Again this needs to be stored like that. f.attrs[HDS_SAS_ID] = numpy.array(self.sas_ids, dtype=h5py.special_dtype(vlen=str)) f.attrs[HDS_TARGET_STATION_NAME] = self.target_station_name f.attrs[HDS_TARGET_STATION_POSITION] = self.target_station_position f.attrs[HDS_SOURCE_NAME] = self.source_name f.attrs[HDS_SOURCE_POSITION] = self.source_position f.attrs[HDS_OBSERVATION_TIME] = numpy.array([self.start_time, self.end_time]) f.attrs[HDS_ROTATION_MATRIX] = self.rotation_matrix f.attrs[HDS_ANTENNA_FIELD_POSITION] = self.antenna_field_position f.attrs[HDS_BEAMLETS] = self.beamlets # Store the list of reference stations and _frequencies. We just # want to keep 'em around for quick reference. f[HDS_REFERENCE_STATION] = to_numpy_array_string(self.reference_stations) f[HDS_FREQUENCY] = self.frequencies f.create_group(HDS_CALIBRATION_TABLES) for mode in self.calibration_tables: self.calibration_tables[mode].store_to_hdf(f, '/{}/{}'.format( HDS_CALIBRATION_TABLES, mode)) for mode in self.derived_calibration_tables: self.derived_calibration_tables[mode].store_to_hdf(f, '/{}/{}'.format( HDS_DERIVED_CAL_TABLES, mode)) HolographyDataset._store_grouped_data(h5file=f, uri='/RA_DEC', data_to_store=self.ra_dec) # We create groups for the reference stations and the _frequencies. # Then we store the data samples [XX, YY, XY, YX, t, l, m, flag] # in an array. The reference station name, the frequency and the # beamlet number (index of the data sample array) allow random # access of the data. HolographyDataset._store_grouped_data(h5file=f, uri='/CROSSCORRELATION', data_to_store=self.data) HolographyDataset._store_grouped_data(h5file=f, uri=HDS_CENTRAL_BEAMLETS, data_to_store=self.central_beamlets) if self.derived_data: self._store_grouped_data(h5file=f, uri='/DERIVED_DATA', data_to_store=self.derived_data) except Exception as e: logger.exception( "Cannot write the Holography Data Set data to the HDF5 file \"%s\"." " This is the exception that was thrown: %s", path, e) raise e finally: if f is not None: f.close()