From 55b3c05aeecdd93d570d7a5674ecbfa857a69d52 Mon Sep 17 00:00:00 2001
From: mancini <mancini@astron.nl>
Date: Fri, 3 Sep 2021 16:06:46 +0200
Subject: [PATCH] Add inspect subworkflow

---
 compress_pipeline.cwl                 | 18 ++++-
 download_and_compress_pipeline.cwl    | 12 +++-
 steps/DPPP.cwl                        |  2 +-
 steps/compress.cwl                    |  2 +-
 steps/define_parset.cwl               |  7 +-
 steps/extract_sip_meta.cwl            |  2 +-
 steps/fetch_data.cwl                  |  2 +-
 steps/inspect_compression_quality.cwl |  2 +-
 steps/inspect_flagging_dataloss.cwl   | 98 +++++++++++++++++++++++++--
 steps/plot_uvw_coverage.cwl           | 42 ++++++++++++
 steps/read_surl_list.cwl              |  2 +-
 11 files changed, 172 insertions(+), 17 deletions(-)
 create mode 100644 steps/plot_uvw_coverage.cwl

diff --git a/compress_pipeline.cwl b/compress_pipeline.cwl
index 74d6348..f83ba79 100644
--- a/compress_pipeline.cwl
+++ b/compress_pipeline.cwl
@@ -1,5 +1,5 @@
 class: Workflow
-cwlVersion: v1.0
+cwlVersion: v1.1
 id: compress_pipeline_cwl
 label: compress_pipeline.cwl
 inputs:
@@ -24,6 +24,14 @@ outputs:
     outputSource:
       - format_ingest/ingest
     type: Any
+  - id: uv_plot
+    type: File
+    outputSource:
+      - plot_uvw_coverage/uv_plot
+  - id: uv_coverage
+    type: File
+    outputSource:
+      - inspect_flagging_dataloss/flags_output
 steps:
   - id: extract_sip_meta
     in:
@@ -76,8 +84,16 @@ steps:
         source: msin
     out:
       - id: output
+      - id: flags_output
     run: steps/inspect_flagging_dataloss.cwl
     label: inspect_flagging_dataloss
+  - id: plot_uvw_coverage
+    in:
+      - id: input_file
+        source: inspect_flagging_dataloss/flags_output
+    out:
+      - id: uv_plot
+    run: steps/plot_uvw_coverage.cwl
   - id: compress
     in:
       - id: directory
diff --git a/download_and_compress_pipeline.cwl b/download_and_compress_pipeline.cwl
index a90a90f..fc5dad3 100644
--- a/download_and_compress_pipeline.cwl
+++ b/download_and_compress_pipeline.cwl
@@ -1,5 +1,5 @@
 class: Workflow
-cwlVersion: v1.0
+cwlVersion: v1.1
 id: compress_pipeline_cwl
 label: compress_pipeline.cwl
 inputs:
@@ -28,6 +28,14 @@ outputs:
     outputSource:
       - compress/inspect
     type: Any[]
+  - id: uv_coverage
+    type: File[]
+    outputSource:
+      - compress/uv_coverage
+  - id: uv_plot
+    type: File[]
+    outputSource:
+      - compress/uv_plot
 steps:
   - id: fetch_data
     in:
@@ -52,6 +60,8 @@ steps:
       - id: inspect
       - id: logfile
       - id: ingest
+      - id: uv_coverage
+      - id: uv_plot
     run: ./compress_pipeline.cwl
 requirements:
   - class: ScatterFeatureRequirement
diff --git a/steps/DPPP.cwl b/steps/DPPP.cwl
index aa1aab4..08d519b 100755
--- a/steps/DPPP.cwl
+++ b/steps/DPPP.cwl
@@ -1,5 +1,5 @@
 class: CommandLineTool
-cwlVersion: v1.0
+cwlVersion: v1.1
 id: dppp
 baseCommand:
   - DPPP
diff --git a/steps/compress.cwl b/steps/compress.cwl
index 1c9b461..8e884a1 100644
--- a/steps/compress.cwl
+++ b/steps/compress.cwl
@@ -1,7 +1,7 @@
 id: compress
 label: compress
 class: CommandLineTool
-cwlVersion: v1.0
+cwlVersion: v1.1
 inputs: 
   - id: directory
     type: Directory
diff --git a/steps/define_parset.cwl b/steps/define_parset.cwl
index 2422157..a938095 100644
--- a/steps/define_parset.cwl
+++ b/steps/define_parset.cwl
@@ -1,7 +1,5 @@
 class: CommandLineTool
-cwlVersion: v1.0
-$namespaces:
-  sbg: 'https://www.sevenbridges.com/'
+cwlVersion: v1.1
 id: define_parset
 baseCommand:
   - cp
@@ -30,7 +28,7 @@ requirements:
     listing:
       - entryname: input.parset
         entry: |+
-          steps=[flagedge,flagelev,aoflag,flagamp]
+          steps=[flagedge,flagelev,aoflag,flagamp $(inputs.demix? ',demix': '')]
           #
           flagedge.chan=[0..nchan/32-1,31*nchan/32..nchan-1]
           flagedge.type=preflagger
@@ -41,6 +39,7 @@ requirements:
           aoflag.autocorr=$(inputs.flag_autocorrelation?'True':'False')
           aoflag.strategy=/usr/local/share/aoflagger/strategies/lofar-default.lua
           #
+
           flagbaseline.type=preflagger
           flagbaseline.baseline=[]
           #
diff --git a/steps/extract_sip_meta.cwl b/steps/extract_sip_meta.cwl
index e218822..f669e50 100644
--- a/steps/extract_sip_meta.cwl
+++ b/steps/extract_sip_meta.cwl
@@ -1,5 +1,5 @@
 class: CommandLineTool
-cwlVersion: v1.0
+cwlVersion: v1.1
 id: inspect_flagging_dataloss
 baseCommand:
   - bash
diff --git a/steps/fetch_data.cwl b/steps/fetch_data.cwl
index 10e7825..454530e 100644
--- a/steps/fetch_data.cwl
+++ b/steps/fetch_data.cwl
@@ -1,7 +1,7 @@
 id: fetchdata
 label: fetch_data
 class: CommandLineTool
-cwlVersion: v1.0
+cwlVersion: v1.1
 inputs: 
   - id: surl_link
     type: string
diff --git a/steps/inspect_compression_quality.cwl b/steps/inspect_compression_quality.cwl
index 699b9e2..4bd65b8 100644
--- a/steps/inspect_compression_quality.cwl
+++ b/steps/inspect_compression_quality.cwl
@@ -1,5 +1,5 @@
 class: CommandLineTool
-cwlVersion: v1.0
+cwlVersion: v1.1
 $namespaces:
   sbg: 'https://www.sevenbridges.com/'
 id: inspect_compression_quality
diff --git a/steps/inspect_flagging_dataloss.cwl b/steps/inspect_flagging_dataloss.cwl
index 5668af8..4214dd5 100644
--- a/steps/inspect_flagging_dataloss.cwl
+++ b/steps/inspect_flagging_dataloss.cwl
@@ -1,7 +1,5 @@
 class: CommandLineTool
-cwlVersion: v1.0
-$namespaces:
-  sbg: 'https://www.sevenbridges.com/'
+cwlVersion: v1.1
 id: inspect_flagging_dataloss
 baseCommand:
   - python3
@@ -12,11 +10,20 @@ inputs:
     inputBinding:
       position: 0
       shellQuote: false
+  - id: uvw_sampling
+    type: int?
+    default: 100
+    inputBinding:
+      position: 1
 outputs:
   - id: output
     type: 'File'
     outputBinding:
       glob: metrics.json
+  - id: flags_output
+    type: 'File'
+    outputBinding:
+      glob: flags.h5
 label: inspect_flagging_dataloss
 requirements:
   - class: ShellCommandRequirement
@@ -26,11 +33,92 @@ requirements:
        entry: |
          import sys
          from casacore.tables import table
+         import itertools
+         import json
+         import h5py
+         import numpy as np
+         import re
+         from scipy.constants import c as LIGHT_SPEED
          input_ms_path = sys.argv[1]
+         sampling = int(sys.argv[2])
+         main_table = table(input_ms_path, 'r')
+
+         antennas = table(input_ms_path + '/ANTENNA', 'r')
+         spec_window = table(input_ms_path + '/SPECTRAL_WINDOW', 'r')
+         observation = table(input_ms_path + '/OBSERVATION', 'r')
+
+         sas_id, sap, sb = re.search('L(\d*).*SAP(\d{3}).*SB(\d{3}).*',
+                   observation.getcell('LOFAR_FILENAME', 0)).groups()
+         antenna_names = antennas.getcol('NAME')
+         antenna_positions = antennas.getcol('POSITION')
+
+         ref_frequency = spec_window.getcell('REF_FREQUENCY', 0)
+
+         TO_UVW = ref_frequency / LIGHT_SPEED  # 1 / (c/nu)
+
+         flags = np.array(main_table.getcol('FLAG'))
+         vis = np.array(main_table.getcol('DATA'))
+
+         uvw = np.array(main_table.getcol('UVW') * TO_UVW)
+
+         antenna1 = np.array(main_table.getcol('ANTENNA1'))
+         antenna2 = np.array(main_table.getcol('ANTENNA2'))
+         time = np.array(main_table.getcol('TIME'))
+         n_antennas = antennas.nrows()
+         n_baselines = int((n_antennas + 1) * n_antennas * .5)
+         n_times = flags.shape[0] // n_baselines
+         dataloss = np.array(vis == 0, dtype=np.int)
 
+         dataloss_matrix, (urange, vrange, wrange) = np.histogramdd(uvw, bins=[sampling, sampling, 1], weights=np.nanmean(dataloss, axis=(1,2)))
+         flag_matrix, (urange, vrange, wrange) = np.histogramdd(uvw, bins=[sampling, sampling, 1], weights=np.nanmean(flags, axis=(1,2)))
+         coverage, (urange, vrange, wrange) = np.histogramdd(uvw, bins=[sampling, sampling, 1])
+
+         flags = flags.reshape(n_times, n_baselines, *flags.shape[1:], order='C')
+         dataloss = dataloss.reshape(n_times, n_baselines, *flags.shape[2:], order='C')
+         uvw = uvw.reshape(n_times, n_baselines, *uvw.shape[1:], order='C')
+         timestamp = time.reshape(n_times, n_baselines, order='C')[:, 0]
+
+         baselines = [
+          "%s,%s" % (antenna_names[i], antenna_names[j])
+          for i, j in itertools.product(range(n_antennas), range(n_antennas)) if i >= j]
+
+         dataloss_as_function_of_time = np.nanmean(dataloss, axis=(1, 2, 3))
+         dataloss_as_function_of_baseline = np.nanmean(dataloss, axis=(0, 2, 3))
+
+
+         flags_as_function_of_time = np.nanmean(flags, axis=(1,2,3))
+         flags_as_function_of_baseline = np.nanmean(flags, axis=(0,2,3))
+
+         metrics = dict(baselines=baselines,
+                        dataloss_vs_time=dataloss_as_function_of_time.tolist(),
+                        dataloss_vs_baseline=dataloss_as_function_of_baseline.tolist(),
+                        flags_vs_time=flags_as_function_of_time.tolist(),
+                        flags_vs_baseline=flags_as_function_of_baseline.tolist(),
+                        timestamp=timestamp.tolist(),
+                        sap=sap,
+                        subband=sb
+                        )
 
          with open('metrics.json', 'w') as f_stream:
-             pass
+           json.dump(metrics, f_stream)
+
+         with h5py.File('flags.h5', 'w') as flags_file:
+           flags_file['/FLAGS'] = flag_matrix
+           flags_file['/COVERAGE'] = coverage
+           flags_file['/DATALOSS'] = dataloss_matrix
+
+           flags_file['/'].attrs['REF_FREQUENCY'] = ref_frequency
+           flags_file['/'].attrs['BASELINES'] = baselines
+           flags_file['/'].attrs['PROJECT'] = observation.getcell('PROJECT', 0)
+           flags_file['/'].attrs['TELESCOPE'] = observation.getcell('TELESCOPE_NAME', 0)
+           flags_file['/'].attrs['SAS_ID'] = sas_id
+           flags_file['/'].attrs['SAP'] = sap
+           flags_file['/'].attrs['SUBBAND'] = sb
+
+           flags_file['/URANGE'] = urange
+           flags_file['/VRANGE'] = vrange
+           flags_file['/WRANGE'] = wrange
+
 
   - class: DockerRequirement
-    dockerPull: 'lofareosc/lofar-pipeline:latest'
+    dockerPull: 'astronsdc/lofar-ms-software:latest'
diff --git a/steps/plot_uvw_coverage.cwl b/steps/plot_uvw_coverage.cwl
new file mode 100644
index 0000000..f188063
--- /dev/null
+++ b/steps/plot_uvw_coverage.cwl
@@ -0,0 +1,42 @@
+cwlVersion: v1.1
+class: CommandLineTool
+baseCommand:
+  - python3
+  - script.py
+inputs:
+  - id: input_file
+    type: File
+    inputBinding:
+      position: 0
+  - id: output_name
+    type: string?
+    default: 'uv_coverage.png'
+    inputBinding:
+      position: 1
+outputs:
+  - id: uv_plot
+    type: File
+    outputBinding:
+      glob: $(inputs.output_name)
+requirements:
+  - class: InlineJavascriptRequirement
+  - class: InitialWorkDirRequirement
+    listing:
+      - entryname: script.py
+        entry: |
+          import h5py
+          import matplotlib.pylab as plt
+          import sys
+          import numpy as np
+          from matplotlib import cm
+
+          h5_file = h5py.File(sys.argv[1], 'r')
+          output_name = sys.argv[2]
+          u = h5_file['URANGE']
+          v = h5_file['VRANGE']
+
+          plt.imshow(np.log10(h5_file['COVERAGE'][:, :, 0]), extent= [u[0], u[-1], v[0], v[-1]], cmap=cm.Blues)
+          plt.imshow(np.log10(h5_file['FLAGS'][:, :, 0]), extent= [u[0], u[-1], v[0], v[-1]] , cmap=cm.Reds)
+          plt.imshow(np.log10(h5_file['DATALOSS'][:, :, 0]), extent= [u[0], u[-1], v[0], v[-1]] , cmap=cm.Greens)
+
+          plt.savefig(output_name)
diff --git a/steps/read_surl_list.cwl b/steps/read_surl_list.cwl
index b1527d0..43c7a42 100644
--- a/steps/read_surl_list.cwl
+++ b/steps/read_surl_list.cwl
@@ -1,5 +1,5 @@
 class: ExpressionTool
-cwlVersion: v1.0
+cwlVersion: v1.1
 id: read_surl_file
 inputs:
   - id: surl_list
-- 
GitLab