diff --git a/scripts/Ateamclipper.py b/scripts/Ateamclipper.py index 2227f92d830057ebe047563a768b11316ff5f01f..c7ba705c22edb4bbcba09cf378837553aed4f452 100755 --- a/scripts/Ateamclipper.py +++ b/scripts/Ateamclipper.py @@ -63,4 +63,4 @@ t.putcol('FLAG', flag) t.close() freq_tab.close() -os.system('echo ' + str(freq[0]) + ' ' + str(output_flags_xx - input_flags_xx) + ' ' + str(output_flags_yy - input_flags_yy) + ' >> Ateamclipper.txt') +os.system('echo ' + str(freq[0]) + ' ' + str(output_flags_xx - input_flags_xx) + ' ' + str(output_flags_yy - input_flags_yy) + ' >> Ateamclipper.txt') \ No newline at end of file diff --git a/scripts/PipelineStep_makeLosotoParset.py b/scripts/PipelineStep_makeLosotoParset.py index 58137e0492e82f79053c4b7d6d84c50b9e424c30..a40ad412d7aa9d962b65d1fc5f79be01047e4e13 100755 --- a/scripts/PipelineStep_makeLosotoParset.py +++ b/scripts/PipelineStep_makeLosotoParset.py @@ -40,6 +40,4 @@ def plugin_main(args, **kwargs): parset_file.close() return 0 - pass - - + pass \ No newline at end of file diff --git a/scripts/TargetListToCoords.py b/scripts/TargetListToCoords.py index 46654e2b54c0d781a2135cf6831a3f5ba9111b3b..b71aa0fd733b85dd3697596f9c81edcc8170b551 100644 --- a/scripts/TargetListToCoords.py +++ b/scripts/TargetListToCoords.py @@ -1,14 +1,9 @@ -import os -import numpy as np -import pyrap.tables, math -from astropy import units as u -from astropy.coordinates import SkyCoord from astropy.table import Table def plugin_main(**kwargs): """ Takes in a catalogue with a target and returns appropriate coordinates - + Parameters ---------- filename: str @@ -20,23 +15,32 @@ def plugin_main(**kwargs): ------- result : dict Output coordinates - """ + # parse the input target_file = kwargs['target_file'] - + mode = kwargs['mode'] + # read in the catalogue to get source_id, RA, and DEC t = Table.read(target_file, format='csv') - RA_val = t['RA'].data[0] - DEC_val = t['DEC'].data[0] - Source_id = t['Source_id'].data[0] - if str(Source_id)[0:1] == 'I': - pass - elif str(Source_id)[0:1] == 'S': - pass - else: - Source_id = 'S' + str(Source_id) + RA_val = t['RA'].data + DEC_val = t['DEC'].data + Source_id = t['Source_id'].data + if mode: # == 'single': + RA_val = [RA_val[0]] + DEC_val = [DEC_val[0]] + Source_id = Source_id[:1] + if str(Source_id[0])[0:1] == 'I': + pass + elif str(Source_id[0])[0:1] == 'S': + pass + else: + Source_id = ['S' + str(x) for x in Source_id[0]] + # make a string of coordinates for the NDPPP command - ss = '["' + str(RA_val) + 'deg","' + str(DEC_val) + 'deg"]' - result = {'name' : Source_id, 'coords' : ss} + ss = [ '[' + str(x) + 'deg,' + str(y) + 'deg]' for x, y in zip(RA_val, DEC_val) ] + + result = {'name' : ",".join(Source_id), + 'coords' : ss[0] if len(ss) == 1 else "[" + ",".join(ss) + "]"} + return result diff --git a/scripts/check_Ateam_separation.py b/scripts/check_Ateam_separation.py index d1549a8cd44bfbf8a38d007a1bb2a6eecadbc8e2..6e149f69bd36d4ade3f77ef2c4a5a0ec28438c50 100755 --- a/scripts/check_Ateam_separation.py +++ b/scripts/check_Ateam_separation.py @@ -175,5 +175,4 @@ if __name__ == '__main__': args = parser.parse_args() - main(args.MSfile, args.min_separation, args.outputimage) - + main(args.MSfile, args.min_separation, args.outputimage) \ No newline at end of file diff --git a/scripts/generate_input.sh b/scripts/generate_input.sh index 9e053603fb879ee0901e4c0a06a08ba40f99cc5d..9294e8d944fb8fdefe30b1342ecddb6c286157f0 100755 --- a/scripts/generate_input.sh +++ b/scripts/generate_input.sh @@ -88,4 +88,4 @@ EOF # Close output file exec 3>&- -echo "Wrote output to '${YAML}'" +echo "Wrote output to '${YAML}'" \ No newline at end of file diff --git a/scripts/make_summary.py b/scripts/make_summary.py index 41a4d4a489b121400acb28d347f8c05fb90c7be3..2ca7ca3462857b33b978e19534e1cacc3708246c 100755 --- a/scripts/make_summary.py +++ b/scripts/make_summary.py @@ -247,4 +247,4 @@ if __name__=='__main__': demix=args.demix, removed_bands=args.removed_bands, min_unflagged=args.min_unflagged, refant=args.refant) - sys.exit(0) + sys.exit(0) \ No newline at end of file diff --git a/steps/dp3_target_phaseup.cwl b/steps/dp3_target_phaseup.cwl new file mode 100644 index 0000000000000000000000000000000000000000..dfae68e24441a875562ce5d6ef0c6555acabe1b9 --- /dev/null +++ b/steps/dp3_target_phaseup.cwl @@ -0,0 +1,73 @@ +class: CommandLineTool +cwlVersion: v1.2 +id: dp3_target_phaseup +label: DP3 Target Phaseup +doc: This tool applies the delay solutions to the target source and phase up to the various + target directions. It appends commands to a parset created in the previous step. + + +baseCommand: DP3 + +inputs: + - id: parset + type: File + doc: Input parset file. + default: dp3_explode.parset + inputBinding: + position: 0 + - id: msin + type: Directory + doc: Input measurement set. + inputBinding: + position: 1 + prefix: msin= + separate: false + shellQuote: false + - id: delay_solset + type: File + doc: Input delay solution set. + inputBinding: + position: 2 + prefix: applycal.parmdb= + separate: false + shellQuote: false + - id: max_dp3_threads + type: int? + default: 8 + doc: Maximum number of threads to use for DP3. + inputBinding: + position: 3 + prefix: numthreads= + separate: false + shellQuote: false + +outputs: + - id: msout + type: Directory[] + outputBinding: + glob: "*.mstargetphaseup" + doc: Output measurement set which has been phaseshifted, + averaged and had solutions applied. + - id: logfile + type: File + outputBinding: + glob: dp3_target_phaseup.log + doc: DP3 processing log file. + - id: errorfile + type: File + outputBinding: + glob: dp3_target_phaseup_err.log + doc: DP3 processing error log file. + + + +hints: + - class: DockerRequirement + dockerPull: vlbi-cwl + - class: ResourceRequirement + coresMax: $(inputs.max_dp3_threads) + coresMin: 2 + + +stdout: dp3_target_phaseup.log +stderr: dp3_target_phaseup_err.log \ No newline at end of file diff --git a/steps/flatten.cwl b/steps/flatten.cwl new file mode 100644 index 0000000000000000000000000000000000000000..e42538ee0f5889f1e443fdbe32107cba0e44af93 --- /dev/null +++ b/steps/flatten.cwl @@ -0,0 +1,31 @@ +cwlVersion: v1.2 +class: ExpressionTool +id: flatten_array +label: Flatten Array +doc: "Flatten a nested array of 'Any' type into an array. Taken from https://github.com/common-workflow-library/cwl-patterns/tree/main" +requirements: + InlineJavascriptRequirement: {} +inputs: + nestedarray: + type: + type: array + items: + type: array + items: ["null", Any] +outputs: + flattenedarray: + type: + type: array + items: Any +expression: | + ${ + var flattenedarray = []; + for (var i = 0; i < inputs.nestedarray.length; i++) { + for (var j = 0; j < inputs.nestedarray[i].length; j++) { + if (inputs.nestedarray[i][j] != null) { + flattenedarray.push(inputs.nestedarray[i][j]); + } + } + } + return {"flattenedarray": flattenedarray}; + } \ No newline at end of file diff --git a/steps/generate_filenames.cwl b/steps/generate_filenames.cwl new file mode 100644 index 0000000000000000000000000000000000000000..fdf437118f871bbe81ed7b24f5b1dfc10f8deb38 --- /dev/null +++ b/steps/generate_filenames.cwl @@ -0,0 +1,45 @@ +cwlVersion: v1.2 +class: CommandLineTool +id: generate_filenames +label: Generate direction filenames +doc: | + Take a MeasurementSet and a list of target sources, and + creates a list of strings where the MeasurementSet name + and target names are concatenated. + +baseCommand: [python3, concatenate.py] + +inputs: + - id: msin + type: Directory + doc: A MeasurementSet to extract the name from. + + - id: source_ids + type: string + doc: A string containing a list of target source IDs. + +outputs: + - id: msout_names + type: string + doc: | + a string containing the names for the MeasurementSets + for each direction. + outputBinding: + loadContents: true + glob: out.json + outputEval: $(JSON.parse(self[0].contents).filenames) + +requirements: + - class: InlineJavascriptRequirement + - class: InitialWorkDirRequirement + listing: + - entryname: concatenate.py + entry: | + import json + + msin = "$(inputs.msin.basename)".split(".")[0] + source_ids = "$(inputs.source_ids)".split(",") + list = [ msin + x + ".mstargetphase" for x in source_ids] + result = {'filenames' : "[" + ",".join(list) + "]"} + with open('./out.json', 'w') as fp: + json.dump(result, fp) diff --git a/steps/generate_parset_split.cwl b/steps/generate_parset_split.cwl new file mode 100755 index 0000000000000000000000000000000000000000..63a15856a611af4fb89af40ae0bd2ce3a18d4abe --- /dev/null +++ b/steps/generate_parset_split.cwl @@ -0,0 +1,82 @@ +cwlVersion: v1.2 +class: CommandLineTool +id: generate_parset_split_directions +label: Generate DP3 parset for split directions +doc: | + Generates a DP3 parameterset to split off + directions from a given MeasurementSet. + +baseCommand: [cat, input.parset] + +stdout: dp3_explode.parset + +inputs: + - id: msout_names + type: string + doc: | + A string of names, one for each direction to image. + + - id: phase_centers + type: string + doc: | + A string of pairs of right ascension and declination + coordinates, one for each direction to image. + + - id: frequency_resolution + type: string? + default: '390.56kHz' + doc: | + Frequency resolution for the third averaging. + + - id: time_resolution + type: string? + default: '32.' + doc: | + Time resolution in seconds for the third averaging. + +outputs: + - id: parset + type: File + doc: A DP3 parameterset file. + outputBinding: + glob: dp3_explode.parset + +requirements: + - class: InlineJavascriptRequirement + - class: InitialWorkDirRequirement + listing: + - entryname: input.parset + entry: |+ + steps = [split] + + split.replaceparms = [phaseshift.phasecenter, applybeam.direction, msout.name] + split.steps = [phaseshift, averager1, applybeam, averager2, applycal, averager3, msout] + + phaseshift.type = phaseshift + phaseshift.phasecenter = $(inputs.phase_centers) + + averager1.type = averager + averager1.freqresolution = 48.82kHz + averager1.timeresolution = 4. + + applybeam.type = applybeam + applybeam.direction = $(inputs.phase_centers) + applybeam.beammode = full + + averager2.type = averager + averager2.freqresolution = 390.56kHz + averager2.timeresolution = 32 + + # Apply solutions and more average_target + + applycal.type = applycal + applycal.correction = fulljones + applycal.soltab = [amplitude000, phase000] + + averager3.type = averager + averager3.freqresolution = $(inputs.frequency_resolution) + averager3.timeresolution = $(inputs.time_resolution) + + msout.storagemanager = dysco + msout.name = $(inputs.msout_names) + msout.overwrite = True diff --git a/steps/order_by_direction.cwl b/steps/order_by_direction.cwl new file mode 100644 index 0000000000000000000000000000000000000000..08b2491865c0d97bbc83beba45695ada675f4e68 --- /dev/null +++ b/steps/order_by_direction.cwl @@ -0,0 +1,65 @@ +class: CommandLineTool +cwlVersion: v1.1 +id: order_by_direction +label: Order by Direction +doc: | + This tool takes an array of arrays of directories containing MeasurementSet files which are in groups of frequency. + It re-orders them such that they are in groups of direction ready to be concatenated. + +baseCommand: + - python3 + - order_by_direction.py + +inputs: + - id: msin + type: + type: array + items: + type: array + items: Directory + inputBinding: + position: 0 + doc: Array of arrays of directories containing the MeasurementSet files to be ordered + +requirements: + - class: InlineJavascriptRequirement + - class: InitialWorkDirRequirement + listing: + - entryname: order_by_direction.py + entry: | + import sys + import numpy as np + import json + + mss = $(inputs)['msin'] + print(mss) + + #The line below does the re-ordering. It performs the transpose of a list. + output = list(map(list, zip(*mss))) + + cwl_output = {} + + cwl_output['msout'] = output + + print(cwl_output) + + with open('./out.json', 'w') as fp: + json.dump(cwl_output, fp) + + + +outputs: + - id: msout + type: + type: array + items: + type: array + items: Directory + outputBinding: + loadContents: true + glob: out.json + outputEval: $(JSON.parse(self[0].contents).msout) + doc: Array of arrays of directories containing the MeasurementSet + files ordered by direction. + + diff --git a/steps/prep_delay.cwl b/steps/prep_delay.cwl index da56b4bb92e3e4cdd7b7ad062a4ce776bdbe7563..9c18409818b9a690c3baf3eed77e7fc40205e497 100644 --- a/steps/prep_delay.cwl +++ b/steps/prep_delay.cwl @@ -11,6 +11,9 @@ inputs: - id: delay_calibrator type: File doc: file containing target info. + - id: extract_single + type: boolean? + default: false requirements: - class: InlineJavascriptRequirement @@ -25,8 +28,9 @@ requirements: inputs = json.loads(r"""$(inputs)""") target_file = inputs['delay_calibrator']['path'] + mode = inputs['extract_single'] - output = targetListToCoords(target_file=target_file) + output = targetListToCoords(target_file=target_file, mode=mode) coords = output['coords'] name = output['name'] diff --git a/steps/sort_concatmap.cwl b/steps/sort_concatmap.cwl index 48f6f52ab08c9232a9362d583917e988353de8e7..cc9e2a7fb7e17fea01ce4da45f15f81a27ffe77d 100755 --- a/steps/sort_concatmap.cwl +++ b/steps/sort_concatmap.cwl @@ -1,7 +1,7 @@ class: CommandLineTool cwlVersion: v1.2 id: sort_concatmap -label: sort_concatmap +label: Sort Concatmap baseCommand: - python3 @@ -50,6 +50,7 @@ requirements: from sort_times_into_freqGroups import main as sort_times_into_freqGroups mss = sys.argv[1:] + inputs = json.loads(r"""$(inputs)""") numbands = inputs['numbands'] @@ -59,8 +60,10 @@ requirements: truncateLastSBs = inputs['truncateLastSBs'] firstSB = inputs['firstSB'] - output = sort_times_into_freqGroups(mss, numbands, NDPPPfill, stepname, mergeLastGroup, truncateLastSBs, firstSB) + print(mss, numbands, NDPPPfill, stepname, mergeLastGroup, truncateLastSBs, firstSB) + output = sort_times_into_freqGroups(mss, numbands, NDPPPfill, stepname, mergeLastGroup, truncateLastSBs, firstSB) + print(output) filenames = output['filenames'] groupnames = output['groupnames'] total_bandwidth = output['total_bandwidth'] diff --git a/steps/target_solve.cwl b/steps/target_solve.cwl new file mode 100644 index 0000000000000000000000000000000000000000..f1f16c55bf6706a1cb2cf91796e1e7450623c490 --- /dev/null +++ b/steps/target_solve.cwl @@ -0,0 +1,74 @@ +cwlVersion: v1.2 +class: CommandLineTool +id: target_solve +label: Target Solve +doc: | + This tool performs selfcalibration on the target source. + It uses the facetselfcal.py script from the LOFAR helper scripts with the --auto setting. + +baseCommand: + - python3 + - delay_solve.py + +inputs: + - id: msin + type: Directory + doc: Delay calibrator measurement set. + - id: configfile + type: File + doc: Configuration options for self-calibration. + - id: selfcal + type: Directory + doc: External self-calibration script. + - id: h5merger + type: Directory + doc: External LOFAR helper scripts for merging h5 files. + +outputs: + - id: images + type: File[] + outputBinding: + glob: '*.png' + - id: fits_images + type: File[] + outputBinding: + glob: '*MFS-image.fits' + - id: logfile + type: File[] + outputBinding: + glob: target_solve*.log + +requirements: + - class: ShellCommandRequirement + - class: InlineJavascriptRequirement + - class: InitialWorkDirRequirement + listing: + - entry: $(inputs.configfile) + - entry: $(inputs.msin) + - entry: $(inputs.h5merger) + - entryname: delay_solve.py + entry: | + import subprocess + import sys + import json + + inputs = json.loads(r"""$(inputs)""") + + msin = inputs['msin']['basename'] + configfile = inputs['configfile']['path'] + selfcal = inputs['selfcal']['path'] + h5merge = inputs['h5merger']['path'] + + imagename = msin.split('.copy')[0] + + subprocess.run(f'python3 {selfcal}/facetselfcal.py {msin} --helperscriptspath {selfcal} + --helperscriptspathh5merge {h5merge} --weightspectrum-clipvalue 30.0 --auto --imagename {imagename}', shell = True) + +hints: + - class: DockerRequirement + dockerPull: vlbi-cwl + - class: ResourceRequirement + coresMin: 6 + +stdout: target_solve.log +stderr: target_solve_err.log diff --git a/target_selfcal_config.txt b/target_selfcal_config.txt new file mode 100644 index 0000000000000000000000000000000000000000..ef0e1c7ccd9de55e6205897edd9af827dc5a94d6 --- /dev/null +++ b/target_selfcal_config.txt @@ -0,0 +1,3 @@ +imsize = 1600 +pixelscale = 0.075 +auto = True \ No newline at end of file diff --git a/workflows/phaseup-concat.cwl b/workflows/phaseup-concat.cwl index 6c7b5c7254ea9f63049d02655c19be39af52439c..eb45b9a8f5f258079ea74f986547d0244e0ba3ba 100644 --- a/workflows/phaseup-concat.cwl +++ b/workflows/phaseup-concat.cwl @@ -72,6 +72,8 @@ steps: in: - id: delay_calibrator source: delay_calibrator + - id: extract_single + default: true out: - id: source_id - id: coordinates diff --git a/workflows/split-directions.cwl b/workflows/split-directions.cwl new file mode 100644 index 0000000000000000000000000000000000000000..55fff245506eebb3688debf99290fabdfdfc12bd --- /dev/null +++ b/workflows/split-directions.cwl @@ -0,0 +1,194 @@ +class: Workflow +cwlVersion: v1.2 +id: split-directions +label: Split Directions +doc: | + This is a workflow for the LOFAR-VLBI pipeline that + splits a LOFAR MeasurementSet into various target directions, applies delay calibrator solutions and then optionally performs + self-calibration on the target directions. + + This step should be run after the delay calibration workflow. + + +requirements: + - class: SubworkflowFeatureRequirement + - class: MultipleInputFeatureRequirement + - class: ScatterFeatureRequirement + +inputs: + - id: msin + type: Directory[] + doc: The input MS. This should have coverage of the target directions. + - id: delay_solset + type: File + doc: The solution tables generated by the VLBI delay calibration workflow in an HDF5 format. + - id: image_cat + type: File + doc: The image catalogue (in CSV format) containing the target directions. + default: lotss_catalogue.csv + - id: max_dp3_threads + type: int? + default: 4 + doc: Number of cores to use per job for + tasks with high I/O or memory. + - id: numbands + type: int? + default: -1 + doc: The number of bands to group. -1 means all bands. + - id: do_flagging + type: boolean? + default: false + doc: Whether to flag the data before splitting. + - id: truncateLastSBs + type: boolean? + default: true + doc: Whether to truncate the last subbands of the + MSs to the same length. + - id: do_selfcal + type: boolean? + default: false + doc: Whether to do selfcal on the direction concat MSs. + - id: configfile + type: File + doc: The configuration file to be used to run + facetselfcal.py during the target_solve step. + - id: h5merger + type: Directory + doc: The h5merger directory. + - id: selfcal + type: Directory + doc: The selfcal directory. + + +steps: + - id: target_phaseup + label: Target Phaseup + in: + - id: msin + source: msin + - id: image_cat + source: image_cat + out: + - id: parset + run: ./subworkflows/split_parset.cwl + scatter: msin + + - id: dp3_target_phaseup + label: DP3 Target Phaseup + in: + - id: msin + source: msin + - id: parset + source: target_phaseup/parset + linkMerge: merge_flattened + - id: delay_solset + source: delay_solset + - id: max_dp3_threads + source: max_dp3_threads + out: + - id: msout + run: ../steps/dp3_target_phaseup.cwl + scatter: [msin, parset] + scatterMethod: dotproduct + + - id: order_by_direction + label: Order by Direction + in: + - id: msin + source: dp3_target_phaseup/msout + out: + - id: msout + run: ../steps/order_by_direction.cwl + + - id: sort_concatmap + label: Sort Concatmap + in: + - id: msin + source: order_by_direction/msout + - id: numbands + source: numbands + - id: truncateLastSBs + source: truncateLastSBs + out: + - id: filenames + - id: groupnames + run: ../steps/sort_concatmap.cwl + scatter: msin + + - id: flatten_groupnames + label: Flatten Groupnames + in: + - id: nestedarray + source: sort_concatmap/groupnames + out: + - id: flattenedarray + run: ../steps/flatten.cwl + + + - id: concatenation + label: concatenation + in: + - id: msin + source: order_by_direction/msout + - id: groups_specification + source: sort_concatmap/filenames + - id: group_id + source: flatten_groupnames/flattenedarray + - id: do_flagging + source: do_flagging + out: + - id: msout + run: ./subworkflows/concatenation.cwl + scatter: [msin, groups_specification, group_id] + scatterMethod: dotproduct + + + - id: target_selfcal + label: Target Selfcal + in: + - id: msin + source: concatenation/msout + - id: configfile + source: configfile + - id: h5merger + source: h5merger + - id: selfcal + source: selfcal + - id: do_selfcal + source: do_selfcal + out: + - id: images + - id: fits_images + when: $(inputs.do_selfcal) + run: ../steps/target_solve.cwl + scatter: msin + +outputs: + - id: msout_phaseup + type: + type: array + items: + type: array + items: Directory + outputSource: dp3_target_phaseup/msout + - id: msout_concat + type: Directory[] + outputSource: concatenation/msout + - id: images + type: + type: array + items: + type: array + items: File + outputSource: + - target_selfcal/images + pickValue: all_non_null + - id: fits_images + type: + type: array + items: + type: array + items: File + outputSource: + - target_selfcal/fits_images + pickValue: all_non_null diff --git a/workflows/subworkflows/split_parset.cwl b/workflows/subworkflows/split_parset.cwl new file mode 100644 index 0000000000000000000000000000000000000000..d0d3df30ca09f52a87cb291a670e3ac08d6887fc --- /dev/null +++ b/workflows/subworkflows/split_parset.cwl @@ -0,0 +1,65 @@ +cwlVersion: v1.2 +class: Workflow +id: split_parset +label: Create parset split directions +doc: | + This workflow does the following: + + * It creates a list of target direction coordinates (RA, Dec). + * For each direction it creates a name for the MeasurementSet + * It creates a parameter set file for DP3 to phase shift the + target data to each direction, and to store that phase-shifted + data in a MeasurementSet with the name generated before. + + The output is a DP3 parameter set file. + +inputs: + - id: msin + type: Directory + doc: The input data in MeasurementSet format. + + - id: image_cat + type: File + doc: | + The image catalogue in CSV format, + containing the target directions. + +outputs: + - id: parset + type: File + doc: The parameterset file for DP3. + outputSource: generate_parset/parset + +steps: + - id: get_coordinates + label: Get target ID and coordinates + in: + - id: delay_calibrator + source: image_cat + out: + - id: source_id + - id: coordinates + - id: logfile + run: ../../steps/prep_delay.cwl + + - id: generate_filenames + label: Generate MeasurementSet output names + in: + - id: msin + source: msin + - id: source_ids + source: get_coordinates/source_id + out: + - id: msout_names + run: ../../steps/generate_filenames.cwl + + - id: generate_parset + label: Generate direction parset. + in: + - id: msout_names + source: generate_filenames/msout_names + - id: phase_centers + source: get_coordinates/coordinates + out: + - id: parset + run: ../../steps/generate_parset_split.cwl