Select Git revision
smearterms.cpp
-
Mattia Mancini authoredMattia Mancini authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
input_validation.py 4.23 KiB
import logging;
import os.path
logger = logging.getLogger(__name__)
# copied from ldv_ingest module
# https://git.astron.nl/astron-sdc/ldv-services/-/blob/master/atdb_services_pip/atdb_services/ldv_ingest.py?ref_type=heads#L1721
def get_filename_without_hash(file_name):
"""
Remove magic number (hash) between latest '_' and '.'
The hash should be 8 characters long otherwise the original file_name will be returned
e.g. LOFAR_PULSAR_ARCHIVE_locus004_L231550_red_c122bb36.tar will result in
LOFAR_PULSAR_ARCHIVE_locus004_L231550_red.tar
:param file_name:
:return file_name_without_hash:
"""
file_name_without_hash = file_name
if "_" and "." in file_name:
file_start_of_hash = file_name.rsplit("_", 1)[0]
file_end_of_hash = file_name.rsplit(".", 1)[1]
file_name_without_hash = f"{file_start_of_hash}.{file_end_of_hash}"
# Check difference in length should be 8 char + '_'
if (len(file_name) - len(file_name_without_hash)) != 9:
logger.warning(f"No correct hash found in '{file_name_without_hash}'. Use the original filename")
file_name_without_hash = file_name
# only return the filename without the path
filename = os.path.basename(file_name_without_hash)
return filename
def check_duplicates(data):
"""
task.inputs can contain double entries (caused by faulty ingests during the 'mom era'). Remove them.
The following is an example of an inputs section with a duplicate 'surl', the duplicate will be removed
:return: found_duplicates is True when a duplicate was found
:return:
[
{
"size": 33638400,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar",
"type": "File",
"location": "srm.grid.sara.nl"
},
{
"size": 33638400,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB001_uv.dppp.MS_74948c4c.tar",
"type": "File",
"location": "srm.grid.sara.nl"
},
{
"size": 33638400,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar",
"type": "File",
"location": "srm.grid.sara.nl"
}]
"""
logger.info(f'check_duplicates')
if not data:
# tasks without inputs should just return
return False,None,None
found_duplicates = False
seen_surls = set()
unique_inputs = []
duplicates = []
for item in data:
filename_without_hash = get_filename_without_hash(item["surl"])
if filename_without_hash not in seen_surls:
unique_inputs.append(item)
seen_surls.add(filename_without_hash)
else:
duplicates.append(filename_without_hash)
found_duplicates = True
logger.info(f'duplicate found: {filename_without_hash}')
return found_duplicates,duplicates,unique_inputs
def recalculate_size(data):
"""
Operators or ATDB could have removed files from the inputs json blob.
If that has happened, the task.size_to_process needs to be recalculated
"""
logger.info(f'recalculate_size')
new_size = sum([e["size"] for e in data])
return new_size
def validate_inputs(task):
"""
check if the task.inputs is valid
- currently only duplicates are checked, but this functionality can be expanded
"""
# --- check for duplicate files in task.inputs ---
found_duplicates,duplicates,unique_inputs = check_duplicates(task.inputs)
# if a duplicate was found, recalculate the size_to_process and update the inputs list of the task
if found_duplicates:
# set a failed state.
new_status = task.new_status + "_failed"
task.new_status = new_status
# add some information for the user
# (abuse the unused 'task.environment' field as a 'cache' to transport this to a 'signal' (to avoid circular imports)
task.environment = f"inputs_duplicates::{duplicates}"
# --- recalculate sizes ---
size_to_process = recalculate_size(task.inputs)
task.size_to_process = size_to_process