From a77da43e36508eb6bedbf0cd35d8d02ad7ce03b9 Mon Sep 17 00:00:00 2001
From: Jurjen de Jong <jurjendejong@strw.leidenuniv.nl>
Date: Tue, 29 Oct 2024 10:31:49 +0000
Subject: [PATCH] Dd selection

---
 .gitignore                                    |   1 +
 scripts/direction_selection.py                | 157 ++++++++++++++++++
 scripts/get_delay_dir.py                      |   5 +-
 steps/dp3_parset.cwl                          |  46 +++++
 steps/dp3_prep_phasediff.cwl                  |  56 +++++++
 steps/get_delay_cal_direction.cwl             |   2 +-
 steps/get_phasediff.cwl                       |  73 ++++++++
 steps/get_selection_scores.cwl                |  53 ++++++
 steps/make_concat_parsets.cwl                 |  57 +++++++
 steps/order_by_direction.cwl                  |  61 -------
 steps/select_best_directions.cwl              |  49 ++++++
 steps/sort_concatmap.cwl                      |   1 +
 workflows/split-directions.cwl                | 124 ++++++--------
 .../subworkflows/ddcal_pre_selection.cwl      | 110 ++++++++++++
 14 files changed, 661 insertions(+), 134 deletions(-)
 create mode 100644 .gitignore
 create mode 100755 scripts/direction_selection.py
 create mode 100644 steps/dp3_parset.cwl
 create mode 100644 steps/dp3_prep_phasediff.cwl
 create mode 100644 steps/get_phasediff.cwl
 create mode 100644 steps/get_selection_scores.cwl
 create mode 100644 steps/make_concat_parsets.cwl
 delete mode 100644 steps/order_by_direction.cwl
 create mode 100644 steps/select_best_directions.cwl
 create mode 100644 workflows/subworkflows/ddcal_pre_selection.cwl

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 00000000..b9a5796b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+*.idea
\ No newline at end of file
diff --git a/scripts/direction_selection.py b/scripts/direction_selection.py
new file mode 100755
index 00000000..d3f4c9b4
--- /dev/null
+++ b/scripts/direction_selection.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+__author__ = "Jurjen de Jong"
+
+import os
+import re
+import sys
+from argparse import ArgumentParser
+
+import pandas as pd
+from astropy.coordinates import SkyCoord
+from astropy import units as u
+
+
+def filter_too_nearest_neighbours(csv: str = None, sep: float = 0.1):
+    """
+    Identify sources that have a nearest neighbour within 0.1 degrees distance.
+    Keep the source with the highest spd_score.
+
+    Args:
+        csv: CSV file with RA/DEC and spd_score
+        sep: separation threshold in degrees
+
+    Returns:
+
+    """
+
+    df = pd.read_csv(csv)
+
+    # SkyCoord objects for all sources
+    coords = SkyCoord(ra=df['RA'].values * u.deg, dec=df['DEC'].values * u.deg)
+
+    # Entries collection to remove
+    to_remove = set()
+
+    for i, row in df.iterrows():
+        if i in to_remove:
+            continue  # Skip already marked rows
+
+        # Separations between the current source and all others
+        separations = coords[i].separation(coords).degree
+
+        # Sources within seperation threshold
+        close_sources = (separations < sep) & (separations > 0)
+        close_indices = df[close_sources].index
+
+        # If there are neighbouring sources, compare the spd_scores
+        for j in close_indices:
+            if df.loc[j, 'spd_score'] < row['spd_score']:
+                # Remove the current source if its spd_score is lower
+                to_remove.add(i)
+
+    # Remove rows marked for removal
+    print('Removing:')
+    print(df.iloc[list(to_remove)])
+    filtered_df = df.drop(index=to_remove).reset_index(drop=True)
+
+    return filtered_df
+
+
+def parse_source_id(inp_str: str = None):
+    """
+    Parse ILTJ... source_id string
+
+    Args:
+        inp_str: ILTJ source_id
+
+    Returns: parsed output
+
+    """
+
+    parsed_inp = re.findall(r'ILTJ\d+\..\d+\+\d+.\d+', inp_str)[0]
+
+    return parsed_inp
+
+
+def match_source_id(mslist: list = None, source_id: str = None):
+    """
+    Return MS name by matching source ID with items from list with MS names
+
+    Parameters
+    ----------
+    mslist: List with MS
+    source_id: Source ID
+
+    Returns
+    -------
+    Most similar element
+    """
+
+    for ms in mslist:
+        if parse_source_id(ms)==source_id:
+            return ms
+
+    # If no match (should not arrive here)
+    sys.exit(f"ERROR: No matching MS for {source_id}")
+
+
+def rename_folder(old_name, new_name):
+    """
+    Rename folder name
+
+    Parameters
+    ----------
+    old_name: Old name
+    new_name: New name
+    """
+
+    try:
+        os.rename(old_name, new_name)
+        print(f"Folder renamed from '{old_name}' to '{new_name}'")
+    except OSError as e:
+        print(f"Error: {e}")
+
+
+def parse_args():
+    """
+    Command line argument parser
+
+    Returns
+    -------
+    Parsed arguments
+    """
+
+    parser = ArgumentParser()
+    parser.add_argument('--csv', help='CSV with names and phasediff scores', default=None)
+    parser.add_argument('--ms', nargs="+", help='Input MS', default=None)
+    parser.add_argument('--best_score', type=float,
+                        help='Optimal selection score (See Section 3.3.1 https://arxiv.org/pdf/2407.13247)',
+                        default=2.3)
+    parser.add_argument('--suffix', help='suffix', default='_best')
+    return parser.parse_args()
+
+
+def main():
+    """
+    Main function
+    """
+
+    args = parse_args()
+
+    # Get dataframe after filtering for sources within 0.1 degrees distance from each other
+    df = filter_too_nearest_neighbours(args.csv)
+
+    for source in df.set_index('source').iterrows():
+        name = source[0]
+        score = source[1]['spd_score']
+        if score < args.best_score:
+            ms_name = match_source_id(args.ms, name)
+
+            # Rename folder to return best directions in CWL workflow
+            rename_folder(ms_name, ms_name.split('/')[-1]+args.suffix+'.ms')
+
+
+if __name__ == '__main__':
+    main()
diff --git a/scripts/get_delay_dir.py b/scripts/get_delay_dir.py
index 17114cea..ecdd8c60 100755
--- a/scripts/get_delay_dir.py
+++ b/scripts/get_delay_dir.py
@@ -25,8 +25,9 @@ def main(h5_in: str, solset: str, direction_name: str):
     delay_direction = SkyCoord(ss_dir[0], ss_dir[1], unit="rad")
     ra = delay_direction.ra.to("deg").value
     dec = delay_direction.dec.to("deg").value
-    print("Source_id,RA,DEC")
-    print(f"DelayCal,{ra},{dec}")
+    with open("delay_dir.csv", "w") as f:
+        f.write("Source_id,RA,DEC\n")
+        f.write(f"DelayCal,{ra},{dec}")
 
 
 if __name__ == "__main__":
diff --git a/steps/dp3_parset.cwl b/steps/dp3_parset.cwl
new file mode 100644
index 00000000..d44ae688
--- /dev/null
+++ b/steps/dp3_parset.cwl
@@ -0,0 +1,46 @@
+cwlVersion: v1.2
+class: CommandLineTool
+id: dp3_parset
+label: DP3 with parset
+doc: |
+    Run DP3 with a parset
+
+baseCommand: DP3
+
+inputs:
+  - id: parset
+    type: File
+    doc: Parset for DP3
+    inputBinding:
+      position: 0
+  - id: msin
+    type: Directory[]
+    doc: input MS
+
+outputs:
+  - id: msout
+    type: Directory
+    doc: Output measurement set
+    outputBinding:
+      glob: "*.ms"
+  - id: logfile
+    type: File[]
+    outputBinding:
+      glob: dp3_parset*.log
+    doc: |
+        The files containing the stdout
+        and stderr from the step.
+
+requirements:
+  - class: InitialWorkDirRequirement
+    listing:
+      - entry: $(inputs.msin)
+
+hints:
+  - class: DockerRequirement
+    dockerPull: vlbi-cwl
+  - class: ResourceRequirement
+    coresMin: 6
+
+stdout: dp3_parset.log
+stderr: dp3_parset_err.log
diff --git a/steps/dp3_prep_phasediff.cwl b/steps/dp3_prep_phasediff.cwl
new file mode 100644
index 00000000..6feb8999
--- /dev/null
+++ b/steps/dp3_prep_phasediff.cwl
@@ -0,0 +1,56 @@
+class: CommandLineTool
+cwlVersion: v1.2
+id: pre_averaging_dp3
+label: DP3 Pre-averaging
+doc: This step pre-averages measurement set for pulling phasediff scores from facetselfcal
+
+
+baseCommand: DP3
+
+inputs:
+    - id: msin
+      type: Directory
+      doc: Input measurement set
+      inputBinding:
+        prefix: msin=
+        position: 0
+        shellQuote: false
+        separate: false
+
+outputs:
+    - id: phasediff_ms
+      type: Directory
+      doc: Output measurement sets
+      outputBinding:
+        glob: "*.phasediff.ms"
+    - id: logfile
+      type: File[]
+      doc: DP3 log files
+      outputBinding:
+        glob: dp3_prephasediff*.log
+
+arguments:
+  - steps=[avg]
+  - msin.datacolumn=DATA
+  - msout.storagemanager=dysco
+  - avg.type=averager
+  - avg.freqresolution=390.56kHz
+  - avg.timeresolution=60
+  - msout=$(inputs.msin.path+".phasediff.ms")
+
+requirements:
+  - class: InlineJavascriptRequirement
+  - class: InitialWorkDirRequirement
+    listing:
+      - entry: $(inputs.msin)
+        writable: true
+
+hints:
+  - class: DockerRequirement
+    dockerPull: vlbi-cwl
+  - class: ResourceRequirement
+    coresMin: 6
+
+
+stdout: dp3_prephasediff.log
+stderr: dp3_prephasediff_err.log
diff --git a/steps/get_delay_cal_direction.cwl b/steps/get_delay_cal_direction.cwl
index b8b77162..8ed7307d 100644
--- a/steps/get_delay_cal_direction.cwl
+++ b/steps/get_delay_cal_direction.cwl
@@ -8,7 +8,7 @@ doc: |
 baseCommand:
     - get_delay_dir.py
 
-stdout: delay_dir.csv
+stdout: get_delay_dir.txt
 stderr: get_delay_dir_err.txt
 
 inputs:
diff --git a/steps/get_phasediff.cwl b/steps/get_phasediff.cwl
new file mode 100644
index 00000000..9c759160
--- /dev/null
+++ b/steps/get_phasediff.cwl
@@ -0,0 +1,73 @@
+class: CommandLineTool
+cwlVersion: v1.2
+id: get_phasediff
+label: Polarization Phase Difference
+doc: This step makes scalarphasediff solution files, needed for collecting source selection scores
+
+baseCommand:
+  - python3
+
+inputs:
+    - id: phasediff_ms
+      type: Directory
+      doc: Input MeasurementSet
+      inputBinding:
+        position: 20
+    - id: lofar_helpers
+      type: Directory
+      doc: The lofar_helpers directory.
+    - id: selfcal
+      type: Directory
+      doc: The facetselfcal directory.
+
+outputs:
+    - id: phasediff_h5out
+      type: File
+      doc: h5parm solution files with scalarphasediff solutions
+      outputBinding:
+        glob: "scalarphasediff*.h5"
+    - id: logfile
+      type: File[]
+      doc: log files from facetselfcal
+      outputBinding:
+        glob: phasediff*.log
+
+requirements:
+  - class: InlineJavascriptRequirement
+  - class: InitialWorkDirRequirement
+    listing:
+      - entry: $(inputs.phasediff_ms)
+        writable: true
+      - entry: $(inputs.lofar_helpers)
+      - entry: $(inputs.selfcal)
+
+arguments:
+  - $(inputs.selfcal.path + '/facetselfcal.py')
+  - -i
+  - phasediff
+  - --forwidefield
+  - --phaseupstations=core
+  - --skipbackup
+  - --uvmin=20000
+  - --soltype-list=['scalarphasediff']
+  - --solint-list=['10min']
+  - --nchan-list=[6]
+  - --docircular
+  - --uvminscalarphasediff=0
+  - --stop=1
+  - --soltypecycles-list=[0]
+  - --imsize=1600
+  - --skymodelpointsource=1.0
+  - --helperscriptspath=$(inputs.selfcal.path)
+  - --helperscriptspathh5merge=$(inputs.lofar_helpers.path)
+  - --stopafterskysolve
+  - --phasediff_only
+
+hints:
+  - class: DockerRequirement
+    dockerPull: vlbi-cwl
+  - class: ResourceRequirement
+    coresMin: 2
+
+stdout: phasediff.log
+stderr: phasediff_err.log
diff --git a/steps/get_selection_scores.cwl b/steps/get_selection_scores.cwl
new file mode 100644
index 00000000..1ee65889
--- /dev/null
+++ b/steps/get_selection_scores.cwl
@@ -0,0 +1,53 @@
+class: CommandLineTool
+cwlVersion: v1.2
+id: get_source_scores
+label: Source Scores
+doc: |
+       This step calculates the scores from h5parm solution files to determine if a direction is suitable
+       for long-baseline calibration for wide-field imaging (See Section 3.3.1 https://arxiv.org/pdf/2407.13247)
+
+baseCommand:
+  - python3
+
+inputs:
+    - id: phasediff_h5
+      type: File[]
+      doc: H5parm solutions from facetselfcal.
+      inputBinding:
+        prefix: "--h5"
+        position: 1
+        itemSeparator: " "
+        separate: true
+    - id: selfcal
+      type: Directory
+      doc: Path to facetselfcal directory.
+
+outputs:
+    - id: phasediff_score_csv
+      type: File
+      doc: csv with phase difference scores
+      outputBinding:
+        glob: "*.csv"
+    - id: logfile
+      type: File[]
+      doc: log files corresponding to this step
+      outputBinding:
+        glob: phasediff*.log
+
+requirements:
+  - class: InlineJavascriptRequirement
+  - class: InitialWorkDirRequirement
+    listing:
+      - entry: $(inputs.phasediff_h5)
+        writable: true
+      - entry: $(inputs.selfcal)
+
+arguments:
+  - $( inputs.selfcal.path + '/source_selection/phasediff_output.py' )
+
+hints:
+  - class: DockerRequirement
+    dockerPull: vlbi-cwl
+
+stdout: phasediff.log
+stderr: phasediff_err.log
diff --git a/steps/make_concat_parsets.cwl b/steps/make_concat_parsets.cwl
new file mode 100644
index 00000000..db297165
--- /dev/null
+++ b/steps/make_concat_parsets.cwl
@@ -0,0 +1,57 @@
+class: CommandLineTool
+cwlVersion: v1.2
+id: makeparsets
+label: Make concat parsets
+doc: |
+    Generate direction concatenation parsets
+
+baseCommand:
+  - python3
+
+inputs:
+  - id: msin
+    type: Directory[]
+    inputBinding:
+        prefix: "--msin"
+        position: 1
+        separate: true
+    doc: Input data in MeasurementSet format.
+  - id: lofar_helpers
+    type: Directory
+    doc: Path to lofar_helpers directory.
+
+outputs:
+  - id: concat_parsets
+    doc: |
+        The output data with corrected
+        data in MeasurementSet format.
+    type: File[]
+    outputBinding:
+      glob: '*.parset'
+
+  - id: logfile
+    type: File[]
+    outputBinding:
+      glob: python_concat*.log
+    doc: |
+        The files containing the stdout
+        and stderr from the step.
+
+arguments:
+  - $( inputs.lofar_helpers.path + '/ms_helpers/concat_with_dummies.py' )
+  - --make_only_parset
+  - --only_basename
+
+requirements:
+  - class: InlineJavascriptRequirement
+  - class: InitialWorkDirRequirement
+    listing:
+      - entry: $(inputs.msin)
+        writable: false
+
+hints:
+  - class: DockerRequirement
+    dockerPull: vlbi-cwl
+
+stdout: python_concat.log
+stderr: python_concat_err.log
diff --git a/steps/order_by_direction.cwl b/steps/order_by_direction.cwl
deleted file mode 100644
index 139f325e..00000000
--- a/steps/order_by_direction.cwl
+++ /dev/null
@@ -1,61 +0,0 @@
-class: CommandLineTool
-cwlVersion: v1.1
-id: order_by_direction
-label: Order by Direction
-doc: |
-  This tool takes an array of arrays of directories containing MeasurementSet files which are in groups of frequency.
-  It re-orders them such that they are in groups of direction ready to be concatenated.
-
-baseCommand: 
-  - python3
-  - order_by_direction.py
-
-inputs:
-  - id: msin
-    type:
-      type: array
-      items:
-        type: array
-        items: Directory
-    inputBinding:
-      position: 0
-    doc: Array of arrays of directories containing the MeasurementSet files to be ordered
-
-requirements:
-    - class: InlineJavascriptRequirement
-    - class: InitialWorkDirRequirement
-      listing:
-        - entryname: order_by_direction.py
-          entry: |
-            import json
-
-            mss = $(inputs)['msin']
-            print(mss)
-
-            #The line below does the re-ordering. It performs the transpose of a list.
-            output = list(map(list, zip(*mss))) 
-
-            cwl_output = {}
-
-            cwl_output['msout'] = output
-
-            print(cwl_output)
-
-            # The results are written to this file to circumvent the size
-            # restrictions placed on files that can be parsed by outputEval. See
-            # https://www.commonwl.org/v1.2/CommandLineTool.html#CommandOutputBinding
-            with open('$(runtime.outdir)/cwl.output.json', 'w') as fp:
-              json.dump(cwl_output, fp)
-
-outputs:
-  - id: msout
-    type:
-      type: array
-      items:
-        type: array
-        items: Directory
-    outputBinding:
-        glob: $(runtime.outdir)/cwl.output.json
-        outputEval: $(self.msout)
-    doc: Array of arrays of directories containing the MeasurementSet 
-      files ordered by direction.
diff --git a/steps/select_best_directions.cwl b/steps/select_best_directions.cwl
new file mode 100644
index 00000000..a43ff4e9
--- /dev/null
+++ b/steps/select_best_directions.cwl
@@ -0,0 +1,49 @@
+class: CommandLineTool
+cwlVersion: v1.2
+id: select_best_directions
+label: Select best directions
+doc: This step uses the phasediff scores to select the best input directions by adding a suffix *_best.ms
+
+baseCommand:
+  - direction_selection.py
+  - --best_score=2.3
+
+inputs:
+    - id: msin
+      type: Directory[]
+      doc: All input MS directions
+      inputBinding:
+        prefix: "--ms"
+        position: 1
+        separate: true
+    - id: phasediff_csv
+      type: File
+      doc: CSV with phasediff source selection scores
+      inputBinding:
+        prefix: "--csv"
+        position: 2
+
+outputs:
+    - id: best_ms
+      type: Directory[]
+      doc: Best directions
+      outputBinding:
+        glob: "*_best.ms"
+    - id: logfile
+      type: File[]
+      doc: Log files corresponding to this step
+      outputBinding:
+        glob: dir_selection*.log
+
+requirements:
+  - class: InitialWorkDirRequirement
+    listing:
+      - entry: $(inputs.msin)
+        writable: true
+
+hints:
+  - class: DockerRequirement
+    dockerPull: vlbi-cwl
+
+stdout: dir_selection.log
+stderr: dir_selection_err.log
diff --git a/steps/sort_concatmap.cwl b/steps/sort_concatmap.cwl
index b1e84d9e..e400627d 100755
--- a/steps/sort_concatmap.cwl
+++ b/steps/sort_concatmap.cwl
@@ -133,3 +133,4 @@ outputs:
 
 stdout: sort_concatmap.log
 stderr: sort_concatmap_err.log
+
diff --git a/workflows/split-directions.cwl b/workflows/split-directions.cwl
index b874deeb..7bc18f8d 100644
--- a/workflows/split-directions.cwl
+++ b/workflows/split-directions.cwl
@@ -4,9 +4,10 @@ id: split-directions
 label: Split Directions
 doc: |
   This is a workflow for the LOFAR-VLBI pipeline that
-  splits a LOFAR MeasurementSet into various target directions, applies delay calibrator solutions and then optionally performs
-  self-calibration on the target directions.
-
+    * Splits a LOFAR MeasurementSet into various target directions
+    * Applies delay calibrator solutions
+    * Optionally (for wide-field imaging) performs direction-dependent calibrator selection
+    * Optionally performs self-calibration on the target directions
   This step should be run after the delay calibration workflow.
 
 requirements:
@@ -17,7 +18,7 @@ requirements:
 inputs:
     - id: msin
       type: Directory[]
-      doc: The input MS. This should have coverage of the target directions.
+      doc: The input MS of the target directions.
     - id: delay_solset
       type: File
       doc: The solution tables generated by the VLBI delay calibration workflow in an HDF5 format.
@@ -43,6 +44,10 @@ inputs:
       type: boolean?
       default: false
       doc: Whether to do selfcal on the direction concat MSs.
+    - id: dd_selection
+      type: boolean?
+      default: false
+      doc: If set to true the pipeline will perform direction-dependent calibrator selection.
     - id: configfile
       type: File
       doc: The configuration file to be used to run
@@ -54,12 +59,6 @@ inputs:
       type: Directory
       doc: The selfcal directory.
 
-    - id: linc
-      type: Directory
-      doc: |
-        The installation directory for the
-        LOFAR INitial Calibration pipeline.
-
 steps:
     - id: target_phaseup
       label: Target Phaseup
@@ -93,77 +92,63 @@ steps:
       scatter: [msin, parset]
       scatterMethod: dotproduct
 
-    - id: order_by_direction
-      label: Order by Direction
+    - id: flatten_msout
+      label: Flatten msout
       in:
-        - id: msin
+        - id: nestedarray
           source: dp3_target_phaseup/msout
       out:
-        - id: msout
-      run: ../steps/order_by_direction.cwl
+        - id: flattenedarray
+      run: ../steps/flatten.cwl
 
-    - id: collect_linc_libraries
-      label: Collect neccesary LINC libraries
+    - id: make_concat_parset
+      label: Make parsets
       in:
-        - id: linc
-          source: linc
-        - id: library
-          default:
-            - scripts/sort_times_into_freqGroups.py
+         - id: msin
+           source: flatten_msout/flattenedarray
+         - id: lofar_helpers
+           source: h5merger
       out:
-        - id: libraries
-      scatter: library
-      run: ../steps/collect_linc_libraries.cwl
+         - id: concat_parsets
+      run: ../steps/make_concat_parsets.cwl
 
-
-    - id: sort_concatmap
-      label: Sort Concatmap
+    - id: dp3_parset
+      label: dp3_parset
       in:
+        - id: parset
+          source: make_concat_parset/concat_parsets
         - id: msin
-          source: order_by_direction/msout
-        - id: numbands
-          source: numbands
-        - id: truncateLastSBs
-          source: truncateLastSBs
-        - id: linc_libraries
-          source: collect_linc_libraries/libraries
-      out:
-        - id: filenames
-        - id: groupnames
-      run: ../steps/sort_concatmap.cwl
-      scatter: msin
-
-    - id: flatten_groupnames
-      label: Flatten Groupnames
-      in:
-        - id: nestedarray
-          source: sort_concatmap/groupnames
+          source: flatten_msout/flattenedarray
       out:
-        - id: flattenedarray
-      run: ../steps/flatten.cwl
-
+        - id: msout
+      run: ../steps/dp3_parset.cwl
+      scatter: parset
 
-    - id: concatenation
-      label: concatenation
+    - id: ddcal_pre_selection
+      label: DD direction selection
       in:
         - id: msin
-          source: order_by_direction/msout
-        - id: groups_specification
-          source: sort_concatmap/filenames
-        - id: group_id
-          source: flatten_groupnames/flattenedarray
+          source: dp3_parset/msout
+        - id: lofar_helpers
+          source: h5merger
+        - id: selfcal
+          source: selfcal
+        - id: dd_selection
+          source: dd_selection
       out:
-        - id: msout
-      run: ./subworkflows/concatenation.cwl
-      scatter: [msin, groups_specification, group_id]
-      scatterMethod: dotproduct
-
+        - id: phasediff_score_csv
+        - id: best_ms
+      when: $(inputs.dd_selection)
+      run: ./subworkflows/ddcal_pre_selection.cwl
 
     - id: target_selfcal
       label: Target Selfcal
       in:
         - id: msin
-          source: concatenation/msout
+          source:
+            - ddcal_pre_selection/best_ms
+            - dp3_parset/msout
+          pickValue: first_non_null
         - id: configfile
           source: configfile
         - id: h5merger
@@ -181,16 +166,12 @@ steps:
       scatter: msin
 
 outputs:
-    - id: msout_phaseup
-      type:
-        type: array
-        items:
-          type: array
-          items: Directory
-      outputSource: dp3_target_phaseup/msout
     - id: msout_concat
       type: Directory[]
-      outputSource: concatenation/msout
+      outputSource:
+        - ddcal_pre_selection/best_ms
+        - dp3_parset/msout
+      pickValue: first_non_null
     - id: images
       type:
         type: array
@@ -209,6 +190,9 @@ outputs:
       outputSource:
         - target_selfcal/fits_images
       pickValue: all_non_null
+    - id: phasediff_score_csv
+      type: File?
+      outputSource: ddcal_pre_selection/phasediff_score_csv
     - id: h5parm
       type: File[]
       outputSource:
diff --git a/workflows/subworkflows/ddcal_pre_selection.cwl b/workflows/subworkflows/ddcal_pre_selection.cwl
new file mode 100644
index 00000000..da0bc535
--- /dev/null
+++ b/workflows/subworkflows/ddcal_pre_selection.cwl
@@ -0,0 +1,110 @@
+class: Workflow
+cwlVersion: v1.2
+id: ddcal_pre_selection
+label: DD direction selection
+doc: |
+   This workflow does the following:
+        * DP3 prep to average measurement to the same freq/time resolution
+        * Get h5parm solutions with scalarphasediff corrections from facetselfcal
+        * Get solution scores using the circular standard deviation
+        * Select MS with scores below 2.3
+   This selection metric is described in Section 3.3.1 from de Jong et al. (2024; https://arxiv.org/pdf/2407.13247)
+
+requirements:
+  - class: ScatterFeatureRequirement
+  - class: SubworkflowFeatureRequirement
+
+inputs:
+    - id: msin
+      type: Directory[]
+      doc: The input concatenated MS.
+    - id: lofar_helpers
+      type: Directory
+      doc: Path to lofar_helpers directory.
+    - id: selfcal
+      type: Directory
+      doc: Path to selfcal directory.
+
+steps:
+    - id: Phasediff
+      in:
+        - id: lofar_helpers
+          source: lofar_helpers
+        - id: selfcal
+          source: selfcal
+        - id: msin
+          source: msin
+      out:
+        - phasediff_h5out
+      scatter: msin
+      run:
+        # start of Phasediff
+        cwlVersion: v1.2
+        class: Workflow
+        inputs:
+          - id: msin
+            type: Directory
+          - id: selfcal
+            type: Directory
+          - id: lofar_helpers
+            type: Directory
+        outputs:
+          - id: phasediff_h5out
+            type: File
+            outputSource: get_phasediff/phasediff_h5out
+
+        steps:
+          - id: dp3_prep_phasediff
+            label: Pre-averaging with DP3
+            in:
+              - id: msin
+                source: msin
+            out:
+              - phasediff_ms
+            run: ../../steps/dp3_prep_phasediff.cwl
+
+          - id: get_phasediff
+            label: Get phase difference with facetselfcal
+            in:
+              - id: phasediff_ms
+                source: dp3_prep_phasediff/phasediff_ms
+              - id: lofar_helpers
+                source: lofar_helpers
+              - id: selfcal
+                source: selfcal
+            out:
+              - phasediff_h5out
+            run: ../../steps/get_phasediff.cwl
+        # end of Phasediff
+
+    - id: get_selection_scores
+      label: Calculate phase difference score
+      in:
+        - id: phasediff_h5
+          source: Phasediff/phasediff_h5out
+        - id: selfcal
+          source: selfcal
+      out:
+        - phasediff_score_csv
+      run: ../../steps/get_selection_scores.cwl
+
+    - id: select_best_directions
+      label: Select best directions
+      in:
+        - id: phasediff_csv
+          source: get_selection_scores/phasediff_score_csv
+        - id: msin
+          source: msin
+      out:
+        - best_ms
+      run: ../../steps/select_best_directions.cwl
+
+outputs:
+    - id: phasediff_score_csv
+      type: File
+      outputSource: get_selection_scores/phasediff_score_csv
+      doc: csv with scores
+    - id: best_ms
+      type: Directory[]
+      outputSource: select_best_directions/best_ms
+      doc: Final MS selection
-- 
GitLab