Commit 7c354bf5 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

Merge branch 'update_metadata' into 'master'

Update metadata

See merge request !3
parents cbce9ab2 bd1ada5a
Pipeline #37651 passed with stage
in 51 seconds
......@@ -15,12 +15,12 @@ from argparse import ArgumentParser
from bf_pulp_utils.ldv_obs import *
from bf_pulp_utils.ldv_pulp import *
# directory with all PulP log- and feedback files
# when executed on Spider in folder '/project/ldv/Data/beamformed/' the path is relative
DEFAULT_ROOTDIR = "./pulp-logs"
def main():
# dictionaries
observation = {}
......@@ -36,6 +36,7 @@ def main():
parser = ArgumentParser(description='Collect metadata from pulp-log files')
parser.add_argument('obs_id')
parser.add_argument('filename')
parser.add_argument('--rootdir', help="Specify rootdir (absolute path)")
args = parser.parse_args()
......@@ -104,7 +105,7 @@ def main():
observation["Parset"] = parset.split("/")[-1] # removing the path
# populating pipeline info
pulp = populating_pipeline(sasid, pulp, loglines, feedlines)
pulp = populating_pipeline(sasid, args.filename, pulp, loglines, feedlines)
pulp["Project"] = observation["Project"]
pulp["Creator"] = observation["Creator"]
pulp["Privileges"] = observation["Privileges"]
......
#!/usr/bin/env python
#
import datetime as dt
from datetime import datetime
import sys
from datetime import datetime
# populating observation info
def populating_observation (sasid, observation, parsetlines, loglines, feedlines):
def populating_observation(sasid, observation, parsetlines, loglines, feedlines):
# getting Project
# It exists for Unspecified Process as well
try:
res=[ii for ii in loglines if "Project:" in ii]
project=res[-1].split("Project:", 1)[-1].split("PI:", 1)[0].strip()
res = [ii for ii in loglines if "Project:" in ii]
project = res[-1].split("Project:", 1)[-1].split("PI:", 1)[0].strip()
observation["Project"] = project
except:
print("(E) Bad logfile for SASid %s. Pipelines has probably failed" % (sasid))
......@@ -27,16 +28,16 @@ def populating_observation (sasid, observation, parsetlines, loglines, feedlines
# Release Date
# It exists for Unspecified Process as well
# should be read from Unspecified Process
observation["Release Date"] = ""
observation["Release Date"] = ""
# SAS Id
# It exists for Unspecified Process as well
# should be read from Unspecified Process
observation["SAS Id"] = ""
observation["SAS Id"] = str(sasid)
# SAS Id Source
observation["SAS Id Source"] = "SAS"
# Process Identifier Name
res = [ii for ii in loglines if "Target" in ii]
target = res[0].split("Target: ")[1].split(" ")[0].strip()
......@@ -63,15 +64,15 @@ def populating_observation (sasid, observation, parsetlines, loglines, feedlines
observation["Instrument Filter"] = filter
# Channel Width and Nr. Channels per Sub
res=[ii for ii in loglines if "SubWidth:" in ii]
subwidth=float(res[0].split("SubWidth:")[1].split("kHz")[0].strip()) / 1000. # in MHz
res=[ii for ii in loglines if "chans/sub:" in ii]
nchan_per_sub=int(res[0].split("/sub:")[1].split("Downsample")[0].strip())
res = [ii for ii in loglines if "SubWidth:" in ii]
subwidth = float(res[0].split("SubWidth:")[1].split("kHz")[0].strip()) / 1000. # in MHz
res = [ii for ii in loglines if "chans/sub:" in ii]
nchan_per_sub = int(res[0].split("/sub:")[1].split("Downsample")[0].strip())
if nchan_per_sub == 0:
if len(parsetlines) > 0:
res=[ii for ii in parsetlines if "Observation.channelsPerSubband" in ii]
nchan_per_sub=int(res[0].split("=")[1].strip())
else:
res = [ii for ii in parsetlines if "Observation.channelsPerSubband" in ii]
nchan_per_sub = int(res[0].split("=")[1].strip())
else:
print("(W) Parset file is not available for SASid %s" % (sasid))
nchan_per_sub = 1
......@@ -83,13 +84,13 @@ def populating_observation (sasid, observation, parsetlines, loglines, feedlines
res = [ii for ii in loglines if "SAPs:" in ii]
nsaps = int(res[0].split("SAPs:")[1].split("Target")[0].strip())
observation["Number of SubArray Pointings"] = nsaps
# Number of subbands (not in the metadata for observation - only for the specific part of a beam)
# I collect this now for testing purposes to calculate #nsubs * #nsaps - total number of subbands, to see
# if this number > 244, or not (to determine if 8-bit was used or not)
res=[ii for ii in loglines if "subbands:" in ii]
nsubs=int(res[0].split(":")[1].split("[")[0].strip())
#observation["Total number of subbands"] = nsubs # * nsaps (NOT needed, as in the logfile is already the total number of subs)
res = [ii for ii in loglines if "subbands:" in ii]
nsubs = int(res[0].split(":")[1].split("[")[0].strip())
# observation["Total number of subbands"] = nsubs # * nsaps (NOT needed, as in the logfile is already the total number of subs)
# Start Time
res = [ii for ii in loglines if "Start UTC:" in ii]
......@@ -109,7 +110,7 @@ def populating_observation (sasid, observation, parsetlines, loglines, feedlines
else:
duration = 0
observation["Duration [s]"] = duration
# End Time
# 2014-05-02 06:21:00.000000000 get rid of the usec in the starttime string
st = datetime.strptime(starttime.rsplit(".")[0], '%Y-%m-%d %H:%M:%S')
......@@ -119,49 +120,51 @@ def populating_observation (sasid, observation, parsetlines, loglines, feedlines
# Bits Per Sample (reading parset file)
if len(parsetlines) > 0:
res=[ii for ii in parsetlines if "Observation.nrBitsPerSample" in ii]
res = [ii for ii in parsetlines if "Observation.nrBitsPerSample" in ii]
if len(res) != 0:
observation["Bits Per Sample"] = int(res[0].split("=")[1].strip())
else:
res=[ii for ii in parsetlines if "OLAP.nrBitsPerSample" in ii]
res = [ii for ii in parsetlines if "OLAP.nrBitsPerSample" in ii]
if len(res) != 0:
observation["Bits Per Sample"] = int(res[0].split("=")[1].strip())
else:
res=[ii for ii in parsetlines if "Observation.ObservationControl.OnlineControl.OLAP.nrBitsPerSample" in ii]
res = [ii for ii in parsetlines if
"Observation.ObservationControl.OnlineControl.OLAP.nrBitsPerSample" in ii]
if len(res) != 0:
observation["Bits Per Sample"] = int(res[0].split("=")[1].strip())
else:
if nsubs > 244:
observation["Bits Per Sample"] = 8
else:
observation["Bits Per Sample"] = 16
else: # if parsetfile is not available we will use the number of subs to guestimate
observation["Bits Per Sample"] = 16
else: # if parsetfile is not available we will use the number of subs to guestimate
if nsubs > 244:
observation["Bits Per Sample"] = 8
else:
observation["Bits Per Sample"] = 16
# Observing Description
observation["Observing Description"] = "%s, %s, %s, %d bit, %d ch/sub, Nsubs=%d" % (antenna, filter, dur, observation["Bits Per Sample"], nchan_per_sub, nsubs)
observation["Observing Description"] = "%s, %s, %s, %d bit, %d ch/sub, Nsubs=%d" % (
antenna, filter, dur, observation["Bits Per Sample"], nchan_per_sub, nsubs)
# Clock
res=[ii for ii in loglines if "Clock:" in ii]
clock=res[-1].split("Clock:")[1].split("MHz")[0].strip()
res = [ii for ii in loglines if "Clock:" in ii]
clock = res[-1].split("Clock:")[1].split("MHz")[0].strip()
observation["Clock [MHz]"] = clock
# Station Selection - is it always 'Custom'?
observation["Station Selection"] = "Custom"
# Number of stations (core, remote, international)
res=[ii for ii in loglines if "stations:" in ii]
nstat=int(res[-1].split("stations:")[1].split("[")[0].strip())
ncore=int(res[-1].split("[")[1].split("CS")[0].strip())
nremote=int(res[-1].split(",")[1].split("RS")[0].strip())
res = [ii for ii in loglines if "stations:" in ii]
nstat = int(res[-1].split("stations:")[1].split("[")[0].strip())
ncore = int(res[-1].split("[")[1].split("CS")[0].strip())
nremote = int(res[-1].split(",")[1].split("RS")[0].strip())
ninternational = nstat - ncore - nremote
observation["Number of Stations"] = nstat
observation["Nr Stations Core"] = ncore
observation["Nr Stations Remote"] = nremote
observation["Nr Stations International"] = ninternational
observation["Nr Stations International"] = ninternational
# Number of Correlated DataProducts - always 0?
observation["Number of Correlated DataProducts"] = 0
......@@ -179,28 +182,28 @@ def populating_observation (sasid, observation, parsetlines, loglines, feedlines
observation["Time System"] = "UTC"
# Number of BeamFormed DataProducts
res=[ii for ii in parsetlines if "Observation.DataProducts.Output_Beamformed.filenames" in ii]
res = [ii for ii in parsetlines if "Observation.DataProducts.Output_Beamformed.filenames" in ii]
if len(res) > 0:
temp=res[-1].split("=")[1].strip().split("[")[1].split("]")[0].strip()
temp = res[-1].split("=")[1].strip().split("[")[1].split("]")[0].strip()
if len(temp) == 0:
nbeamfiles=0
nbeamfiles = 0
else:
nbeamfiles=len(temp.split(","))
nbeamfiles = len(temp.split(","))
observation["Number of BeamFormed DataProducts"] = nbeamfiles
else:
res1=[ii for ii in parsetlines if "Observation.DataProducts.Output_CoherentStokes.filenames" in ii]
res2=[ii for ii in parsetlines if "Observation.DataProducts.Output_IncoherentStokes.filenames" in ii]
res1 = [ii for ii in parsetlines if "Observation.DataProducts.Output_CoherentStokes.filenames" in ii]
res2 = [ii for ii in parsetlines if "Observation.DataProducts.Output_IncoherentStokes.filenames" in ii]
if len(res1) + len(res2) > 0:
temp=res1[-1].split("=")[1].strip().split("[")[1].split("]")[0].strip()
temp = res1[-1].split("=")[1].strip().split("[")[1].split("]")[0].strip()
if len(temp) == 0:
ncsfiles=0
ncsfiles = 0
else:
ncsfiles=len(temp.split(","))
temp=res2[-1].split("=")[1].strip().split("[")[1].split("]")[0].strip()
ncsfiles = len(temp.split(","))
temp = res2[-1].split("=")[1].strip().split("[")[1].split("]")[0].strip()
if len(temp) == 0:
nisfiles=0
nisfiles = 0
else:
nisfiles=len(temp.split(","))
nisfiles = len(temp.split(","))
observation["Number of BeamFormed DataProducts"] = ncsfiles + nisfiles
else:
observation["Number of BeamFormed DataProducts"] = 0
......
......@@ -4,8 +4,61 @@ import sys
from datetime import datetime
def set_on_path(obj, path, value):
leaf = obj
for item in path[:-1]:
if item.strip() not in leaf:
leaf[item.strip()] = {}
leaf = leaf[item.strip()]
else:
leaf = leaf[item.strip()]
leaf[path[-1].strip()] = value.strip()
def parse_feedback_lines(feedlines):
parsed_log_lines = {}
for line in feedlines:
if line == '':
continue
key, value = line.split('=')
path = key.split('.')
set_on_path(parsed_log_lines, path, value)
return parsed_log_lines
def get_path_to_nested_dict(obj, path):
path_keys = path.split('.')
leaf = obj
for path_key in path_keys:
leaf = leaf[path_key]
return leaf
def match_filename(filename, value):
if 'plots' in filename and 'summary' in value:
return True
elif 'plots' not in filename:
return True
else:
return False
# populating pipeline info
def populating_pipeline(sasid, pulp, loglines, feedlines):
def populating_pipeline(sasid, filename, pulp, loglines, feedlines):
parsed_feedback = parse_feedback_lines(feedlines)
dataproducts = get_path_to_nested_dict(parsed_feedback, 'LOFAR.ObsSW.Observation.DataProducts')
dataproduct = None
dataproduct_key = None
for key, value in dataproducts.items():
if '[' not in key:
continue
if match_filename(filename, value['filename']):
dataproduct = value
dataproduct_key = key
break
pulp['dataType'] = dataproduct['datatype']
# single-pulse on/off
res = [ii for ii in loglines if "Single-pulse" in ii]
if len(res) == 0:
......
[{
"path": "tar_archive",
"file_name": "LOFAR_PULSAR_ARCHIVE_locus014_L192549_red.tar",
"metadata": {
"Observation": {
"Parset": "L192549.parset",
"SAS Id": "",
"Creator": "AWTIER0",
"Project": "LC1_027",
"End Time": "2013-12-04 10:13:00",
"Privileges": 4,
"Start Time": "2013-12-04 10:03:00",
"Antenna Set": "HBA Dual",
"Clock [MHz]": "200",
"Time System": "UTC",
"Duration [s]": 600,
"Release Date": "",
"SAS Id Source": "SAS",
"Strategy Name": "BeamObservation",
"Observing Mode": "Beam Observation",
"Bits Per Sample": 8,
"Nr Stations Core": 46,
"Instrument Filter": "110-190 MHz",
"Station Selection": "Custom",
"Nr Stations Remote": 0,
"Number of Stations": 46,
"Channel Width [MHz]": 0.195312,
"Channels Per Subband": 1,
"Strategy Description": "default",
"Observing Description": "HBA Dual, 110-190 MHz, 10.0m, 8 bit, 1 ch/sub, Nsubs=400",
"Process Identifier Name": "B1642-03",
"Nr Stations International": 0,
"Process Identifier Source": "MoM",
"Number of SubArray Pointings": 1,
"Number of BeamFormed DataProducts": 80,
"Number of Correlated DataProducts": 0,
"Number of Transient BufferBoard Events": -1
},
"fileContent": [
"L192549_red/1642-03.par",
"L192549_red/rawvoltages/SAP0/BEAM0/L192549_SAP000_B000_S0_P010_bf.h5",
"L192549_red/rawvoltages/SAP0/BEAM0/L192549_SAP000_B000_S2_P010_bf.h5",
"L192549_red/rawvoltages/SAP0/BEAM0/L192549_SAP000_B000_S1_P010_bf.h5",
"L192549_red/rawvoltages/SAP0/BEAM0/L192549_SAP000_B000_S3_P010_bf.h5",
"L192549_red/rawvoltages/SAP0/BEAM0/B1642-03_L192549_SAP0_BEAM0_PART10.ar",
"L192549_red/L192549_sap000_beam0000_part10.log"
],
"Pulsar Pipeline": {
"Creator": "AWTIER0",
"Project": "LC1_027",
"Pulsars": 0,
"End Time": "2013-12-05 00:32:45",
"Privileges": 4,
"Start Time": "2013-12-04 11:20:03",
"Duration [s]": -38838,
"Release Date": "",
"skipPrepfold": 0,
"Pipeline Name": "B1642-03/PULP",
"Strategy Name": "Pulsar Pipeline",
"skipDataFolding": 0,
"skipRFIExcision": 0,
"Pipeline Version": "n/a",
"Pulsar Selection": "Pulsars in observation specs, file or SAP",
"convertRawTo8bit": 0,
"skipDynamicSpectrum": 0,
"Strategy Description": "default",
"doSinglePulseAnalysis": 0,
"Process Identifier Name": "B1642-03/PULP",
"subIntegrationLength [s]": -1,
"skipOptimizePulsarProfile": 1,
"skipConvertRawIntoFoldedPSRFITS": 1,
"runRotationalRAdioTransientsAnalysis": 0
}
}
},
{
"path": "tar_archive",
"metadata": {
"dataProduct": {
"size": 12345,
"_type": "PulpDataProduct",
"fileName": "LOFAR_PULSAR_ARCHIVE_locus014_L186113_red.tar",
"md5checksum": "94c31d5780125a4b2cc0a3cb0616c2f6"
},
"pipelineRun": {
"_type": "PulsarPipeline",
"Observation": {
"Parset": "L186113.parset",
"SAS Id": "",
"Creator": "AWTIER0",
"Project": "LC0_011",
"End Time": "2013-11-09 17:56:00",
"Privileges": 4,
"Start Time": "2013-11-09 17:46:00",
"Antenna Set": "HBA Dual",
"Clock [MHz]": "200",
"Time System": "UTC",
"Duration [s]": 600,
"Release Date": "",
"SAS Id Source": "SAS",
"Strategy Name": "BeamObservation",
"Observing Mode": "Beam Observation",
"Bits Per Sample": 8,
"Nr Stations Core": 46,
"Instrument Filter": "110-190 MHz",
"Station Selection": "Custom",
"Nr Stations Remote": 0,
"Number of Stations": 46,
"Channel Width [MHz]": 0.195312,
"Channels Per Subband": 1,
"Strategy Description": "default",
"Observing Description": "HBA Dual, 110-190 MHz, 10.0m, 8 bit, 1 ch/sub, Nsubs=400",
"Process Identifier Name": "J1923+2515",
"Nr Stations International": 0,
"Process Identifier Source": "MoM",
"Number of SubArray Pointings": 1,
"Number of BeamFormed DataProducts": 80,
"Number of Correlated DataProducts": 0,
"Number of Transient BufferBoard Events": -1
},
"fileContent": [
"L186113_red/1923+2515.par",
"L186113_red/rawvoltages/SAP0/BEAM0/L186113_SAP000_B000_S0_P006_bf.h5",
"L186113_red/rawvoltages/SAP0/BEAM0/L186113_SAP000_B000_S2_P006_bf.h5",
"L186113_red/rawvoltages/SAP0/BEAM0/L186113_SAP000_B000_S1_P006_bf.h5",
"L186113_red/rawvoltages/SAP0/BEAM0/L186113_SAP000_B000_S3_P006_bf.h5",
"L186113_red/rawvoltages/SAP0/BEAM0/J1923+2515_L186113_SAP0_BEAM0_PART6.ar",
"L186113_red/L186113_sap000_beam0000_part06.log"
],
"pipelineName": "beamformed",
"strategyName": "Beamformed fixing",
"Pulsar Pipeline": {
"Creator": "AWTIER0",
"Project": "LC0_011",
"Pulsars": 0,
"End Time": "2013-11-18 19:54:18",
"Privileges": 4,
"Start Time": "2013-11-18 14:47:05",
"Duration [s]": 18433,
"Release Date": "",
"skipPrepfold": 0,
"Pipeline Name": "J1923+2515/PULP",
"Strategy Name": "Pulsar Pipeline",
"skipDataFolding": 0,
"skipRFIExcision": 0,
"Pipeline Version": "n/a",
"Pulsar Selection": "Pulsars in observation specs, file or SAP",
"convertRawTo8bit": 0,
"skipDynamicSpectrum": 0,
"Strategy Description": "default",
"doSinglePulseAnalysis": 0,
"Process Identifier Name": "J1923+2515/PULP",
"subIntegrationLength [s]": -1,
"skipOptimizePulsarProfile": 0,
"skipConvertRawIntoFoldedPSRFITS": 1,
"runRotationalRAdioTransientsAnalysis": 0
},
"pipelineVersion": "v0.0.0",
"strategyDescription": "Pulsar Pipeline"
}
},
"file_name": "LOFAR_PULSAR_ARCHIVE_locus014_L186113_red.tar"
}]
\ No newline at end of file
cwlVersion: v1.2
class: CommandLineTool
inputs:
- id: archive
doc: Archive to compute md5 checksum for
type: File
inputBinding:
position: 1
stdout: std.out
outputs:
- id: md5sum
doc: MD5 sum
type: string
outputBinding:
glob: "std.out"
loadContents: true
outputEval: $(self[0].contents.split(' ')[0])
baseCommand:
- md5sum
requirements:
- class: InlineJavascriptRequirement
hints:
- class: DockerRequirement
dockerPull: git.astron.nl:5000/ldv/bf_double_tgz:latest
\ No newline at end of file
......@@ -8,6 +8,11 @@ inputs:
type: string
inputBinding:
position: 0
- id: filename
doc: File name
type: string
inputBinding:
position: 1
- id: log_root_folder
doc: The absolute path where the pulp-logs are located
type: Directory
......
......@@ -9,6 +9,10 @@ inputs:
type: string
- id: file_name
type: string
- id: md5sum
type: string
- id: filesize
type: int
outputs:
- id: ingest
type: Any
......@@ -16,11 +20,26 @@ requirements:
- class: InlineJavascriptRequirement
expression: |
${
inputs.metadata['fileContent'] = inputs.file_content
var dataProduct = {
"size": inputs.filesize,
"_type": "PulpDataProduct",
"fileName": inputs.file_name,
"md5checksum": inputs.md5sum,
"fileFormat": "PULP",
"storageWriter": "Unknown",
"dataProductType": "Pulsar pipeline output",
"storageWriterVersion": "Unknown"
}
var pipelineRun = inputs.metadata
pipelineRun['fileContent'] = inputs.file_content
return { "ingest": {
"path": inputs.output_name,
"file_name": inputs.file_name,
"metadata": inputs.metadata
"metadata": {
"dataProduct": dataProduct,
"pipelineRun": pipelineRun
}
}
}
}
......@@ -32,8 +32,18 @@ steps:
valueFrom: $(self.basename.match('L([0-9]+)')[1])
- id: log_root_folder
source: pulp_log_folder
- id: filename
source: bf_tar_archive
valueFrom: $(self.basename)
out:
- id: output_json
- id: compute_md5sum
run: ../steps/computemd5.cwl
in:
- id: archive
source: eliminate_double_tgz/output_tar
out:
- id: md5sum
- id: format_ingest
run: ../steps/format_ingest.cwl
in:
......@@ -43,9 +53,14 @@ steps:
source: eliminate_double_tgz/file_content
- id: output_name
default: tar_archive
- id: md5sum
source: compute_md5sum/md5sum
- id: file_name
source: eliminate_double_tgz/output_tar
valueFrom: $(self.basename)
- id: filesize
source: eliminate_double_tgz/output_tar
valueFrom: $(self.size)
out:
- id: ingest
requirements:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment