Skip to content
Snippets Groups Projects
Select Git revision
  • 3ad2dfcf981564ca3e700e7046e3c4e8841eddf4
  • master default protected
  • gec-95-add-filter-skymodel
  • gec-95-migrate-filter-skymodel-utils
  • add-coverage-badge
  • releases/1.6
  • releases/1.5
  • rap-457-implement-ci-v2
  • test_heroku
  • action-python-publish
  • v1.7.1
  • v1.7.0
  • v1.6.2
  • v1.6.1
  • v1.6.post1
  • v1.6
  • v1.5.1
  • v1.5.0
  • v1.4.11
  • v1.4.10
  • v1.4.9
  • v1.4.8
  • v1.4.7
  • v1.4.6
  • v1.4.5
  • v1.4.4
  • v1.4.3
  • v1.4.2
  • v1.4.1
  • v1.4.0
30 results

_tessellate.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    _tessellate.py 19.76 KiB
    # -*- coding: utf-8 -*-
    #
    # Defines the tessellate functions used by the group operation
    #
    # Based on the IDL version of the Voronoi binning method of:
    #   Cappellari M., Copin Y., 2003, MNRAS, 342, 345
    # Modified by David Rafferty as required for integration into LSMTool
    #
    # This software is provided as is without any warranty whatsoever.
    # Permission to use, copy and distribute unmodified copies for
    # non-commercial purposes is granted. Permission to modify for
    # personal or internal use is granted, provided this copyright
    # and disclaimer are included unchanged at the beginning of
    # the file. All other rights are reserved.
    
    
    from __future__ import print_function
    import numpy as np
    from numpy import sum, sqrt, min, max, any
    from numpy import argmax, argmin, mean, abs
    from numpy import int32 as Nint
    from numpy import float32 as Nfloat
    import copy
    
    
    def get_skymodel(skymodel_fn):
        """Returns x, y, flux arrays for input skymodel
        """
        import math as m
        from astropy import WCS
    
        y = []
        x = []
        fluxes = []
    
        # Make wcs object to handle transformation from ra and dec to pixel coords.
        w = WCS(naxis=2)
        w.wcs.crpix = [-234.75, 8.3393]
        w.wcs.cdelt = np.array([-0.066667, 0.066667])
        w.wcs.crval = [0, -90]
        w.wcs.ctype = ["RA---TAN", "DEC--TAN"]
        w.wcs.set_pv([(2, 1, 45.0)])
        arcsec_per_pix = abs(w.wcs.cdelt[0]) * 3600.0 # arcsec/pixel
    
        for line in open(skymodel_fn):
            sline=line.split(',')
            if line.startswith("FORMAT"): continue
            if line[0] == '#': continue
            if line[0] == '\n': continue
            name = str(sline[0])
            srctype = str(sline[1])
            ra_src = str(sline[2]).split(':')
            ra_deg = float(ra_src[0])*15.0 + (float(ra_src[1])/60.0)*15.0 + (float(ra_src[2])
                /3600.0)*15.0
            dec_src = str(sline[3]).split('.')
            if len(dec_src) == 3:
                dec_deg = float(dec_src[0]) + (float(dec_src[1])/60.0) + (float(dec_src[2])/3600.0)
            else:
                dec_deg = float(dec_src[0]) + (float(dec_src[1])/60.0) + (float(dec_src[2]
                    + '.' + dec_src[3])/3600.0)
            flux = str(sline[4])
            fluxes.append(float(flux))
            ra_dec = np.array([[ra_deg, dec_deg]])
            try:
                x.append(w.wcs_world2pix(ra_dec, 1)[0][0])
                y.append(w.wcs_world2pix(ra_dec, 1)[0][1])
            except AttributeError:
                x.append(w.wcs_sky2pix(ra_dec, 1)[0][0])
                y.append(w.wcs_sky2pix(ra_dec, 1)[0][1])
    
        minx = np.min(x)
        miny = np.min(y)
        x += abs(minx)
        y += abs(miny)
        return x, y, np.array(fluxes), arcsec_per_pix
    
    
    def dist2(x1,y1,x2,y2, scale=1.0) :
        return ((x1-x2)**2 + (y1-y2)**2)/scale**2
    
    
    def guess_regular_grid(xnodes, ynodes, pixelsize=None) :
        """
        Return a regular grid guessed on an irregular one (Voronoi)
        xnodes, ynodes: arrays of Voronoi bins
    
        Return: xunb, yunb = regular grid for x and y (unbinned)
        """
        ## First deriving a pixel size
        xn_rav, yn_rav = xnodes.ravel(), ynodes.ravel()
        if pixelsize is None :
            pixelsize = derive_pixelsize(xnodes, ynodes)
        minxn = int(np.min(xn_rav) / pixelsize) * pixelsize
        minyn = int(np.min(yn_rav) / pixelsize) * pixelsize
        xunb, yunb = np.meshgrid(np.arange(minxn, np.max(xn_rav)+pixelsize, pixelsize),
                               np.arange(minyn, np.max(yn_rav)+pixelsize, pixelsize))
    
        return xunb, yunb
    
    
    def derive_unbinned_field(xnodes, ynodes, data, xunb=None, yunb=None) :
        """
           Provide an array of the same shape as the input xunb, and yunb
           with the values derived from the Voronoi binned data
    
           xnodes, ynodes: 2 arrays providing the nodes from the binning
           data : values for each node
           xunb, yunb: x and y coordinates of the unbinned data
                     if not provided (default) they will be guessed from the nodes
    
           Return: xunb, yunb, and unbinned_data arrays with the same shape as xunb,
        """
        if xunb is None :
            xunb, yunb = guess_regular_grid(xnodes, ynodes)
    
        x_rav, y_rav = xunb.ravel(), yunb.ravel()
        xnodes_rav, ynodes_rav = xnodes.ravel(), ynodes.ravel()
        data_rav = data.ravel()
        unbinned_data = np.zeros_like(x_rav)
        for i in xrange(len(x_rav)) :
            indclosestBin = argmin(dist2(x_rav[i], y_rav[i], xnodes_rav, ynodes_rav))
            unbinned_data[i] = data_rav[indclosestBin]
    
        return xunb, yunb, unbinned_data.reshape(xunb.shape)
    
    
    listmethods = ["voronoi", "quadtree"]
    class bin2D:
        """
        Class for Voronoi binning of a set of x and y coordinates
        using given data and potential associated noise
        """
        def __init__(self, xin, yin, data, target_flux=1.0, pixelsize=None,
            method="Voronoi", cvt=1, wvt=0):
            self.xin = xin.ravel()
            self.yin = yin.ravel()
            self.data = data.ravel()
    
            ## Sort all pixels by their distance to the maximum flux
            if pixelsize is None :
                self.pixelsize = derive_pixelsize(self.xin, self.yin, verbose=1)
            else :
                self.pixelsize = pixelsize
    
            self.target_flux = target_flux
    
            ## Binning method and options
            self.method = str.lower(method)
            self.cvt = cvt
            self.wvt = wvt
            self.scale = 1.0
            self._check_input()
    
        def _check_input(self):
            """
            Check consistency of input data
            """
            # Basic checks of the data
            # First about the dimensions of the datasets
            self.npix = len(self.xin)
    
        ### Accrete the bins ============================================
        def bin2d_accretion(self, verbose=0):
            """ Accrete the bins according to their flux
            """
            ## Status holds the bin number when assigned
            self.status = np.zeros(self.npix, dtype=Nint)
            ## Good is 1 if the bin was accepted
            self.good = np.zeros(self.xin.size, dtype=Nint)
    
            ## Start with the max flux in the data
            currentBin = [argmax(self.data)]
    
            for ind in range(1,self.npix+1):  ## Running over the index of the Voronoi BIN
                ## Only one pixel at this stage
                current_flux = sum(self.data[currentBin])
                if verbose :
                    print("Bin %d"%(ind)) # TODO: Change to logging
    
                self.status[currentBin] = ind   # only one pixel at this stage
                ## Barycentric centroid for 1 pixel...
                xbar, ybar = self.xin[currentBin], self.yin[currentBin]
    
                ## Indices of remaining unbinned data
                unBinned = np.where(self.status == 0)[0]
                ##+++++++++++++++++++++++++++++++++++++++++++++++++
                ## STOP THE WHILE Loop if all pixels are now binned
                while len(unBinned) > 0 :
                ##+++++++++++++++++++++++++++++++++++++++++++++++++
    
                    ## Coordinates of the Remaining unbinned pixels
                    xunBin = self.xin[unBinned]
                    yunBin = self.yin[unBinned]
    
                    ## Location of current bin
                    xcurrent = self.xin[currentBin]
                    ycurrent = self.yin[currentBin]
    
                    ## Closest unbinned pixel to the centroid making sure the highest flux is used
                    indclosestBar = argmin(dist2(xbar,ybar, xunBin, yunBin))
                    xclosest = xunBin[indclosestBar]
                    yclosest = yunBin[indclosestBar]
    
                    ## Distance between this closest pixel and the current pixel
                    currentSqrtDist= sqrt(min(dist2(xclosest,yclosest, xcurrent,ycurrent)))
    
                    ## Add new pixel
                    possibleBin = currentBin + [unBinned[indclosestBar]]
    
                    ## Transfer new flux to current value
                    old_flux = current_flux
                    current_flux = sum(self.data[possibleBin])
    
                    ## Test if getting better for flux
                    if (current_flux > 0.8 * self.target_flux):
                        if (current_flux < 1.2 * self.target_flux):
                            self.good[possibleBin] = 1
                            break
                        elif (abs(current_flux-self.target_flux) > abs(old_flux-self.target_flux)):
                            if (old_flux > 0.8 * self.target_flux):
                                self.good[currentBin] = 1
                            break
                    ##++++++++++++++++++++++++++++++++++++++++++
    
                    ## If the new Bin is ok we associate the number of the bin to that one
                    self.status[unBinned[indclosestBar]] = ind
                    ##   ... and we use that now as the current Bin
                    currentBin = possibleBin
    
                    ## And update the values
                    ## First the centroid
                    xbar, ybar = mean(self.xin[currentBin]), mean(self.yin[currentBin])
    
                    ## New set of unbinned pixels
                    unBinned = np.where(self.status == 0)[0]
                    ## ----- End of While Loop --------------------------------------------
    
                ## Unbinned pixels
                unBinned = np.where(self.status == 0)[0]
                ## Break if all pixels are binned
                if len(unBinned) == 0 :
                    break
                ## When the while loop is finished for this new BIN
                ## Find the centroid of all Binned pixels
                Binned = np.where(self.status != 0)[0]
                xbar, ybar = mean(self.xin[Binned]), mean(self.yin[Binned])
    
                ## Closest unbinned pixel to the centroid of all Binned pixels
                xunBin = self.xin[unBinned]
                yunBin = self.yin[unBinned]
                indclosestBar = argmin(dist2(xbar,ybar, xunBin, yunBin))
                ## Take now this closest pixel as the next one to look at
                currentBin = [unBinned[indclosestBar]]
    
            ## Set to zero all bins that did not reach the target flux
            self.status *= self.good
    
        ### Compute centroid of bins ======================================
        def bin2d_centroid(self, verbose=0):
            ## Compute the area for each node
            self.Areanode, self.bins = np.histogram(self.status, bins=np.arange(np.max(self.status+0.5))+0.5)
            ## Select the ones which have at least one bin
            self.indgoodbins = np.where(self.Areanode > 0)[0]
            self.Areanode = self.Areanode[self.indgoodbins]
            ngoodbins = self.indgoodbins.size
            ## Reset the xnode, ynode, SNnode, and statusnode (which provides the number for the node)
            self.xnode = np.zeros(ngoodbins, Nfloat)
            self.ynode = np.zeros_like(self.xnode)
            self.flux_node = np.zeros_like(self.xnode)
            self.statusnode = np.zeros_like(self.xnode)
            self.listbins = []
            for i in range(ngoodbins) :
                ## indgoodbins[i] provides the bin of the Areanode, so indgoodbins[i] + 1 is the status number
                self.statusnode[i] = self.indgoodbins[i]+1
                ## List of bins which have statusnode as a status
                listbins = np.where(self.status==self.statusnode[i])[0]
                ## Centroid of the node
                self.xnode[i], self.ynode[i] = mean(self.xin[listbins]), mean(self.yin[listbins])
                self.flux_node[i] = sum(self.data[listbins])
                self.listbins.append(listbins)
    
        ### Compute WEIGHTED centroid of bins ======================================
        def bin2d_weighted_centroid(self, weight=None, verbose=0):
            if weight is not None : self.weight = weight
    
            self.Areanode, self.bins = np.histogram(self.status, bins=np.arange(np.max(self.status+0.5))+0.5)
            ## Select the ones which have at least one bin
            self.indgoodbins = np.where(self.Areanode > 0)[0]
            self.Areanode = self.Areanode[self.indgoodbins]
            ngoodbins = self.indgoodbins.size
            ## Reset the xnode, ynode, SNnode, and statusnode (which provides the number for the node)
            self.xnode = np.zeros(ngoodbins, Nfloat)
            self.ynode = np.zeros_like(self.xnode)
            self.flux_node = np.zeros_like(self.xnode)
            self.statusnode = np.zeros_like(self.xnode)
            self.listbins = []
            for i in range(ngoodbins) :
                ## indgoodbins[i] provides the bin of the Areanode, so indgoodbins[i] + 1 is the status number
                self.statusnode[i] = self.indgoodbins[i]+1
                ## List of bins which have statusnode as a status
                listbins = np.where(self.status==self.statusnode[i])[0]
                ## Weighted centroid of the node
                self.xnode[i], self.ynode[i] = np.average(self.xin[listbins], weights=self.weight[listbins]), np.average(self.yin[listbins], weights=self.weight[listbins])
                self.flux_node[i] = sum(self.data[listbins])
                self.listbins.append(listbins)
    
        ### Assign bins ============================================
        def bin2d_assign_bins(self, sel_pixels=None, scale=None, verbose=0) :
            """
            Assign the bins when the nodes are derived With Scaling factor
            """
            if scale is not None: self.scale = scale
            if sel_pixels is None : sel_pixels = range(self.xin.size)
            for i in sel_pixels :
                minind = argmin(dist2(self.xin[i], self.yin[i], self.xnode, self.ynode, scale=self.scale))
                self.status[i] = self.statusnode[minind]
                if verbose :
                    print("Pixel ",  self.status[i], self.xin[i], self.yin[i], self.xnode[minind], self.ynode[minind]) # TODO: Change to logging
    
            ## reDerive the centroid
            self.bin2d_centroid()
    
        ### Do  CV tesselation ============================================
        def bin2d_cvt_equal_mass(self, wvt=None, verbose=1) :
            """
            Produce a CV Tesselation
    
            wvt: default is None (will use preset value, see self.wvt)
            """
    
            ## Reset the status and statusnode for all nodes
            self.status = np.zeros(self.npix, dtype=Nint)
            self.statusnode = np.arange(self.xnode.size) + 1
    
            if wvt is not None : self.wvt = wvt
            if self.wvt : self.weight = np.ones_like(self.data)
            else : self.weight = self.data**4
    
            self.scale = 1.0
    
            self.niter = 0
            ## WHILE LOOP: stop when the nodes do not move anymore ============
            Oldxnode, Oldynode = copy.copy(self.xnode[-1]), copy.copy(self.ynode[-1])
            while (not np.array_equiv(self.xnode, Oldxnode)) | (not np.array_equiv(self.ynode, Oldynode)):
                Oldxnode, Oldynode = copy.copy(self.xnode), copy.copy(self.ynode)
                ## Assign the closest centroid to each bin
                self.bin2d_assign_bins()
    
                ## New nodes weighted centroids
                self.bin2d_weighted_centroid()
    
                ## Eq. (4) of Diehl & Statler (2006)
                if self.wvt : self.scale = sqrt(self.Areanode/self.flux_node)
                self.niter += 1
    
        ### Do Voronoi binning ============================================
        def bin_voronoi(self, wvt=None, cvt=None, verbose=0) :
            """ Actually do the Voronoi binning
    
            wvt: default is None (will use preset value, see self.wvt)
            cvt: default is None (will use preset value, see self.cvt)
            """
            if cvt is not None : self.cvt = cvt
            if wvt is not None : self.wvt = wvt
    
            self.bin2d_accretion(verbose=verbose)
            self.bin2d_centroid()
            ## Get the bad pixels, not assigned and assign them
            badpixels = np.where(self.status == 0)[0]
            self.bin2d_assign_bins(badpixels)
            if self.cvt :
                self.bin2d_cvt_equal_mass()
            else : self.scale = 1.0
            ## Final nodes weighted centroids after assigning to the final nodes
            self.bin2d_assign_bins()
            if self.wvt : self.weight = np.ones_like(self.data)
            else : self.weight = self.data
            self.bin2d_weighted_centroid()
    
    
        def show_voronoibin(self, datain=None, shownode=1, mycmap=None) :
            """
            Display the voronoi bins on a map
    
            datain: if None (Default), will use random colors to display the bins
                    if provided, will display that with a jet (or specified mycmap) cmap
                       (should be either the length of the voronoi nodes array or the size of the initial pixels)
            shownode: default is 1 -> show the voronoi nodes, otherwise ignore (0)
            mycmap: in case datain is provide, will use that cmpa to display the bins
            """
            from distutils import version
            try:
                import matplotlib
            except ImportError:
                raise Exception("matplotlib 0.99.0 or later is required for this routine")
    
            if version.LooseVersion(matplotlib.__version__) < version.LooseVersion('0.99.0'):
                raise Exception("matplotlib 0.99.0 or later is required for this routine")
    
            from matplotlib.collections import PatchCollection
            import matplotlib.patches as mpatches
            import matplotlib.pyplot as plt
            fig = plt.figure(1,figsize=(7,7))
            plt.clf()
            ax = plt.gca()
            patches = []
            binsize = self.pixelsize
            for i in range(len(self.xin)) :
                patches.append(mpatches.Rectangle((self.xin[i],self.yin[i]), binsize*10, binsize*10))
    
            if datain is None :
                dataout = self.status
                mycmap = 'prism'
            else :
                if len(datain) == self.xnode.size :
                    dataout = np.zeros(self.xin.size, Nfloat)
                    for i in range(self.xnode.size) :
                        listbins = self.listbins[i]
                        dataout[listbins] = [datain[i]]*len(listbins)
                elif len(datain) == self.xin.size :
                    dataout = datain
                if mycmap is None : mycmap = 'jet'
    
            colors = dataout * 100.0 / max(dataout)
            collection = PatchCollection(patches, cmap=mycmap)
            collection.set_array(np.array(colors))
            ax.add_collection(collection)
            if shownode :
                plt.scatter(self.xnode,self.ynode, marker='o', edgecolors='k', facecolors='none', s=100)
                for i in range(self.xnode.size):
                    ax.annotate(i, (self.xnode[i], self.ynode[i]))
                    listbins = self.listbins[i]
    
            plt.axis('image')
            plt.xlabel("X axis")
            plt.ylabel("Y axis")
            plt.title("Voronoi Map")
    
        def bin_data(self, datain=None, noisein=None) :
            """
            Return a Voronoi adaptive binning of your data.
    
            datain: if provided, will be used as data input
                    if not provided (None = default), will use self.data
            noisein: if provided, will be used as noise input
                    if not provided (None = default), will use self.noise
    
            Output = xnode, ynode, bindata, S/N
            """
    
            if datain is None: datain = copy.copy(self.data)
    
            dataout = np.zeros(self.xnode.size, Nfloat)
            xout = np.zeros_like(dataout)
            yout = np.zeros_like(dataout)
            flux_out = np.zeros_like(dataout)
            for i in range(self.xnode.size) :
                listbins = self.listbins[i]
                xout[i] = np.average(self.xin.ravel()[listbins], weights=datain[listbins])
                yout[i] = np.average(self.yin.ravel()[listbins], weights=datain[listbins])
                dataout[i] = mean(datain[listbins])
                flux_out[i] = sum(datain[listbins])
    
            return xout, yout, dataout, flux_out
    
    
    def derive_pixelsize(x, y, verbose=0) :
        """ Find the pixelsize by looking at the minimum distance between
            pairs of x,y
            x: xaxis coordinates
            y: yaxis coordinates
            Return: pixelsize
        """
        pixelsize = 1.e30
        for i in range(len(x)-1) :
            mindist = np.min(dist2(x[i], y[i], x[i+1:], y[i+1:]))
            pixelsize = np.minimum(mindist, pixelsize)
        pixelsize = np.sqrt(pixelsize)
        return pixelsize
    
    
    def bins2Patches(bin_obj, root='Patch', pad_index=False):
        """Returns a patch column based on binning"""
        patchNames = []
        for src_idx in range(len(bin_obj.data)):
            for i in range(bin_obj.xnode.size):
                listbins = bin_obj.listbins[i]
                if src_idx in listbins:
                    if pad_index:
                        patchNames.append('{0}_{1}'.format(root,
                            str(i).zfill(int(np.ceil(np.log10(bin_obj.xnode.size))))))
                    else:
                        patchNames.append('{0}_{1}'.format(root, i))
        return np.array(patchNames)