Commit b45dc490 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

Merge branch 'create_workflow_sdc676' into 'master'

Create workflow sdc676

See merge request !2
parents d9afc8f5 e5554c59
Pipeline #37646 passed with stage
in 1 minute and 38 seconds
docker-build:
image: docker:stable
stage: build
services:
- docker:dind
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
# Default branch leaves tag empty (= latest tag)
# All other branches are tagged with the escaped branch name (commit ref slug)
script:
- docker build --pull -t "$CI_REGISTRY_IMAGE:latest" .
- docker push "$CI_REGISTRY_IMAGE:latest"
# Run this job in a branch where a Dockerfile exists
rules:
- if: $CI_COMMIT_BRANCH
exists:
- Dockerfile
FROM python:3.10
COPY . /src
RUN cd /src/bf_pulp_utils && pip install .
RUN collect_unspecified_metadata --help && \
double_tgz_elimination --help
from importlib.metadata import version, PackageNotFoundError
try:
__version__ = version("bf_pulp_utils")
except PackageNotFoundError:
# package is not installed
pass
\ No newline at end of file
......@@ -5,33 +5,34 @@
#
# (c) Vlad Kondratiev - 01.11.2021
#
import os, sys, re, glob
from argparse import ArgumentParser
import glob
import json
import datetime as dt
from datetime import datetime
import time
from ldv_obs import *
from ldv_pulp import *
import os
import re
import sys
from argparse import ArgumentParser
from bf_pulp_utils.ldv_obs import *
from bf_pulp_utils.ldv_pulp import *
# directory with all PulP log- and feedback files
# when executed on Spider in folder '/project/ldv/Data/beamformed/' the path is relative
DEFAULT_ROOTDIR = "./pulp-logs"
# dictionaries
observation = {}
pulp = {}
# to save to JSON file
global_dict = {
"Observation" : observation,
"Pulsar Pipeline" : pulp
}
# suffix for the filename of the output JSON file
json_filename_suffix = "_unspecified.json"
def main():
# dictionaries
observation = {}
pulp = {}
# main
if __name__ == "__main__":
# to save to JSON file
global_dict = {
"Observation": observation,
"Pulsar Pipeline": pulp
}
# suffix for the filename of the output JSON file
json_filename_suffix = "_unspecified.json"
parser = ArgumentParser(description='Collect metadata from pulp-log files')
parser.add_argument('obs_id')
......@@ -39,24 +40,17 @@ if __name__ == "__main__":
args = parser.parse_args()
if args.rootdir is None:
print("No rootdir as option is given, use default path - %s\n", DEFAULT_ROOTDIR)
print("No rootdir as option is given use default path")
rootdir = DEFAULT_ROOTDIR
else:
rootdir = args.rootdir
if args.parsetdir None:
print("No parsetdir as option is given, use default path - %s\n", DEFAULT_PARSETDIR)
parsetsdir = DEFAULT_PARSETDIR
else:
parsetsdir = args.parsetdir
parsetsdir = os.path.join(rootdir, 'parsets')
# TODO use are argument for this like obs_id is 'all' ??
# if ObsID is not given, then will be collecting all ObsIDs from the corresponding directories in the "root" directory
# for now will be printing the message and exiting...
sasid = args.obs_id
# checking if directory <ObsID> exists
......@@ -73,21 +67,22 @@ if __name__ == "__main__":
if not os.path.exists(feedbackfile):
feedbackfile = "%s/L%s/pulpL%s_feedback" % (rootdir, sasid, sasid)
if not os.path.exists(feedbackfile):
print("ERROR: Feedback file is not found (neither '%s/Observation%s_feedback' nor '%s')" % (rootdir, sasid, feedbackfile))
print("ERROR: Feedback file is not found (neither '%s/Observation%s_feedback' nor '%s')" % (
rootdir, sasid, feedbackfile))
sys.exit(1)
# checking if parset file exists
foundparsets=glob.glob("%s/parsets/*%s*" % (rootdir, sasid))
foundparsets = glob.glob("%s/*%s*" % (parsetsdir, sasid))
parset=""
if len(foundparsets) > 0:
parset = ""
if len(foundparsets) > 0:
parset = sorted(foundparsets, key=len)[0]
# reading log-file file into the list of lines
f = open(logfile, 'r')
# ignoring empty lines
#comments_and_empty=re.compile(r"(^\s*#+.*$)|(^\s*$)")
empty=re.compile(r"(^\s*$)")
# comments_and_empty=re.compile(r"(^\s*#+.*$)|(^\s*$)")
empty = re.compile(r"(^\s*$)")
loglines = [ff for ff in f.read().splitlines() if empty.search(ff) is None]
f.close()
# reading feedback-file file into the list of lines
......@@ -98,15 +93,16 @@ if __name__ == "__main__":
# reading parset-file file into the list of lines
if parset != "":
f = open(parset, 'r')
empty=re.compile(r"(^\s*$)")
empty = re.compile(r"(^\s*$)")
parsetlines = [ff for ff in f.read().splitlines() if empty.search(ff) is None]
f.close()
else: parsetlines = []
else:
parsetlines = []
# populating observation info
observation = populating_observation (sasid, observation, parsetlines, loglines, feedlines)
observation["Parset"] = parset.split("/")[-1] # removing the path
observation = populating_observation(sasid, observation, parsetlines, loglines, feedlines)
observation["Parset"] = parset.split("/")[-1] # removing the path
# populating pipeline info
pulp = populating_pipeline(sasid, pulp, loglines, feedlines)
pulp["Project"] = observation["Project"]
......@@ -120,3 +116,7 @@ if __name__ == "__main__":
json.dump(global_dict, outfile)
print("File %s created" % outname)
if __name__ == '__main__':
main()
......@@ -14,29 +14,27 @@
import os, sys, re
import tarfile
import glob
import fnmatch
import numpy as np
import optparse as opt
import shutil
# files that are not in the duplicate tarballs but we still need to keep them
excluded=["*_pulp.log", \
"*all_bestRMs.out", \
"*StokeStats.out", \
"*rmfit_results.out"]
excluded = ["*_pulp.log", \
"*all_bestRMs.out", \
"*StokeStats.out", \
"*rmfit_results.out"]
# main
if __name__=="__main__":
def main():
usage = "Usage: %prog [options] <input LTA tarball> <output tarball>"
cmdline = opt.OptionParser(usage)
# adding options
#cmdline.add_option('-o', '--output', dest='outfile', metavar='OUTPUT TARBALL NAME', help='If not given, \
#then "_new" suffix will be added to the name of the input tarball', default="", type='str')
# cmdline.add_option('-o', '--output', dest='outfile', metavar='OUTPUT TARBALL NAME', help='If not given, \
# then "_new" suffix will be added to the name of the input tarball', default="", type='str')
# reading cmd options
(opts,args) = cmdline.parse_args()
(opts, args) = cmdline.parse_args()
# check if input file is given
if len(args) == 0:
......@@ -48,15 +46,15 @@ if __name__=="__main__":
# output tarball
if len(args) < 2:
print ("The name of the output tarball is not given!")
print("The name of the output tarball is not given!")
sys.exit(1)
output_tarball = args[1]
# if opts.outfile == "":
# output_tarball = input_tarball.split(".tar")[0] + "_new" + ".tar"
# else:
# output_tarball = opts.outfile
# if opts.outfile == "":
# output_tarball = input_tarball.split(".tar")[0] + "_new" + ".tar"
# else:
# output_tarball = opts.outfile
# getting all *.tar.gz in the LTA tarball
matches = []
......@@ -66,6 +64,11 @@ if __name__=="__main__":
matches.append(filename)
inputtar.close()
ascii_name = output_tarball + "_filecontent.txt"
with open(ascii_name, "w") as outfile:
outfile.write("\n".join(dircontent))
# checking whether _all_ files in dircontent have the same prefix dirname in the path
dirprefix = dircontent[0].split("/")[0]
for ii in dircontent:
......@@ -75,10 +78,10 @@ if __name__=="__main__":
# checking if dirprefix has any one of these patterns: "_CVplots", "_CSplots", "_ISplots", "_redIS", "_red_locus"
# if this is the case, then we will create a new tarball with this prefix removed
is_restructured=False
pattern=re.compile(r"(_CVplots)|(_CSplots)|(_ISplots)|(_redIS)|(_red_locus)")
if pattern.search(dirprefix) is not None: # i.e. we need to rewrite the tarball
is_restructured=True
is_restructured = False
pattern = re.compile(r"(_CVplots)|(_CSplots)|(_ISplots)|(_redIS)|(_red_locus)")
if pattern.search(dirprefix) is not None: # i.e. we need to rewrite the tarball
is_restructured = True
inputtar = tarfile.open(input_tarball, "r")
outputtar = tarfile.open(output_tarball, "w")
for member in inputtar.getmembers():
......@@ -87,12 +90,12 @@ if __name__=="__main__":
outputtar.addfile(member, obj)
outputtar.close()
inputtar.close()
else: # making the tarball copy
else: # making the tarball copy
shutil.copyfile(input_tarball, output_tarball)
# if no *.tar.gz found, then there is nothing to do
if len(matches) == 0:
print ("No duplicate tarballs found. Output tarball is the same as input")
print("No duplicate tarballs found. Output tarball is the same as input")
inputtar.close()
sys.exit(0)
......@@ -101,7 +104,7 @@ if __name__=="__main__":
dircontent = [ii.split("%s/" % (dirprefix))[-1] for ii in dircontent]
# merging contents of all tarballs together in a single list
flist=[]
flist = []
inputtar = tarfile.open(input_tarball, "r")
for ff in matches:
tgz = inputtar.extractfile(ff)
......@@ -114,9 +117,9 @@ if __name__=="__main__":
# cross-checking files in the LTA tarball with files in the internal *.tar.gz files
# extra files will be added to to_delete list for removal
to_delete=[]
to_delete = []
for ff in dircontent:
is_excluded=[]
is_excluded = []
for jj in excluded:
is_excluded.extend(fnmatch.filter([ff], jj))
if len(is_excluded) != 0: continue
......@@ -125,19 +128,19 @@ if __name__=="__main__":
# getting files that are possibly only in the internal *.tar.gz, but not in the LTA tarball.
# these files will be extracted and kept
to_extract=list(set(flist)-set(flist).intersection(set(dircontent)))
to_extract = list(set(flist) - set(flist).intersection(set(dircontent)))
if len(to_extract) == 0:
print ("No extra files to extract from *.tar.gz")
print("No extra files to extract from *.tar.gz")
else:
# opening output tarball for writing
outputtar = tarfile.open(output_tarball, "a")
print ("To extract:")
print("To extract:")
# if we have only one internal *.tar.gz (usually)
if len(matches) == 1:
jj = inputtar.extractfile(matches[0])
tar = tarfile.open(matches[0], "r:gz", fileobj=jj)
for ii in to_extract:
print (ii)
print(ii)
# !!! here we should extract from an internal tgz into the LTA tarball !!!
# if dirprefix != "", then it should be added also for the extracted file
if dirprefix != "":
......@@ -155,12 +158,12 @@ if __name__=="__main__":
else:
# if we have several *.tar.gz we need first to know which tarball this file belong to
for ii in to_extract:
print (ii, "[from: ", end='')
print(ii, "[from: ", end='')
for tgz in matches:
jj = inputtar.extractfile(tgz)
with tarfile.open(tgz, "r:gz", fileobj=jj) as tar:
if ii in tar.getmembers():
print (tgz, "]")
print(tgz, "]")
# !!!! here we should extract from an internal tgz into the LTA tarball !!!
# if dirprefix != "", then it should be added also for the extracted file
if dirprefix != "":
......@@ -183,9 +186,9 @@ if __name__=="__main__":
if dirprefix != "":
if not is_restructured:
to_delete = [dirprefix + "/" + ii for ii in to_delete]
print ("To delete: ")
print("To delete: ")
for ii in to_delete:
print (ii)
print(ii)
# Python tarfile module does not allow to delete files from the archives, using system calls for now
os.system("tar -vf %s --delete %s" % (output_tarball, ii))
......@@ -193,6 +196,10 @@ if __name__=="__main__":
outputtar = tarfile.open(output_tarball, "r")
newtarcontent = [ii.name for ii in outputtar.getmembers()]
outputtar.close()
ascii_name=output_tarball + "_filecontent.txt"
ascii_name = output_tarball + "_filecontent.txt"
with open(ascii_name, "w") as outfile:
outfile.write("\n".join(newtarcontent))
if __name__ == '__main__':
main()
\ No newline at end of file
......@@ -2,6 +2,7 @@
#
import datetime as dt
from datetime import datetime
import sys
# populating observation info
def populating_observation (sasid, observation, parsetlines, loglines, feedlines):
......@@ -12,7 +13,7 @@ def populating_observation (sasid, observation, parsetlines, loglines, feedlines
project=res[-1].split("Project:", 1)[-1].split("PI:", 1)[0].strip()
observation["Project"] = project
except:
print "(E) Bad logfile for SASid %s. Pipelines has probably failed" % (sasid)
print("(E) Bad logfile for SASid %s. Pipelines has probably failed" % (sasid))
sys.exit(1)
# Creator - it is always AWTIER0 ??
......@@ -71,7 +72,7 @@ def populating_observation (sasid, observation, parsetlines, loglines, feedlines
res=[ii for ii in parsetlines if "Observation.channelsPerSubband" in ii]
nchan_per_sub=int(res[0].split("=")[1].strip())
else:
print "(W) Parset file is not available for SASid %s" % (sasid)
print("(W) Parset file is not available for SASid %s" % (sasid))
nchan_per_sub = 1
chanwidth = subwidth / nchan_per_sub
......@@ -98,12 +99,15 @@ def populating_observation (sasid, observation, parsetlines, loglines, feedlines
# Duration [s]
res = [ii for ii in loglines if "Start UTC:" in ii]
dur = res[0].split("Duration:")[1].strip()
if dur[-1] == "s":
duration = float(dur.split("s")[0])
if dur[-1] == "m":
elif dur[-1] == "m":
duration = float(dur.split("m")[0]) * 60.
if dur[-1] == "h":
elif dur[-1] == "h":
duration = float(dur.split("h")[0]) * 3600.
else:
duration = 0
observation["Duration [s]"] = duration
# End Time
......
#!/usr/bin/env python
#
import sys
from datetime import datetime
......@@ -110,13 +111,13 @@ def populating_pipeline(sasid, pulp, loglines, feedlines):
pulp["skipDynamicSpectrum"] = skipDynamicSpectrum
# pulp start, stop times and wall time
res=[ii for ii in loglines if "UTC" in ii and "Start" not in ii]
res = [ii for ii in loglines if "UTC" in ii and "Start" not in ii]
try:
st=res[-2].split("is:", 1)[-1].strip()
except:
print "(E) Pipeline for the SASid %s has probably failed" % (sasid)
st = res[-2].split("is:", 1)[-1].strip()
except:
print("(E) Pipeline for the SASid %s has probably failed" % (sasid))
sys.exit(1)
# Start Time
starttime = datetime.strptime(st, '%a %b %d %H:%M:%S %Y').strftime('%Y-%m-%d %H:%M:%S')
pt = datetime.strptime(st, '%a %b %d %H:%M:%S %Y')
......
[build-system]
requires = ["setuptools>=61.0", "wheel", "setuptools_scm[toml]>=6.2"]
build-backend = "setuptools.build_meta"
[project]
name = "bf_pulp_utils"
description = "Small set of utils to process PULP data"
dynamic = ['version']
requires-python = ">=3.7"
classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: OS Independent"
]
dependencies = [
"numpy"
]
[tool.setuptools_scm]
root=".."
[project.scripts]
collect_unspecified_metadata = "bf_pulp_utils.collect_unspecified_metadata:main"
double_tgz_elimination = "bf_pulp_utils.double_tgz_elimination:main"
\ No newline at end of file
from setuptools import setup, find_packages
def readme():
with open('README.rst') as f:
return f.read()
setup(name='bcf_pipeline',
version='1.0.0',
description='Beamformed data compression pipeline',
url='https://git.astron.nl/ldv/bf_double_tgz/',
author='Vlad Kondratiev',
author_email='kondratiev@astron.nl',
license='Apache 2',
install_requires=[
'cwltool'],
packages=find_packages(),
include_package_data=True,
scripts=['scripts/double_tgz_elimination.py']
)
......@@ -6,20 +6,28 @@ inputs:
type: File
inputBinding:
position: 1
- id: output_filename
doc: output filename for the tar archive
default: 'output'
type: string?
inputBinding:
position: 2
arguments:
- valueFrom: $(inputs.source_tgz.basename.replace(/_\w{8}.tar/gm, '.tar'))
position: 2
outputs:
- id: output_tar
doc: output tar archive
type: File
outputBinding:
glob: $(inputs.output_filename)
baseCommand:
- double_tgz_elimination.py
glob: "*.tar"
- id: file_content
doc: file_content
type: string[]
outputBinding:
glob: "*_filecontent.txt"
loadContents: true
outputEval: $(self[0].contents.split('\n'))
baseCommand:
- double_tgz_elimination
hints:
- class: DockerRequirement
dockerPull: git.astron.nl:5000/ldv/bf_double_tgz:latest
requirements: []
class: CommandLineTool
cwlVersion: v1.2
baseCommand:
- collect-unspecified-metadata.py
- collect_unspecified_metadata
inputs:
- id: sas_id
doc: SAS ID (ObservationID)
......@@ -17,8 +17,14 @@ inputs:
outputs:
- id: output_json
doc: Resulting json file with metadata of given SAS_ID
type: File
type: Any
outputBinding:
glob: 'L$(inputs.sas_id)_unspecified.json'
loadContents: true
outputEval: |
$(self[0] ? JSON.parse(self[0].contents): null)
requirements:
- class: InlineJavascriptRequirement
hints:
- class: DockerRequirement
dockerPull: git.astron.nl:5000/ldv/bf_double_tgz:latest
id: fetchdata
label: fetch_data
class: CommandLineTool
cwlVersion: v1.1
inputs:
- id: surl_link
type: string
inputBinding:
position: 0
outputs:
- id: tar_archive
type: File
outputBinding:
glob: 'out/*'
baseCommand:
- 'bash'
- 'fetch.sh'
doc: 'Untar a compressed file'
requirements:
InlineJavascriptRequirement: {}
InitialWorkDirRequirement:
listing:
- entryname: 'fetch.sh'
entry: |
#!/bin/bash
mkdir out
cd out
turl=`echo $1 | awk '{gsub("srm://srm.grid.sara.nl[:0-9]*","gsiftp://gridftp.grid.sara.nl"); print}'`
file_name=$(inputs.surl_link.split('/').pop())
echo "Downloading $turl to $file_name"
globus-url-copy $turl file://$PWD/$file_name
cwlVersion: v1.2
class: ExpressionTool
inputs:
- id: metadata
type: Any
- id: file_content
type: string[]
- id: output_name
type: string
- id: file_name
type: string
outputs:
- id: ingest
type: Any
requirements:
- class: InlineJavascriptRequirement
expression: |
${
inputs.metadata['fileContent'] = inputs.file_content
return { "ingest": {
"path": inputs.output_name,
"file_name": inputs.file_name,
"metadata": inputs.metadata
}
}
}
cwlVersion: v1.2
class: Workflow
inputs:
- id: bf_tar_archive
doc: Tar archive with the BeamFormed dataset
type: File
- id: pulp_log_folder
type: Directory
outputs:
- id: tar_archive