Skip to content
Snippets Groups Projects
Select Git revision
  • a725df0a4705b34d3b4ff017ad44c6e472a2afab
  • MCCS-163 default
  • main
  • sar-277-update-docs-with-examples-for-lrc
  • st-946-automate
  • sar_302-log-fix
  • sar-287_subarray_commands_to_lrc
  • sar_302-POC_await_sub_device_state
  • sat_302_fix_pipelines
  • sar-286_lrc_one_subarry_command
  • sar-286_lrc_improvements
  • sar-288-async-controller
  • sar-276-combine-tango-queue
  • sar-255_remove_nexus_reference
  • sar-275-add-LRC
  • sar-273-add-lrc-attributes
  • sar-272
  • sp-1106-marvin-1230525148-ska-tango-base
  • sp-1106-marvin-813091765-ska-tango-base
  • sar-255/Publish-package-to-CAR
  • mccs-661-device-under-test-fixture
  • mccs-659-pep257-docstring-linting
  • 0.11.3
  • 0.11.2
  • 0.11.1
  • 0.11.0
  • 0.10.1
  • 0.10.0
  • 0.9.1
  • 0.9.0
  • 0.8.1
  • 0.8.0
  • 0.7.2
  • 0.7.1
  • 0.7.0
  • 0.6.6
  • 0.6.5
  • 0.6.4
  • 0.6.3
  • 0.6.2
  • 0.6.1
  • 0.6.0
42 results

test_task_queue_manager.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    solver.py 9.33 KiB
    import logging
    
    import lofar.calibration.common.datacontainers as datacontainers
    import numpy
    from scipy.constants import c as LIGHT_SPEED
    
    logger = logging.getLogger(__name__)
    
    _FRQ_TO_PHASE = 2. * numpy.pi / LIGHT_SPEED
    
    
    def _rotate_antenna_coordinates(dataset):
        """
        Rotate the antenna coordinates to be aligned to the station orientation
        :param dataset: input dataset
        :type dataset: datacontainers.HolographyDataset
        :return: the rotated coordinate system
        :rtype: numpy.matrix
        """
        station_position = numpy.array(dataset.target_station_position)
        antenna_field_position = numpy.array(dataset.antenna_field_position)
    
        antenna_position_offset = numpy.array([antenna_position - station_position for
                                               antenna_position in antenna_field_position])
    
        station_rotation_matrix = dataset.rotation_matrix
        return numpy.dot(station_rotation_matrix, antenna_position_offset.T).T
    
    
    def _compute_expected_phase_delay(l, m, x, y, frequency):
        """
        Convert the antenna position (x,y) and the pointing cosines (l,m) into a phase delay for
        the given frequency
        :param l: l pointing cosine
        :param m: m pointing cosine
        :param x: x displacement of the antenna
        :param y: y displacement of the antenna
        :param frequency: frequency under consideration
        :return:
        """
        phase = (x * l + y * m) * _FRQ_TO_PHASE * frequency
        return phase
    
    
    def _compute_pointing_matrices_per_station_frequency_beam(dataset, datatable, frequency):
        """
        Compute the pointing matrix of a given station, at a given frequency and for a specific beam.
        :param dataset: datatable's dataset
        :type dataset: datacontainers.HolographyDataset
        :param datatable: input data
        :type datatable: dict(dict(numpy.ndarray))
        :param frequency: frequency at which to compute the pointing matrix
        :type frequency: float
        :return: the pointing matrix
        :rtype: numpy.matrix
        """
    
        rotated_coordinates = _rotate_antenna_coordinates(dataset)
        n_antennas = len(dataset.antenna_field_position)
        one_over_n_antennas = 1. / float(n_antennas)
        n_beams = len(datatable)
    
        pointing_matrix = numpy.matrix(numpy.zeros((n_beams, n_antennas), dtype=complex))
    
        for i in range(n_beams):
            for j in range(n_antennas):
                l = datatable[str(i)]['mean']['l'][0]
                m = datatable[str(i)]['mean']['m'][0]
                x, y = rotated_coordinates[j, 0:2]
    
                phase = _compute_expected_phase_delay(l, m, x, y, frequency)
                pointing_matrix[i, j] = numpy.exp(-1.j * phase) * one_over_n_antennas
    
        return pointing_matrix
    
    
    def _convert_pointing_matrix_to_real(matrix):
        """
        Convert a complex matrix in the equivalent real one.
        Ex.
        [re1 + j * im1  , re2 + j * im2 ;        [ re1, -im1,  re2, -im2]
                                            ~>    [ im1,  re1,  im2,  re2]
         re3 + j * im3  , re4 + j * im4 ]        [ re3, -im3,  re4, -im4]
                                                 [ re3,  re3,  re$, -im4]
    
        :param matrix: input matrix complex valued
        :type matrix: numpy.matrix
        :return: output matrix real valued
        :rtype: numpy.matrix
        """
        matrix_real = numpy.matrix(numpy.zeros((matrix.shape[0] * 2, matrix.shape[1] * 2)))
        matrix_real[0::2, 0::2] = matrix.real
        matrix_real[0::2, 1::2] = -matrix.imag
        matrix_real[1::2, 0::2] = matrix.real
        matrix_real[1::2, 1::2] = matrix.imag
        return matrix_real
    
    
    def _convert_visibilities_to_real(visibilities):
        """
        Convert a complex matrix in the equivalent real one.
        Ex.
        [re1 + j * im1  , re2 + j * im2]   ~>    [ re1, im1,  re2, im2]
    
        :param visibilities: array with the visibilities
        :type visibilities: numpy.matrix
        :return: the real valued matrix equivalent to the input one
        :rtype: numpy.matrix
        """
        visibilities_real = numpy.matrix(numpy.zeros(visibilities.shape[0] * 2)).T
        visibilities_real[0::2] = visibilities.real
        visibilities_real[1::2] = visibilities.imag
    
        return visibilities_real
    
    
    def _solve_gains_complex(visibilities, matrix, **kwargs):
        """
        To solve a complex linear system of equation it is possible to rewrite it in a equivalent
        real system.
        :param matrix:
        :param visibilities:
        :return:
        """
        matrix_real = _convert_pointing_matrix_to_real(matrix)
    
        visibilities_real = _convert_visibilities_to_real(visibilities)
    
        result = _solve_gains(visibilities_real, matrix_real, **kwargs)
        if result['flag'] is False:
            gains = result['gains']
            gains = gains[0::2] + 1.j * gains[1::2]
            result['gains'] = gains
    
            noise = result['relative_error']
            noise = numpy.sqrt(noise[0::2].A1**2. + noise[1::2].A1**2.)
            result["relative_error"] = noise
        return result
    
    
    def _invert_matrix_lstsqr(matrix, visibilities, rcond=None):
        try:
            gains, residual, _, _ = numpy.linalg.lstsq(matrix, visibilities, rcond=rcond)
        except ValueError as e:
            raise numpy.linalg.LinAlgError(e)
        return gains, residual
    
    
    def _invert_matrix_direct(matrix, visibilities, rcond=None):
        try:
    
            gains = (matrix.T * matrix).I * matrix.T * visibilities
            # IS the same as : gains = matrix.I * visibilities
            residual = 0
    
        except ValueError as e:
            raise numpy.linalg.LinAlgError(e)
        return gains, residual
    
    
    def _invert_matrix_mcmc(matrix, visibilities, **kwargs):
        def lnprob(parameters):
            gains, sigmas = parameters
            return -abs(matrix * gains - visibilities)**2/sigmas**2
    
        ndim, nwalkers = 3, 100
        #pos = [result["x"] + 1e-4 * np.random.randn(ndim) for i in range(nwalkers)]
        raise NotImplementedError()
    
    
    def _invert_matrix(matrix, visibilities, type='LSTSQR', **kwargs):
        """
        Invert the pointing to find the gains from the visibilities.
        It is possible to specify the type of solution method applied through the parameter type
        :param matrix: pointing matrix
        :param visibilities: visibilities for each pointing
        :param type: type of methodology used to invert the matrix
        :param kwargs: additional parameters for the solution method
        :return:
        """
        SOLUTION_METHODS = ['LSTSQR', 'MCMC', 'DIRECT']
        if type not in SOLUTION_METHODS:
            raise ValueError('wrong type of solution method specified. Alternatives are: %s'
                             % SOLUTION_METHODS )
    
        if type is 'LSTSQR':
            return _invert_matrix_lstsqr(matrix, visibilities, **kwargs)
        elif type is 'MCMC':
            return _invert_matrix_mcmc(matrix, visibilities, **kwargs)
        elif type is 'DIRECT':
            return _invert_matrix_direct(matrix, visibilities, **kwargs)
    
    def _solve_gains(visibilities, matrix, **kwargs):
        """
        SOLVE THE EQUATION M * G = V FOR G
        where M is the pointing matrix
              G are the gains per antenna
              V are the visibilities
        :param visibilities: the visibility computed for each pointing
        :type visibilities: numpy.matrix
        :param matrix: the pointing matrix containing the phase delay for each pointing and antenna
        :type matrix: numpy.matrix
        :return: the gains for each antenna
        """
        try:
            gains, residual = _invert_matrix(matrix, visibilities, **kwargs)
            noise = abs(matrix * gains - visibilities) / abs(visibilities)
    
            return dict(gains=gains, residual=residual, relative_error=noise,
                        flag=False)
        except numpy.linalg.LinAlgError as e:
            logger.warning('error solving for the gains: %s', e)
            __empty = dict(gains=numpy.array(numpy.nan),
                           residual=numpy.array(numpy.nan),
                           relative_error=numpy.array(numpy.nan),
                           flag=numpy.array(True))
            return __empty
    
    
    def _solve_gains_per_frequency(dataset, datatable, frequency, direct_complex=False, **kwargs):
        """
        SOLVE THE EQUATION M * G = V FOR G
        :param dataset:
        :type dataset: datacontainers.HolographyDataset
        :param datatable:
        :param frequency:
        :type frequency: float
        :return:
        """
        matrix = _compute_pointing_matrices_per_station_frequency_beam(dataset, datatable, frequency)
        n_beams = len(datatable)
    
        result = dict()
        for polarization in ['XX', 'XY', 'YX', 'YY']:
            visibilities = numpy.matrix([datatable[str(i)]['mean'][polarization] for i in range(n_beams)])
            if direct_complex is True:
                result[polarization] = _solve_gains(visibilities, matrix, **kwargs)
            else:
                result[polarization] = _solve_gains_complex(visibilities, matrix, **kwargs)
    
        return result
    
    
    def solve_gains_per_datatable(dataset, datatable, **kwargs):
        """
        Solve for the gains the given datatable
        :param dataset: dataset containing the specified datatable
        :type dataset: datacontainers.HolographyDataset
        :param datatable: table containing the data
        :type dataset: dict(dict(numpy.ndarray))
        :return: a nested dict containing the gain matrix per station, frequency, polarization
        :rtype: dict(dict(dict(numpy.ndarray)))
        """
        result = dict()
    
        for station in datatable:
            result_per_station = dict()
            result[station] = result_per_station
            data_per_station = datatable[station]
            for frequency in data_per_station:
                data_per_frequency = data_per_station[frequency]
                result_per_station[str(frequency)] = \
                    _solve_gains_per_frequency(dataset, data_per_frequency, float(frequency), **kwargs)
    
        return result