Skip to content
Snippets Groups Projects
Commit e1b872d3 authored by Alexander Kutkin's avatar Alexander Kutkin
Browse files

.

parent 9645e0d4
No related branches found
No related tags found
No related merge requests found
imcal.py 0 → 100755
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
imaging and self-calibration pipeline for Apertif
"""
import os
import sys
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
from matplotlib.patches import Circle
import numpy as np
import subprocess
from subprocess import Popen as Process, TimeoutExpired, PIPE
import h5py
import pandas as pd
import glob
import logging
import yaml
import argparse
import distutils.spawn
from astropy.coordinates import SkyCoord
from astropy.time import Time
import astropy.units as u
from astropy.io import fits
# the executables:
dppp_bin = distutils.spawn.find_executable('DPPP') or '/home/offringa/Software/DP3/build/DP3'
wsclean_bin = distutils.spawn.find_executable('wsclean') or '/home/offringa/Software/wsclean/build/wsclean'
makesourcedb_bin = distutils.spawn.find_executable('makesourcedb') or '/home/offringa/Software/DP3/build/makesourcedb'
bbs2model_bin = distutils.spawn.find_executable('bbs2model') or '/home/offringa/Software/lofartools/build/bbs2model'
render_bin = distutils.spawn.find_executable('render') or '/home/offringa/Software/lofartools/build/render'
_POOL_TIME = 300 # SECONDS
_MAX_TIME = 1 * 3600 # SECONDS
_MAX_POOL = _MAX_TIME // _POOL_TIME
def modify_filename(fname, string, ext=None):
""" name.ext --> name<string>.ext """
fbase, fext = os.path.splitext(fname)
if ext is not None:
fext = ext
return fbase + string + fext
def wsclean(msin, outname=None, pixelsize=3, mgain=0.8, imagesize=3072, multifreq=8, autothresh=0.3,
automask=3, multiscale=False, kwstring=''):
"""
wsclean
"""
# wsclean_bin = subprocess.check_output('which wsclean; exit 0', shell=True).strip() or '/home/soft/wsclean/wsclean/build/wsclean'
# logging.debug('wsclean binary: {}'.format(wsclean_bin))
msbase = os.path.splitext(msin)[0]
if outname is None:
outname = msbase
if multiscale:
kwstring += ' -multiscale'
if autothresh is not None:
kwstring += f' -auto-threshold {autothresh}'
if multifreq:
kwstring += f' -join-channels -channels-out {multifreq} -fit-spectral-pol 2'
cmd = f'{wsclean_bin} -name {outname} -size {imagesize} {imagesize} -scale {pixelsize}asec -niter 1000000 \
-auto-mask {automask} -save-source-list -mgain {mgain} {kwstring} {msin}'
# .\
# format(wsclean=wsclean_bin, outname=outname, pix=pixelsize, autothresh=autothresh,
# automask=automask, imsize=imagesize, kwstring=kwstring, msin=msin)
cmd = " ".join(cmd.split())
logging.debug("Running command: %s", cmd)
subprocess.call(cmd, shell=True)
if multifreq:
for fname in glob.glob(msbase+'-*.fits'):
newname = fname.replace('MFS-', '')
os.rename(fname, newname)
return 0
def makesourcedb(modelfile, out=None):
""" Make sourcedb file from a clustered model """
# makesourcedb_bin = subprocess.check_output('which makesourcedb; exit 0', shell=True).strip() or makesourcedb_bin
# logging.debug('makesourcedb binary: {}'.format(makesourcedb_bin))
out = out or os.path.splitext(modelfile)[0] + '.sourcedb'
cmd = '{} in={} out={}'.format(makesourcedb_bin, modelfile, out)
logging.debug("Running command: %s", cmd)
subprocess.call(cmd, shell=True)
return out
def bbs2model(inp, out=None):
""" Convert model file to AO format """
# bbs2model_bin = subprocess.check_output('which bbs2model; exit 0', shell=True).strip() or bbs2model_bin
out = out or os.path.splitext(inp)[0] + '.ao'
cmd = '{} {} {}'.format(bbs2model_bin, inp, out)
logging.debug("Running command: %s", cmd)
subprocess.call(cmd, shell=True)
return out
def render(bkgr, model, out=None):
# render_bin = subprocess.check_output('which render; exit 0', shell=True).strip() or render_bin
out = out or os.path.split(bkgr)[0] + '/restored.fits'
cmd = '{} -a -r -t {} -o {} {}'.format(render_bin, bkgr, out, model)
logging.debug("Running command: %s", cmd)
subprocess.call(cmd, shell=True)
return out
def execute_dppp(args):
command = [f'{dppp_bin}'] + args
logging.debug('executing %s', ','.join(command))
dppp_process = subprocess.Popen(command)
for i in range(_MAX_POOL):
try:
return_code = dppp_process.wait(_POOL_TIME)
logging.debug('DPPP process %s finished with status: %s', dppp_process.pid, return_code)
return return_code
except TimeoutExpired as e:
logging.debug('DPPP process %s still running', dppp_process.pid)
continue
def check_return_code(return_code):
if return_code > 0:
logging.error('An error occurred in the DPPP execution: %s', return_code)
raise SystemExit(return_code)
else:
pass
def split_ms(msin_path, startchan, nchan=0, msout_path=''):
"""
use casacore.tables.msutil.msconcat() to concat the new MS files
"""
if not msout_path:
msout_path = msin_path.replace('.MS', f'_split_{startchan}_{nchan}.MS')
logging.debug('Splitting file %s to %s', msin_path, msout_path)
command_args = ['steps=[]',
'msout.overwrite=True',
f'msin={msin_path}',
f'msin.startchan={startchan}',
f'msin.nchan={nchan}',
f'msout={msout_path}']
return_code = execute_dppp(command_args)
logging.debug('Split of %s returned status code %s', msin_path, return_code)
check_return_code(return_code)
return msout_path
def dical(msin, srcdb, msout=None, h5out=None, solint=1, startchan=0, split_nchan=0,
mode='phaseonly', cal_nchan=31, uvlambdamin=500):
""" direction independent calibration with DPPP """
h5out = h5out or modify_filename(msin, f'_dical_dt{solint}_{mode}', ext='.h5')
msout = msout or modify_filename(msin, f'_dical_dt{solint}_{mode}')
command_args = [f'msin={msin}',
f'msout={msout}',
f'cal.caltype={mode}',
f'cal.sourcedb={srcdb}',
f'cal.solint={solint}',
f'cal.parmdb={h5out}',
f'cal.nchan={cal_nchan}',
f'cal.uvlambdamin={uvlambdamin}',
'cal.applysolution=True',
'cal.type=gaincal',
'steps=[cal]']
if startchan or split_nchan:
logging.info('Calibrating MS channels: %d - %d', startchan, split_nchan)
command_args += [f'msin.startchan={startchan}', f'msin.nchan={split_nchan}']
return_code = execute_dppp(command_args)
logging.debug('DICAL returned status code %s', return_code)
check_return_code(return_code)
return msout
def ddecal(msin, srcdb, msout=None, h5out=None, solint=1, nfreq=15,
startchan=0, nchan=192, mode='diagonal', uvlambdamin=500, subtract=True):
""" Perform direction dependent calibration with DPPP """
h5out = h5out or os.path.split(msin)[0] + '/ddcal.h5'
msbase = os.path.basename(msin).split('.')[0]
msout = msout or '{}_{}_{}.MS'.format(msbase,mode, solint)
cmd = '{dppp_bin} msin={msin} msout={msout} \
msin.startchan={startchan} \
msin.nchan={nchan} \
msout.overwrite=true \
cal.type=ddecal \
cal.mode={mode} \
cal.sourcedb={srcdb} \
cal.solint={solint} \
cal.h5parm={h5out} \
cal.subtract={subtract} \
cal.propagatesolutions=true \
cal.propagateconvergedonly=true \
cal.nchan={nfreq} \
cal.uvlambdamin={uvlambdamin} \
steps=[cal] \
'.format(dppp_bin=dppp_bin, msin=msin, msout=msout, startchan=startchan, nchan=nchan, mode=mode,
srcdb=srcdb, solint=solint, h5out=h5out, subtract=subtract, nfreq=nfreq,
uvlambdamin=uvlambdamin)
cmd = " ".join(cmd.split())
logging.debug("Running command: %s", cmd)
subprocess.call(cmd, shell=True)
return msout, h5out
def phase_shift(msin, new_center, msout=None):
""" new_center examples: [12h31m34.5, 52d14m07.34] or [187.5deg, 52.45deg] """
msout = msout or '.'
cmd = "DPPP msin={msin} msout={msout} msout.overwrite=True steps=[phaseshift] \
phaseshift.phasecenter={new_center}".format(**locals())
cmd = " ".join(cmd.split())
subprocess.call(cmd, shell=True)
def view_sols(h5param):
""" read and plot the gains """
path = os.path.split(os.path.abspath(h5param))[0]
def plot_sols(h5param, key):
with h5py.File(h5param, 'r') as f:
grp = f['sol000/{}'.format(key)]
data = grp['val'][()]
time = grp['time'][()]
print(data.shape)
ants = ['RT2','RT3','RT4','RT5','RT6','RT7','RT8','RT9','RTA','RTB','RTC','RTD']
fig = plt.figure(figsize=[20, 15])
fig.suptitle('Freq. averaged {} gain solutions'.format(key.rstrip('000')))
for i, ant in enumerate(ants):
ax = fig.add_subplot(4, 3, i+1)
ax.set_title(ant)
if len(data.shape) == 5: # several directions
# a = ax.imshow(data[:,:,i,1,0].T, aspect='auto')
# plt.colorbar(a)
gavg = np.nanmean(data, axis=1)
ax.plot((time-time[0])/60.0, gavg[:, i, :, 0], alpha=0.7)
elif len(data.shape) == 4: # a single direction
ax.plot((time-time[0])/3600.0, data[:, 0, i, 0], alpha=0.7)
if i == 0:
ax.legend(['c{}'.format(_) for _ in range(data.shape[-2])])
if i == 10:
ax.set_xlabel('Time (hrs)')
return fig, ax
try:
fig, ax = plot_sols(h5param, 'amplitude000')
fig.savefig(path + '/amp_sols.png')
except:
logging.error('No amplitude solutions found')
try:
fig, ax = plot_sols(h5param, 'phase000')
fig.savefig(path + '/phase_sols.png')
except:
logging.error('No phase solutions found')
# plt.show()
# TODO
def model_apply_threshold(model, threshold=0.0, out=None):
"""
Clip the model to be above the given threshold
Parameters
----------
model : STR, model file name
the input model file name
threshold : FLOAT, optional
the threshold above which the components are kept. The default is 0.0.
out : STR, optional
The output model filename. The default is None (the model file will be overwritten).
Returns
-------
None.
"""
out = out or model
logging.warning('Overwriting the model')
df = pd.read_csv(model, skipinitialspace=True)
new = df.query('I>@threshold')
new.to_csv(out, index=False)
return out
def main(msin, outbase=None, cfgfile='imcal.yml'):
logging.info('Processing {}'.format(msin))
logging.info('The config file: {}'.format(cfgfile))
with open(cfgfile) as f:
cfg = yaml.safe_load(f)
# define file names:
mspath = os.path.split(os.path.abspath(msin))[0]
msbase = os.path.splitext(msin)[0]
if outbase is None:
outbase = msbase
ms_split = msbase + '_splt.MS'
img1 = outbase + '_1'
img2 = outbase + '_2'
img3 = outbase + '_3'
img_final = outbase
model1 = outbase + '_model1.sourcedb'
model2 = outbase + '_model2.sourcedb'
model3 = outbase + '_model3.sourcedb'
dical1 = outbase + '_dical1.MS'
dical2 = outbase + '_dical2.MS'
dical3 = outbase + '_dical3.MS'
h5_1 = outbase + '_dical1.h5'
h5_2 = outbase + '_dical2.h5'
h5_3 = outbase + '_dical3.h5'
if os.path.exists(img_final+'-image.fits'):
logging.info('The final image exists. Exiting...')
return 0
if not os.path.exists(ms_split):
ms_split = split_ms(msin, msout_path=ms_split, **cfg['split1'])
# Clean + DIcal
# img_peak = fits.getdata('
if not os.path.exists(img1 +'-image.fits') and (not os.path.exists(img1 +'-MFS-image.fits')):
wsclean(ms_split, outname=img1, **cfg['clean1']) # fast shallow clean
if not os.path.exists(model1):
makesourcedb(img1+'-sources.txt', out=model1)
if not os.path.exists(dical1):
dical1 = dical(ms_split, model1, msout=dical1, h5out=h5_1, **cfg['dical1'])
view_sols(h5_1)
if (not os.path.exists(img2 +'-image.fits')) and (not os.path.exists(img2 +'-MFS-image.fits')):
wsclean(dical1, outname=img2, **cfg['clean2'])
if not os.path.exists(model2):
makesourcedb(img2+'-sources.txt', out=model2)
if not os.path.exists(dical2):
dical2 = dical(dical1, model2, msout=dical2, h5out=h5_2, **cfg['dical2'])
view_sols(h5_2)
if (not os.path.exists(img3 +'-image.fits')) and (not os.path.exists(img3 +'-MFS-image.fits')):
wsclean(dical2, outname=img3, **cfg['clean3'])
if not os.path.exists(model3):
makesourcedb(img3+'-sources.txt', out=model3)
if not os.path.exists(dical3):
dical3 = dical(dical2, model3, msout=dical3, h5out=h5_3, **cfg['dical3'])
view_sols(h5_3)
if (not os.path.exists(img_final +'-image.fits')) and (not os.path.exists(img_final +'-MFS-image.fits')):
wsclean(dical3, outname=img_final, **cfg['clean4'])
return 0
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logging.info('Starting logger for {}'.format(__name__))
logger = logging.getLogger(__name__)
t0 = Time.now()
parser = argparse.ArgumentParser(description='DDCal Inputs')
parser.add_argument('msin', help='MS file to process')
parser.add_argument('-c', '--config', action='store',
dest='configfile', help='Config file', type=str)
parser.add_argument('-o', '--outbase', default=None, help='output prefix', type=str)
args = parser.parse_args()
configfile = args.configfile or \
os.path.join(os.path.dirname(os.path.realpath(__file__)), 'imcal.yml')
msin = args.msin
logging.info('Using config file: {}'.format(os.path.abspath(configfile)))
main(msin, outbase=args.outbase, cfgfile=configfile)
extime = Time.now() - t0
print("Execution time: {:.1f} min".format(extime.to("minute").value))
#:===========================================================================
# Settings for imcal
#:===========================================================================
####################### DICAL IMAGING #######################
#global:
# dppp_bin: 'DPPP' # on blizzard
# wsclean_bin: 'wsclean'
# makesourcedb_bin: 'makesourcedb'
# bbs2model_bin: 'bbs2model'
# render_bin: 'render'
split1:
startchan: 40 # start channel to split from
nchan: 0 # 0 means till the end
clean1: # wsclean setup
imagesize: 3072
pixelsize: 3
multifreq: 8
automask: 20
autothresh: 5
multiscale: False
kwstring: '-use-wgridder -parallel-deconvolution 1400 -parallel-gridding 8 -deconvolution-channels 3' # use this for additional wsclean options, e.g. '-weight uniform -use-idg'
dical1: # DPPP setup for direction independent calibration
solint: 20
mode: 'phaseonly'
uvlambdamin: 500 # Ignore baselines / channels with UV < uvlambdamin wavelengths.
# cal_nchan: 31 # number of chans with the same solutions
clean2:
imagesize: 3072
pixelsize: 3
multifreq: 8
automask: 6
autothresh: 1
multiscale: True
kwstring: '-use-wgridder -parallel-deconvolution 1400 -parallel-gridding 8 -deconvolution-channels 3'
dical2:
solint: 1
mode: 'phaseonly'
uvlambdamin: 500 # Ignore baselines / channels with UV < uvlambdamin wavelengths.
cal_nchan: 31 # number of chans with the same solutions
clean3:
imagesize: 3072
pixelsize: 3
multifreq: 8
automask: 6
autothresh: 1
multiscale: True
kwstring: '-use-wgridder -parallel-deconvolution 1400 -parallel-gridding 8 -deconvolution-channels 3'
dical3:
solint: 1000
mode: 'diagonal'
uvlambdamin: 500 # Ignore baselines / channels with UV < uvlambdamin wavelengths.
cal_nchan: 31 # number of chans with the same solutions
clean4:
imagesize: 3072
pixelsize: 3
multifreq: 8
automask: 4.5
autothresh: 0.5
multiscale: True
kwstring: '-use-wgridder -parallel-deconvolution 1400 -parallel-gridding 8 -deconvolution-channels 3 -weight briggs -1.0'
### END
####################### CLUSTERING #######################
#cluster:
# nbright: 80 # number of brightest clean components (CC) to check for artefacts
# boxsize: 250 # the boxsize around CC in pixels where to check for artefacts
# nclusters: 10 # number of clusters ('auto' -- to set automatically)
# cluster_radius: 5 # arcmin
# cluster_overlap: 1.6 # if lower than 2 clusters can intersect
# auto: True
# add_manual: False
#
######################## DD CALIBRATION #######################
#ddcal: # see DPPP/DDECal documentation
# solint: 120 # Solution interval in timesteps (1 ~ 30sec for Apertif).
# mode: 'diagonal' # Type of constraint to apply.
# nchan: 15 # Number of channels in each channel block, for which the solution is assumed to be constant.
# startchan: 0
# nchan: 192
# uvlambdamin: 500 # Ignore baselines / channels with UV < uvlambdamin wavelengths.
#
## TODO:
#plotsols:
#
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment