diff --git a/imcal.py b/imcal.py new file mode 100755 index 0000000000000000000000000000000000000000..7c27d3845b7de3cde65c8c561d33569881582e5c --- /dev/null +++ b/imcal.py @@ -0,0 +1,386 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +imaging and self-calibration pipeline for Apertif +""" + +import os +import sys +import matplotlib +matplotlib.use('agg') +import matplotlib.pyplot as plt +from matplotlib.patches import Circle +import numpy as np +import subprocess +from subprocess import Popen as Process, TimeoutExpired, PIPE + +import h5py +import pandas as pd +import glob +import logging +import yaml +import argparse +import distutils.spawn + +from astropy.coordinates import SkyCoord +from astropy.time import Time +import astropy.units as u +from astropy.io import fits + +# the executables: +dppp_bin = distutils.spawn.find_executable('DPPP') or '/home/offringa/Software/DP3/build/DP3' +wsclean_bin = distutils.spawn.find_executable('wsclean') or '/home/offringa/Software/wsclean/build/wsclean' +makesourcedb_bin = distutils.spawn.find_executable('makesourcedb') or '/home/offringa/Software/DP3/build/makesourcedb' +bbs2model_bin = distutils.spawn.find_executable('bbs2model') or '/home/offringa/Software/lofartools/build/bbs2model' +render_bin = distutils.spawn.find_executable('render') or '/home/offringa/Software/lofartools/build/render' + +_POOL_TIME = 300 # SECONDS +_MAX_TIME = 1 * 3600 # SECONDS +_MAX_POOL = _MAX_TIME // _POOL_TIME + + +def modify_filename(fname, string, ext=None): + """ name.ext --> name<string>.ext """ + fbase, fext = os.path.splitext(fname) + if ext is not None: + fext = ext + return fbase + string + fext + + +def wsclean(msin, outname=None, pixelsize=3, mgain=0.8, imagesize=3072, multifreq=8, autothresh=0.3, + automask=3, multiscale=False, kwstring=''): + """ + wsclean + """ + # wsclean_bin = subprocess.check_output('which wsclean; exit 0', shell=True).strip() or '/home/soft/wsclean/wsclean/build/wsclean' + # logging.debug('wsclean binary: {}'.format(wsclean_bin)) + msbase = os.path.splitext(msin)[0] + if outname is None: + outname = msbase + + if multiscale: + kwstring += ' -multiscale' + if autothresh is not None: + kwstring += f' -auto-threshold {autothresh}' + if multifreq: + kwstring += f' -join-channels -channels-out {multifreq} -fit-spectral-pol 2' + cmd = f'{wsclean_bin} -name {outname} -size {imagesize} {imagesize} -scale {pixelsize}asec -niter 1000000 \ + -auto-mask {automask} -save-source-list -mgain {mgain} {kwstring} {msin}' + # .\ + # format(wsclean=wsclean_bin, outname=outname, pix=pixelsize, autothresh=autothresh, + # automask=automask, imsize=imagesize, kwstring=kwstring, msin=msin) + cmd = " ".join(cmd.split()) + logging.debug("Running command: %s", cmd) + subprocess.call(cmd, shell=True) + + if multifreq: + for fname in glob.glob(msbase+'-*.fits'): + newname = fname.replace('MFS-', '') + os.rename(fname, newname) + return 0 + + +def makesourcedb(modelfile, out=None): + """ Make sourcedb file from a clustered model """ + # makesourcedb_bin = subprocess.check_output('which makesourcedb; exit 0', shell=True).strip() or makesourcedb_bin + # logging.debug('makesourcedb binary: {}'.format(makesourcedb_bin)) + out = out or os.path.splitext(modelfile)[0] + '.sourcedb' + cmd = '{} in={} out={}'.format(makesourcedb_bin, modelfile, out) + logging.debug("Running command: %s", cmd) + subprocess.call(cmd, shell=True) + return out + + +def bbs2model(inp, out=None): + """ Convert model file to AO format """ + # bbs2model_bin = subprocess.check_output('which bbs2model; exit 0', shell=True).strip() or bbs2model_bin + out = out or os.path.splitext(inp)[0] + '.ao' + cmd = '{} {} {}'.format(bbs2model_bin, inp, out) + logging.debug("Running command: %s", cmd) + subprocess.call(cmd, shell=True) + return out + + +def render(bkgr, model, out=None): + # render_bin = subprocess.check_output('which render; exit 0', shell=True).strip() or render_bin + out = out or os.path.split(bkgr)[0] + '/restored.fits' + cmd = '{} -a -r -t {} -o {} {}'.format(render_bin, bkgr, out, model) + logging.debug("Running command: %s", cmd) + subprocess.call(cmd, shell=True) + return out + + +def execute_dppp(args): + command = [f'{dppp_bin}'] + args + logging.debug('executing %s', ','.join(command)) + dppp_process = subprocess.Popen(command) + for i in range(_MAX_POOL): + try: + return_code = dppp_process.wait(_POOL_TIME) + logging.debug('DPPP process %s finished with status: %s', dppp_process.pid, return_code) + return return_code + except TimeoutExpired as e: + logging.debug('DPPP process %s still running', dppp_process.pid) + continue + + +def check_return_code(return_code): + if return_code > 0: + logging.error('An error occurred in the DPPP execution: %s', return_code) + raise SystemExit(return_code) + else: + pass + + +def split_ms(msin_path, startchan, nchan=0, msout_path=''): + """ + use casacore.tables.msutil.msconcat() to concat the new MS files + """ + if not msout_path: + msout_path = msin_path.replace('.MS', f'_split_{startchan}_{nchan}.MS') + logging.debug('Splitting file %s to %s', msin_path, msout_path) + command_args = ['steps=[]', + 'msout.overwrite=True', + f'msin={msin_path}', + f'msin.startchan={startchan}', + f'msin.nchan={nchan}', + f'msout={msout_path}'] + return_code = execute_dppp(command_args) + logging.debug('Split of %s returned status code %s', msin_path, return_code) + check_return_code(return_code) + return msout_path + + +def dical(msin, srcdb, msout=None, h5out=None, solint=1, startchan=0, split_nchan=0, + mode='phaseonly', cal_nchan=31, uvlambdamin=500): + """ direction independent calibration with DPPP """ + h5out = h5out or modify_filename(msin, f'_dical_dt{solint}_{mode}', ext='.h5') + msout = msout or modify_filename(msin, f'_dical_dt{solint}_{mode}') + command_args = [f'msin={msin}', + f'msout={msout}', + f'cal.caltype={mode}', + f'cal.sourcedb={srcdb}', + f'cal.solint={solint}', + f'cal.parmdb={h5out}', + f'cal.nchan={cal_nchan}', + f'cal.uvlambdamin={uvlambdamin}', + 'cal.applysolution=True', + 'cal.type=gaincal', + 'steps=[cal]'] + if startchan or split_nchan: + logging.info('Calibrating MS channels: %d - %d', startchan, split_nchan) + command_args += [f'msin.startchan={startchan}', f'msin.nchan={split_nchan}'] + return_code = execute_dppp(command_args) + logging.debug('DICAL returned status code %s', return_code) + check_return_code(return_code) + return msout + + +def ddecal(msin, srcdb, msout=None, h5out=None, solint=1, nfreq=15, + startchan=0, nchan=192, mode='diagonal', uvlambdamin=500, subtract=True): + """ Perform direction dependent calibration with DPPP """ + h5out = h5out or os.path.split(msin)[0] + '/ddcal.h5' + msbase = os.path.basename(msin).split('.')[0] + msout = msout or '{}_{}_{}.MS'.format(msbase,mode, solint) + cmd = '{dppp_bin} msin={msin} msout={msout} \ + msin.startchan={startchan} \ + msin.nchan={nchan} \ + msout.overwrite=true \ + cal.type=ddecal \ + cal.mode={mode} \ + cal.sourcedb={srcdb} \ + cal.solint={solint} \ + cal.h5parm={h5out} \ + cal.subtract={subtract} \ + cal.propagatesolutions=true \ + cal.propagateconvergedonly=true \ + cal.nchan={nfreq} \ + cal.uvlambdamin={uvlambdamin} \ + steps=[cal] \ + '.format(dppp_bin=dppp_bin, msin=msin, msout=msout, startchan=startchan, nchan=nchan, mode=mode, + srcdb=srcdb, solint=solint, h5out=h5out, subtract=subtract, nfreq=nfreq, + uvlambdamin=uvlambdamin) + cmd = " ".join(cmd.split()) + logging.debug("Running command: %s", cmd) + subprocess.call(cmd, shell=True) + return msout, h5out + + +def phase_shift(msin, new_center, msout=None): + """ new_center examples: [12h31m34.5, 52d14m07.34] or [187.5deg, 52.45deg] """ + msout = msout or '.' + cmd = "DPPP msin={msin} msout={msout} msout.overwrite=True steps=[phaseshift] \ + phaseshift.phasecenter={new_center}".format(**locals()) + cmd = " ".join(cmd.split()) + subprocess.call(cmd, shell=True) + + +def view_sols(h5param): + """ read and plot the gains """ + path = os.path.split(os.path.abspath(h5param))[0] + def plot_sols(h5param, key): + with h5py.File(h5param, 'r') as f: + grp = f['sol000/{}'.format(key)] + data = grp['val'][()] + time = grp['time'][()] + print(data.shape) + ants = ['RT2','RT3','RT4','RT5','RT6','RT7','RT8','RT9','RTA','RTB','RTC','RTD'] + fig = plt.figure(figsize=[20, 15]) + fig.suptitle('Freq. averaged {} gain solutions'.format(key.rstrip('000'))) + for i, ant in enumerate(ants): + ax = fig.add_subplot(4, 3, i+1) + ax.set_title(ant) + if len(data.shape) == 5: # several directions + # a = ax.imshow(data[:,:,i,1,0].T, aspect='auto') + # plt.colorbar(a) + gavg = np.nanmean(data, axis=1) + ax.plot((time-time[0])/60.0, gavg[:, i, :, 0], alpha=0.7) + elif len(data.shape) == 4: # a single direction + ax.plot((time-time[0])/3600.0, data[:, 0, i, 0], alpha=0.7) + + if i == 0: + ax.legend(['c{}'.format(_) for _ in range(data.shape[-2])]) + if i == 10: + ax.set_xlabel('Time (hrs)') + return fig, ax + + try: + fig, ax = plot_sols(h5param, 'amplitude000') + fig.savefig(path + '/amp_sols.png') + except: + logging.error('No amplitude solutions found') + + try: + fig, ax = plot_sols(h5param, 'phase000') + fig.savefig(path + '/phase_sols.png') + except: + logging.error('No phase solutions found') + + # plt.show() + +# TODO +def model_apply_threshold(model, threshold=0.0, out=None): + """ + Clip the model to be above the given threshold + + Parameters + ---------- + model : STR, model file name + the input model file name + threshold : FLOAT, optional + the threshold above which the components are kept. The default is 0.0. + out : STR, optional + The output model filename. The default is None (the model file will be overwritten). + + Returns + ------- + None. + """ + out = out or model + logging.warning('Overwriting the model') + df = pd.read_csv(model, skipinitialspace=True) + new = df.query('I>@threshold') + new.to_csv(out, index=False) + return out + + +def main(msin, outbase=None, cfgfile='imcal.yml'): + logging.info('Processing {}'.format(msin)) + logging.info('The config file: {}'.format(cfgfile)) + with open(cfgfile) as f: + cfg = yaml.safe_load(f) + +# define file names: + mspath = os.path.split(os.path.abspath(msin))[0] + msbase = os.path.splitext(msin)[0] + + if outbase is None: + outbase = msbase + + ms_split = msbase + '_splt.MS' + + img1 = outbase + '_1' + img2 = outbase + '_2' + img3 = outbase + '_3' + img_final = outbase + + model1 = outbase + '_model1.sourcedb' + model2 = outbase + '_model2.sourcedb' + model3 = outbase + '_model3.sourcedb' + + dical1 = outbase + '_dical1.MS' + dical2 = outbase + '_dical2.MS' + dical3 = outbase + '_dical3.MS' + + h5_1 = outbase + '_dical1.h5' + h5_2 = outbase + '_dical2.h5' + h5_3 = outbase + '_dical3.h5' + + + if os.path.exists(img_final+'-image.fits'): + logging.info('The final image exists. Exiting...') + return 0 + + if not os.path.exists(ms_split): + ms_split = split_ms(msin, msout_path=ms_split, **cfg['split1']) + +# Clean + DIcal + + # img_peak = fits.getdata(' + if not os.path.exists(img1 +'-image.fits') and (not os.path.exists(img1 +'-MFS-image.fits')): + wsclean(ms_split, outname=img1, **cfg['clean1']) # fast shallow clean + + if not os.path.exists(model1): + makesourcedb(img1+'-sources.txt', out=model1) + + if not os.path.exists(dical1): + dical1 = dical(ms_split, model1, msout=dical1, h5out=h5_1, **cfg['dical1']) + view_sols(h5_1) + + if (not os.path.exists(img2 +'-image.fits')) and (not os.path.exists(img2 +'-MFS-image.fits')): + wsclean(dical1, outname=img2, **cfg['clean2']) + + if not os.path.exists(model2): + makesourcedb(img2+'-sources.txt', out=model2) + + if not os.path.exists(dical2): + dical2 = dical(dical1, model2, msout=dical2, h5out=h5_2, **cfg['dical2']) + view_sols(h5_2) + + if (not os.path.exists(img3 +'-image.fits')) and (not os.path.exists(img3 +'-MFS-image.fits')): + wsclean(dical2, outname=img3, **cfg['clean3']) + + if not os.path.exists(model3): + makesourcedb(img3+'-sources.txt', out=model3) + + if not os.path.exists(dical3): + dical3 = dical(dical2, model3, msout=dical3, h5out=h5_3, **cfg['dical3']) + view_sols(h5_3) + + if (not os.path.exists(img_final +'-image.fits')) and (not os.path.exists(img_final +'-MFS-image.fits')): + wsclean(dical3, outname=img_final, **cfg['clean4']) + + return 0 + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + logging.info('Starting logger for {}'.format(__name__)) + logger = logging.getLogger(__name__) + + t0 = Time.now() + parser = argparse.ArgumentParser(description='DDCal Inputs') + parser.add_argument('msin', help='MS file to process') + parser.add_argument('-c', '--config', action='store', + dest='configfile', help='Config file', type=str) + parser.add_argument('-o', '--outbase', default=None, help='output prefix', type=str) + + args = parser.parse_args() + configfile = args.configfile or \ + os.path.join(os.path.dirname(os.path.realpath(__file__)), 'imcal.yml') + msin = args.msin + logging.info('Using config file: {}'.format(os.path.abspath(configfile))) + main(msin, outbase=args.outbase, cfgfile=configfile) + extime = Time.now() - t0 + print("Execution time: {:.1f} min".format(extime.to("minute").value)) + diff --git a/imcal.yml b/imcal.yml new file mode 100644 index 0000000000000000000000000000000000000000..334750a73e2368fcdd8537019f893edb4b4f7164 --- /dev/null +++ b/imcal.yml @@ -0,0 +1,96 @@ +#:=========================================================================== +# Settings for imcal +#:=========================================================================== +####################### DICAL IMAGING ####################### + +#global: +# dppp_bin: 'DPPP' # on blizzard +# wsclean_bin: 'wsclean' +# makesourcedb_bin: 'makesourcedb' +# bbs2model_bin: 'bbs2model' +# render_bin: 'render' + +split1: + startchan: 40 # start channel to split from + nchan: 0 # 0 means till the end + +clean1: # wsclean setup + imagesize: 3072 + pixelsize: 3 + multifreq: 8 + automask: 20 + autothresh: 5 + multiscale: False + kwstring: '-use-wgridder -parallel-deconvolution 1400 -parallel-gridding 8 -deconvolution-channels 3' # use this for additional wsclean options, e.g. '-weight uniform -use-idg' + +dical1: # DPPP setup for direction independent calibration + solint: 20 + mode: 'phaseonly' + uvlambdamin: 500 # Ignore baselines / channels with UV < uvlambdamin wavelengths. +# cal_nchan: 31 # number of chans with the same solutions + +clean2: + imagesize: 3072 + pixelsize: 3 + multifreq: 8 + automask: 6 + autothresh: 1 + multiscale: True + kwstring: '-use-wgridder -parallel-deconvolution 1400 -parallel-gridding 8 -deconvolution-channels 3' + +dical2: + solint: 1 + mode: 'phaseonly' + uvlambdamin: 500 # Ignore baselines / channels with UV < uvlambdamin wavelengths. + cal_nchan: 31 # number of chans with the same solutions + +clean3: + imagesize: 3072 + pixelsize: 3 + multifreq: 8 + automask: 6 + autothresh: 1 + multiscale: True + kwstring: '-use-wgridder -parallel-deconvolution 1400 -parallel-gridding 8 -deconvolution-channels 3' + +dical3: + solint: 1000 + mode: 'diagonal' + uvlambdamin: 500 # Ignore baselines / channels with UV < uvlambdamin wavelengths. + cal_nchan: 31 # number of chans with the same solutions + +clean4: + imagesize: 3072 + pixelsize: 3 + multifreq: 8 + automask: 4.5 + autothresh: 0.5 + multiscale: True + kwstring: '-use-wgridder -parallel-deconvolution 1400 -parallel-gridding 8 -deconvolution-channels 3 -weight briggs -1.0' + +### END + +####################### CLUSTERING ####################### +#cluster: +# nbright: 80 # number of brightest clean components (CC) to check for artefacts +# boxsize: 250 # the boxsize around CC in pixels where to check for artefacts +# nclusters: 10 # number of clusters ('auto' -- to set automatically) +# cluster_radius: 5 # arcmin +# cluster_overlap: 1.6 # if lower than 2 clusters can intersect +# auto: True +# add_manual: False +# +######################## DD CALIBRATION ####################### +#ddcal: # see DPPP/DDECal documentation +# solint: 120 # Solution interval in timesteps (1 ~ 30sec for Apertif). +# mode: 'diagonal' # Type of constraint to apply. +# nchan: 15 # Number of channels in each channel block, for which the solution is assumed to be constant. +# startchan: 0 +# nchan: 192 +# uvlambdamin: 500 # Ignore baselines / channels with UV < uvlambdamin wavelengths. +# +## TODO: +#plotsols: +# + +