Skip to content
Snippets Groups Projects
Commit f63db0b9 authored by James Petley's avatar James Petley
Browse files

Split directions

parent 22700a8f
No related branches found
No related tags found
1 merge request!25Split directions
Showing
with 672 additions and 30 deletions
......@@ -41,5 +41,3 @@ def plugin_main(args, **kwargs):
return 0
pass
\ No newline at end of file
import os
import numpy as np
import pyrap.tables, math
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.table import Table
def plugin_main(**kwargs):
......@@ -20,23 +15,32 @@ def plugin_main(**kwargs):
-------
result : dict
Output coordinates
"""
# parse the input
target_file = kwargs['target_file']
mode = kwargs['mode']
# read in the catalogue to get source_id, RA, and DEC
t = Table.read(target_file, format='csv')
RA_val = t['RA'].data[0]
DEC_val = t['DEC'].data[0]
Source_id = t['Source_id'].data[0]
if str(Source_id)[0:1] == 'I':
RA_val = t['RA'].data
DEC_val = t['DEC'].data
Source_id = t['Source_id'].data
if mode: # == 'single':
RA_val = [RA_val[0]]
DEC_val = [DEC_val[0]]
Source_id = Source_id[:1]
if str(Source_id[0])[0:1] == 'I':
pass
elif str(Source_id)[0:1] == 'S':
elif str(Source_id[0])[0:1] == 'S':
pass
else:
Source_id = 'S' + str(Source_id)
Source_id = ['S' + str(x) for x in Source_id[0]]
# make a string of coordinates for the NDPPP command
ss = '["' + str(RA_val) + 'deg","' + str(DEC_val) + 'deg"]'
result = {'name' : Source_id, 'coords' : ss}
ss = [ '[' + str(x) + 'deg,' + str(y) + 'deg]' for x, y in zip(RA_val, DEC_val) ]
result = {'name' : ",".join(Source_id),
'coords' : ss[0] if len(ss) == 1 else "[" + ",".join(ss) + "]"}
return result
......@@ -176,4 +176,3 @@ if __name__ == '__main__':
args = parser.parse_args()
main(args.MSfile, args.min_separation, args.outputimage)
\ No newline at end of file
class: CommandLineTool
cwlVersion: v1.2
id: dp3_target_phaseup
label: DP3 Target Phaseup
doc: This tool applies the delay solutions to the target source and phase up to the various
target directions. It appends commands to a parset created in the previous step.
baseCommand: DP3
inputs:
- id: parset
type: File
doc: Input parset file.
default: dp3_explode.parset
inputBinding:
position: 0
- id: msin
type: Directory
doc: Input measurement set.
inputBinding:
position: 1
prefix: msin=
separate: false
shellQuote: false
- id: delay_solset
type: File
doc: Input delay solution set.
inputBinding:
position: 2
prefix: applycal.parmdb=
separate: false
shellQuote: false
- id: max_dp3_threads
type: int?
default: 8
doc: Maximum number of threads to use for DP3.
inputBinding:
position: 3
prefix: numthreads=
separate: false
shellQuote: false
outputs:
- id: msout
type: Directory[]
outputBinding:
glob: "*.mstargetphaseup"
doc: Output measurement set which has been phaseshifted,
averaged and had solutions applied.
- id: logfile
type: File
outputBinding:
glob: dp3_target_phaseup.log
doc: DP3 processing log file.
- id: errorfile
type: File
outputBinding:
glob: dp3_target_phaseup_err.log
doc: DP3 processing error log file.
hints:
- class: DockerRequirement
dockerPull: vlbi-cwl
- class: ResourceRequirement
coresMax: $(inputs.max_dp3_threads)
coresMin: 2
stdout: dp3_target_phaseup.log
stderr: dp3_target_phaseup_err.log
\ No newline at end of file
cwlVersion: v1.2
class: ExpressionTool
id: flatten_array
label: Flatten Array
doc: "Flatten a nested array of 'Any' type into an array. Taken from https://github.com/common-workflow-library/cwl-patterns/tree/main"
requirements:
InlineJavascriptRequirement: {}
inputs:
nestedarray:
type:
type: array
items:
type: array
items: ["null", Any]
outputs:
flattenedarray:
type:
type: array
items: Any
expression: |
${
var flattenedarray = [];
for (var i = 0; i < inputs.nestedarray.length; i++) {
for (var j = 0; j < inputs.nestedarray[i].length; j++) {
if (inputs.nestedarray[i][j] != null) {
flattenedarray.push(inputs.nestedarray[i][j]);
}
}
}
return {"flattenedarray": flattenedarray};
}
\ No newline at end of file
cwlVersion: v1.2
class: CommandLineTool
id: generate_filenames
label: Generate direction filenames
doc: |
Take a MeasurementSet and a list of target sources, and
creates a list of strings where the MeasurementSet name
and target names are concatenated.
baseCommand: [python3, concatenate.py]
inputs:
- id: msin
type: Directory
doc: A MeasurementSet to extract the name from.
- id: source_ids
type: string
doc: A string containing a list of target source IDs.
outputs:
- id: msout_names
type: string
doc: |
a string containing the names for the MeasurementSets
for each direction.
outputBinding:
loadContents: true
glob: out.json
outputEval: $(JSON.parse(self[0].contents).filenames)
requirements:
- class: InlineJavascriptRequirement
- class: InitialWorkDirRequirement
listing:
- entryname: concatenate.py
entry: |
import json
msin = "$(inputs.msin.basename)".split(".")[0]
source_ids = "$(inputs.source_ids)".split(",")
list = [ msin + x + ".mstargetphase" for x in source_ids]
result = {'filenames' : "[" + ",".join(list) + "]"}
with open('./out.json', 'w') as fp:
json.dump(result, fp)
cwlVersion: v1.2
class: CommandLineTool
id: generate_parset_split_directions
label: Generate DP3 parset for split directions
doc: |
Generates a DP3 parameterset to split off
directions from a given MeasurementSet.
baseCommand: [cat, input.parset]
stdout: dp3_explode.parset
inputs:
- id: msout_names
type: string
doc: |
A string of names, one for each direction to image.
- id: phase_centers
type: string
doc: |
A string of pairs of right ascension and declination
coordinates, one for each direction to image.
- id: frequency_resolution
type: string?
default: '390.56kHz'
doc: |
Frequency resolution for the third averaging.
- id: time_resolution
type: string?
default: '32.'
doc: |
Time resolution in seconds for the third averaging.
outputs:
- id: parset
type: File
doc: A DP3 parameterset file.
outputBinding:
glob: dp3_explode.parset
requirements:
- class: InlineJavascriptRequirement
- class: InitialWorkDirRequirement
listing:
- entryname: input.parset
entry: |+
steps = [split]
split.replaceparms = [phaseshift.phasecenter, applybeam.direction, msout.name]
split.steps = [phaseshift, averager1, applybeam, averager2, applycal, averager3, msout]
phaseshift.type = phaseshift
phaseshift.phasecenter = $(inputs.phase_centers)
averager1.type = averager
averager1.freqresolution = 48.82kHz
averager1.timeresolution = 4.
applybeam.type = applybeam
applybeam.direction = $(inputs.phase_centers)
applybeam.beammode = full
averager2.type = averager
averager2.freqresolution = 390.56kHz
averager2.timeresolution = 32
# Apply solutions and more average_target
applycal.type = applycal
applycal.correction = fulljones
applycal.soltab = [amplitude000, phase000]
averager3.type = averager
averager3.freqresolution = $(inputs.frequency_resolution)
averager3.timeresolution = $(inputs.time_resolution)
msout.storagemanager = dysco
msout.name = $(inputs.msout_names)
msout.overwrite = True
class: CommandLineTool
cwlVersion: v1.1
id: order_by_direction
label: Order by Direction
doc: |
This tool takes an array of arrays of directories containing MeasurementSet files which are in groups of frequency.
It re-orders them such that they are in groups of direction ready to be concatenated.
baseCommand:
- python3
- order_by_direction.py
inputs:
- id: msin
type:
type: array
items:
type: array
items: Directory
inputBinding:
position: 0
doc: Array of arrays of directories containing the MeasurementSet files to be ordered
requirements:
- class: InlineJavascriptRequirement
- class: InitialWorkDirRequirement
listing:
- entryname: order_by_direction.py
entry: |
import sys
import numpy as np
import json
mss = $(inputs)['msin']
print(mss)
#The line below does the re-ordering. It performs the transpose of a list.
output = list(map(list, zip(*mss)))
cwl_output = {}
cwl_output['msout'] = output
print(cwl_output)
with open('./out.json', 'w') as fp:
json.dump(cwl_output, fp)
outputs:
- id: msout
type:
type: array
items:
type: array
items: Directory
outputBinding:
loadContents: true
glob: out.json
outputEval: $(JSON.parse(self[0].contents).msout)
doc: Array of arrays of directories containing the MeasurementSet
files ordered by direction.
......@@ -11,6 +11,9 @@ inputs:
- id: delay_calibrator
type: File
doc: file containing target info.
- id: extract_single
type: boolean?
default: false
requirements:
- class: InlineJavascriptRequirement
......@@ -25,8 +28,9 @@ requirements:
inputs = json.loads(r"""$(inputs)""")
target_file = inputs['delay_calibrator']['path']
mode = inputs['extract_single']
output = targetListToCoords(target_file=target_file)
output = targetListToCoords(target_file=target_file, mode=mode)
coords = output['coords']
name = output['name']
......
class: CommandLineTool
cwlVersion: v1.2
id: sort_concatmap
label: sort_concatmap
label: Sort Concatmap
baseCommand:
- python3
......@@ -50,6 +50,7 @@ requirements:
from sort_times_into_freqGroups import main as sort_times_into_freqGroups
mss = sys.argv[1:]
inputs = json.loads(r"""$(inputs)""")
numbands = inputs['numbands']
......@@ -59,8 +60,10 @@ requirements:
truncateLastSBs = inputs['truncateLastSBs']
firstSB = inputs['firstSB']
output = sort_times_into_freqGroups(mss, numbands, NDPPPfill, stepname, mergeLastGroup, truncateLastSBs, firstSB)
print(mss, numbands, NDPPPfill, stepname, mergeLastGroup, truncateLastSBs, firstSB)
output = sort_times_into_freqGroups(mss, numbands, NDPPPfill, stepname, mergeLastGroup, truncateLastSBs, firstSB)
print(output)
filenames = output['filenames']
groupnames = output['groupnames']
total_bandwidth = output['total_bandwidth']
......
cwlVersion: v1.2
class: CommandLineTool
id: target_solve
label: Target Solve
doc: |
This tool performs selfcalibration on the target source.
It uses the facetselfcal.py script from the LOFAR helper scripts with the --auto setting.
baseCommand:
- python3
- delay_solve.py
inputs:
- id: msin
type: Directory
doc: Delay calibrator measurement set.
- id: configfile
type: File
doc: Configuration options for self-calibration.
- id: selfcal
type: Directory
doc: External self-calibration script.
- id: h5merger
type: Directory
doc: External LOFAR helper scripts for merging h5 files.
outputs:
- id: images
type: File[]
outputBinding:
glob: '*.png'
- id: fits_images
type: File[]
outputBinding:
glob: '*MFS-image.fits'
- id: logfile
type: File[]
outputBinding:
glob: target_solve*.log
requirements:
- class: ShellCommandRequirement
- class: InlineJavascriptRequirement
- class: InitialWorkDirRequirement
listing:
- entry: $(inputs.configfile)
- entry: $(inputs.msin)
- entry: $(inputs.h5merger)
- entryname: delay_solve.py
entry: |
import subprocess
import sys
import json
inputs = json.loads(r"""$(inputs)""")
msin = inputs['msin']['basename']
configfile = inputs['configfile']['path']
selfcal = inputs['selfcal']['path']
h5merge = inputs['h5merger']['path']
imagename = msin.split('.copy')[0]
subprocess.run(f'python3 {selfcal}/facetselfcal.py {msin} --helperscriptspath {selfcal}
--helperscriptspathh5merge {h5merge} --weightspectrum-clipvalue 30.0 --auto --imagename {imagename}', shell = True)
hints:
- class: DockerRequirement
dockerPull: vlbi-cwl
- class: ResourceRequirement
coresMin: 6
stdout: target_solve.log
stderr: target_solve_err.log
imsize = 1600
pixelscale = 0.075
auto = True
\ No newline at end of file
......@@ -72,6 +72,8 @@ steps:
in:
- id: delay_calibrator
source: delay_calibrator
- id: extract_single
default: true
out:
- id: source_id
- id: coordinates
......
class: Workflow
cwlVersion: v1.2
id: split-directions
label: Split Directions
doc: |
This is a workflow for the LOFAR-VLBI pipeline that
splits a LOFAR MeasurementSet into various target directions, applies delay calibrator solutions and then optionally performs
self-calibration on the target directions.
This step should be run after the delay calibration workflow.
requirements:
- class: SubworkflowFeatureRequirement
- class: MultipleInputFeatureRequirement
- class: ScatterFeatureRequirement
inputs:
- id: msin
type: Directory[]
doc: The input MS. This should have coverage of the target directions.
- id: delay_solset
type: File
doc: The solution tables generated by the VLBI delay calibration workflow in an HDF5 format.
- id: image_cat
type: File
doc: The image catalogue (in CSV format) containing the target directions.
default: lotss_catalogue.csv
- id: max_dp3_threads
type: int?
default: 4
doc: Number of cores to use per job for
tasks with high I/O or memory.
- id: numbands
type: int?
default: -1
doc: The number of bands to group. -1 means all bands.
- id: do_flagging
type: boolean?
default: false
doc: Whether to flag the data before splitting.
- id: truncateLastSBs
type: boolean?
default: true
doc: Whether to truncate the last subbands of the
MSs to the same length.
- id: do_selfcal
type: boolean?
default: false
doc: Whether to do selfcal on the direction concat MSs.
- id: configfile
type: File
doc: The configuration file to be used to run
facetselfcal.py during the target_solve step.
- id: h5merger
type: Directory
doc: The h5merger directory.
- id: selfcal
type: Directory
doc: The selfcal directory.
steps:
- id: target_phaseup
label: Target Phaseup
in:
- id: msin
source: msin
- id: image_cat
source: image_cat
out:
- id: parset
run: ./subworkflows/split_parset.cwl
scatter: msin
- id: dp3_target_phaseup
label: DP3 Target Phaseup
in:
- id: msin
source: msin
- id: parset
source: target_phaseup/parset
linkMerge: merge_flattened
- id: delay_solset
source: delay_solset
- id: max_dp3_threads
source: max_dp3_threads
out:
- id: msout
run: ../steps/dp3_target_phaseup.cwl
scatter: [msin, parset]
scatterMethod: dotproduct
- id: order_by_direction
label: Order by Direction
in:
- id: msin
source: dp3_target_phaseup/msout
out:
- id: msout
run: ../steps/order_by_direction.cwl
- id: sort_concatmap
label: Sort Concatmap
in:
- id: msin
source: order_by_direction/msout
- id: numbands
source: numbands
- id: truncateLastSBs
source: truncateLastSBs
out:
- id: filenames
- id: groupnames
run: ../steps/sort_concatmap.cwl
scatter: msin
- id: flatten_groupnames
label: Flatten Groupnames
in:
- id: nestedarray
source: sort_concatmap/groupnames
out:
- id: flattenedarray
run: ../steps/flatten.cwl
- id: concatenation
label: concatenation
in:
- id: msin
source: order_by_direction/msout
- id: groups_specification
source: sort_concatmap/filenames
- id: group_id
source: flatten_groupnames/flattenedarray
- id: do_flagging
source: do_flagging
out:
- id: msout
run: ./subworkflows/concatenation.cwl
scatter: [msin, groups_specification, group_id]
scatterMethod: dotproduct
- id: target_selfcal
label: Target Selfcal
in:
- id: msin
source: concatenation/msout
- id: configfile
source: configfile
- id: h5merger
source: h5merger
- id: selfcal
source: selfcal
- id: do_selfcal
source: do_selfcal
out:
- id: images
- id: fits_images
when: $(inputs.do_selfcal)
run: ../steps/target_solve.cwl
scatter: msin
outputs:
- id: msout_phaseup
type:
type: array
items:
type: array
items: Directory
outputSource: dp3_target_phaseup/msout
- id: msout_concat
type: Directory[]
outputSource: concatenation/msout
- id: images
type:
type: array
items:
type: array
items: File
outputSource:
- target_selfcal/images
pickValue: all_non_null
- id: fits_images
type:
type: array
items:
type: array
items: File
outputSource:
- target_selfcal/fits_images
pickValue: all_non_null
cwlVersion: v1.2
class: Workflow
id: split_parset
label: Create parset split directions
doc: |
This workflow does the following:
* It creates a list of target direction coordinates (RA, Dec).
* For each direction it creates a name for the MeasurementSet
* It creates a parameter set file for DP3 to phase shift the
target data to each direction, and to store that phase-shifted
data in a MeasurementSet with the name generated before.
The output is a DP3 parameter set file.
inputs:
- id: msin
type: Directory
doc: The input data in MeasurementSet format.
- id: image_cat
type: File
doc: |
The image catalogue in CSV format,
containing the target directions.
outputs:
- id: parset
type: File
doc: The parameterset file for DP3.
outputSource: generate_parset/parset
steps:
- id: get_coordinates
label: Get target ID and coordinates
in:
- id: delay_calibrator
source: image_cat
out:
- id: source_id
- id: coordinates
- id: logfile
run: ../../steps/prep_delay.cwl
- id: generate_filenames
label: Generate MeasurementSet output names
in:
- id: msin
source: msin
- id: source_ids
source: get_coordinates/source_id
out:
- id: msout_names
run: ../../steps/generate_filenames.cwl
- id: generate_parset
label: Generate direction parset.
in:
- id: msout_names
source: generate_filenames/msout_names
- id: phase_centers
source: get_coordinates/coordinates
out:
- id: parset
run: ../../steps/generate_parset_split.cwl
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment