Skip to content
Snippets Groups Projects

Tossed Together Nearfield

  • Clone with SSH
  • Clone with HTTPS
  • Embed
  • Share
    The snippet can be accessed without any authentication.
    Authored by David McKenna
    Edited
    quick_nearfield.py 8.37 KiB
    # Cobbled together from lofarimaging and lofty
    from collections import defaultdict
    import numpy as np
    
    from tango import DeviceProxy, DevSource
    from lofarantpos.db import LofarAntennaDatabase
    import numexpr as ne
    import matplotlib.pyplot as plt
    
    # Configurations for HBA observations with a single dipole activated per tile.
    GENERIC_INT_201512 = [0, 5, 3, 1, 8, 3, 12, 15, 10, 13, 11, 5, 12, 12, 5, 2, 10, 8, 0, 3, 5, 1, 4, 0, 11, 6, 2, 4, 9,
                          14, 15, 3, 7, 5, 13, 15, 5, 6, 5, 12, 15, 7, 1, 1, 14, 9, 4, 9, 3, 9, 3, 13, 7, 14, 7, 14, 2, 8,
                          8, 0, 1, 4, 2, 2, 12, 15, 5, 7, 6, 10, 12, 3, 3, 12, 7, 4, 6, 0, 5, 9, 1, 10, 10, 11, 5, 11, 7, 9,
                          7, 6, 4, 4, 15, 4, 1, 15]
    GENERIC_CORE_201512 = [0, 10, 4, 3, 14, 0, 5, 5, 3, 13, 10, 3, 12, 2, 7, 15, 6, 14, 7, 5, 7, 9, 0, 15, 0, 10, 4, 3, 14,
                           0, 5, 5, 3, 13, 10, 3, 12, 2, 7, 15, 6, 14, 7, 5, 7, 9, 0, 15]
    GENERIC_REMOTE_201512 = [0, 13, 12, 4, 11, 11, 7, 8, 2, 7, 11, 2, 10, 2, 6, 3, 8, 3, 1, 7, 1, 15, 13, 1, 11, 1, 12, 7,
                             10, 15, 8, 2, 12, 13, 9, 13, 4, 5, 5, 12, 5, 5, 9, 11, 15, 12, 2, 15]
    SPEED_OF_LIGHT = 299792458.0
    
    def get_station_type(station_name: str) -> str:
        """
        Get the station type, one of 'intl', 'core' or 'remote'
    
        Args:
            station_name: Station name, e.g. "DE603LBA" or just "DE603"
    
        Returns:
            str: station type, one of 'intl', 'core' or 'remote'
    
        Example:
            >>> get_station_type("DE603")
            'intl'
        """
        if station_name[0] == "C":
            return "core"
        elif station_name[0] == "R" or station_name[:5] == "PL611":
            return "remote"
        else:
            return "intl"
    
    
    def get_station_pqr(full_station_name: str, activation_pattern: bool | str = False):
        """
        Get PQR coordinates for the relevant subset of antennas in a station.
    
        Args:
            station_name: Station name, e.g. 'DE603LBA' or 'DE603'
            rcu_mode: RCU mode (0 - 6, can be string)
            db: instance of LofarAntennaDatabase from lofarantpos
    
        Example:
            >>> from lofarantpos.db import LofarAntennaDatabase
            >>> db = LofarAntennaDatabase()
            >>> pqr = get_station_pqr("DE603", "outer", db)
            >>> pqr.shape
            (96, 3)
            >>> pqr[0, 0]
            1.7434713
    
            >>> pqr = get_station_pqr("LV614", "5", db)
            >>> pqr.shape
            (96, 3)
        """
        db = LofarAntennaDatabase()
        station_type = get_station_type(full_station_name)
    
        if 'LBA' in full_station_name or not activation_pattern:
            station_pqr = db.antenna_pqr(full_station_name)
        elif 'HBA' in full_station_name:
            antenna_set = activation_pattern
            selected_dipole_config = {
                'intl': GENERIC_INT_201512, 'remote': GENERIC_REMOTE_201512, 'core': GENERIC_CORE_201512
            }
            selected_dipoles = selected_dipole_config[station_type] + \
                np.arange(len(selected_dipole_config[station_type])) * 16
            print(selected_dipoles.shape, db.hba_dipole_pqr(full_station_name).shape, selected_dipoles, db.hba_dipole_pqr(full_station_name))
            if antenna_set == "HBA_SINGLE":
                station_pqr = db.hba_dipole_pqr(full_station_name)[selected_dipoles]
            elif antenna_set == "HBA0_SINGLE":
                station_pqr = db.hba_dipole_pqr(full_station_name)[selected_dipoles[:24]]
            elif antenna_set == "HBA1_SINGLE":
                station_pqr = db.hba_dipole_pqr(full_station_name)[selected_dipoles[24:] - 16 * 24]
            else:
                raise RuntimeError("AAAAAAAAAAAAAA")
        else:
            raise RuntimeError("Station name did not contain LBA or HBA, could not load antenna positions")
    
        return station_pqr.astype('float32')
    
    def nearfield_imager(visibilities, baseline_indices, freqs, npix_p, npix_q, extent, station_pqr, height=1.5,
                         max_memory_mb=200):
        """
        Nearfield imager
    
        Args:
            visibilities: Numpy array with visibilities, shape [num_visibilities x num_frequencies]
            baseline_indices: List with tuples of antenna numbers in visibilities, shape [2 x num_visibilities]
            freqs: List of frequencies
            npix_p: Number of pixels in p-direction
            npix_q: Number of pixels in q-direction
            extent: Extent (in m) that the image should span
            station_pqr: PQR coordinates of stations
            height: Height of image in metre
            max_memory_mb: Maximum amount of memory to use for the biggest array. Higher may improve performance.
    
        Returns:
            np.array(complex): Complex valued array of shape [npix_p, npix_q]
        """
        z = height
        x = np.linspace(extent[0], extent[1], npix_p)
        y = np.linspace(extent[2], extent[3], npix_q)
    
        posx, posy = np.meshgrid(x, y)
        posxyz = np.transpose(np.array([posx, posy, z * np.ones_like(posx)]), [1, 2, 0])
    
        diff_vectors = (station_pqr[:, None, None, :] - posxyz[None, :, :, :])
        distances = np.linalg.norm(diff_vectors, axis=3)
    
        vis_chunksize = max_memory_mb * 1024 * 1024 // (8 * npix_p * npix_q)
    
        bl_diff = np.zeros((vis_chunksize, npix_q, npix_p), dtype=np.float64)
        img = np.zeros((npix_q, npix_p), dtype=np.complex128)
        for vis_chunkstart in range(0, len(baseline_indices), vis_chunksize):
            vis_chunkend = min(vis_chunkstart + vis_chunksize, baseline_indices.shape[0])
            # For the last chunk, bl_diff_chunk is a bit smaller than bl_diff
            bl_diff_chunk = bl_diff[:vis_chunkend - vis_chunkstart, :]
            np.add(distances[baseline_indices[vis_chunkstart:vis_chunkend, 0]],
                   -distances[baseline_indices[vis_chunkstart:vis_chunkend, 1]], out=bl_diff_chunk)
    
            j2pi = 1j * 2 * np.pi
            for ifreq, freq in enumerate(freqs):
                v = visibilities[vis_chunkstart:vis_chunkend, ifreq][:, None, None]
                lamb = SPEED_OF_LIGHT / freq
    
                # v[:,np.newaxis,np.newaxis]*np.exp(-2j*np.pi*freq/SPEED_OF_LIGHT*groundbase_pixels[:,:,:]/SPEED_OF_LIGHT)
                # groundbase_pixels=nvis x npix x npix
                np.add(img, np.sum(ne.evaluate("v * exp(j2pi * bl_diff_chunk / lamb)"), axis=0), out=img)
        img /= len(freqs) * len(baseline_indices)
    
        return img
    
    def get_xst_statistics(antennafield: str):
        device = DeviceProxy(f"stat/xst/{antennafield}")
        device.set_source(DevSource.DEV)
    
        subband = device.xst_subband_r
        dataset = device.xst_real_r.astype(np.complex64) + 1.0j * device.xst_imag_r.astype(np.complex64)
    
        frequency = DeviceProxy(f"stat/sdp/{antennafield}").subband_frequency_r[0][subband]
    
        return dataset, frequency
    
    
    def wrap_nearfield_imager(antennafield: str, activation_pattern: bool = False, heights_m: float | list[float] = 1.5, npix_p: int = 96, npix_q: int = 96, extent_m: tuple[float, float] = (-100.0, 100.0, -100.0,100.0) , max_memory_mb: float = 200):
        """
        Wrap nearfield imager for single/multi-subband captures.
        """
        if not isinstance(heights_m, list):
            if not isinstance(heights_m, float):
                raise TypeError(f"Expected a float or list of floats for subbannds, got {heights_m=}")
            heights_m = [heights_m]
    
        results = defaultdict(lambda: dict())
        dataset, frequency = get_xst_statistics(antennafield)
        baseline_indices = np.array(np.tril_indices_from(dataset))
        dataset_selected = dataset[baseline_indices].ravel()[:, np.newaxis]
    
        manager_device = DeviceProxy("stat/stationmanager/1")
        station_pqr = get_station_pqr(f"{manager_device.station_name_r}{antennafield}".upper(), activation_pattern = activation_pattern)
    
        for height in heights_m:
            results[height] = nearfield_imager(dataset_selected, baseline_indices, freqs = [frequency], npix_p = npix_p, npix_q = npix_q, extent = extent_m, station_pqr = station_pqr, height = height, max_memory_mb = max_memory_mb)
    
        for key, result in results.items():
            plt.figure()
            plt.title(f"{antennafield} @ {str(key)}")
            plt.imshow(np.abs(result))
    
        plt.show()
        return results
    
    
    if __name__ == '__main__':
        wrap_nearfield_imager("LBA", extent_m = [-50, 50, -50, 50], heights_m = [0.5, 1.0, 1.5, 10.0])
        wrap_nearfield_imager("HBA1", extent_m = [-50, 50, -50, 50], heights_m = [0.5, 1.0, 1.5, 10.0])
        wrap_nearfield_imager("HBA1", activation_pattern = "HBA1_SINGLE", extent_m = [-50, 50, -50, 50], heights_m = [0.5, 1.0, 1.5, 10.0])
        wrap_nearfield_imager("HBA0", extent_m = [-50, 50, -50, 50], heights_m = [0.5, 1.0, 1.5, 10.0])
        wrap_nearfield_imager("HBA0", activation_pattern = "HBA0_SINGLE", extent_m = [-50, 50, -50, 50], heights_m = [0.5, 1.0, 1.5, 10.0])
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Please register or to comment