Skip to content
Snippets Groups Projects
Commit 232d170a authored by Mattia Mancini's avatar Mattia Mancini
Browse files

Add ingest and split subworkflow

parent 7e8409cb
Branches
Tags v0.1.3
No related merge requests found
...@@ -5,45 +5,44 @@ label: compress_pipeline.cwl ...@@ -5,45 +5,44 @@ label: compress_pipeline.cwl
inputs: inputs:
- id: flag_autocorrelation - id: flag_autocorrelation
type: boolean? type: boolean?
- id: surls - id: msin
type: 'string[]' type: Directory
outputs: outputs:
- id: output - id: inspect
outputSource: outputSource:
- inspect_flagging_dataloss/output - inspect_flagging_dataloss/output
type: 'File[]' type: 'File'
- id: logfile - id: logfile
outputSource: outputSource:
- dppp/logfile - dppp/logfile
type: type: File[]
type: array
items:
items: File
type: array
- id: compressed - id: compressed
outputSource: outputSource:
- compress/compressed - compress/compressed
type: File[] type: File
- id: ingest
outputSource:
- format_ingest/ingest
type: Any
steps: steps:
- id: fetch_data - id: extract_sip_meta
in: in:
- id: surl_link - id: msin
source: surls source: dppp/msout
- id: compressed_file
source: compress/compressed
out: out:
- id: uncompressed - id: ingest
run: steps/fetch_data.cwl run: steps/extract_sip_meta.cwl
label: fetch_data
scatter:
- surl_link
- id: dppp - id: dppp
in: in:
- id: parset - id: parset
source: define_parset/output source: define_parset/output
- id: msin - id: msin
source: fetch_data/uncompressed source: msin
- id: msout_name - id: msout_name
source: fetch_data/uncompressed source: msin
valueFrom: '$("COMPRESSED_" + self[0].basename)' valueFrom: '$("COMPRESSED_" + self.basename)'
- id: writefullresflag - id: writefullresflag
default: true default: true
- id: storagemanager - id: storagemanager
...@@ -54,8 +53,15 @@ steps: ...@@ -54,8 +53,15 @@ steps:
- id: msout - id: msout
- id: logfile - id: logfile
run: steps/DPPP.cwl run: steps/DPPP.cwl
scatter: - id: format_ingest
- msin in:
- id: metadata
source: extract_sip_meta/ingest
- id: output_name
default: compressed
out:
- id: ingest
run: steps/format_ingest.cwl
- id: define_parset - id: define_parset
in: in:
- id: flag_autocorrelation - id: flag_autocorrelation
...@@ -67,13 +73,11 @@ steps: ...@@ -67,13 +73,11 @@ steps:
- id: inspect_flagging_dataloss - id: inspect_flagging_dataloss
in: in:
- id: input - id: input
source: fetch_data/uncompressed source: msin
out: out:
- id: output - id: output
run: steps/inspect_flagging_dataloss.cwl run: steps/inspect_flagging_dataloss.cwl
label: inspect_flagging_dataloss label: inspect_flagging_dataloss
scatter:
- input
- id: compress - id: compress
in: in:
- id: directory - id: directory
...@@ -82,9 +86,6 @@ steps: ...@@ -82,9 +86,6 @@ steps:
- id: compressed - id: compressed
run: steps/compress.cwl run: steps/compress.cwl
label: compress label: compress
scatter:
- directory
requirements: requirements:
- class: ScatterFeatureRequirement
- class: StepInputExpressionRequirement - class: StepInputExpressionRequirement
- class: InlineJavascriptRequirement - class: InlineJavascriptRequirement
class: Workflow
cwlVersion: v1.0
id: compress_pipeline_cwl
label: compress_pipeline.cwl
inputs:
- id: flag_autocorrelation
type: boolean?
- id: surls
type: 'string[]'
outputs:
- id: compressed
outputSource:
- compress/compressed
type: File[]
- id: inspect
outputSource:
- compress/inspect
type: File[]
- id: logfile
outputSource:
- compress/logfile
type:
type: array
items:
type: array
items: File
- id: ingest
outputSource:
- compress/inspect
type: Any[]
steps:
- id: fetch_data
in:
- id: surl_link
source: surls
out:
- id: uncompressed
run: steps/fetch_data.cwl
label: fetch_data
scatter:
- surl_link
- id: compress
in:
- id: msin
source: fetch_data/uncompressed
- id: flag_autocorrelation
source: flag_autocorrelation
scatter:
- msin
out:
- id: compressed
- id: inspect
- id: logfile
- id: ingest
run: ./compress_pipeline.cwl
requirements:
- class: ScatterFeatureRequirement
- class: SubworkflowFeatureRequirement
- class: StepInputExpressionRequirement
- class: InlineJavascriptRequirement
...@@ -84,7 +84,7 @@ outputs: ...@@ -84,7 +84,7 @@ outputs:
glob: 'DPPP*.log' glob: 'DPPP*.log'
hints: hints:
- class: DockerRequirement - class: DockerRequirement
dockerPull: lofareosc/lofar-pipeline:latest dockerPull: astronsdc/lofar-ms-software:latest
stdout: DPPP.log stdout: DPPP.log
stderr: DPPP_err.log stderr: DPPP_err.log
requirements: requirements:
......
class: CommandLineTool
cwlVersion: v1.0
id: inspect_flagging_dataloss
baseCommand:
- bash
- script.sh
inputs:
- id: msin
type: Directory
inputBinding:
position: 0
- id: compressed_file
type: File
inputBinding:
position: 1
outputs:
- id: ingest
type: Any
outputBinding:
glob: metadata.json
loadContents: True
outputEval: |
${
return JSON.parse(self[0].contents)
}
label: inspect_flagging_dataloss
hints:
- class: DockerRequirement
dockerPull: 'astronsdc/lofar-ms-software:latest'
requirements:
- class: InlineJavascriptRequirement
- class: InitialWorkDirRequirement
listing:
- entryname: pipeline_info.json
entry: |
{
"pipelineRun": {
"_type": "AveragingPipeline",
"demixing": "false",
"pipelineName": "compress_pipeline",
"strategyName": "",
"pipelineVersion": "v01.23",
"strategyDescription": "",
"timeIntegrationStep": "1",
"flagAutoCorrelations": "true",
"frequencyIntegrationStep": "1",
"numberOfCorrelatedDataProducts": "1"
}
}
- entryname: dataproduct.json
entry: |
{
"dataProduct":
{
"fileName": $(inputs.compressed_file.basename),
"size": $(inputs.compressed_file.size)
}
}
- entryname: script.sh
entry: |
md5=`md5sum $2 | awk '{ print $1 }'`
cat > md5_sum.json << EOF
{
"dataProduct":
{
"fileName": $(inputs.compressed_file.basename),
"size": $(inputs.compressed_file.size),
"md5checksum": $md5
}
}
EOF
lofar_sip_from_ms.py --json metadata.json $1
lofar_sip_merge.py metadata.json pipeline_info.json --json metadata.json
lofar_sip_merge.py metadata.json dataproduct.json --json metadata.json
lofar_sip_merge.py metadata.json md5_sum.json --json metadata.json
cwlVersion: v1.2
class: ExpressionTool
inputs:
- id: metadata
type: Any
- id: output_name
type: string
outputs:
- id: ingest
type: Any
requirements:
- class: InlineJavascriptRequirement
expression: |
${
return { "ingest": {
"path": inputs.output_name,
"metadata": inputs.metadata
}
}
}
...@@ -8,7 +8,7 @@ baseCommand: ...@@ -8,7 +8,7 @@ baseCommand:
- script.py - script.py
inputs: inputs:
- id: input - id: input
type: Directory? type: Directory
inputBinding: inputBinding:
position: 0 position: 0
shellQuote: false shellQuote: false
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment