Skip to content
Snippets Groups Projects
Select Git revision
  • b260f98d738096d9ec9777691ba3fc8afb3722cd
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

tmss_test_data_rest.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    solver.py 9.33 KiB
    import logging
    
    import lofar.calibration.common.datacontainers as datacontainers
    import numpy
    from scipy.constants import c as LIGHT_SPEED
    
    logger = logging.getLogger(__name__)
    
    _FRQ_TO_PHASE = 2. * numpy.pi / LIGHT_SPEED
    
    
    def _rotate_antenna_coordinates(dataset):
        """
        Rotate the antenna coordinates to be aligned to the station orientation
        :param dataset: input dataset
        :type dataset: datacontainers.HolographyDataset
        :return: the rotated coordinate system
        :rtype: numpy.matrix
        """
        station_position = numpy.array(dataset.target_station_position)
        antenna_field_position = numpy.array(dataset.antenna_field_position)
    
        antenna_position_offset = numpy.array([antenna_position - station_position for
                                               antenna_position in antenna_field_position])
    
        station_rotation_matrix = dataset.rotation_matrix
        return numpy.dot(station_rotation_matrix, antenna_position_offset.T).T
    
    
    def _compute_expected_phase_delay(l, m, x, y, frequency):
        """
        Convert the antenna position (x,y) and the pointing cosines (l,m) into a phase delay for
        the given frequency
        :param l: l pointing cosine
        :param m: m pointing cosine
        :param x: x displacement of the antenna
        :param y: y displacement of the antenna
        :param frequency: frequency under consideration
        :return:
        """
        phase = (x * l + y * m) * _FRQ_TO_PHASE * frequency
        return phase
    
    
    def _compute_pointing_matrices_per_station_frequency_beam(dataset, datatable, frequency):
        """
        Compute the pointing matrix of a given station, at a given frequency and for a specific beam.
        :param dataset: datatable's dataset
        :type dataset: datacontainers.HolographyDataset
        :param datatable: input data
        :type datatable: dict(dict(numpy.ndarray))
        :param frequency: frequency at which to compute the pointing matrix
        :type frequency: float
        :return: the pointing matrix
        :rtype: numpy.matrix
        """
    
        rotated_coordinates = _rotate_antenna_coordinates(dataset)
        n_antennas = len(dataset.antenna_field_position)
        one_over_n_antennas = 1. / float(n_antennas)
        n_beams = len(datatable)
    
        pointing_matrix = numpy.matrix(numpy.zeros((n_beams, n_antennas), dtype=complex))
    
        for i in range(n_beams):
            for j in range(n_antennas):
                l = datatable[str(i)]['mean']['l'][0]
                m = datatable[str(i)]['mean']['m'][0]
                x, y = rotated_coordinates[j, 0:2]
    
                phase = _compute_expected_phase_delay(l, m, x, y, frequency)
                pointing_matrix[i, j] = numpy.exp(-1.j * phase) * one_over_n_antennas
    
        return pointing_matrix
    
    
    def _convert_pointing_matrix_to_real(matrix):
        """
        Convert a complex matrix in the equivalent real one.
        Ex.
        [re1 + j * im1  , re2 + j * im2 ;        [ re1, -im1,  re2, -im2]
                                            ~>    [ im1,  re1,  im2,  re2]
         re3 + j * im3  , re4 + j * im4 ]        [ re3, -im3,  re4, -im4]
                                                 [ re3,  re3,  re$, -im4]
    
        :param matrix: input matrix complex valued
        :type matrix: numpy.matrix
        :return: output matrix real valued
        :rtype: numpy.matrix
        """
        matrix_real = numpy.matrix(numpy.zeros((matrix.shape[0] * 2, matrix.shape[1] * 2)))
        matrix_real[0::2, 0::2] = matrix.real
        matrix_real[0::2, 1::2] = -matrix.imag
        matrix_real[1::2, 0::2] = matrix.real
        matrix_real[1::2, 1::2] = matrix.imag
        return matrix_real
    
    
    def _convert_visibilities_to_real(visibilities):
        """
        Convert a complex matrix in the equivalent real one.
        Ex.
        [re1 + j * im1  , re2 + j * im2]   ~>    [ re1, im1,  re2, im2]
    
        :param visibilities: array with the visibilities
        :type visibilities: numpy.matrix
        :return: the real valued matrix equivalent to the input one
        :rtype: numpy.matrix
        """
        visibilities_real = numpy.matrix(numpy.zeros(visibilities.shape[0] * 2)).T
        visibilities_real[0::2] = visibilities.real
        visibilities_real[1::2] = visibilities.imag
    
        return visibilities_real
    
    
    def _solve_gains_complex(visibilities, matrix, **kwargs):
        """
        To solve a complex linear system of equation it is possible to rewrite it in a equivalent
        real system.
        :param matrix:
        :param visibilities:
        :return:
        """
        matrix_real = _convert_pointing_matrix_to_real(matrix)
    
        visibilities_real = _convert_visibilities_to_real(visibilities)
    
        result = _solve_gains(visibilities_real, matrix_real, **kwargs)
        if result['flag'] is False:
            gains = result['gains']
            gains = gains[0::2] + 1.j * gains[1::2]
            result['gains'] = gains
    
            noise = result['relative_error']
            noise = numpy.sqrt(noise[0::2].A1**2. + noise[1::2].A1**2.)
            result["relative_error"] = noise
        return result
    
    
    def _invert_matrix_lstsqr(matrix, visibilities, rcond=None):
        try:
            gains, residual, _, _ = numpy.linalg.lstsq(matrix, visibilities, rcond=rcond)
        except ValueError as e:
            raise numpy.linalg.LinAlgError(e)
        return gains, residual
    
    
    def _invert_matrix_direct(matrix, visibilities, rcond=None):
        try:
    
            gains = (matrix.T * matrix).I * matrix.T * visibilities
            # IS the same as : gains = matrix.I * visibilities
            residual = 0
    
        except ValueError as e:
            raise numpy.linalg.LinAlgError(e)
        return gains, residual
    
    
    def _invert_matrix_mcmc(matrix, visibilities, **kwargs):
        def lnprob(parameters):
            gains, sigmas = parameters
            return -abs(matrix * gains - visibilities)**2/sigmas**2
    
        ndim, nwalkers = 3, 100
        #pos = [result["x"] + 1e-4 * np.random.randn(ndim) for i in range(nwalkers)]
        raise NotImplementedError()
    
    
    def _invert_matrix(matrix, visibilities, type='LSTSQR', **kwargs):
        """
        Invert the pointing to find the gains from the visibilities.
        It is possible to specify the type of solution method applied through the parameter type
        :param matrix: pointing matrix
        :param visibilities: visibilities for each pointing
        :param type: type of methodology used to invert the matrix
        :param kwargs: additional parameters for the solution method
        :return:
        """
        SOLUTION_METHODS = ['LSTSQR', 'MCMC', 'DIRECT']
        if type not in SOLUTION_METHODS:
            raise ValueError('wrong type of solution method specified. Alternatives are: %s'
                             % SOLUTION_METHODS )
    
        if type is 'LSTSQR':
            return _invert_matrix_lstsqr(matrix, visibilities, **kwargs)
        elif type is 'MCMC':
            return _invert_matrix_mcmc(matrix, visibilities, **kwargs)
        elif type is 'DIRECT':
            return _invert_matrix_direct(matrix, visibilities, **kwargs)
    
    def _solve_gains(visibilities, matrix, **kwargs):
        """
        SOLVE THE EQUATION M * G = V FOR G
        where M is the pointing matrix
              G are the gains per antenna
              V are the visibilities
        :param visibilities: the visibility computed for each pointing
        :type visibilities: numpy.matrix
        :param matrix: the pointing matrix containing the phase delay for each pointing and antenna
        :type matrix: numpy.matrix
        :return: the gains for each antenna
        """
        try:
            gains, residual = _invert_matrix(matrix, visibilities, **kwargs)
            noise = abs(matrix * gains - visibilities) / abs(visibilities)
    
            return dict(gains=gains, residual=residual, relative_error=noise,
                        flag=False)
        except numpy.linalg.LinAlgError as e:
            logger.warning('error solving for the gains: %s', e)
            __empty = dict(gains=numpy.array(numpy.nan),
                           residual=numpy.array(numpy.nan),
                           relative_error=numpy.array(numpy.nan),
                           flag=numpy.array(True))
            return __empty
    
    
    def _solve_gains_per_frequency(dataset, datatable, frequency, direct_complex=False, **kwargs):
        """
        SOLVE THE EQUATION M * G = V FOR G
        :param dataset:
        :type dataset: datacontainers.HolographyDataset
        :param datatable:
        :param frequency:
        :type frequency: float
        :return:
        """
        matrix = _compute_pointing_matrices_per_station_frequency_beam(dataset, datatable, frequency)
        n_beams = len(datatable)
    
        result = dict()
        for polarization in ['XX', 'XY', 'YX', 'YY']:
            visibilities = numpy.matrix([datatable[str(i)]['mean'][polarization] for i in range(n_beams)])
            if direct_complex is True:
                result[polarization] = _solve_gains(visibilities, matrix, **kwargs)
            else:
                result[polarization] = _solve_gains_complex(visibilities, matrix, **kwargs)
    
        return result
    
    
    def solve_gains_per_datatable(dataset, datatable, **kwargs):
        """
        Solve for the gains the given datatable
        :param dataset: dataset containing the specified datatable
        :type dataset: datacontainers.HolographyDataset
        :param datatable: table containing the data
        :type dataset: dict(dict(numpy.ndarray))
        :return: a nested dict containing the gain matrix per station, frequency, polarization
        :rtype: dict(dict(dict(numpy.ndarray)))
        """
        result = dict()
    
        for station in datatable:
            result_per_station = dict()
            result[station] = result_per_station
            data_per_station = datatable[station]
            for frequency in data_per_station:
                data_per_frequency = data_per_station[frequency]
                result_per_station[str(frequency)] = \
                    _solve_gains_per_frequency(dataset, data_per_frequency, float(frequency), **kwargs)
    
        return result