diff --git a/CAL/CalibrationCommon/lib/coordinates.py b/CAL/CalibrationCommon/lib/coordinates.py index 58fe75967a624c6bee5c4c284e2d8bdb470a36c8..bd3c452a19389ecb42357962fc9ea5457924f36d 100644 --- a/CAL/CalibrationCommon/lib/coordinates.py +++ b/CAL/CalibrationCommon/lib/coordinates.py @@ -6,6 +6,7 @@ import astropy.units as u # Always try to download the most recent IERS tables. from astropy.utils.data import download_file from astropy.utils import iers + iers.IERS.iers_table = iers.IERS_A.open(download_file(iers.IERS_A_URL, cache=True)) @@ -54,21 +55,18 @@ def lm_from_radec(ra, dec, ra0, dec0): ... ra0=Angle(2.0, u.rad), dec0=Angle(1.0, u.rad)) (-0.1234948364118721, -0.089406906258670149) ''' - cos_dec = cos(dec.rad) - sin_dec = sin(dec.rad) + cos_dec = cos(dec.rad) + sin_dec = sin(dec.rad) cos_dec0 = cos(dec0.rad) sin_dec0 = sin(dec0.rad) - sin_dra = sin(float(ra.rad - ra0.rad)) - cos_dra = cos(float(ra.rad - ra0.rad)) + sin_dra = sin(float(ra.rad - ra0.rad)) + cos_dra = cos(float(ra.rad - ra0.rad)) - l_rad = cos_dec*sin_dra - m_rad = sin_dec*cos_dec0 - cos_dec*sin_dec0*cos_dra + l_rad = cos_dec * sin_dra + m_rad = sin_dec * cos_dec0 - cos_dec * sin_dec0 * cos_dra return (l_rad, m_rad) - - - def radec_from_lm(l_rad, m_rad, ra0, dec0): r''' Calculate right ascension and declination given direction cosines @@ -111,17 +109,14 @@ def radec_from_lm(l_rad, m_rad, ra0, dec0): (<Angle 1.8 rad>, <Angle 0.9 rad>) ''' - n_rad = sqrt(1.0 - l_rad*l_rad - m_rad*m_rad) + n_rad = sqrt(1.0 - l_rad * l_rad - m_rad * m_rad) cos_dec0 = cos(dec0.rad) sin_dec0 = sin(dec0.rad) - ra_rad = ra0.rad + arctan2(l_rad, cos_dec0*n_rad - m_rad*sin_dec0) - dec_rad = arcsin(m_rad*cos_dec0 + sin_dec0*n_rad) + ra_rad = ra0.rad + arctan2(l_rad, cos_dec0 * n_rad - m_rad * sin_dec0) + dec_rad = arcsin(m_rad * cos_dec0 + sin_dec0 * n_rad) return (Angle(ra_rad, u.rad), Angle(dec_rad, u.rad)) - - - def icrs_from_itrs(unit_vector_itrs, obstime): r''' Convert a geocentric cartesian unit vector in the ITRS system into @@ -161,7 +156,7 @@ def icrs_from_itrs(unit_vector_itrs, obstime): (358.7928571, 89.91033405)> ''' x, y, z = array(unit_vector_itrs).T - c_itrs = SkyCoord(x, y, z, representation='cartesian',frame='itrs', + c_itrs = SkyCoord(x, y, z, representation='cartesian', frame='itrs', obstime=obstime, equinox='J2000') return c_itrs.icrs @@ -200,15 +195,13 @@ def itrs_from_icrs(icrs_position_rad, obstime): [ 1.70157684e-01, -4.68937638e-01, 8.66685557e-01]]) ''' - ra, dec = array(icrs_position_rad, dtype='float64').T*u.rad - icrs = SkyCoord(ra, dec,frame='icrs', + ra, dec = array(icrs_position_rad, dtype='float64').T * u.rad + icrs = SkyCoord(ra, dec, frame='icrs', obstime=obstime, equinox='J2000') itrs = icrs.itrs return array([itrs.x, itrs.y, itrs.z], dtype=float64).T - - def pqr_from_icrs(icrs_rad, obstime, pqr_to_itrs_matrix): r''' Compute geocentric station-local PQR coordinates of a certain ICRS @@ -261,8 +254,6 @@ def pqr_from_icrs(icrs_rad, obstime, pqr_to_itrs_matrix): return dot(pqr_to_itrs_matrix.T, itrs_from_icrs(icrs_rad, obstime).T).T.squeeze() - - def icrs_from_pqr(pqr, obstime, pqr_to_itrs_matrix): r''' Convert directions from the station-local PQR system into @@ -337,4 +328,3 @@ def icrs_from_pqr(pqr, obstime, pqr_to_itrs_matrix): level at which we see the difference. ''' return icrs_from_itrs(dot(pqr_to_itrs_matrix, pqr.T).T, obstime=obstime) - diff --git a/CAL/CalibrationCommon/lib/datacontainers/holography_dataset.py b/CAL/CalibrationCommon/lib/datacontainers/holography_dataset.py index 9881f1d0dd0b9105ff3674d2ca57285184f9333d..709a6ae34caa785feade27eea338c633a0889d2f 100644 --- a/CAL/CalibrationCommon/lib/datacontainers/holography_dataset.py +++ b/CAL/CalibrationCommon/lib/datacontainers/holography_dataset.py @@ -1,10 +1,13 @@ import logging -import h5py +import os +import h5py from lofar.calibration.common.datacontainers.holography_observation import HolographyObservation + from .holography_dataset_definitions import * from .holography_specification import HolographySpecification + logger = logging.getLogger(__file__) @@ -207,7 +210,8 @@ class HolographyDataset(): # the geometrical distance are not really cutting it since RA # and DEC are coordinates of a spherical coordinate system. # But here the pseudo distance is good enough. - pseudo_distance[frequency_string][beamlet_string] = abs(ra - source_ra) + abs(dec - source_dec) + pseudo_distance[frequency_string][beamlet_string] = abs(ra - source_ra) + abs( + dec - source_dec) # OK. Done with all the iteration business. Now check if across all # _frequencies the same beamlet is the central one. It is allowed that @@ -227,7 +231,8 @@ class HolographyDataset(): # All is good. Only one central beamlet. central_beamlet[frequency_string] = keys[values.index(minimal_distance)] else: - text = "Found %d beamlets that have the same distance from the source position. Therefore a unique central beamlet does not exist." % (values.count(minimal_distance)) + text = "Found %d beamlets that have the same distance from the source position. Therefore a unique central beamlet does not exist." % ( + values.count(minimal_distance)) logger.error(text) raise ValueError(text) @@ -237,7 +242,9 @@ class HolographyDataset(): # the set will be 1. central_beamlet_set = set(central_beamlet.values()) if len(central_beamlet_set) == 1: - logger.debug("All is good, unicorns everywhere, there is only one central beamlet \"%s\" for all _frequencies.", central_beamlet_set) + logger.debug( + "All is good, unicorns everywhere, there is only one central beamlet \"%s\" for all _frequencies.", + central_beamlet_set) else: logger.warning("Multiple central beamlets have been identified: ", central_beamlet) return central_beamlet @@ -252,13 +259,17 @@ class HolographyDataset(): """ logger.info("Creating a holography data set for station \"%s\"...", station_name) try: + logger.debug('collecting preliminary information') self.__collect_preliminary_information(station_name, list_of_hbs_ms_tuples) + logger.debug('collected preliminary information') + logger.debug('reading data') self.__read_data(station_name, list_of_hbs_ms_tuples) + logger.debug('read data') self.central_beamlets = self.find_central_beamlets(self.source_position, self.ra_dec, self.frequencies, self.beamlets) logger.info("Creation of a holography data set for station \"%s\" done.", station_name) except Exception as e: - logger.exception("Errore creating dataset for station \"%s\": %s", station_name, e) + logger.exception("Error creating dataset for station \"%s\": %s", station_name, e) raise e def __read_data(self, station_name, list_of_hbs_ms_tuples): @@ -315,7 +326,6 @@ class HolographyDataset(): target_stations = set() reference_stations = set() beamlets = set() - central_beamlet = None virtual_pointing = dict() frequencies = set() sas_ids = set() @@ -395,7 +405,7 @@ class HolographyDataset(): self.antenna_field_position = [list(station_position - antenna_offset) for antenna_offset in tile_offset] self.target_station_position = list(station_position) - self.rotation_matrix = axes_coordinates + self.rotation_matrix = axes_coordinates.T if station_name not in target_stations: logger.error('Station %s was not involved in the observation.' @@ -537,6 +547,10 @@ class HolographyDataset(): :return: the read dataset """ f = None + + if not os.path.exists(path): + raise FileNotFoundError(path) + try: f = h5py.File(path, "r") @@ -557,7 +571,6 @@ class HolographyDataset(): result.frequencies = list(f[HDS_FREQUENCY]) - result.ra_dec = dict() for frequency in f["RA_DEC"].keys(): for beamlet in f["RA_DEC"][frequency].keys(): diff --git a/CAL/CalibrationCommon/lib/datacontainers/holography_datatable.py b/CAL/CalibrationCommon/lib/datacontainers/holography_datatable.py index 0278ff0f7f3048b13eb2be2d203b6268a643d5e7..ca413462960b02346e6a31aee0ba3c47160b6028 100644 --- a/CAL/CalibrationCommon/lib/datacontainers/holography_datatable.py +++ b/CAL/CalibrationCommon/lib/datacontainers/holography_datatable.py @@ -58,7 +58,7 @@ class LazyH5Table(): :return: """ data_item = self.__get_data_set(key) - value = numpy.array(data_item) + value = data_item return value def __setitem__(self, key, value): diff --git a/CAL/CalibrationCommon/lib/datacontainers/holography_measurementset.py b/CAL/CalibrationCommon/lib/datacontainers/holography_measurementset.py index cd31cb991832e1609ea501899982543900488828..b390b8a56c16830b2f4c2e427be7fe0f8f2bf6f1 100644 --- a/CAL/CalibrationCommon/lib/datacontainers/holography_measurementset.py +++ b/CAL/CalibrationCommon/lib/datacontainers/holography_measurementset.py @@ -1,133 +1,194 @@ import os import re +from enum import IntEnum import numpy from astropy.time import Time from casacore.tables import table as MS_Table - from lofar.calibration.common.coordinates import pqr_from_icrs + from .holography_dataset_definitions import * -class HolographyMeasurementSet(object): - ms_name_pattern = r'L(?P<sas_id>\d{6})_SB(?P<sub_band_id>\d{3})_uv\.MS' - CASA_XX_INDEX = 0 - CASA_XY_INDEX = 1 - CASA_YX_INDEX = 2 - CASA_YY_INDEX = 3 +class CASA_POLARIZATION_INDEX(IntEnum): + XX = 0 + XY = 1 + YX = 2 + YY = 3 + X = 0 + Y = 1 + + +ms_name_pattern = r'L(?P<sas_id>\d{6})_SB(?P<sub_band_id>\d{3})_uv\.MS' + + +def __mjd_to_astropy_time(mjd_time_seconds): + """ + Convert the modified julian date in seconds in a datetime object + :param mjd_time_seconds: modified julian data in seconds + :return: the date time of the given julian date + :rtype: datetime + """ + hour_in_seconds = 60 * 60 + day_in_seconds = hour_in_seconds * 24 + + return Time(mjd_time_seconds / day_in_seconds, format='mjd', scale='utc') + + +def _compute_lm_from_ra_dec_station_position_rotation_matrix_and_time(ra_dec_epoch, + rotation_matrix, + mjd_times): + if isinstance(ra_dec_epoch, numpy.ndarray): + ra, dec, epoch = ra_dec_epoch.tolist() + + astropy_times = [__mjd_to_astropy_time(mjd_time) + for mjd_time in mjd_times] + n_samples = len(astropy_times) + return_value_dtype = [('l', numpy.float64), + ('m', numpy.float64)] + + return_value = numpy.empty(n_samples, dtype=return_value_dtype) + l_m_arrays = pqr_from_icrs(numpy.array((ra, dec)), astropy_times, rotation_matrix) + + return_value['l'][:] = l_m_arrays[:, 0] + return_value['m'][:] = l_m_arrays[:, 1] + else: + raise TypeError('Expected a structured numpy array for ra_dec obtained {}'. + format(ra_dec_epoch)) - CASA_X_INDEX = 0 - CASA_Y_INDEX = 1 + return return_value + + +def parse_sas_id_and_sub_band_from_ms_name(ms_name): + if is_a_valid_ms_name(ms_name): + match = re.match(ms_name_pattern, ms_name) + else: + raise ValueError('The measurement set %s has not a valid name' % ms_name, ) + return str(match.group('sas_id')), int(match.group('sub_band_id')) + + +def is_a_valid_ms_name(ms_name): + pattern = ms_name_pattern + return re.match(pattern, ms_name.strip()) # is not None + + +def filter_valid_ms_names(list_of_ms_names): + return list(filter(is_a_valid_ms_name, list_of_ms_names)) + + +class HolographyMeasurementSet(object): def __init__(self, ms_name, ms_path): self.path = os.path.join(ms_path, ms_name) - if HolographyMeasurementSet.is_a_valid_ms_name(ms_name): + if is_a_valid_ms_name(ms_name): self.name = ms_name - self.sas_id, self.beamlet = \ - HolographyMeasurementSet.parse_sas_id_and_sub_band_from_ms_name(self.name) + self.sas_id, self.beamlet = parse_sas_id_and_sub_band_from_ms_name(self.name) else: raise ValueError('The measurement set located in %s has not a valid name' % self.path, ) - def get_frequency(self): - observation_table = self.get_spectral_window_table() - try: - reference_frequency = observation_table.getcol('REF_FREQUENCY')[0] - finally: - observation_table.close() - - return reference_frequency - - def get_subband(self): - """ - Return the sub band associated to this measurement set - :return: sub band number - :rtype: int + def get_data_table(self): """ - observation_table = self.get_observation_table() - try: - clock = observation_table.getcol('LOFAR_CLOCK_FREQUENCY')[0] - central_frequency = observation_table.getcol('LOFAR_OBSERVATION_FREQUENCY_CENTER')[0] - bit_sampling = 1024 - subband = int(round(central_frequency / clock * bit_sampling) % 512) - - finally: - observation_table.close() - return subband - - def get_data_table(self): + :return: + :rtype: MS_Table + """ data_table = MS_Table(self.path, ack=False, readonly=True) return data_table def get_pointing_table(self): + """ + + :return: + :rtype: MS_Table + """ pointing_table = MS_Table(self.path + '/POINTING', ack=False, readonly=True) return pointing_table def get_antenna_table(self): + """ + + :return: + :rtype: MS_Table + """ antenna_table = MS_Table(self.path + '/ANTENNA', ack=False, readonly=True) return antenna_table def get_spectral_window_table(self): - antenna_table = MS_Table(self.path + '/SPECTRAL_WINDOW', ack=False, readonly=True) - return antenna_table + """ + + :return: + :rtype: MS_Table + """ + spectral_window_table = MS_Table(self.path + '/SPECTRAL_WINDOW', ack=False, readonly=True) + return spectral_window_table def get_lofar_antenna_field_table(self): + """ + + :return: + :rtype: MS_Table + """ antenna_field_table = MS_Table(self.path + '/LOFAR_ANTENNA_FIELD', ack=False, readonly=True) return antenna_field_table + def get_observation_table(self): + observation_table = MS_Table(self.path + '/OBSERVATION', ack=False) + return observation_table + + def get_frequency(self): + with self.get_spectral_window_table() as observation_table: + reference_frequency = observation_table.getcol('REF_FREQUENCY')[0] + return reference_frequency + + def get_subband(self): + """ + Return the sub band associated to this measurement set + :return: sub band number + :rtype: int + """ + with self.get_observation_table() as observation_table: + clock = observation_table.getcol('LOFAR_CLOCK_FREQUENCY')[0] + central_frequency = observation_table.getcol('LOFAR_OBSERVATION_FREQUENCY_CENTER')[0] + + bit_sampling = 1024 + subband = int(round(central_frequency / clock * bit_sampling) % 512) + return subband + def get_station_position_tile_offsets_and_axes_coordinate_for_station_name(self, station_name): - antenna_table = self.get_antenna_table() - antenna_field_table = self.get_lofar_antenna_field_table() - try: - station_name_index = antenna_table.index('NAME').rownr(station_name) + with self.get_antenna_table() as antenna_table: + with self.get_lofar_antenna_field_table() as antenna_field_table: + station_name_index = antenna_table.index('NAME').rownr(station_name) - station_position = antenna_field_table.getcell('POSITION', station_name_index) - tile_offsets = antenna_field_table.getcell('ELEMENT_OFFSET', station_name_index) - tiles_not_used = antenna_field_table.getcell('ELEMENT_FLAG', station_name_index) - index_tiles_used = numpy.where( - (tiles_not_used[:, HolographyMeasurementSet.CASA_X_INDEX] == False) & - (tiles_not_used[:, HolographyMeasurementSet.CASA_Y_INDEX] == False))[0] - tile_offsets = tile_offsets[index_tiles_used, :] + station_position = antenna_field_table.getcell('POSITION', station_name_index) + tile_offsets = antenna_field_table.getcell('ELEMENT_OFFSET', station_name_index) + tiles_not_used = antenna_field_table.getcell('ELEMENT_FLAG', station_name_index) + index_tiles_used = numpy.where( + (tiles_not_used[:, CASA_POLARIZATION_INDEX.X] == False) & + (tiles_not_used[:, CASA_POLARIZATION_INDEX.Y] == False))[0] + tile_offsets = tile_offsets[index_tiles_used, :] - axes_coordinate = antenna_field_table.getcell('COORDINATE_AXES', station_name_index) + axes_coordinate = antenna_field_table.getcell('COORDINATE_AXES', station_name_index) - finally: - antenna_table.close() - antenna_field_table.close() - return station_position, tile_offsets, axes_coordinate + return station_position, tile_offsets, axes_coordinate def __extract_source_name_from_pointing(self): - pointing_table = self.get_pointing_table() - try: + with self.get_pointing_table() as pointing_table: unique_names = {name for name in pointing_table.getcol('NAME')} if len(unique_names) == 1: source_name = unique_names.pop() else: raise ValueError('Expected only a source as a target for the observation') - finally: - pointing_table.close() - - return source_name - def get_observation_table(self): - observation_table = MS_Table(self.path + '/OBSERVATION', ack=False) - return observation_table + return source_name def get_start_end_observation(self): - observation_table = self.get_observation_table() - - try: + with self.get_observation_table() as observation_table: time_range = observation_table.getcol('TIME_RANGE')[0] - start_time, end_time = time_range - - finally: - observation_table.close() - - return start_time, end_time + return start_time, end_time def get_source_name(self): return self.__extract_source_name_from_pointing() @@ -193,16 +254,16 @@ class HolographyMeasurementSet(object): beams_crosscorrelations_array[reference_station_index, :]['XX'] = \ crosscorrelations[ - reference_station_index, :, HolographyMeasurementSet.CASA_XX_INDEX] + reference_station_index, :, CASA_POLARIZATION_INDEX.XX] beams_crosscorrelations_array[reference_station_index, :]['XY'] = \ crosscorrelations[ - reference_station_index, :, HolographyMeasurementSet.CASA_XY_INDEX] + reference_station_index, :, CASA_POLARIZATION_INDEX.XY] beams_crosscorrelations_array[reference_station_index, :]['YX'] = \ crosscorrelations[ - reference_station_index, :, HolographyMeasurementSet.CASA_YX_INDEX] + reference_station_index, :, CASA_POLARIZATION_INDEX.YX] beams_crosscorrelations_array[reference_station_index, :]['YY'] = \ crosscorrelations[ - reference_station_index, :, HolographyMeasurementSet.CASA_YY_INDEX] + reference_station_index, :, CASA_POLARIZATION_INDEX.YY] beams_crosscorrelations_array[reference_station_index, :]['flag'] = \ flags[reference_station_index, :] @@ -237,7 +298,7 @@ class HolographyMeasurementSet(object): reference_stations) timestamps = beam_crosscorrelations_array[0, :]['t'] - lm_for_target_station = HolographyMeasurementSet._compute_lm_from_ra_dec_station_position_rotation_matrix_and_time( + lm_for_target_station = _compute_lm_from_ra_dec_station_position_rotation_matrix_and_time( pointing, rotation_matrix, timestamps) for reference_stations_index in range(beam_crosscorrelations_array.shape[0]): beam_crosscorrelations_array['l'][reference_stations_index, :] = \ @@ -247,62 +308,8 @@ class HolographyMeasurementSet(object): return reference_station_names, beam_crosscorrelations_array - @staticmethod - def __mjd_to_astropy_time(mjd_time_seconds): - """ - Convert the modified julian date in seconds in a datetime object - :param mjd_time_seconds: modified julian data in seconds - :return: the date time of the given julian date - :rtype: datetime - """ - hour_in_seconds = 60 * 60 - day_in_seconds = hour_in_seconds * 24 - - return Time(mjd_time_seconds / day_in_seconds, format='mjd', scale='utc') - - @staticmethod - def _compute_lm_from_ra_dec_station_position_rotation_matrix_and_time(ra_dec_epoch, - rotation_matrix, - mjd_times): - if isinstance(ra_dec_epoch, numpy.ndarray): - ra, dec, epoch = ra_dec_epoch.tolist() - - astropy_times = [HolographyMeasurementSet.__mjd_to_astropy_time(mjd_time) - for mjd_time in mjd_times] - n_samples = len(astropy_times) - return_value_dtype = [('l', numpy.float64), - ('m', numpy.float64)] - - return_value = numpy.empty(n_samples, dtype=return_value_dtype) - l_m_arrays = pqr_from_icrs(numpy.array((ra, dec)), astropy_times, rotation_matrix) - - return_value['l'][:] = l_m_arrays[:, 0] - return_value['m'][:] = l_m_arrays[:, 1] - else: - raise TypeError('Expected a structured numpy array for ra_dec obtained {}'. - format(ra_dec_epoch)) - - return return_value - def __repr__(self): return 'MeasurementSet(%d) located in %s for sas_id %s and sub_band_id %d' % (id(self), self.name, self.sas_id, self.beamlet) - - @staticmethod - def parse_sas_id_and_sub_band_from_ms_name(ms_name): - if HolographyMeasurementSet.is_a_valid_ms_name(ms_name): - match = re.match(HolographyMeasurementSet.ms_name_pattern, ms_name) - else: - raise ValueError('The measurement set %s has not a valid name' % ms_name, ) - return str(match.group('sas_id')), int(match.group('sub_band_id')) - - @staticmethod - def is_a_valid_ms_name(ms_name): - pattern = HolographyMeasurementSet.ms_name_pattern - return re.match(pattern, ms_name.strip()) # is not None - - @staticmethod - def filter_valid_ms_names(list_of_ms_names): - return list(filter(HolographyMeasurementSet.is_a_valid_ms_name, list_of_ms_names)) diff --git a/CAL/CalibrationCommon/lib/datacontainers/holography_observation.py b/CAL/CalibrationCommon/lib/datacontainers/holography_observation.py index 81a2d333efc2d49b4e3471dcb15b5a126de94758..dcf074025d7b77c005a3b2169e958ec5b821724d 100644 --- a/CAL/CalibrationCommon/lib/datacontainers/holography_observation.py +++ b/CAL/CalibrationCommon/lib/datacontainers/holography_observation.py @@ -1,12 +1,61 @@ -import re +import logging import os -import astropy.time as astrotime +import re from datetime import datetime -from .holography_measurementset import HolographyMeasurementSet -import logging + +import astropy.time as astrotime + +from .holography_measurementset import HolographyMeasurementSet, filter_valid_ms_names logger = logging.getLogger(__file__) + +def _mjd_to_datetime(mjd_time_seconds): + """ + Convert the modified julian date in seconds in a datetime object + :param mjd_time_seconds: modified julian data in seconds + :return: the date time of the given julian date + :rtype: datetime + """ + hour_in_seconds = 60 * 60 + day_in_seconds = hour_in_seconds * 24 + return astrotime.Time(mjd_time_seconds / day_in_seconds, format='mjd', + scale='utc').to_datetime() + + +def _compute_time_range_from_ms_list(ms_list): + observation_start, observation_end = ms_list[0].get_start_end_observation() + for ms in ms_list: + ms_start_time, ms_end_time = ms.get_start_end_observation() + if observation_start > ms_start_time: + observation_start = ms_start_time + if observation_end < ms_end_time: + observation_end = ms_end_time + return observation_start, observation_end + + +def extract_unique_source_names_from_ms_list(ms_list): + """ + Returns a set of unique source names given a list of measurement sets + :param ms_list: a list of measurement set where to extract the reference _frequencies + :type ms_list: list[HolographyMeasurementSet] + :return: a set of _frequencies + :rtype: set[str] + """ + return {ms.get_source_name() for ms in ms_list} + + +def extract_unique_subband_from_ms_list(ms_list): + """ + Returns a set of unique rcu modes given a list of measurement sets + :param ms_list: a list of measurement set where to extract the reference _frequencies + :type ms_list: list[HolographyMeasurementSet] + :return: a set of rcu modes + :rtype: set[int] + """ + return {ms.get_subband() for ms in ms_list} + + class HolographyObservation(): def __init__(self, path, sas_id, ms_for_a_given_beamlet_number, start_mjd_in_seconds, end_mjd_in_seconds, sub_band, frequency, source_name): @@ -32,149 +81,98 @@ class HolographyObservation(): self.path = path self.sas_id = sas_id self.ms_for_a_given_beamlet_number = ms_for_a_given_beamlet_number - self.start_datetime = HolographyObservation.__mjd_to_datetime(start_mjd_in_seconds) - self.end_datetime = HolographyObservation.__mjd_to_datetime(end_mjd_in_seconds) + self.start_datetime = _mjd_to_datetime(start_mjd_in_seconds) + self.end_datetime = _mjd_to_datetime(end_mjd_in_seconds) self.start_mjd = start_mjd_in_seconds self.end_mjd = end_mjd_in_seconds self.sub_band = sub_band self.frequency = frequency self.source_name = source_name - @staticmethod - def __mjd_to_datetime(mjd_time_seconds): - """ - Convert the modified julian date in seconds in a datetime object - :param mjd_time_seconds: modified julian data in seconds - :return: the date time of the given julian date - :rtype: datetime - """ - hour_in_seconds = 60 * 60 - day_in_seconds = hour_in_seconds * 24 - return astrotime.Time(mjd_time_seconds / day_in_seconds, format='mjd', - scale='utc').to_datetime() - - @staticmethod - def __compute_time_range_from_ms_list(ms_list): - observation_start, observation_end = ms_list[0].get_start_end_observation() - for ms in ms_list: - ms_start_time, ms_end_time = ms.get_start_end_observation() - if observation_start > ms_start_time: - observation_start = ms_start_time - if observation_end < ms_end_time: - observation_end = ms_end_time - return observation_start, observation_end - - @staticmethod - def extract_unique_source_names_from_ms_list(ms_list): - """ - Returns a set of unique source names given a list of measurement sets - :param ms_list: a list of measurement set where to extract the reference _frequencies - :type ms_list: list[HolographyMeasurementSet] - :return: a set of _frequencies - :rtype: set[str] - """ - return {ms.get_source_name() for ms in ms_list} - - @staticmethod - def extract_unique_subband_from_ms_list(ms_list): - """ - Returns a set of unique rcu modes given a list of measurement sets - :param ms_list: a list of measurement set where to extract the reference _frequencies - :type ms_list: list[HolographyMeasurementSet] - :return: a set of rcu modes - :rtype: set[int] - """ - return {ms.get_subband() for ms in ms_list} - - @staticmethod - def list_observations_in_path(path): - """ - List all the observations in the given path and return a list of HolographyObservation - - :param path: path to the directory where the holography observation is stored\ - :type path: str - :return: a list of HolographyObservation - :rtype: list[HolographyObservation] - """ - logger.info("Loading holography observations from \"%s\"...", path) - ms_dir_name_pattern = 'L(?P<sas_id>\d{6})' - ms_dirs_path_pattern = '^' + os.path.join(path, ms_dir_name_pattern, 'uv$') - observations_list = [] - for root, dirnames, filenames in os.walk(path): - match = re.match(ms_dirs_path_pattern, root) - if match: - sas_id = match.group('sas_id') - - ms_indexed_per_beamlet_number = HolographyObservation. \ - create_ms_dict_from_ms_name_list_and_path(dirnames, root) - - - start_mjd_in_seconds, end_mjd_in_seconds = HolographyObservation.\ - __compute_time_range_from_ms_list( - list(ms_indexed_per_beamlet_number.values())) - - unique_frequencies = HolographyObservation. \ - extract_unique_reference_frequencies_from_ms_list( - list(ms_indexed_per_beamlet_number.values())) - - if len(unique_frequencies) == 1: - frequency = unique_frequencies.pop() - else: - raise ValueError( - 'Multiple reference _frequencies per observation are not supported') - - unique_source_names = HolographyObservation.\ - extract_unique_source_names_from_ms_list( - list(ms_indexed_per_beamlet_number.values())) - - if len(unique_source_names) == 1: - source_name = unique_source_names.pop() - else: - raise ValueError( - 'Multiple source target per observation are not supported') - - unique_subband = HolographyObservation.\ - extract_unique_subband_from_ms_list( - list(ms_indexed_per_beamlet_number.values())) - - if len(unique_subband) == 1: - sub_band = unique_subband.pop() - else: - raise ValueError( - 'Multiple subband per observation are not supported') - - observations_list.append( - HolographyObservation(path, sas_id, ms_indexed_per_beamlet_number, - start_mjd_in_seconds, end_mjd_in_seconds, sub_band, - frequency, - source_name)) - logger.info("Holography observations were successfully loaded from \"%s\".", path) - return observations_list - - @staticmethod - def extract_unique_reference_frequencies_from_ms_list(ms_list): - """ - Returns a set of reference _frequencies given a list of measurement setss - :param ms_list: a list of measurement set where to extract the reference _frequencies - :type ms_list: list[HolographyMeasurementSet] - :return: returns a set of _frequencies - :rtype: set[float] - """ - return {ms.get_frequency() for ms in ms_list} - - @staticmethod - def create_ms_dict_from_ms_name_list_and_path(list_of_ms_names, path): - """ - Creates a dict measurement sets indexed by beamlet id - :param list_of_ms_names: a list of the ms to process - :param path: a path were the ms are stored - :return: a dict containing the map of the ms indexed by their beamlet number - ex. { 0 : ms_beam0 ....} - :rtype: dict[int, HolographyMeasurementSet] - """ - filtered_list_of_ms_names = HolographyMeasurementSet.filter_valid_ms_names(list_of_ms_names) - ms_list = [HolographyMeasurementSet(ms_name, path) for ms_name in filtered_list_of_ms_names] - - beamlet_ms_map = {ms.beamlet:ms for ms in ms_list} # - return beamlet_ms_map +def list_observations_in_path(path): + """ + List all the observations in the given path and return a list of HolographyObservation + + :param path: path to the directory where the holography observation is stored\ + :type path: str + :return: a list of HolographyObservation + :rtype: list[HolographyObservation] + """ + logger.info("Loading holography observations from \"%s\"...", path) + ms_dir_name_pattern = 'L(?P<sas_id>\d{6})' + ms_dirs_path_pattern = '^' + os.path.join(path, ms_dir_name_pattern, 'uv$') + observations_list = [] + for root, dirnames, filenames in os.walk(path): + match = re.match(ms_dirs_path_pattern, root) + if match: + sas_id = match.group('sas_id') + + ms_indexed_per_beamlet_number = create_ms_dict_from_ms_name_list_and_path(dirnames, + root) + + start_mjd_in_seconds, end_mjd_in_seconds = _compute_time_range_from_ms_list( + list(ms_indexed_per_beamlet_number.values())) + + unique_frequencies = extract_unique_reference_frequencies_from_ms_list( + list(ms_indexed_per_beamlet_number.values())) + + if len(unique_frequencies) == 1: + frequency = unique_frequencies.pop() + else: + raise ValueError( + 'Multiple reference _frequencies per observation are not supported') + + unique_source_names = extract_unique_source_names_from_ms_list( + list(ms_indexed_per_beamlet_number.values())) + + if len(unique_source_names) == 1: + source_name = unique_source_names.pop() + else: + raise ValueError( + 'Multiple source target per observation are not supported') + + unique_subband = extract_unique_subband_from_ms_list( + list(ms_indexed_per_beamlet_number.values())) + + if len(unique_subband) == 1: + sub_band = unique_subband.pop() + else: + raise ValueError( + 'Multiple subband per observation are not supported') + + observations_list.append( + HolographyObservation(path, sas_id, ms_indexed_per_beamlet_number, + start_mjd_in_seconds, end_mjd_in_seconds, sub_band, + frequency, + source_name)) + logger.info("Holography observations were successfully loaded from \"%s\".", path) + return observations_list + + +def extract_unique_reference_frequencies_from_ms_list(ms_list): + """ + Returns a set of reference _frequencies given a list of measurement setss + :param ms_list: a list of measurement set where to extract the reference _frequencies + :type ms_list: list[HolographyMeasurementSet] + :return: returns a set of _frequencies + :rtype: set[float] + """ + return {ms.get_frequency() for ms in ms_list} + + +def create_ms_dict_from_ms_name_list_and_path(list_of_ms_names, path): + """ + Creates a dict measurement sets indexed by beamlet id + :param list_of_ms_names: a list of the ms to process + :param path: a path were the ms are stored + :return: a dict containing the map of the ms indexed by their beamlet number + ex. { 0 : ms_beam0 ....} + :rtype: dict[int, HolographyMeasurementSet] + """ + filtered_list_of_ms_names = filter_valid_ms_names(list_of_ms_names) + ms_list = [HolographyMeasurementSet(ms_name, path) for ms_name in filtered_list_of_ms_names] + + beamlet_ms_map = {ms.beamlet: ms for ms in ms_list} # + + return beamlet_ms_map diff --git a/CAL/CalibrationCommon/lib/datacontainers/holography_specification.py b/CAL/CalibrationCommon/lib/datacontainers/holography_specification.py index b43f124e1cfd98ebf5f75c03769bbdb058877256..62d142740afb66e4d4b69e601c2013239e43bfc7 100644 --- a/CAL/CalibrationCommon/lib/datacontainers/holography_specification.py +++ b/CAL/CalibrationCommon/lib/datacontainers/holography_specification.py @@ -1,12 +1,13 @@ import datetime +import logging import os import re from collections import defaultdict from glob import glob -import logging logger = logging.getLogger(__file__) + class HolographyStationBeamSpecification(object): """ Contains the data regarding the beam specification for a single station. @@ -22,6 +23,158 @@ class HolographyStationBeamSpecification(object): station_type = () +def _parse_line(split_line): + """ + Parses a line in the holography specification file + :param split_line: dict containing the row indexed by the column name + :type split_line: dict[str, str] + """ + beam_specification = HolographyStationBeamSpecification() + + beam_specification.rcus_mode = int(split_line['rcus_mode']) + beam_specification.sub_band_ids = [int(sub_band) for sub_band in + split_line['sub_band'].split(',')] + beam_specification.mode_description = split_line['mode_description'] + beam_specification.rcus_involved = _parse_integer_range_or_list( + split_line['rcus']) + beam_specification.beamlets = split_line['beamlets'] + + beam_specification.station_pointing = _parse_pointing( + split_line['station_pointing']) + beam_specification.virtual_pointing = _parse_pointing( + split_line['virtual_pointing']) + if len(beam_specification.sub_band_ids) == 1: + beam_specification.station_type = 'target' + else: + beam_specification.station_type = 'reference' + beam_specification.station_name = split_line['station_name'] + + return beam_specification + + +def list_bsf_files_in_path(path): + """ + List all the Holography beam specification files in the given path + :param path: path to the beam specification files + :type path: str + :return: the list of Holography Specifications + :rtype: list[HolographySpecification] + """ + logger.info("Loading holography beam specifications from \"%s\"...", path) + bsf_files_name_pattern = 'Holog-*.txt' + bsf_files_path_pattern = os.path.join(path, bsf_files_name_pattern) + matched_path_list = glob(bsf_files_path_pattern) + matched_file_path_list = list( + filter(lambda path_i: os.path.isfile(path_i), matched_path_list)) + matched_file_name_list = list(map(lambda path_i: os.path.basename(path_i), + matched_file_path_list)) + holography_beam_specification = create_hs_list_from_name_list_and_path( + matched_file_name_list, path) + logger.info("Loading holography beam specifications were successfully loaded from \"%s\".", + path) + return holography_beam_specification + + +def create_hs_list_from_name_list_and_path(name_list, path): + return [HolographySpecification(name, path) for name in name_list] + + +def is_holography_specification_file_name(name): + return re.match(HolographySpecification.hs_name_pattern, name) is not None + + +def extract_id_date_comment_from_name(name): + match = re.match(HolographySpecification.hs_name_pattern, name) + date = match.group('date') + hs_id = int(match.group('id')) + comment = match.group('comment') + date = datetime.datetime.strptime(date, '%Y%m%d') + return hs_id, date, comment + + +def _split_header(line): + """ + Split the header of holography specification file + :param line: line containing the header + :return: a dict containing the start_date, + the end_date, the rcu_mode, and the beam_switch_delay + :rtype: dict + """ + date_regex = '\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}' + format = r'(?P<start_date>{date_regex})\s*' \ + '(?P<end_date>{date_regex})\s*' \ + '(?P<rcu_mode>\d*)\s*' \ + '(?P<beam_switch_delay>\d*.\d*)'.format(date_regex=date_regex) + match = re.match(format, line) + return match.groupdict() + + +def _split_line(line): + """ + Split the header of holography specification file + :param line: line containing the header + :return: a dict containing the start_date, + the end_date, the rcu_mode, and the beam_switch_delay + :rtype: dict + """ + range_regex = '(\d*\:\d*)|(\d*)' + ra_dec_regex = '\d*\.\d*,-?\d*\.\d*,\w*' + regex = r'^(?P<station_name>\w*)\s*' \ + r'(?P<mode_description>\w*)\s*' \ + r'(?P<sub_band>[\d,]*)\s*' \ + r'(?P<beamlets>{range_regex})\s*' \ + r'(?P<rcus>{range_regex})\s*' \ + r'(?P<rcus_mode>(\d*))\s*' \ + r'(?P<virtual_pointing>{ra_dec_regex})\s*' \ + r'(?P<station_pointing>{ra_dec_regex})'.format(range_regex=range_regex, + ra_dec_regex=ra_dec_regex) + match = re.match(regex, line) + if match is None: + raise ValueError('Cannot parse line {}'.format(line)) + return match.groupdict() + + +def _split_lines(lines): + return [_split_line(line) + for line in lines] + + +def _parse_pointing(pointing_string): + ra, dec, coordinate_system = pointing_string.split(',') + ra = float(ra) + dec = float(dec) + return dict(ra=ra, dec=dec, coordinate_system=coordinate_system) + + +def _parse_integer_range_or_list(string_to_be_parsed): + """ + Parses a string containing a list of int or an inclusive range and returns a list of int + + ex "1,2,3,4,5" -> [1, 2, 3, 4, 5] + "1:4" -> [1, 2, 3, 4] + :param string_to_be_parsed: the string representing a list of int or a range + :return: a list of int + :rtype: list(int) + """ + if ':' in string_to_be_parsed: + try: + start, end = map(int, string_to_be_parsed.split(':')) + except ValueError as e: + raise ValueError('Cannot parse string %s expected [start]:[end] -> %s' % + (string_to_be_parsed, e)) + return_value = [x for x in range(start, end)] + elif ',' in string_to_be_parsed: + try: + return_value = list(map(int, string_to_be_parsed.split(','))) + except ValueError as e: + raise ValueError('Cannot parse string %s expected [int],[int],... -> %s' % + (string_to_be_parsed, e)) + else: + raise ValueError('Cannot parse string %s expected [int],[int],... or [start]:[end] %s' % + string_to_be_parsed) + return return_value + + class HolographySpecification(object): """ The HolographySpecification represents the set of input used to specify an holography @@ -40,8 +193,7 @@ class HolographySpecification(object): """ self.path = os.path.join(path, name) self.name = name - self.id, self.date, self.comment = HolographySpecification. \ - extract_id_date_comment_from_name(name) + self.id, self.date, self.comment = extract_id_date_comment_from_name(name) self.reference_station_names = None self.target_station_names = None self.station_specification_map = defaultdict(list) @@ -59,28 +211,6 @@ class HolographySpecification(object): self.path, ) - @staticmethod - def list_bsf_files_in_path(path): - """ - List all the Holography beam specification files in the given path - :param path: path to the beam specification files - :type path: str - :return: the list of Holography Specifications - :rtype: list[HolographySpecification] - """ - logger.info("Loading holography beam specifications from \"%s\"...", path) - bsf_files_name_pattern = 'Holog-*.txt' - bsf_files_path_pattern = os.path.join(path, bsf_files_name_pattern) - matched_path_list = glob(bsf_files_path_pattern) - matched_file_path_list = list( - filter(lambda path_i: os.path.isfile(path_i), matched_path_list)) - matched_file_name_list = list(map(lambda path_i: os.path.basename(path_i), - matched_file_path_list)) - holography_beam_specification = HolographySpecification.create_hs_list_from_name_list_and_path( - matched_file_name_list, path) - logger.info("Loading holography beam specifications were successfully loaded from \"%s\".", path) - return holography_beam_specification - def get_beam_specifications_per_station_name(self, station_name): """ Returns a list of beam specifications for the given station name @@ -99,44 +229,10 @@ class HolographySpecification(object): self._update_class_attributes() logger.debug("Reading holography file \"%s\" done.", self.path) - @staticmethod - def create_hs_list_from_name_list_and_path(name_list, path): - return [HolographySpecification(name, path) for name in name_list] - - @staticmethod - def is_holography_specification_file_name(name): - return re.match(HolographySpecification.hs_name_pattern, name) is not None - - @staticmethod - def extract_id_date_comment_from_name(name): - match = re.match(HolographySpecification.hs_name_pattern, name) - date = match.group('date') - hs_id = int(match.group('id')) - comment = match.group('comment') - date = datetime.datetime.strptime(date, '%Y%m%d') - return hs_id, date, comment - def _read_lines(self): with open(self.path, 'r') as fstream_in: return fstream_in.readlines() - @staticmethod - def _split_header(line): - """ - Split the header of holography specification file - :param line: line containing the header - :return: a dict containing the start_date, - the end_date, the rcu_mode, and the beam_switch_delay - :rtype: dict - """ - date_regex = '\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}' - format = r'(?P<start_date>{date_regex})\s*' \ - '(?P<end_date>{date_regex})\s*' \ - '(?P<rcu_mode>\d*)\s*' \ - '(?P<beam_switch_delay>\d*.\d*)'.format(date_regex=date_regex) - match = re.match(format, line) - return match.groupdict() - def _parse_header(self, header): """ Parse the header @@ -144,7 +240,7 @@ class HolographySpecification(object): """ date_time_format = '%Y-%m-%d %H:%M:%S' - split_header = HolographySpecification._split_header(header) + split_header = _split_header(header) self.start_datetime = datetime.datetime.strptime(split_header['start_date'], date_time_format) @@ -153,111 +249,16 @@ class HolographySpecification(object): self.rcu_mode = split_header['rcu_mode'] self.beam_set_interval = split_header['beam_switch_delay'] - @staticmethod - def _split_line(line): - """ - Split the header of holography specification file - :param line: line containing the header - :return: a dict containing the start_date, - the end_date, the rcu_mode, and the beam_switch_delay - :rtype: dict - """ - range_regex = '(\d*\:\d*)|(\d*)' - ra_dec_regex = '\d*\.\d*,-?\d*\.\d*,\w*' - regex = r'^(?P<station_name>\w*)\s*' \ - r'(?P<mode_description>\w*)\s*' \ - r'(?P<sub_band>[\d,]*)\s*' \ - r'(?P<beamlets>{range_regex})\s*' \ - r'(?P<rcus>{range_regex})\s*' \ - r'(?P<rcus_mode>(\d*))\s*' \ - r'(?P<virtual_pointing>{ra_dec_regex})\s*' \ - r'(?P<station_pointing>{ra_dec_regex})'.format(range_regex=range_regex, - ra_dec_regex=ra_dec_regex) - match = re.match(regex, line) - if match is None: - raise ValueError('Cannot parse line {}'.format(line)) - return match.groupdict() - - @staticmethod - def _split_lines(lines): - return [HolographySpecification._split_line(line) - for line in lines] - - @staticmethod - def _parse_pointing(pointing_string): - ra, dec, coordinate_system = pointing_string.split(',') - ra = float(ra) - dec = float(dec) - return dict(ra=ra, dec=dec, coordinate_system=coordinate_system) - - @staticmethod - def _parse_integer_range_or_list(string_to_be_parsed): - """ - Parses a string containing a list of int or an inclusive range and returns a list of int - - ex "1,2,3,4,5" -> [1, 2, 3, 4, 5] - "1:4" -> [1, 2, 3, 4] - :param string_to_be_parsed: the string representing a list of int or a range - :return: a list of int - :rtype: list(int) - """ - if ':' in string_to_be_parsed: - try: - start, end = map(int, string_to_be_parsed.split(':')) - except ValueError as e: - raise ValueError('Cannot parse string %s expected [start]:[end] -> %s' % - (string_to_be_parsed, e)) - return_value = [x for x in range(start, end)] - elif ',' in string_to_be_parsed: - try: - return_value = list(map(int, string_to_be_parsed.split(','))) - except ValueError as e: - raise ValueError('Cannot parse string %s expected [int],[int],... -> %s' % - (string_to_be_parsed, e)) - else: - raise ValueError('Cannot parse string %s expected [int],[int],... or [start]:[end] %s' % - string_to_be_parsed) - return return_value - - @staticmethod - def _parse_line(split_line): - """ - Parses a line in the holography specification file - :param split_line: dict containing the row indexed by the column name - :type split_line: dict[str, str] - """ - beam_specification = HolographyStationBeamSpecification() - - beam_specification.rcus_mode = int(split_line['rcus_mode']) - beam_specification.sub_band_ids = [int(sub_band) for sub_band in - split_line['sub_band'].split(',')] - beam_specification.mode_description = split_line['mode_description'] - beam_specification.rcus_involved = HolographySpecification._parse_integer_range_or_list( - split_line['rcus']) - beam_specification.beamlets = split_line['beamlets'] - - beam_specification.station_pointing = HolographySpecification._parse_pointing( - split_line['station_pointing']) - beam_specification.virtual_pointing = HolographySpecification._parse_pointing( - split_line['virtual_pointing']) - if len(beam_specification.sub_band_ids) == 1: - beam_specification.station_type = 'target' - else: - beam_specification.station_type = 'reference' - beam_specification.station_name = split_line['station_name'] - - return beam_specification - def _parse_lines(self, lines): - split_lines = HolographySpecification._split_lines(lines) + split_lines = _split_lines(lines) for line in split_lines: - parsed_line = HolographySpecification._parse_line(line) + parsed_line = _parse_line(line) self.station_specification_map[parsed_line.station_name] += [parsed_line] def _update_class_attributes(self): - self.station_names = self.station_specification_map.keys() + self.station_names = list(self.station_specification_map.keys()) self.reference_station_names = [station_name for station_name in self.station_specification_map if len(self.station_specification_map[station_name]) == 1] diff --git a/CAL/CalibrationCommon/lib/mshologextract.py b/CAL/CalibrationCommon/lib/mshologextract.py index 5ba8ffea6eb8bb70385f535ffc5ba94c08c52319..921cfa03603a9325d14ecca8c7b8607002177b4e 100644 --- a/CAL/CalibrationCommon/lib/mshologextract.py +++ b/CAL/CalibrationCommon/lib/mshologextract.py @@ -9,6 +9,7 @@ from multiprocessing import Pool as ThreadPool import os from lofar.calibration.common.utils import * +from lofar.calibration.common.datacontainers.holography_dataset import HolographyDataset DEFAULT_SLEEP_TIME = 1 @@ -80,7 +81,7 @@ def read_and_store_single_target_station(target_station_name, matched_bsf_ms_pai logger.info('Loading data for station %s ', target_station_name) holography_dataset.load_from_beam_specification_and_ms(target_station_name, matched_bsf_ms_pair) - filename = '%s.hdf5' % (target_station_name) + filename = '%s.hdf5' % target_station_name outpath = os.path.join(store_path, filename) logger.info('Storing data for station %s in %s', target_station_name, outpath) @@ -150,9 +151,11 @@ def read_holography_datasets_and_store(holography_observation_path, raise NotImplementedError() target_station_names = list_all_target_stations(holography_observation_path) + logger.debug('target station names %s', target_station_names) matched_bsf_ms_pair =\ match_holography_beam_specification_file_with_observation(holography_observation_path) - + logger.debug('matched beam specification files and measurement sets: %s', + matched_bsf_ms_pair) try: if not os.path.exists(store_path): os.makedirs(store_path) diff --git a/CAL/CalibrationCommon/lib/utils.py b/CAL/CalibrationCommon/lib/utils.py index 807f1c706a67029db3563bf729fa9712b1a38e27..587e3fbae6755b74d4bbd585ec6c39b94085ef6a 100644 --- a/CAL/CalibrationCommon/lib/utils.py +++ b/CAL/CalibrationCommon/lib/utils.py @@ -1,5 +1,9 @@ -from lofar.calibration.common.datacontainers import * +from lofar.calibration.common.datacontainers.holography_specification import list_bsf_files_in_path +from lofar.calibration.common.datacontainers.holography_observation import list_observations_in_path +import logging + +logger = logging.getLogger(__name__) def is_observation_in_range(start, end, from_datetime, to_datetime): """ @@ -21,8 +25,10 @@ def is_observation_in_range(start, end, from_datetime, to_datetime): def match_holography_beam_specification_file_with_observation(path): - bsf_files = HolographySpecification.list_bsf_files_in_path(path) - observation_list = HolographyObservation.list_observations_in_path(path) + bsf_files = list_bsf_files_in_path(path) + observation_list = list_observations_in_path(path) + logger.debug('specification files found %s', bsf_files) + logger.debug('observation files found %s', observation_list) matched_observation_bsf_pair = [] for bsf_file in bsf_files: bsf_file.read_file() @@ -38,7 +44,7 @@ def match_holography_beam_specification_file_with_observation(path): def list_all_target_stations(observation_path): - bsf_files = HolographySpecification.list_bsf_files_in_path(observation_path) + bsf_files = list_bsf_files_in_path(observation_path) target_station_name_set = set() for bsf_file in bsf_files: bsf_file.read_file() diff --git a/CAL/CalibrationCommon/test/t_holography_dataset_class.py b/CAL/CalibrationCommon/test/t_holography_dataset_class.py index c7840898ccd35d59579d19d9081b8e0a96410d8a..2ce17f2aecdefbbeccc6cf8b59e7b17bda667314 100755 --- a/CAL/CalibrationCommon/test/t_holography_dataset_class.py +++ b/CAL/CalibrationCommon/test/t_holography_dataset_class.py @@ -4,9 +4,9 @@ import tempfile import h5py import numpy import os -from lofar.calibration.common.datacontainers import HolographyDataset, HolographySpecification -from lofar.calibration.common.datacontainers import HolographyObservation - +from lofar.calibration.common.datacontainers import HolographyDataset +from lofar.calibration.common.datacontainers.holography_observation import list_observations_in_path +from lofar.calibration.common.datacontainers.holography_specification import list_bsf_files_in_path logger = logging.getLogger('t_holography_dataset_class') # READ doc/Holography_Data_Set.md! It contains the location from which the @@ -20,8 +20,8 @@ class TestHolographyDatasetClass(unittest.TestCase): Reads a Measurement Set from a file and converts it to an HDS. ''' # Read the MS into memory. - holist = HolographyObservation.list_observations_in_path(path_to_test_data) - hbsflist = HolographySpecification.list_bsf_files_in_path(path_to_test_data) + holist = list_observations_in_path(path_to_test_data) + hbsflist = list_bsf_files_in_path(path_to_test_data) for hbsf in hbsflist: hbsf.read_file() ho_per_ms = [(hbsf, ho) for hbsf, ho in zip(hbsflist, holist)] diff --git a/CAL/CalibrationCommon/test/t_holography_dataset_class2.py b/CAL/CalibrationCommon/test/t_holography_dataset_class2.py index 9dfd40ad671bd358186f9af1bef7ffaa88e3a79b..064f1b36858c270122a71422827e26e40ba8691c 100755 --- a/CAL/CalibrationCommon/test/t_holography_dataset_class2.py +++ b/CAL/CalibrationCommon/test/t_holography_dataset_class2.py @@ -2,8 +2,9 @@ import logging import unittest import tempfile import os -from lofar.calibration.common.datacontainers import HolographyDataset, HolographySpecification -from lofar.calibration.common.datacontainers import HolographyObservation +from lofar.calibration.common.datacontainers import HolographyDataset +from lofar.calibration.common.datacontainers.holography_specification import list_bsf_files_in_path +from lofar.calibration.common.datacontainers.holography_observation import list_observations_in_path logger = logging.getLogger('t_holography_dataset_class') @@ -21,8 +22,8 @@ class TestHolographyDatasetClass(unittest.TestCase): the file is compared with the HDS in memory. ''' # Read the MS into memory. - holist = HolographyObservation.list_observations_in_path(path_to_test_data) - hbsflist = HolographySpecification.list_bsf_files_in_path(path_to_test_data) + holist = list_observations_in_path(path_to_test_data) + hbsflist = list_bsf_files_in_path(path_to_test_data) for hbsf in hbsflist: hbsf.read_file() ho_per_ms = [(hbsf, ho) for hbsf, ho in zip(hbsflist, holist)] diff --git a/CAL/CalibrationCommon/test/t_holography_ms_class.py b/CAL/CalibrationCommon/test/t_holography_ms_class.py index 3765b1a40ed7531842b512ee088123fcceb10e57..36ffc1d4dc2b44063cf166d06bedded4093a514f 100755 --- a/CAL/CalibrationCommon/test/t_holography_ms_class.py +++ b/CAL/CalibrationCommon/test/t_holography_ms_class.py @@ -5,7 +5,8 @@ import numpy from astropy.coordinates import SkyCoord from astropy.time import Time -from lofar.calibration.common.datacontainers import HolographyMeasurementSet +from lofar.calibration.common.datacontainers.holography_measurementset import \ + _compute_lm_from_ra_dec_station_position_rotation_matrix_and_time logger = logging.getLogger('t_holography_ms_class') @@ -27,8 +28,7 @@ class TestHolographyDatasetClass(unittest.TestCase): [9.92823000e-01, -9.54190000e-02, 7.20990000e-02], [3.30000000e-05, 6.03078000e-01, 7.97682000e-01]], dtype=numpy.float64) - result = HolographyMeasurementSet.\ - _compute_lm_from_ra_dec_station_position_rotation_matrix_and_time( + result = _compute_lm_from_ra_dec_station_position_rotation_matrix_and_time( target_icrs_rad, core_pqr_itrs_mat, [sample_time, sample_time]) return_value_dtype = [('l', numpy.float64), diff --git a/CAL/CalibrationCommon/test/t_holography_observation.py b/CAL/CalibrationCommon/test/t_holography_observation.py index 1e764c67712dda046333b9b923415a292536d399..cb42013f387c3cd5fe8d910f88449378fb5adfa3 100755 --- a/CAL/CalibrationCommon/test/t_holography_observation.py +++ b/CAL/CalibrationCommon/test/t_holography_observation.py @@ -1,6 +1,6 @@ import logging import unittest -from lofar.calibration.common.datacontainers import HolographyObservation +from lofar.calibration.common.datacontainers.holography_observation import list_observations_in_path logger = logging.getLogger('t_holography_dataset_class') path_to_test_data = '/var/tmp/holography' @@ -8,7 +8,7 @@ path_to_test_data = '/var/tmp/holography' class TestHolographyDatasetClass(unittest.TestCase): def test_holography_observation_path_listing(self): - list = HolographyObservation.list_observations_in_path(path_to_test_data) + list = list_observations_in_path(path_to_test_data) ## Assert correct reading of the files HolographyObservation_freq_one = list.pop() diff --git a/CAL/CalibrationProcessing/bin/holography_process.py b/CAL/CalibrationProcessing/bin/holography_process.py index 6722662418b5d7a86d17bd7a9b974ea1b9dc315a..224d1b234b472d8b8bc3f21aea90525697aa770a 100644 --- a/CAL/CalibrationProcessing/bin/holography_process.py +++ b/CAL/CalibrationProcessing/bin/holography_process.py @@ -3,12 +3,36 @@ import argparse import sys import os import lofar.calibration.common.datacontainers as datacontainers -import lofar.calibration.processing as processing +from lofar.calibration.processing.solver import solve_gains_per_datatable +from lofar.calibration.processing.averaging import average_data,\ + weighted_average_dataset_per_station +from lofar.calibration.processing.normalize import normalize_beams_by_central_one import logging +import functools + logger = logging.getLogger('holography_process') +def log_step_execution(step_description): + def log_step_decorator(function): + @functools.wraps(function) + def wrapper(*args, **kwargs): + logger.info('Performing step %s', step_description) + try: + result = function(*args, **kwargs) + logger.info('Step %s performed', step_description) + return result + + except Exception as e: + logger.exception('exception occurred performing step %s: %s', + step_description, e) + raise e + + return wrapper + return log_step_decorator + + def setup_argument_parser(): """ @@ -33,6 +57,7 @@ def parse_command_arguments(arguments): return parser.parse_args(arguments) +@log_step_execution('load') def loading(path): """ @@ -41,16 +66,11 @@ def loading(path): :return: :rtype: datacontainers.HolographyDataset """ - - logger.info('loading dataset from %s', path) - if not os.path.exists(path): - raise Exception('Path not found') - dataset = datacontainers.HolographyDataset.load_from_file(path) - logger.info('dataset from path %s loaded', path) return dataset +@log_step_execution('normalize') def normalize_step(dataset, input_datatable): """ @@ -60,17 +80,14 @@ def normalize_step(dataset, input_datatable): :return: :rtype: dict(dict(dict(numpy.ndarray))) """ - logger.info('normalizing dataset') - output_datatable = \ - processing.normalize_beams_by_central_one(dataset, input_datatable, dataset.central_beamlets) - if dataset.derived_data is None: - dataset.derived_data = dict() - + output_datatable = normalize_beams_by_central_one(dataset, + input_datatable, + dataset.central_beamlets) dataset.derived_data['NORMALIZED'] = output_datatable - logger.info('dataset normalized') return output_datatable +@log_step_execution('time average') def time_averaging_step(dataset, input_datatable): """ @@ -80,17 +97,12 @@ def time_averaging_step(dataset, input_datatable): :return: :rtype: dict(dict(dict(numpy.ndarray))) """ - logger.info('time averaging dataset') - output_datable = \ - processing.averaging.average_data(input_datatable) - if dataset.derived_data is None: - dataset.derived_data = dict() - logger.info('dataset time averaged') - + output_datable = average_data(input_datatable) dataset.derived_data['TIME_AVERAGED'] = output_datable return output_datable +@log_step_execution('station average') def station_averaging_step(dataset, input_datatable): """ @@ -100,16 +112,14 @@ def station_averaging_step(dataset, input_datatable): :return: :rtype: dict(dict(dict(numpy.ndarray))) """ - logger.info('averaging dataset per reference station') - output_datable = \ - processing.averaging.weighted_average_dataset_per_station(dataset, input_datatable) + output_datable = weighted_average_dataset_per_station(dataset, input_datatable) if dataset.derived_data is None: dataset.derived_data = dict() - logger.info('dataset averaged for reference station') dataset.derived_data['STATION_AVERAGED'] = output_datable return output_datable +@log_step_execution('solving gains') def compute_gains_step(dataset, input_datatable, direct_complex=True): """ @@ -119,32 +129,36 @@ def compute_gains_step(dataset, input_datatable, direct_complex=True): :return: :rtype: dict(dict(dict(numpy.ndarray))) """ - logger.info('computing gains per dataset') - output_datable = \ - processing.solver.solve_gains_per_datatable(dataset, input_datatable, direct_complex=direct_complex) + output_datable = solve_gains_per_datatable(dataset, input_datatable, direct_complex=direct_complex) if dataset.derived_data is None: dataset.derived_data = dict() - logger.info('gains per dataset computed') dataset.derived_data['GAINS'] = output_datable return output_datable +@log_step_execution('save') def store_step(dataset, filepath): absolute_filepath = os.path.abspath(filepath) - logger.info('storing dataset in path %s', filepath) dataset.store_to_file(absolute_filepath) - logger.info('stored dataset in path %s', filepath) +def prepare_dataset_for_processing(dataset: datacontainers.HolographyDataset): + """ + :param dataset: + :return: None + """ + if dataset.derived_data is None: + dataset.derived_data = dict() + def execute_processing(arguments): dataset = loading(arguments.input_path) - + prepare_dataset_for_processing(dataset) normalized_data = normalize_step(dataset, dataset.data) averaged_data = time_averaging_step(dataset, normalized_data) station_averaged_data = station_averaging_step(dataset, averaged_data) - logger.info('storing datatafile in %s', os.path.abspath(arguments.output_path)) + store_step(dataset, arguments.output_path) - gains = compute_gains_step(dataset, station_averaged_data) + _ = compute_gains_step(dataset, station_averaged_data) store_step(dataset, arguments.output_path) diff --git a/CAL/CalibrationProcessing/lib/processing/averaging.py b/CAL/CalibrationProcessing/lib/processing/averaging.py index 3ac3b1670e85497e588ce5bc7f3ab9b2ec9657df..1485649e58950378e85b6f0339b793996b608c06 100644 --- a/CAL/CalibrationProcessing/lib/processing/averaging.py +++ b/CAL/CalibrationProcessing/lib/processing/averaging.py @@ -65,6 +65,17 @@ def average_values_by_sample(data_array, window_size, field_name=None): return result +def is_datatable_completely_flagged(data_table: numpy.ndarray): + """ + Checks if the datatable is entirely flagged + :param data_table: input data + :return: if the data is completely flagged + :rtype: bool + """ + flags = numpy.array(data_table)['flag'] + return numpy.product(flags) + + def _average_datatable_with_function(data_table_in, parameter, expected_array_size, function): field_names_mean = [] formats_mean = [] @@ -100,8 +111,7 @@ def _average_datatable_with_function(data_table_in, parameter, expected_array_si fields_to_average = list(data_table_in.dtype.names) fields_to_average.remove('flag') - flags = numpy.array(data_table_in)['flag'] - is_flagged = numpy.product(flags) == True + is_flagged = is_datatable_completely_flagged(data_table_in) for field_name in fields_to_average: datatable_values = numpy.array(data_table_in) @@ -166,7 +176,6 @@ def average_datatable(data_table_in): return result - def average_datatable_by_time_interval(data_table_in, time_interval): """ Average the datatable with a given sample time interval window @@ -253,7 +262,7 @@ def average_dataset_by_sample(input_data_set, window_size): if input_data_set is not None: if isinstance(input_data_set, HolographyDataset) is True: dataset = input_data_set - elif isinstance(str, input_data_set): + elif isinstance(input_data_set, str): dataset = HolographyDataset.load_from_file(input_data_set) else: text = "Cannot average data by samples! The input data is neither an HDS not a string." @@ -265,7 +274,7 @@ def average_dataset_by_sample(input_data_set, window_size): logger.error(text) raise ValueError(text) - if isinstance(int, window_size) is False or window_size < 1: + if isinstance(window_size, int) is False or window_size < 1: text = "Cannot average data by samples! The window size must be an integer value bigger than 0." logger.error(text) raise ValueError(text) @@ -344,7 +353,6 @@ def average_data(input_data_table): return out_data - def _compute_size_of_new_array(array_size, window_size): """ Round up the array size given a window size. @@ -364,7 +372,7 @@ def _compute_size_of_new_array(array_size, window_size): return new_array_size -def _weigthed_average_data_per_polarization(input_dataset, output_dataset, +def _weighted_average_data_per_polarization(input_dataset, output_dataset, real_weights, imag_weights, polarization): mask = input_dataset['flag'] == False try: @@ -379,6 +387,60 @@ def _weigthed_average_data_per_polarization(input_dataset, output_dataset, output_dataset['flag'] = True +def __prepare_input_to_weight_dataset(stations, frequency, beam, + input_data_table): + """ + Prepare the input data for the weighted average + :param stations: list of stations to iterate on + :type stations: (str) + :param frequency: frequency ID + :type frequency: str + :param beam: beam ID + :type beam: str + :param input_data_table: data to iterate over + :return: the average values, + the standard deviation of the real part, + the standard deviation of the imaginary part + :rtype: list(numpy.ndarray, numpy.ndarray, numpy.ndarray) + """ + + n_stations = len(stations) + + average_per_beam_station = numpy.zeros(n_stations, dtype=HDS_data_sample_type) + std_real_per_beam_station = numpy.zeros(n_stations, dtype=HDS_data_sample_type) + std_imag_per_beam_station = numpy.zeros(n_stations, dtype=HDS_data_sample_type) + + for i, station in enumerate(stations): + data_per_station_frequency_beam = \ + input_data_table[station][str(frequency)][beam] + average_per_beam_station['XX'][i] = data_per_station_frequency_beam['mean']['XX'] + average_per_beam_station['XY'][i] = data_per_station_frequency_beam['mean']['XY'] + average_per_beam_station['YX'][i] = data_per_station_frequency_beam['mean']['YX'] + average_per_beam_station['YY'][i] = data_per_station_frequency_beam['mean']['YY'] + + average_per_beam_station['l'][i] = data_per_station_frequency_beam['mean']['l'] + average_per_beam_station['m'][i] = data_per_station_frequency_beam['mean']['m'] + + std_real_per_beam_station['XX'][i] = \ + data_per_station_frequency_beam['std']['XX_real'] + std_real_per_beam_station['XY'][i] = \ + data_per_station_frequency_beam['std']['XY_real'] + std_real_per_beam_station['YX'][i] = \ + data_per_station_frequency_beam['std']['YX_real'] + std_real_per_beam_station['YY'][i] = \ + data_per_station_frequency_beam['std']['YY_real'] + + std_imag_per_beam_station['XX'][i] = \ + data_per_station_frequency_beam['std']['XX_imag'] + std_imag_per_beam_station['XY'][i] = \ + data_per_station_frequency_beam['std']['XY_imag'] + std_imag_per_beam_station['YX'][i] = \ + data_per_station_frequency_beam['std']['YX_imag'] + std_imag_per_beam_station['YY'][i] = \ + data_per_station_frequency_beam['std']['YY_imag'] + return average_per_beam_station, std_real_per_beam_station, std_imag_per_beam_station + + def weighted_average_dataset_per_station(dataset, input_data_table): """ Perform a weighted average of the dataset using the std deviations in the input data @@ -389,7 +451,7 @@ def weighted_average_dataset_per_station(dataset, input_data_table): :return: """ stations = dataset.reference_stations - n_stations = len(stations) + frequencies = dataset.frequencies beams = dataset.beamlets @@ -403,41 +465,11 @@ def weighted_average_dataset_per_station(dataset, input_data_table): beam_str = str(beam) result_per_beam = numpy.zeros(1, dtype=HDS_data_sample_type) - average_per_beam_station = numpy.zeros(n_stations, dtype=HDS_data_sample_type) - std_real_per_beam_station = numpy.zeros(n_stations, dtype=HDS_data_sample_type) - std_imag_per_beam_station = numpy.zeros(n_stations, dtype=HDS_data_sample_type) - - for i, station in enumerate(stations): - data_per_station_frequency_beam = \ - input_data_table[station][str(frequency)][beam_str] - average_per_beam_station['XX'][i] = data_per_station_frequency_beam['mean']['XX'] - average_per_beam_station['XY'][i] = data_per_station_frequency_beam['mean']['XY'] - average_per_beam_station['YX'][i] = data_per_station_frequency_beam['mean']['YX'] - average_per_beam_station['YY'][i] = data_per_station_frequency_beam['mean']['YY'] - - average_per_beam_station['l'][i] = data_per_station_frequency_beam['mean']['l'] - average_per_beam_station['m'][i] = data_per_station_frequency_beam['mean']['m'] - - std_real_per_beam_station['XX'][i] = \ - data_per_station_frequency_beam['std']['XX_real'] - std_real_per_beam_station['XY'][i] = \ - data_per_station_frequency_beam['std']['XY_real'] - std_real_per_beam_station['YX'][i] = \ - data_per_station_frequency_beam['std']['YX_real'] - std_real_per_beam_station['YY'][i] = \ - data_per_station_frequency_beam['std']['YY_real'] - - std_imag_per_beam_station['XX'][i] = \ - data_per_station_frequency_beam['std']['XX_imag'] - std_imag_per_beam_station['XY'][i] = \ - data_per_station_frequency_beam['std']['XY_imag'] - std_imag_per_beam_station['YX'][i] = \ - data_per_station_frequency_beam['std']['YX_imag'] - std_imag_per_beam_station['YY'][i] = \ - data_per_station_frequency_beam['std']['YY_imag'] + average_per_beam_station, std_real_per_beam_station, std_imag_per_beam_station = \ + __prepare_input_to_weight_dataset(stations, frequency, beam_str, input_data_table) for polarization in ['XX', 'XY', 'YX', 'YY']: - _weigthed_average_data_per_polarization(average_per_beam_station, + _weighted_average_data_per_polarization(average_per_beam_station, result_per_beam, std_real_per_beam_station, std_imag_per_beam_station, diff --git a/CAL/CalibrationProcessing/lib/processing/interpolate.py b/CAL/CalibrationProcessing/lib/processing/interpolate.py index 1ce613f4e981e24774c9fd82db5c8d57b6991707..d6be9ab92fc1f26edc8192895d3e12eef31eced5 100644 --- a/CAL/CalibrationProcessing/lib/processing/interpolate.py +++ b/CAL/CalibrationProcessing/lib/processing/interpolate.py @@ -1,13 +1,20 @@ from lofar.calibration.common.datacontainers import HolographyDataset from typing import Generator, Dict, Tuple, Union import numpy +import emcee __POSSIBLE_POLARIZATIONS = ('XX', 'XY', 'YX', 'YY') -def __iterate_gains_over_station_polarization_frequency(dataset: Dict[str, Dict[str,Dict[str, numpy.ndarray]]]) -> Generator[ - Tuple[ - Tuple[str, str], Tuple[float], numpy.ndarray], None, None]: +def __iterate_gains_over_station_polarization_frequency(dataset: Dict[str, + Dict[str, + Dict[str, + numpy.ndarray]]]) -> \ + Generator[ + Tuple[ + Tuple[str, str], + Tuple[float], + numpy.ndarray], None, None]: for station_name, dataset_per_station_name in dataset.items(): sorted_frequencies = sorted(map(float, dataset_per_station_name.keys())) @@ -25,13 +32,77 @@ def __iterate_gains_over_station_polarization_frequency(dataset: Dict[str, Dict[ yield (station_name, polarization), tuple(sorted_frequencies), data_per_polarization -def __reshape_into_gains_per_antenna(dataset: Dict[str, Dict[str,Dict[str, numpy.ndarray]]]): +def __interpolate_gains_per_antenna(dataset: Dict[str, Dict[str, Dict[str, numpy.ndarray]]]): for (station_name, polarization), frequencies, dataset in __iterate_gains_over_station_polarization_frequency(dataset): - print(station_name, polarization, frequencies) + for antenna_id, data_per_antenna in enumerate(dataset): print(antenna_id, data_per_antenna.shape) +def __ln_likelihood(parameters, x, y): + m, b, sigma = parameters + y_model = m * x + b + inv_variance = sigma**-2. + residual = numpy.square(y - y_model) * inv_variance + + ln_likelihood = .5 * numpy.sum(residual) - numpy.log(inv_variance) + return ln_likelihood + + +def __ln_prior(parameters): + m, b, sigma = parameters + if sigma > 0 and (-2*numpy.pi <= b <= 2*numpy.pi): + return 0.0 + return -numpy.inf + + +def __ln_probability(parameters, x, y): + priors = __ln_prior(parameters) + if not numpy.isfinite(priors): + return priors + __ln_likelihood(parameters, x, y) + return -numpy.inf + + +def _first_guess(x, y): + result = numpy.polyfit(x, y, 1) + m = result[0][0] + numpy.random.uniform(-1, 1) * result[0][0] + q = result[1][0] + numpy.random.uniform(-1, 1) * result[1][0] + + residual = y - m * x - q + + sigma = numpy.std(residual) + return numpy.array((m, q, sigma)) + + +def _interpolate_mc_mc(x, y): + dims = 3 + n_walkers = 1000 + relaxing_steps = 500 + + start_position = [_first_guess(x, y) for _ in range(n_walkers)] + + sampler = emcee.EnsembleSampler(n_walkers, dims, __ln_probability, args=(x, y)) + sampler.run_mcmc(start_position, relaxing_steps) + + samples = sampler.chain + + m, q, sigma = numpy.mean(samples[:, :, 0]),\ + numpy.mean(samples[:, :, 1]),\ + numpy.mean(samples[:, :, 2]) + + return m, q, sigma + + +def _interpolate_gains(x, y): + amplitude = numpy.abs(y) + phase = numpy.angle(y) + + amplitude_parameters = _interpolate_mc_mc(x, amplitude) + phase_parameters = _interpolate_mc_mc(x, phase) + + return amplitude_parameters, phase_parameters + + def derive_interpolation_parameters(dataset: Dict[str, Dict[str,Dict[str, numpy.ndarray]]]) -> Dict[str, Dict[str, Dict[str, numpy.ndarray]]]: - __reshape_into_gains_per_antenna(dataset) - return None \ No newline at end of file + __interpolate_gains_per_antenna(dataset) + return None diff --git a/CAL/CalibrationProcessing/lib/processing/normalize.py b/CAL/CalibrationProcessing/lib/processing/normalize.py index d7f39206f8d926a31d2b428ece371f3812715670..3dc6a021f3da4d9fca6f8ee14181f7bfa1ffc93a 100644 --- a/CAL/CalibrationProcessing/lib/processing/normalize.py +++ b/CAL/CalibrationProcessing/lib/processing/normalize.py @@ -1,14 +1,15 @@ import logging -from lofar.calibration.common.datacontainers import HolographyDataset + import numpy +from lofar.calibration.common.datacontainers import HolographyDataset logger = logging.getLogger(__name__) __POLARIZATION_TO_INDEX = { - 'XX': (0,0), - 'XY': (0,1), - 'YX': (1,0), - 'YY': (1,1) + 'XX': (0, 0), + 'XY': (0, 1), + 'YX': (1, 0), + 'YY': (1, 1) } @@ -34,78 +35,107 @@ def normalize_beams_by_central_one(dataset, input_data, central_beamlet_number): data_per_station_per_frequency = data_per_station[frequency_str] normalized_data_per_station_frequency = \ - normalize_crosscorrelation_beams(data_per_station_per_frequency, - central_beamlet_number[frequency_str]) + normalize_crosscorrelation_beams(data_per_station_per_frequency, + central_beamlet_number[frequency_str]) normalized_data_per_station[frequency_str] = normalized_data_per_station_frequency return normalized_data -def normalize_crosscorrelation_beams(data_per_station_frequency, - central_beamlet): +def __invert_central_beam(data_per_station_frequency, central_beamlet): """ - It normalize the crosscorrelation per each beam given the normalization array computed - from the central beam - :param data_per_station_frequency: - :param normalization_array: - :return: + Invert the crosscorrelation of the central beamlet + :param data_per_station_frequency: input data array + :param central_beamlet: index of the central beamlet + :return: if the normalization was successfull and the result of the inversion + :rtype: (bool, numpy.ndarray) """ + is_inversion_matrix_valid = True + normalization_array = None + try: + normalization_array = invert_central_beam_crosscorrelations(data_per_station_frequency, + central_beamlet) + except numpy.linalg.LinAlgError as e: + logger.warning('error inverting central beamlet crosscorrelation for beamlet %s: %s', + central_beamlet, + e) + print('error inverting central beamlet crosscorrelation for beamlet %s: %s' % ( + central_beamlet, + e)) + is_inversion_matrix_valid = False + return is_inversion_matrix_valid, normalization_array + + +def __normalize_data_by_central_beam(normalization_array, data_per_beam, flag_data): + """ + Normalizes the data by the central beam + :param normalization_array: the used to normalize the data + :param data_per_beam: data per beam + :param flag_data: flag the data + :return: the normalized data per beam + """ """ TO avoid copying the data in a different array and then copy the data back I compute directly the product of the two array of matrices - + the resulting cross polarization as a function of time are: XX = XX_b * XX_n + XY_b * YX_n XY = XX_b * XY_n + XY_b * YY_n YX = YX_b * XX_n + YY_b * YX_n YY = YX_b * XY_n + YY_b * YY_n - + where b is the beam and n is the normalization array """ - is_invertion_matrix_valid = True - normalization_array = None - try: - normalization_array = invert_central_beam_crosscorrelations(data_per_station_frequency, - central_beamlet) - except numpy.linalg.LinAlgError as e: - logger.warning('error inverting central beamlet crosscorrelation for beamlet %s: %s', - central_beamlet, - e) - print('error inverting central beamlet crosscorrelation for beamlet %s: %s' % ( - central_beamlet, - e)) - is_invertion_matrix_valid = False + normalized_data = numpy.array(data_per_beam) + + if flag_data: + normalized_data['flag'] = True + return normalized_data + + normalized_data['XX'] = \ + data_per_beam['XX'] * normalization_array[__POLARIZATION_TO_INDEX['XX']] + \ + data_per_beam['XY'] * normalization_array[__POLARIZATION_TO_INDEX['YX']] + normalized_data['XY'] = \ + data_per_beam['XX'] * normalization_array[__POLARIZATION_TO_INDEX['XY']] + \ + data_per_beam['XY'] * normalization_array[__POLARIZATION_TO_INDEX['YY']] + normalized_data['YX'] = \ + data_per_beam['YX'] * normalization_array[__POLARIZATION_TO_INDEX['XX']] + \ + data_per_beam['YY'] * normalization_array[__POLARIZATION_TO_INDEX['YX']] + normalized_data['YY'] = \ + data_per_beam['YX'] * normalization_array[__POLARIZATION_TO_INDEX['XY']] + \ + data_per_beam['YY'] * normalization_array[__POLARIZATION_TO_INDEX['YY']] + return normalized_data + + +def normalize_crosscorrelation_beams(data_per_station_frequency, + central_beamlet): + """ + Normalize the crosscorrelation per each beam given the normalization array computed + from the central beam + :param data_per_station_frequency: input data array + :param central_beamlet: index of the central beamlet + :return: + """ + + is_invertion_matrix_valid, normalization_array = __invert_central_beam( + data_per_station_frequency, + central_beamlet) normalized_data_per_beam = dict() for beam in data_per_station_frequency: - data_per_beam = data_per_station_frequency[beam] - normalized_data = numpy.array(data_per_beam) - normalized_data_per_beam[beam] = normalized_data - - if is_invertion_matrix_valid == False: - normalized_data['flag'] = True - continue - - normalized_data['XX'] = \ - data_per_beam['XX'] * normalization_array[__POLARIZATION_TO_INDEX['XX']] + \ - data_per_beam['XY'] * normalization_array[__POLARIZATION_TO_INDEX['YX']] - normalized_data['XY'] = \ - data_per_beam['XX'] * normalization_array[__POLARIZATION_TO_INDEX['XY']] + \ - data_per_beam['XY'] * normalization_array[__POLARIZATION_TO_INDEX['YY']] - normalized_data['YX'] = \ - data_per_beam['YX'] * normalization_array[__POLARIZATION_TO_INDEX['XX']] + \ - data_per_beam['YY'] * normalization_array[__POLARIZATION_TO_INDEX['YX']] - normalized_data['YY'] = \ - data_per_beam['YX'] * normalization_array[__POLARIZATION_TO_INDEX['XY']] + \ - data_per_beam['YY'] * normalization_array[__POLARIZATION_TO_INDEX['YY']] + normalized_data_per_beam[beam] = \ + __normalize_data_by_central_beam(normalization_array, + data_per_beam, + not is_invertion_matrix_valid) + return normalized_data_per_beam def invert_central_beam_crosscorrelations(data_per_station_per_frequency, central_beamlet_number): data_per_central_beamlet = __extract_crosscorrelation_matrices_from_data( - data_per_station_per_frequency[central_beamlet_number]) + data_per_station_per_frequency[central_beamlet_number]) return __invert_crosscorrelation_matrices(data_per_central_beamlet) @@ -134,4 +164,3 @@ def __invert_crosscorrelation_matrices(cross_correlation_matrices): result_array = numpy.rollaxis(numpy.linalg.inv(cross_correlation_matrices), 0, 3) return result_array - diff --git a/CAL/CalibrationProcessing/lib/processing/solver.py b/CAL/CalibrationProcessing/lib/processing/solver.py index f1c8258ddc8a1dbf02d3c8f95c3b04f0d0e0d6a9..23afbd020cd504d4e4f0c3bd19622dc66e6752ab 100644 --- a/CAL/CalibrationProcessing/lib/processing/solver.py +++ b/CAL/CalibrationProcessing/lib/processing/solver.py @@ -114,6 +114,24 @@ def _convert_visibilities_to_real(visibilities): return visibilities_real +def __convert_real_gains_to_complex(result): + """ + Convert the real values of the gains back into the complex and computes + the relative error + :param result: gain computed with the complex to real transformation + :return: the complex array with the gains + """ + if not result['flag']: + gains = result['gains'] + gains = gains[0::2] + 1.j * gains[1::2] + result['gains'] = gains + + noise = result['relative_error'] + noise = numpy.sqrt(noise[0::2].A1 ** 2. + noise[1::2].A1 ** 2.) + result["relative_error"] = noise + return result + + def _solve_gains_complex(visibilities, matrix, **kwargs): """ To solve a complex linear system of equation it is possible to rewrite it in a equivalent @@ -126,15 +144,8 @@ def _solve_gains_complex(visibilities, matrix, **kwargs): visibilities_real = _convert_visibilities_to_real(visibilities) - result = _solve_gains(visibilities_real, matrix_real, **kwargs) - if result['flag'] is False: - gains = result['gains'] - gains = gains[0::2] + 1.j * gains[1::2] - result['gains'] = gains - - noise = result['relative_error'] - noise = numpy.sqrt(noise[0::2].A1**2. + noise[1::2].A1**2.) - result["relative_error"] = noise + result_real = _solve_gains(visibilities_real, matrix_real, **kwargs) + result = __convert_real_gains_to_complex(result_real) return result @@ -161,10 +172,10 @@ def _invert_matrix_direct(matrix, visibilities, rcond=None): def _invert_matrix_mcmc(matrix, visibilities, **kwargs): def lnprob(parameters): gains, sigmas = parameters - return -abs(matrix * gains - visibilities)**2/sigmas**2 + return -abs(matrix * gains - visibilities) ** 2 / sigmas ** 2 ndim, nwalkers = 3, 100 - #pos = [result["x"] + 1e-4 * np.random.randn(ndim) for i in range(nwalkers)] + # pos = [result["x"] + 1e-4 * np.random.randn(ndim) for i in range(nwalkers)] raise NotImplementedError() @@ -181,7 +192,7 @@ def _invert_matrix(matrix, visibilities, type='LSTSQR', **kwargs): SOLUTION_METHODS = ['LSTSQR', 'MCMC', 'DIRECT'] if type not in SOLUTION_METHODS: raise ValueError('wrong type of solution method specified. Alternatives are: %s' - % SOLUTION_METHODS ) + % SOLUTION_METHODS) if type is 'LSTSQR': return _invert_matrix_lstsqr(matrix, visibilities, **kwargs) @@ -190,6 +201,7 @@ def _invert_matrix(matrix, visibilities, type='LSTSQR', **kwargs): elif type is 'DIRECT': return _invert_matrix_direct(matrix, visibilities, **kwargs) + def _solve_gains(visibilities, matrix, **kwargs): """ SOLVE THE EQUATION M * G = V FOR G @@ -207,7 +219,7 @@ def _solve_gains(visibilities, matrix, **kwargs): noise = abs(matrix * gains - visibilities) / abs(visibilities) return dict(gains=gains, residual=residual, relative_error=noise, - flag=False) + flag=numpy.array(False)) except numpy.linalg.LinAlgError as e: logger.warning('error solving for the gains: %s', e) __empty = dict(gains=numpy.array(numpy.nan), @@ -232,7 +244,8 @@ def _solve_gains_per_frequency(dataset, datatable, frequency, direct_complex=Fal result = dict() for polarization in ['XX', 'XY', 'YX', 'YY']: - visibilities = numpy.matrix([datatable[str(i)]['mean'][polarization] for i in range(n_beams)]) + visibilities = numpy.matrix( + [datatable[str(i)]['mean'][polarization] for i in range(n_beams)]) if direct_complex is True: result[polarization] = _solve_gains(visibilities, matrix, **kwargs) else: diff --git a/CAL/CalibrationProcessing/test/processing/t_interpolate.py b/CAL/CalibrationProcessing/test/processing/t_interpolate.py index bd8139a7ee914b1d011d89714608b8592731cb96..cae0b251ea4aba9814f59823157610d4bb378749 100644 --- a/CAL/CalibrationProcessing/test/processing/t_interpolate.py +++ b/CAL/CalibrationProcessing/test/processing/t_interpolate.py @@ -3,10 +3,30 @@ import unittest import lofar.calibration.processing.interpolate as interpolate import lofar.calibration.testing.utils as utils +import corner + import numpy from typing import Dict +def generate_signal(nu, slope, intercept, noise): + signal_amplitude_slope = numpy.abs(slope) + signal_phase_slope = numpy.angle(slope) + + noise_amplitude = numpy.abs(noise) + noise_phase = numpy.angle(noise) + + signal = (signal_amplitude_slope * nu + + abs(intercept) + + numpy.random.uniform(-noise_amplitude, noise_amplitude, 1)) * numpy.exp(( + signal_phase_slope * nu + + numpy.angle(intercept) + + numpy.random.uniform(-noise_phase, noise_phase, 1) + )*1.j) + + return signal + + def generate_test_data(slope: complex, intercept: complex, noise_amplitude: complex) -> Dict[str, Dict[str, Dict[str, numpy.ndarray]]]: @@ -28,7 +48,7 @@ def generate_test_data(slope: complex, n_antennas = 40 data = lambda nu: numpy.zeros(n_antennas, dtype=numpy.complexfloating) + \ - slope * nu + noise_amplitude * numpy.std(1) + intercept + generate_signal(nu, slope, intercept, noise_amplitude) return { station: { str(frequency): { @@ -42,13 +62,40 @@ def generate_test_data(slope: complex, class TestInterpolateStep(unittest.TestCase): + @unittest.skip def test_interpolation(self): - data_table = generate_test_data(slope=1.0 + 2.j, - intercept= 4.0 + 5.j, - noise_amplitude=1.0 + 3.j + data_table = generate_test_data(slope=1.0 + .01j, + intercept= 4.0 + 2.j, + noise_amplitude=1.0 + .1j ) interpolate.derive_interpolation_parameters(data_table) + def test_mc_interpolation(self): + numpy.random.seed(1) + frequencies = numpy.linspace(0, 10, 15) + slope = 10 * numpy.exp(+.05j) + intercept = 1. * numpy.exp(-.4j) + noise_amplitude = 3.*numpy.exp(.001j) + + signal = [generate_signal(frequency, slope, intercept, noise_amplitude) for frequency in frequencies] + + amplitude_par, phase_par = interpolate._interpolate_gains(frequencies, signal) + + amplitude = numpy.abs(signal) + phase = numpy.angle(signal) + + import matplotlib.pyplot as plt + plt.figure('amplitude') + plt.plot(frequencies, amplitude, 'o') + plt.plot(frequencies, amplitude_par[0] * frequencies + amplitude_par[1], '-') + + plt.figure('phase') + plt.plot(frequencies, phase, 'x') + plt.plot(frequencies, phase_par[0] * frequencies + phase_par[1], 'b-') + parameters = numpy.polyfit(frequencies, phase, 1) + plt.plot(frequencies, parameters[0] * frequencies + parameters[1], 'g-') + + plt.show() if __name__ == '__main__': logging.basicConfig(format='%(name)s : %(message)s') diff --git a/CAL/CalibrationProcessing/test/processing/t_interpolate.run b/CAL/CalibrationProcessing/test/processing/t_interpolate.run index f0d4c8fd7fafb8017877972a0e41bcef4c6d5bf0..574f0f8986acb6a05ebfb7e30dd5de65041644a8 100755 --- a/CAL/CalibrationProcessing/test/processing/t_interpolate.run +++ b/CAL/CalibrationProcessing/test/processing/t_interpolate.run @@ -19,4 +19,4 @@ # Run the unit test source python-coverage.sh -python_coverage_test "*interpolate*" t_interpolate.py +python_coverage_test "*processing/interpolate*" t_interpolate.py diff --git a/CAL/CalibrationProcessing/test/processing/t_processing.py b/CAL/CalibrationProcessing/test/processing/t_processing.py index 128dd2dec570dfd5df38d40d000e22d59440212d..0e0453994e0ef3eb36558ab10565de9df9bc3bd7 100755 --- a/CAL/CalibrationProcessing/test/processing/t_processing.py +++ b/CAL/CalibrationProcessing/test/processing/t_processing.py @@ -23,7 +23,7 @@ def is_sample_dataset_available(): class TestSignalGenerator(unittest.TestCase): - def test_gaussian_noise_generation(self): + def test_gaussian_noise_generation_produce_expected_results(self): test_average_value = 5. + 12j test_real_std = 12 test_imag_std = 3 @@ -37,7 +37,7 @@ class TestSignalGenerator(unittest.TestCase): self.assertGreater(abs(numpy.average(sample_noise).real / test_average_value.real), .9) self.assertGreater(abs(numpy.average(sample_noise).imag / test_average_value.imag), .9) - def test_signal_generator(self): + def test_signal_generator_produce_expected_linear_trend_with_expected_noise(self): test_imag_coeff = 3.j test_real_coeff = 2. expected_dependencies_imag_real = test_imag_coeff / test_real_coeff