diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..b9a5796b61c3de67a6b8dd0c48df1cdc6171ea85 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.idea \ No newline at end of file diff --git a/scripts/direction_selection.py b/scripts/direction_selection.py new file mode 100755 index 0000000000000000000000000000000000000000..d3f4c9b4a267360c7deb83f146b79db53bc31bdd --- /dev/null +++ b/scripts/direction_selection.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +__author__ = "Jurjen de Jong" + +import os +import re +import sys +from argparse import ArgumentParser + +import pandas as pd +from astropy.coordinates import SkyCoord +from astropy import units as u + + +def filter_too_nearest_neighbours(csv: str = None, sep: float = 0.1): + """ + Identify sources that have a nearest neighbour within 0.1 degrees distance. + Keep the source with the highest spd_score. + + Args: + csv: CSV file with RA/DEC and spd_score + sep: separation threshold in degrees + + Returns: + + """ + + df = pd.read_csv(csv) + + # SkyCoord objects for all sources + coords = SkyCoord(ra=df['RA'].values * u.deg, dec=df['DEC'].values * u.deg) + + # Entries collection to remove + to_remove = set() + + for i, row in df.iterrows(): + if i in to_remove: + continue # Skip already marked rows + + # Separations between the current source and all others + separations = coords[i].separation(coords).degree + + # Sources within seperation threshold + close_sources = (separations < sep) & (separations > 0) + close_indices = df[close_sources].index + + # If there are neighbouring sources, compare the spd_scores + for j in close_indices: + if df.loc[j, 'spd_score'] < row['spd_score']: + # Remove the current source if its spd_score is lower + to_remove.add(i) + + # Remove rows marked for removal + print('Removing:') + print(df.iloc[list(to_remove)]) + filtered_df = df.drop(index=to_remove).reset_index(drop=True) + + return filtered_df + + +def parse_source_id(inp_str: str = None): + """ + Parse ILTJ... source_id string + + Args: + inp_str: ILTJ source_id + + Returns: parsed output + + """ + + parsed_inp = re.findall(r'ILTJ\d+\..\d+\+\d+.\d+', inp_str)[0] + + return parsed_inp + + +def match_source_id(mslist: list = None, source_id: str = None): + """ + Return MS name by matching source ID with items from list with MS names + + Parameters + ---------- + mslist: List with MS + source_id: Source ID + + Returns + ------- + Most similar element + """ + + for ms in mslist: + if parse_source_id(ms)==source_id: + return ms + + # If no match (should not arrive here) + sys.exit(f"ERROR: No matching MS for {source_id}") + + +def rename_folder(old_name, new_name): + """ + Rename folder name + + Parameters + ---------- + old_name: Old name + new_name: New name + """ + + try: + os.rename(old_name, new_name) + print(f"Folder renamed from '{old_name}' to '{new_name}'") + except OSError as e: + print(f"Error: {e}") + + +def parse_args(): + """ + Command line argument parser + + Returns + ------- + Parsed arguments + """ + + parser = ArgumentParser() + parser.add_argument('--csv', help='CSV with names and phasediff scores', default=None) + parser.add_argument('--ms', nargs="+", help='Input MS', default=None) + parser.add_argument('--best_score', type=float, + help='Optimal selection score (See Section 3.3.1 https://arxiv.org/pdf/2407.13247)', + default=2.3) + parser.add_argument('--suffix', help='suffix', default='_best') + return parser.parse_args() + + +def main(): + """ + Main function + """ + + args = parse_args() + + # Get dataframe after filtering for sources within 0.1 degrees distance from each other + df = filter_too_nearest_neighbours(args.csv) + + for source in df.set_index('source').iterrows(): + name = source[0] + score = source[1]['spd_score'] + if score < args.best_score: + ms_name = match_source_id(args.ms, name) + + # Rename folder to return best directions in CWL workflow + rename_folder(ms_name, ms_name.split('/')[-1]+args.suffix+'.ms') + + +if __name__ == '__main__': + main() diff --git a/scripts/get_delay_dir.py b/scripts/get_delay_dir.py index 17114ceac1703c9fa7a120e35cbb69998935f3ca..ecdd8c60281545fdc57e705d17ca3eb864582d36 100755 --- a/scripts/get_delay_dir.py +++ b/scripts/get_delay_dir.py @@ -25,8 +25,9 @@ def main(h5_in: str, solset: str, direction_name: str): delay_direction = SkyCoord(ss_dir[0], ss_dir[1], unit="rad") ra = delay_direction.ra.to("deg").value dec = delay_direction.dec.to("deg").value - print("Source_id,RA,DEC") - print(f"DelayCal,{ra},{dec}") + with open("delay_dir.csv", "w") as f: + f.write("Source_id,RA,DEC\n") + f.write(f"DelayCal,{ra},{dec}") if __name__ == "__main__": diff --git a/steps/dp3_parset.cwl b/steps/dp3_parset.cwl new file mode 100644 index 0000000000000000000000000000000000000000..d44ae6887a8d946ca78371b0a9c61f54985893ba --- /dev/null +++ b/steps/dp3_parset.cwl @@ -0,0 +1,46 @@ +cwlVersion: v1.2 +class: CommandLineTool +id: dp3_parset +label: DP3 with parset +doc: | + Run DP3 with a parset + +baseCommand: DP3 + +inputs: + - id: parset + type: File + doc: Parset for DP3 + inputBinding: + position: 0 + - id: msin + type: Directory[] + doc: input MS + +outputs: + - id: msout + type: Directory + doc: Output measurement set + outputBinding: + glob: "*.ms" + - id: logfile + type: File[] + outputBinding: + glob: dp3_parset*.log + doc: | + The files containing the stdout + and stderr from the step. + +requirements: + - class: InitialWorkDirRequirement + listing: + - entry: $(inputs.msin) + +hints: + - class: DockerRequirement + dockerPull: vlbi-cwl + - class: ResourceRequirement + coresMin: 6 + +stdout: dp3_parset.log +stderr: dp3_parset_err.log diff --git a/steps/dp3_prep_phasediff.cwl b/steps/dp3_prep_phasediff.cwl new file mode 100644 index 0000000000000000000000000000000000000000..6feb89992600751117c62624ef3cb8c77f802a9d --- /dev/null +++ b/steps/dp3_prep_phasediff.cwl @@ -0,0 +1,56 @@ +class: CommandLineTool +cwlVersion: v1.2 +id: pre_averaging_dp3 +label: DP3 Pre-averaging +doc: This step pre-averages measurement set for pulling phasediff scores from facetselfcal + + +baseCommand: DP3 + +inputs: + - id: msin + type: Directory + doc: Input measurement set + inputBinding: + prefix: msin= + position: 0 + shellQuote: false + separate: false + +outputs: + - id: phasediff_ms + type: Directory + doc: Output measurement sets + outputBinding: + glob: "*.phasediff.ms" + - id: logfile + type: File[] + doc: DP3 log files + outputBinding: + glob: dp3_prephasediff*.log + +arguments: + - steps=[avg] + - msin.datacolumn=DATA + - msout.storagemanager=dysco + - avg.type=averager + - avg.freqresolution=390.56kHz + - avg.timeresolution=60 + - msout=$(inputs.msin.path+".phasediff.ms") + +requirements: + - class: InlineJavascriptRequirement + - class: InitialWorkDirRequirement + listing: + - entry: $(inputs.msin) + writable: true + +hints: + - class: DockerRequirement + dockerPull: vlbi-cwl + - class: ResourceRequirement + coresMin: 6 + + +stdout: dp3_prephasediff.log +stderr: dp3_prephasediff_err.log diff --git a/steps/get_delay_cal_direction.cwl b/steps/get_delay_cal_direction.cwl index b8b77162699b6b9788834fcefc45ae60da6000b9..8ed7307d284a477404477ca706cf8a70eed10030 100644 --- a/steps/get_delay_cal_direction.cwl +++ b/steps/get_delay_cal_direction.cwl @@ -8,7 +8,7 @@ doc: | baseCommand: - get_delay_dir.py -stdout: delay_dir.csv +stdout: get_delay_dir.txt stderr: get_delay_dir_err.txt inputs: diff --git a/steps/get_phasediff.cwl b/steps/get_phasediff.cwl new file mode 100644 index 0000000000000000000000000000000000000000..9c7591607aa0f91b34604ee2524d70c2bf916b55 --- /dev/null +++ b/steps/get_phasediff.cwl @@ -0,0 +1,73 @@ +class: CommandLineTool +cwlVersion: v1.2 +id: get_phasediff +label: Polarization Phase Difference +doc: This step makes scalarphasediff solution files, needed for collecting source selection scores + +baseCommand: + - python3 + +inputs: + - id: phasediff_ms + type: Directory + doc: Input MeasurementSet + inputBinding: + position: 20 + - id: lofar_helpers + type: Directory + doc: The lofar_helpers directory. + - id: selfcal + type: Directory + doc: The facetselfcal directory. + +outputs: + - id: phasediff_h5out + type: File + doc: h5parm solution files with scalarphasediff solutions + outputBinding: + glob: "scalarphasediff*.h5" + - id: logfile + type: File[] + doc: log files from facetselfcal + outputBinding: + glob: phasediff*.log + +requirements: + - class: InlineJavascriptRequirement + - class: InitialWorkDirRequirement + listing: + - entry: $(inputs.phasediff_ms) + writable: true + - entry: $(inputs.lofar_helpers) + - entry: $(inputs.selfcal) + +arguments: + - $(inputs.selfcal.path + '/facetselfcal.py') + - -i + - phasediff + - --forwidefield + - --phaseupstations=core + - --skipbackup + - --uvmin=20000 + - --soltype-list=['scalarphasediff'] + - --solint-list=['10min'] + - --nchan-list=[6] + - --docircular + - --uvminscalarphasediff=0 + - --stop=1 + - --soltypecycles-list=[0] + - --imsize=1600 + - --skymodelpointsource=1.0 + - --helperscriptspath=$(inputs.selfcal.path) + - --helperscriptspathh5merge=$(inputs.lofar_helpers.path) + - --stopafterskysolve + - --phasediff_only + +hints: + - class: DockerRequirement + dockerPull: vlbi-cwl + - class: ResourceRequirement + coresMin: 2 + +stdout: phasediff.log +stderr: phasediff_err.log diff --git a/steps/get_selection_scores.cwl b/steps/get_selection_scores.cwl new file mode 100644 index 0000000000000000000000000000000000000000..1ee658893f84842563bbd848a15be166e95a626b --- /dev/null +++ b/steps/get_selection_scores.cwl @@ -0,0 +1,53 @@ +class: CommandLineTool +cwlVersion: v1.2 +id: get_source_scores +label: Source Scores +doc: | + This step calculates the scores from h5parm solution files to determine if a direction is suitable + for long-baseline calibration for wide-field imaging (See Section 3.3.1 https://arxiv.org/pdf/2407.13247) + +baseCommand: + - python3 + +inputs: + - id: phasediff_h5 + type: File[] + doc: H5parm solutions from facetselfcal. + inputBinding: + prefix: "--h5" + position: 1 + itemSeparator: " " + separate: true + - id: selfcal + type: Directory + doc: Path to facetselfcal directory. + +outputs: + - id: phasediff_score_csv + type: File + doc: csv with phase difference scores + outputBinding: + glob: "*.csv" + - id: logfile + type: File[] + doc: log files corresponding to this step + outputBinding: + glob: phasediff*.log + +requirements: + - class: InlineJavascriptRequirement + - class: InitialWorkDirRequirement + listing: + - entry: $(inputs.phasediff_h5) + writable: true + - entry: $(inputs.selfcal) + +arguments: + - $( inputs.selfcal.path + '/source_selection/phasediff_output.py' ) + +hints: + - class: DockerRequirement + dockerPull: vlbi-cwl + +stdout: phasediff.log +stderr: phasediff_err.log diff --git a/steps/make_concat_parsets.cwl b/steps/make_concat_parsets.cwl new file mode 100644 index 0000000000000000000000000000000000000000..db297165d13d45223784aefd19e18166779d2139 --- /dev/null +++ b/steps/make_concat_parsets.cwl @@ -0,0 +1,57 @@ +class: CommandLineTool +cwlVersion: v1.2 +id: makeparsets +label: Make concat parsets +doc: | + Generate direction concatenation parsets + +baseCommand: + - python3 + +inputs: + - id: msin + type: Directory[] + inputBinding: + prefix: "--msin" + position: 1 + separate: true + doc: Input data in MeasurementSet format. + - id: lofar_helpers + type: Directory + doc: Path to lofar_helpers directory. + +outputs: + - id: concat_parsets + doc: | + The output data with corrected + data in MeasurementSet format. + type: File[] + outputBinding: + glob: '*.parset' + + - id: logfile + type: File[] + outputBinding: + glob: python_concat*.log + doc: | + The files containing the stdout + and stderr from the step. + +arguments: + - $( inputs.lofar_helpers.path + '/ms_helpers/concat_with_dummies.py' ) + - --make_only_parset + - --only_basename + +requirements: + - class: InlineJavascriptRequirement + - class: InitialWorkDirRequirement + listing: + - entry: $(inputs.msin) + writable: false + +hints: + - class: DockerRequirement + dockerPull: vlbi-cwl + +stdout: python_concat.log +stderr: python_concat_err.log diff --git a/steps/order_by_direction.cwl b/steps/order_by_direction.cwl deleted file mode 100644 index 139f325ecb26eef05d0786a88087d547d46e69a1..0000000000000000000000000000000000000000 --- a/steps/order_by_direction.cwl +++ /dev/null @@ -1,61 +0,0 @@ -class: CommandLineTool -cwlVersion: v1.1 -id: order_by_direction -label: Order by Direction -doc: | - This tool takes an array of arrays of directories containing MeasurementSet files which are in groups of frequency. - It re-orders them such that they are in groups of direction ready to be concatenated. - -baseCommand: - - python3 - - order_by_direction.py - -inputs: - - id: msin - type: - type: array - items: - type: array - items: Directory - inputBinding: - position: 0 - doc: Array of arrays of directories containing the MeasurementSet files to be ordered - -requirements: - - class: InlineJavascriptRequirement - - class: InitialWorkDirRequirement - listing: - - entryname: order_by_direction.py - entry: | - import json - - mss = $(inputs)['msin'] - print(mss) - - #The line below does the re-ordering. It performs the transpose of a list. - output = list(map(list, zip(*mss))) - - cwl_output = {} - - cwl_output['msout'] = output - - print(cwl_output) - - # The results are written to this file to circumvent the size - # restrictions placed on files that can be parsed by outputEval. See - # https://www.commonwl.org/v1.2/CommandLineTool.html#CommandOutputBinding - with open('$(runtime.outdir)/cwl.output.json', 'w') as fp: - json.dump(cwl_output, fp) - -outputs: - - id: msout - type: - type: array - items: - type: array - items: Directory - outputBinding: - glob: $(runtime.outdir)/cwl.output.json - outputEval: $(self.msout) - doc: Array of arrays of directories containing the MeasurementSet - files ordered by direction. diff --git a/steps/select_best_directions.cwl b/steps/select_best_directions.cwl new file mode 100644 index 0000000000000000000000000000000000000000..a43ff4e97d70cef4848cbc4f84ed8d281ebc376d --- /dev/null +++ b/steps/select_best_directions.cwl @@ -0,0 +1,49 @@ +class: CommandLineTool +cwlVersion: v1.2 +id: select_best_directions +label: Select best directions +doc: This step uses the phasediff scores to select the best input directions by adding a suffix *_best.ms + +baseCommand: + - direction_selection.py + - --best_score=2.3 + +inputs: + - id: msin + type: Directory[] + doc: All input MS directions + inputBinding: + prefix: "--ms" + position: 1 + separate: true + - id: phasediff_csv + type: File + doc: CSV with phasediff source selection scores + inputBinding: + prefix: "--csv" + position: 2 + +outputs: + - id: best_ms + type: Directory[] + doc: Best directions + outputBinding: + glob: "*_best.ms" + - id: logfile + type: File[] + doc: Log files corresponding to this step + outputBinding: + glob: dir_selection*.log + +requirements: + - class: InitialWorkDirRequirement + listing: + - entry: $(inputs.msin) + writable: true + +hints: + - class: DockerRequirement + dockerPull: vlbi-cwl + +stdout: dir_selection.log +stderr: dir_selection_err.log diff --git a/steps/sort_concatmap.cwl b/steps/sort_concatmap.cwl index b1e84d9eb157d2154c8b523e068840a78e502a31..e400627dca73aeacc86d16e283925ec05d2e1a4f 100755 --- a/steps/sort_concatmap.cwl +++ b/steps/sort_concatmap.cwl @@ -133,3 +133,4 @@ outputs: stdout: sort_concatmap.log stderr: sort_concatmap_err.log + diff --git a/workflows/split-directions.cwl b/workflows/split-directions.cwl index b874deeba46556c2ff0fcb8a3b99ee99da2af4ee..7bc18f8d28e0c14e6f55732aedec88e41d0e94cc 100644 --- a/workflows/split-directions.cwl +++ b/workflows/split-directions.cwl @@ -4,9 +4,10 @@ id: split-directions label: Split Directions doc: | This is a workflow for the LOFAR-VLBI pipeline that - splits a LOFAR MeasurementSet into various target directions, applies delay calibrator solutions and then optionally performs - self-calibration on the target directions. - + * Splits a LOFAR MeasurementSet into various target directions + * Applies delay calibrator solutions + * Optionally (for wide-field imaging) performs direction-dependent calibrator selection + * Optionally performs self-calibration on the target directions This step should be run after the delay calibration workflow. requirements: @@ -17,7 +18,7 @@ requirements: inputs: - id: msin type: Directory[] - doc: The input MS. This should have coverage of the target directions. + doc: The input MS of the target directions. - id: delay_solset type: File doc: The solution tables generated by the VLBI delay calibration workflow in an HDF5 format. @@ -43,6 +44,10 @@ inputs: type: boolean? default: false doc: Whether to do selfcal on the direction concat MSs. + - id: dd_selection + type: boolean? + default: false + doc: If set to true the pipeline will perform direction-dependent calibrator selection. - id: configfile type: File doc: The configuration file to be used to run @@ -54,12 +59,6 @@ inputs: type: Directory doc: The selfcal directory. - - id: linc - type: Directory - doc: | - The installation directory for the - LOFAR INitial Calibration pipeline. - steps: - id: target_phaseup label: Target Phaseup @@ -93,77 +92,63 @@ steps: scatter: [msin, parset] scatterMethod: dotproduct - - id: order_by_direction - label: Order by Direction + - id: flatten_msout + label: Flatten msout in: - - id: msin + - id: nestedarray source: dp3_target_phaseup/msout out: - - id: msout - run: ../steps/order_by_direction.cwl + - id: flattenedarray + run: ../steps/flatten.cwl - - id: collect_linc_libraries - label: Collect neccesary LINC libraries + - id: make_concat_parset + label: Make parsets in: - - id: linc - source: linc - - id: library - default: - - scripts/sort_times_into_freqGroups.py + - id: msin + source: flatten_msout/flattenedarray + - id: lofar_helpers + source: h5merger out: - - id: libraries - scatter: library - run: ../steps/collect_linc_libraries.cwl + - id: concat_parsets + run: ../steps/make_concat_parsets.cwl - - - id: sort_concatmap - label: Sort Concatmap + - id: dp3_parset + label: dp3_parset in: + - id: parset + source: make_concat_parset/concat_parsets - id: msin - source: order_by_direction/msout - - id: numbands - source: numbands - - id: truncateLastSBs - source: truncateLastSBs - - id: linc_libraries - source: collect_linc_libraries/libraries - out: - - id: filenames - - id: groupnames - run: ../steps/sort_concatmap.cwl - scatter: msin - - - id: flatten_groupnames - label: Flatten Groupnames - in: - - id: nestedarray - source: sort_concatmap/groupnames + source: flatten_msout/flattenedarray out: - - id: flattenedarray - run: ../steps/flatten.cwl - + - id: msout + run: ../steps/dp3_parset.cwl + scatter: parset - - id: concatenation - label: concatenation + - id: ddcal_pre_selection + label: DD direction selection in: - id: msin - source: order_by_direction/msout - - id: groups_specification - source: sort_concatmap/filenames - - id: group_id - source: flatten_groupnames/flattenedarray + source: dp3_parset/msout + - id: lofar_helpers + source: h5merger + - id: selfcal + source: selfcal + - id: dd_selection + source: dd_selection out: - - id: msout - run: ./subworkflows/concatenation.cwl - scatter: [msin, groups_specification, group_id] - scatterMethod: dotproduct - + - id: phasediff_score_csv + - id: best_ms + when: $(inputs.dd_selection) + run: ./subworkflows/ddcal_pre_selection.cwl - id: target_selfcal label: Target Selfcal in: - id: msin - source: concatenation/msout + source: + - ddcal_pre_selection/best_ms + - dp3_parset/msout + pickValue: first_non_null - id: configfile source: configfile - id: h5merger @@ -181,16 +166,12 @@ steps: scatter: msin outputs: - - id: msout_phaseup - type: - type: array - items: - type: array - items: Directory - outputSource: dp3_target_phaseup/msout - id: msout_concat type: Directory[] - outputSource: concatenation/msout + outputSource: + - ddcal_pre_selection/best_ms + - dp3_parset/msout + pickValue: first_non_null - id: images type: type: array @@ -209,6 +190,9 @@ outputs: outputSource: - target_selfcal/fits_images pickValue: all_non_null + - id: phasediff_score_csv + type: File? + outputSource: ddcal_pre_selection/phasediff_score_csv - id: h5parm type: File[] outputSource: diff --git a/workflows/subworkflows/ddcal_pre_selection.cwl b/workflows/subworkflows/ddcal_pre_selection.cwl new file mode 100644 index 0000000000000000000000000000000000000000..da0bc53594e9fa01695416b458f98c1b67f87b15 --- /dev/null +++ b/workflows/subworkflows/ddcal_pre_selection.cwl @@ -0,0 +1,110 @@ +class: Workflow +cwlVersion: v1.2 +id: ddcal_pre_selection +label: DD direction selection +doc: | + This workflow does the following: + * DP3 prep to average measurement to the same freq/time resolution + * Get h5parm solutions with scalarphasediff corrections from facetselfcal + * Get solution scores using the circular standard deviation + * Select MS with scores below 2.3 + This selection metric is described in Section 3.3.1 from de Jong et al. (2024; https://arxiv.org/pdf/2407.13247) + +requirements: + - class: ScatterFeatureRequirement + - class: SubworkflowFeatureRequirement + +inputs: + - id: msin + type: Directory[] + doc: The input concatenated MS. + - id: lofar_helpers + type: Directory + doc: Path to lofar_helpers directory. + - id: selfcal + type: Directory + doc: Path to selfcal directory. + +steps: + - id: Phasediff + in: + - id: lofar_helpers + source: lofar_helpers + - id: selfcal + source: selfcal + - id: msin + source: msin + out: + - phasediff_h5out + scatter: msin + run: + # start of Phasediff + cwlVersion: v1.2 + class: Workflow + inputs: + - id: msin + type: Directory + - id: selfcal + type: Directory + - id: lofar_helpers + type: Directory + outputs: + - id: phasediff_h5out + type: File + outputSource: get_phasediff/phasediff_h5out + + steps: + - id: dp3_prep_phasediff + label: Pre-averaging with DP3 + in: + - id: msin + source: msin + out: + - phasediff_ms + run: ../../steps/dp3_prep_phasediff.cwl + + - id: get_phasediff + label: Get phase difference with facetselfcal + in: + - id: phasediff_ms + source: dp3_prep_phasediff/phasediff_ms + - id: lofar_helpers + source: lofar_helpers + - id: selfcal + source: selfcal + out: + - phasediff_h5out + run: ../../steps/get_phasediff.cwl + # end of Phasediff + + - id: get_selection_scores + label: Calculate phase difference score + in: + - id: phasediff_h5 + source: Phasediff/phasediff_h5out + - id: selfcal + source: selfcal + out: + - phasediff_score_csv + run: ../../steps/get_selection_scores.cwl + + - id: select_best_directions + label: Select best directions + in: + - id: phasediff_csv + source: get_selection_scores/phasediff_score_csv + - id: msin + source: msin + out: + - best_ms + run: ../../steps/select_best_directions.cwl + +outputs: + - id: phasediff_score_csv + type: File + outputSource: get_selection_scores/phasediff_score_csv + doc: csv with scores + - id: best_ms + type: Directory[] + outputSource: select_best_directions/best_ms + doc: Final MS selection