Skip to content
Snippets Groups Projects
Commit a77da43e authored by Jurjen de Jong's avatar Jurjen de Jong Committed by Matthijs van der Wild
Browse files

Dd selection

parent df8ae840
No related branches found
No related tags found
1 merge request!66Dd selection
*.idea
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "Jurjen de Jong"
import os
import re
import sys
from argparse import ArgumentParser
import pandas as pd
from astropy.coordinates import SkyCoord
from astropy import units as u
def filter_too_nearest_neighbours(csv: str = None, sep: float = 0.1):
"""
Identify sources that have a nearest neighbour within 0.1 degrees distance.
Keep the source with the highest spd_score.
Args:
csv: CSV file with RA/DEC and spd_score
sep: separation threshold in degrees
Returns:
"""
df = pd.read_csv(csv)
# SkyCoord objects for all sources
coords = SkyCoord(ra=df['RA'].values * u.deg, dec=df['DEC'].values * u.deg)
# Entries collection to remove
to_remove = set()
for i, row in df.iterrows():
if i in to_remove:
continue # Skip already marked rows
# Separations between the current source and all others
separations = coords[i].separation(coords).degree
# Sources within seperation threshold
close_sources = (separations < sep) & (separations > 0)
close_indices = df[close_sources].index
# If there are neighbouring sources, compare the spd_scores
for j in close_indices:
if df.loc[j, 'spd_score'] < row['spd_score']:
# Remove the current source if its spd_score is lower
to_remove.add(i)
# Remove rows marked for removal
print('Removing:')
print(df.iloc[list(to_remove)])
filtered_df = df.drop(index=to_remove).reset_index(drop=True)
return filtered_df
def parse_source_id(inp_str: str = None):
"""
Parse ILTJ... source_id string
Args:
inp_str: ILTJ source_id
Returns: parsed output
"""
parsed_inp = re.findall(r'ILTJ\d+\..\d+\+\d+.\d+', inp_str)[0]
return parsed_inp
def match_source_id(mslist: list = None, source_id: str = None):
"""
Return MS name by matching source ID with items from list with MS names
Parameters
----------
mslist: List with MS
source_id: Source ID
Returns
-------
Most similar element
"""
for ms in mslist:
if parse_source_id(ms)==source_id:
return ms
# If no match (should not arrive here)
sys.exit(f"ERROR: No matching MS for {source_id}")
def rename_folder(old_name, new_name):
"""
Rename folder name
Parameters
----------
old_name: Old name
new_name: New name
"""
try:
os.rename(old_name, new_name)
print(f"Folder renamed from '{old_name}' to '{new_name}'")
except OSError as e:
print(f"Error: {e}")
def parse_args():
"""
Command line argument parser
Returns
-------
Parsed arguments
"""
parser = ArgumentParser()
parser.add_argument('--csv', help='CSV with names and phasediff scores', default=None)
parser.add_argument('--ms', nargs="+", help='Input MS', default=None)
parser.add_argument('--best_score', type=float,
help='Optimal selection score (See Section 3.3.1 https://arxiv.org/pdf/2407.13247)',
default=2.3)
parser.add_argument('--suffix', help='suffix', default='_best')
return parser.parse_args()
def main():
"""
Main function
"""
args = parse_args()
# Get dataframe after filtering for sources within 0.1 degrees distance from each other
df = filter_too_nearest_neighbours(args.csv)
for source in df.set_index('source').iterrows():
name = source[0]
score = source[1]['spd_score']
if score < args.best_score:
ms_name = match_source_id(args.ms, name)
# Rename folder to return best directions in CWL workflow
rename_folder(ms_name, ms_name.split('/')[-1]+args.suffix+'.ms')
if __name__ == '__main__':
main()
...@@ -25,8 +25,9 @@ def main(h5_in: str, solset: str, direction_name: str): ...@@ -25,8 +25,9 @@ def main(h5_in: str, solset: str, direction_name: str):
delay_direction = SkyCoord(ss_dir[0], ss_dir[1], unit="rad") delay_direction = SkyCoord(ss_dir[0], ss_dir[1], unit="rad")
ra = delay_direction.ra.to("deg").value ra = delay_direction.ra.to("deg").value
dec = delay_direction.dec.to("deg").value dec = delay_direction.dec.to("deg").value
print("Source_id,RA,DEC") with open("delay_dir.csv", "w") as f:
print(f"DelayCal,{ra},{dec}") f.write("Source_id,RA,DEC\n")
f.write(f"DelayCal,{ra},{dec}")
if __name__ == "__main__": if __name__ == "__main__":
......
cwlVersion: v1.2
class: CommandLineTool
id: dp3_parset
label: DP3 with parset
doc: |
Run DP3 with a parset
baseCommand: DP3
inputs:
- id: parset
type: File
doc: Parset for DP3
inputBinding:
position: 0
- id: msin
type: Directory[]
doc: input MS
outputs:
- id: msout
type: Directory
doc: Output measurement set
outputBinding:
glob: "*.ms"
- id: logfile
type: File[]
outputBinding:
glob: dp3_parset*.log
doc: |
The files containing the stdout
and stderr from the step.
requirements:
- class: InitialWorkDirRequirement
listing:
- entry: $(inputs.msin)
hints:
- class: DockerRequirement
dockerPull: vlbi-cwl
- class: ResourceRequirement
coresMin: 6
stdout: dp3_parset.log
stderr: dp3_parset_err.log
class: CommandLineTool
cwlVersion: v1.2
id: pre_averaging_dp3
label: DP3 Pre-averaging
doc: This step pre-averages measurement set for pulling phasediff scores from facetselfcal
baseCommand: DP3
inputs:
- id: msin
type: Directory
doc: Input measurement set
inputBinding:
prefix: msin=
position: 0
shellQuote: false
separate: false
outputs:
- id: phasediff_ms
type: Directory
doc: Output measurement sets
outputBinding:
glob: "*.phasediff.ms"
- id: logfile
type: File[]
doc: DP3 log files
outputBinding:
glob: dp3_prephasediff*.log
arguments:
- steps=[avg]
- msin.datacolumn=DATA
- msout.storagemanager=dysco
- avg.type=averager
- avg.freqresolution=390.56kHz
- avg.timeresolution=60
- msout=$(inputs.msin.path+".phasediff.ms")
requirements:
- class: InlineJavascriptRequirement
- class: InitialWorkDirRequirement
listing:
- entry: $(inputs.msin)
writable: true
hints:
- class: DockerRequirement
dockerPull: vlbi-cwl
- class: ResourceRequirement
coresMin: 6
stdout: dp3_prephasediff.log
stderr: dp3_prephasediff_err.log
...@@ -8,7 +8,7 @@ doc: | ...@@ -8,7 +8,7 @@ doc: |
baseCommand: baseCommand:
- get_delay_dir.py - get_delay_dir.py
stdout: delay_dir.csv stdout: get_delay_dir.txt
stderr: get_delay_dir_err.txt stderr: get_delay_dir_err.txt
inputs: inputs:
......
class: CommandLineTool
cwlVersion: v1.2
id: get_phasediff
label: Polarization Phase Difference
doc: This step makes scalarphasediff solution files, needed for collecting source selection scores
baseCommand:
- python3
inputs:
- id: phasediff_ms
type: Directory
doc: Input MeasurementSet
inputBinding:
position: 20
- id: lofar_helpers
type: Directory
doc: The lofar_helpers directory.
- id: selfcal
type: Directory
doc: The facetselfcal directory.
outputs:
- id: phasediff_h5out
type: File
doc: h5parm solution files with scalarphasediff solutions
outputBinding:
glob: "scalarphasediff*.h5"
- id: logfile
type: File[]
doc: log files from facetselfcal
outputBinding:
glob: phasediff*.log
requirements:
- class: InlineJavascriptRequirement
- class: InitialWorkDirRequirement
listing:
- entry: $(inputs.phasediff_ms)
writable: true
- entry: $(inputs.lofar_helpers)
- entry: $(inputs.selfcal)
arguments:
- $(inputs.selfcal.path + '/facetselfcal.py')
- -i
- phasediff
- --forwidefield
- --phaseupstations=core
- --skipbackup
- --uvmin=20000
- --soltype-list=['scalarphasediff']
- --solint-list=['10min']
- --nchan-list=[6]
- --docircular
- --uvminscalarphasediff=0
- --stop=1
- --soltypecycles-list=[0]
- --imsize=1600
- --skymodelpointsource=1.0
- --helperscriptspath=$(inputs.selfcal.path)
- --helperscriptspathh5merge=$(inputs.lofar_helpers.path)
- --stopafterskysolve
- --phasediff_only
hints:
- class: DockerRequirement
dockerPull: vlbi-cwl
- class: ResourceRequirement
coresMin: 2
stdout: phasediff.log
stderr: phasediff_err.log
class: CommandLineTool
cwlVersion: v1.2
id: get_source_scores
label: Source Scores
doc: |
This step calculates the scores from h5parm solution files to determine if a direction is suitable
for long-baseline calibration for wide-field imaging (See Section 3.3.1 https://arxiv.org/pdf/2407.13247)
baseCommand:
- python3
inputs:
- id: phasediff_h5
type: File[]
doc: H5parm solutions from facetselfcal.
inputBinding:
prefix: "--h5"
position: 1
itemSeparator: " "
separate: true
- id: selfcal
type: Directory
doc: Path to facetselfcal directory.
outputs:
- id: phasediff_score_csv
type: File
doc: csv with phase difference scores
outputBinding:
glob: "*.csv"
- id: logfile
type: File[]
doc: log files corresponding to this step
outputBinding:
glob: phasediff*.log
requirements:
- class: InlineJavascriptRequirement
- class: InitialWorkDirRequirement
listing:
- entry: $(inputs.phasediff_h5)
writable: true
- entry: $(inputs.selfcal)
arguments:
- $( inputs.selfcal.path + '/source_selection/phasediff_output.py' )
hints:
- class: DockerRequirement
dockerPull: vlbi-cwl
stdout: phasediff.log
stderr: phasediff_err.log
class: CommandLineTool
cwlVersion: v1.2
id: makeparsets
label: Make concat parsets
doc: |
Generate direction concatenation parsets
baseCommand:
- python3
inputs:
- id: msin
type: Directory[]
inputBinding:
prefix: "--msin"
position: 1
separate: true
doc: Input data in MeasurementSet format.
- id: lofar_helpers
type: Directory
doc: Path to lofar_helpers directory.
outputs:
- id: concat_parsets
doc: |
The output data with corrected
data in MeasurementSet format.
type: File[]
outputBinding:
glob: '*.parset'
- id: logfile
type: File[]
outputBinding:
glob: python_concat*.log
doc: |
The files containing the stdout
and stderr from the step.
arguments:
- $( inputs.lofar_helpers.path + '/ms_helpers/concat_with_dummies.py' )
- --make_only_parset
- --only_basename
requirements:
- class: InlineJavascriptRequirement
- class: InitialWorkDirRequirement
listing:
- entry: $(inputs.msin)
writable: false
hints:
- class: DockerRequirement
dockerPull: vlbi-cwl
stdout: python_concat.log
stderr: python_concat_err.log
class: CommandLineTool
cwlVersion: v1.1
id: order_by_direction
label: Order by Direction
doc: |
This tool takes an array of arrays of directories containing MeasurementSet files which are in groups of frequency.
It re-orders them such that they are in groups of direction ready to be concatenated.
baseCommand:
- python3
- order_by_direction.py
inputs:
- id: msin
type:
type: array
items:
type: array
items: Directory
inputBinding:
position: 0
doc: Array of arrays of directories containing the MeasurementSet files to be ordered
requirements:
- class: InlineJavascriptRequirement
- class: InitialWorkDirRequirement
listing:
- entryname: order_by_direction.py
entry: |
import json
mss = $(inputs)['msin']
print(mss)
#The line below does the re-ordering. It performs the transpose of a list.
output = list(map(list, zip(*mss)))
cwl_output = {}
cwl_output['msout'] = output
print(cwl_output)
# The results are written to this file to circumvent the size
# restrictions placed on files that can be parsed by outputEval. See
# https://www.commonwl.org/v1.2/CommandLineTool.html#CommandOutputBinding
with open('$(runtime.outdir)/cwl.output.json', 'w') as fp:
json.dump(cwl_output, fp)
outputs:
- id: msout
type:
type: array
items:
type: array
items: Directory
outputBinding:
glob: $(runtime.outdir)/cwl.output.json
outputEval: $(self.msout)
doc: Array of arrays of directories containing the MeasurementSet
files ordered by direction.
class: CommandLineTool
cwlVersion: v1.2
id: select_best_directions
label: Select best directions
doc: This step uses the phasediff scores to select the best input directions by adding a suffix *_best.ms
baseCommand:
- direction_selection.py
- --best_score=2.3
inputs:
- id: msin
type: Directory[]
doc: All input MS directions
inputBinding:
prefix: "--ms"
position: 1
separate: true
- id: phasediff_csv
type: File
doc: CSV with phasediff source selection scores
inputBinding:
prefix: "--csv"
position: 2
outputs:
- id: best_ms
type: Directory[]
doc: Best directions
outputBinding:
glob: "*_best.ms"
- id: logfile
type: File[]
doc: Log files corresponding to this step
outputBinding:
glob: dir_selection*.log
requirements:
- class: InitialWorkDirRequirement
listing:
- entry: $(inputs.msin)
writable: true
hints:
- class: DockerRequirement
dockerPull: vlbi-cwl
stdout: dir_selection.log
stderr: dir_selection_err.log
...@@ -133,3 +133,4 @@ outputs: ...@@ -133,3 +133,4 @@ outputs:
stdout: sort_concatmap.log stdout: sort_concatmap.log
stderr: sort_concatmap_err.log stderr: sort_concatmap_err.log
...@@ -4,9 +4,10 @@ id: split-directions ...@@ -4,9 +4,10 @@ id: split-directions
label: Split Directions label: Split Directions
doc: | doc: |
This is a workflow for the LOFAR-VLBI pipeline that This is a workflow for the LOFAR-VLBI pipeline that
splits a LOFAR MeasurementSet into various target directions, applies delay calibrator solutions and then optionally performs * Splits a LOFAR MeasurementSet into various target directions
self-calibration on the target directions. * Applies delay calibrator solutions
* Optionally (for wide-field imaging) performs direction-dependent calibrator selection
* Optionally performs self-calibration on the target directions
This step should be run after the delay calibration workflow. This step should be run after the delay calibration workflow.
requirements: requirements:
...@@ -17,7 +18,7 @@ requirements: ...@@ -17,7 +18,7 @@ requirements:
inputs: inputs:
- id: msin - id: msin
type: Directory[] type: Directory[]
doc: The input MS. This should have coverage of the target directions. doc: The input MS of the target directions.
- id: delay_solset - id: delay_solset
type: File type: File
doc: The solution tables generated by the VLBI delay calibration workflow in an HDF5 format. doc: The solution tables generated by the VLBI delay calibration workflow in an HDF5 format.
...@@ -43,6 +44,10 @@ inputs: ...@@ -43,6 +44,10 @@ inputs:
type: boolean? type: boolean?
default: false default: false
doc: Whether to do selfcal on the direction concat MSs. doc: Whether to do selfcal on the direction concat MSs.
- id: dd_selection
type: boolean?
default: false
doc: If set to true the pipeline will perform direction-dependent calibrator selection.
- id: configfile - id: configfile
type: File type: File
doc: The configuration file to be used to run doc: The configuration file to be used to run
...@@ -54,12 +59,6 @@ inputs: ...@@ -54,12 +59,6 @@ inputs:
type: Directory type: Directory
doc: The selfcal directory. doc: The selfcal directory.
- id: linc
type: Directory
doc: |
The installation directory for the
LOFAR INitial Calibration pipeline.
steps: steps:
- id: target_phaseup - id: target_phaseup
label: Target Phaseup label: Target Phaseup
...@@ -93,77 +92,63 @@ steps: ...@@ -93,77 +92,63 @@ steps:
scatter: [msin, parset] scatter: [msin, parset]
scatterMethod: dotproduct scatterMethod: dotproduct
- id: order_by_direction - id: flatten_msout
label: Order by Direction label: Flatten msout
in: in:
- id: msin - id: nestedarray
source: dp3_target_phaseup/msout source: dp3_target_phaseup/msout
out: out:
- id: msout - id: flattenedarray
run: ../steps/order_by_direction.cwl run: ../steps/flatten.cwl
- id: collect_linc_libraries
label: Collect neccesary LINC libraries
in:
- id: linc
source: linc
- id: library
default:
- scripts/sort_times_into_freqGroups.py
out:
- id: libraries
scatter: library
run: ../steps/collect_linc_libraries.cwl
- id: sort_concatmap - id: make_concat_parset
label: Sort Concatmap label: Make parsets
in: in:
- id: msin - id: msin
source: order_by_direction/msout source: flatten_msout/flattenedarray
- id: numbands - id: lofar_helpers
source: numbands source: h5merger
- id: truncateLastSBs
source: truncateLastSBs
- id: linc_libraries
source: collect_linc_libraries/libraries
out: out:
- id: filenames - id: concat_parsets
- id: groupnames run: ../steps/make_concat_parsets.cwl
run: ../steps/sort_concatmap.cwl
scatter: msin
- id: flatten_groupnames - id: dp3_parset
label: Flatten Groupnames label: dp3_parset
in: in:
- id: nestedarray - id: parset
source: sort_concatmap/groupnames source: make_concat_parset/concat_parsets
- id: msin
source: flatten_msout/flattenedarray
out: out:
- id: flattenedarray - id: msout
run: ../steps/flatten.cwl run: ../steps/dp3_parset.cwl
scatter: parset
- id: concatenation - id: ddcal_pre_selection
label: concatenation label: DD direction selection
in: in:
- id: msin - id: msin
source: order_by_direction/msout source: dp3_parset/msout
- id: groups_specification - id: lofar_helpers
source: sort_concatmap/filenames source: h5merger
- id: group_id - id: selfcal
source: flatten_groupnames/flattenedarray source: selfcal
- id: dd_selection
source: dd_selection
out: out:
- id: msout - id: phasediff_score_csv
run: ./subworkflows/concatenation.cwl - id: best_ms
scatter: [msin, groups_specification, group_id] when: $(inputs.dd_selection)
scatterMethod: dotproduct run: ./subworkflows/ddcal_pre_selection.cwl
- id: target_selfcal - id: target_selfcal
label: Target Selfcal label: Target Selfcal
in: in:
- id: msin - id: msin
source: concatenation/msout source:
- ddcal_pre_selection/best_ms
- dp3_parset/msout
pickValue: first_non_null
- id: configfile - id: configfile
source: configfile source: configfile
- id: h5merger - id: h5merger
...@@ -181,16 +166,12 @@ steps: ...@@ -181,16 +166,12 @@ steps:
scatter: msin scatter: msin
outputs: outputs:
- id: msout_phaseup
type:
type: array
items:
type: array
items: Directory
outputSource: dp3_target_phaseup/msout
- id: msout_concat - id: msout_concat
type: Directory[] type: Directory[]
outputSource: concatenation/msout outputSource:
- ddcal_pre_selection/best_ms
- dp3_parset/msout
pickValue: first_non_null
- id: images - id: images
type: type:
type: array type: array
...@@ -209,6 +190,9 @@ outputs: ...@@ -209,6 +190,9 @@ outputs:
outputSource: outputSource:
- target_selfcal/fits_images - target_selfcal/fits_images
pickValue: all_non_null pickValue: all_non_null
- id: phasediff_score_csv
type: File?
outputSource: ddcal_pre_selection/phasediff_score_csv
- id: h5parm - id: h5parm
type: File[] type: File[]
outputSource: outputSource:
......
class: Workflow
cwlVersion: v1.2
id: ddcal_pre_selection
label: DD direction selection
doc: |
This workflow does the following:
* DP3 prep to average measurement to the same freq/time resolution
* Get h5parm solutions with scalarphasediff corrections from facetselfcal
* Get solution scores using the circular standard deviation
* Select MS with scores below 2.3
This selection metric is described in Section 3.3.1 from de Jong et al. (2024; https://arxiv.org/pdf/2407.13247)
requirements:
- class: ScatterFeatureRequirement
- class: SubworkflowFeatureRequirement
inputs:
- id: msin
type: Directory[]
doc: The input concatenated MS.
- id: lofar_helpers
type: Directory
doc: Path to lofar_helpers directory.
- id: selfcal
type: Directory
doc: Path to selfcal directory.
steps:
- id: Phasediff
in:
- id: lofar_helpers
source: lofar_helpers
- id: selfcal
source: selfcal
- id: msin
source: msin
out:
- phasediff_h5out
scatter: msin
run:
# start of Phasediff
cwlVersion: v1.2
class: Workflow
inputs:
- id: msin
type: Directory
- id: selfcal
type: Directory
- id: lofar_helpers
type: Directory
outputs:
- id: phasediff_h5out
type: File
outputSource: get_phasediff/phasediff_h5out
steps:
- id: dp3_prep_phasediff
label: Pre-averaging with DP3
in:
- id: msin
source: msin
out:
- phasediff_ms
run: ../../steps/dp3_prep_phasediff.cwl
- id: get_phasediff
label: Get phase difference with facetselfcal
in:
- id: phasediff_ms
source: dp3_prep_phasediff/phasediff_ms
- id: lofar_helpers
source: lofar_helpers
- id: selfcal
source: selfcal
out:
- phasediff_h5out
run: ../../steps/get_phasediff.cwl
# end of Phasediff
- id: get_selection_scores
label: Calculate phase difference score
in:
- id: phasediff_h5
source: Phasediff/phasediff_h5out
- id: selfcal
source: selfcal
out:
- phasediff_score_csv
run: ../../steps/get_selection_scores.cwl
- id: select_best_directions
label: Select best directions
in:
- id: phasediff_csv
source: get_selection_scores/phasediff_score_csv
- id: msin
source: msin
out:
- best_ms
run: ../../steps/select_best_directions.cwl
outputs:
- id: phasediff_score_csv
type: File
outputSource: get_selection_scores/phasediff_score_csv
doc: csv with scores
- id: best_ms
type: Directory[]
outputSource: select_best_directions/best_ms
doc: Final MS selection
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment