Skip to content
Snippets Groups Projects
Commit 4647b2e6 authored by Nico Vermaas's avatar Nico Vermaas
Browse files

adapted the functionality based on review

parent 96bff0fd
No related branches found
No related tags found
1 merge request!388Sdc 1663 inputs validation
Pipeline #106377 failed
......@@ -27,7 +27,7 @@ TD {
background-color: lightgreen;
}
.aggregate_failed,.aggregating_failed {
.aggregate_failed,.aggregating_failed,.defining_failed,.defined_failed {
color: red;
font-weight: bold;
}
......
......@@ -9,7 +9,7 @@ import logging
from .services import calculated_qualities as qualities
from .services.common import State, AggregationStrategy, ACTIVITY_RESET_STATUSSEN
from .services.specification.input_validation import validate_inputs
from .services.specification.input_validation import validate_inputs,recalculate_size
logger = logging.getLogger(__name__)
......
......@@ -5,15 +5,32 @@ from django.core.signals import request_started, request_finished
from django.contrib.auth.models import User
from django.dispatch import receiver
from django.contrib.contenttypes.models import ContentType
from django.utils import timezone
from taskdatabase.models import Task, Workflow, LogEntry, Status
from .activities_handler import update_activity
"""
Signals sent from different parts of the backend are centrally defined and handled here.
"""
logger = logging.getLogger(__name__)
def add_logentry(task, step, status, description, service='ATDB'):
"""
add log entry
usually this functionality is called by external services directly using the REST API to report to ATDB
but there are also 'internal services', like input_validation, that can report to the user this way.
"""
logentry = LogEntry(
task=task,
service=service,
step_name=step,
status=status,
description=description,
timestamp=timezone.now())
logentry.save()
#--- Task signals-------------
......@@ -67,6 +84,16 @@ def handle_post_save(sender, **kwargs):
update_activity(task)
if task.environment:
step = task.environment.split('::')[0]
description = task.environment.split('::')[1]
add_logentry(task,step,task.status,description)
# clear the 'cache'
task.environment=''
disconnect_signals()
task.save()
connect_signals()
def connect_signals():
#logger.info("connect_signals")
......
import logging;
import os.path
logger = logging.getLogger(__name__)
def remove_duplicates(data):
# copied from ldv_ingest module
# https://git.astron.nl/astron-sdc/ldv-services/-/blob/master/atdb_services_pip/atdb_services/ldv_ingest.py?ref_type=heads#L1721
def get_filename_without_hash(file_name):
"""
Remove magic number (hash) between latest '_' and '.'
The hash should be 8 characters long otherwise the original file_name will be returned
e.g. LOFAR_PULSAR_ARCHIVE_locus004_L231550_red_c122bb36.tar will result in
LOFAR_PULSAR_ARCHIVE_locus004_L231550_red.tar
:param file_name:
:return file_name_without_hash:
"""
file_name_without_hash = file_name
if "_" and "." in file_name:
file_start_of_hash = file_name.rsplit("_", 1)[0]
file_end_of_hash = file_name.rsplit(".", 1)[1]
file_name_without_hash = f"{file_start_of_hash}.{file_end_of_hash}"
# Check difference in length should be 8 char + '_'
if (len(file_name) - len(file_name_without_hash)) != 9:
logger.warning(f"No correct hash found in '{file_name_without_hash}'. Use the original filename")
file_name_without_hash = file_name
# only return the filename without the path
filename = os.path.basename(file_name_without_hash)
return filename
def check_duplicates(data):
"""
task.inputs can contain double entries (caused by faulty ingests during the 'mom era'). Remove them.
The following is an example of an inputs section with a duplicate 'surl', the duplicate will be removed
:return: found_duplicates is True when a duplicate was found
:return:
[
{
"size": 33638400,
......@@ -26,25 +57,30 @@ def remove_duplicates(data):
}]
"""
logger.info(f'check_for_duplicates')
logger.info(f'check_duplicates')
if not data:
# tasks without inputs should just return
return False,None
recalculate = False
found_duplicates = False
seen_surls = set()
unique_inputs = []
duplicates = []
for item in data:
if item["surl"] not in seen_surls:
filename_without_hash = get_filename_without_hash(item["surl"])
if filename_without_hash not in seen_surls:
unique_inputs.append(item)
seen_surls.add(item["surl"])
seen_surls.add(filename_without_hash)
else:
recalculate = True
logger.info(f'duplicate found: {item["surl"]}')
duplicates.append(filename_without_hash)
found_duplicates = True
logger.info(f'duplicate found: {filename_without_hash}')
return recalculate,unique_inputs
return found_duplicates,duplicates, unique_inputs
def recalculate_size(data):
......@@ -52,22 +88,32 @@ def recalculate_size(data):
Operators or ATDB could have removed files from the inputs json blob.
If that has happened, the task.size_to_process needs to be recalculated
"""
logger.info(f'recalculate_sums')
logger.info(f'recalculate_size')
new_size = sum([e["size"] for e in data])
return new_size
def validate_inputs(task):
"""
check if the task.inputs is valid
- currently only duplicates are checked, but this functionality can be expanded
"""
inputs = task.inputs
# remove duplicates
recalculate,unique_inputs = remove_duplicates(inputs)
# --- check for duplicate files in task.inputs ---
found_duplicates,duplicates,unique_inputs = check_duplicates(task.inputs)
# if a duplicate was found, recalculate the size_to_process and update the inputs list of the task
if recalculate:
task.inputs = unique_inputs
recalculate_size(unique_inputs)
task.size_to_process = recalculate_size(unique_inputs)
\ No newline at end of file
if found_duplicates:
# set a failed state.
new_status = task.new_status + "_failed"
task.new_status = new_status
# add some information for the user
# (abuse the unused 'task.environment' field as a 'cache' to transport this to a 'signal' (to avoid circular imports)
task.environment = f"inputs_duplicates::{duplicates}"
# --- recalculate sizes ---
size_to_process = recalculate_size(task.inputs)
task.size_to_process = size_to_process
\ No newline at end of file
......@@ -27,7 +27,7 @@ TD {
background-color: lightgreen;
}
.aggregate_failed,.aggregating_failed {
.aggregate_failed,.aggregating_failed,.defining_failed,.defined_failed {
color: red;
font-weight: bold;
}
......
......@@ -31,7 +31,7 @@
{% include 'taskdatabase/pagination.html' %}
</div>
</div>
<p class="footer"> Version 3 Feb 2025</p>
<p class="footer"> Version 4 Feb 2025</p>
</div>
{% include 'taskdatabase/refresh.html' %}
......
from django.test import TestCase
from django.utils import timezone
from datetime import datetime
from taskdatabase.models import LogEntry, Task, Workflow, Job, Configuration
from taskdatabase.models import LogEntry, Task, Workflow
from taskdatabase.services.specification import input_validation
from unittest.mock import Mock, MagicMock, patch
class TestInputValidation(TestCase):
......@@ -51,21 +50,21 @@ class TestInputValidation(TestCase):
size_to_process = 100, workflow=self.workflow, inputs = self.inputs_with_duplicate)
self.task.save()
def test_validate_inputs(self):
def test_check_duplicates(self):
"""
run the validates_inputs function with a task.inputs that contains a duplicate.
The duplicate should be removed, and the size recalculated
test if duplicates are detected
"""
# arrange
expected_size = 300
expected_inputs = self.inputs_validated
expected_size = 600
expected_duplicates = ['L432340_SB000_uv.dppp.MS.tar']
# act
input_validation.validate_inputs(self.task)
found_duplicates, duplicates, unique_inputs = input_validation.check_duplicates(self.inputs_with_duplicate)
# assert
self.assertEqual(self.task.inputs, expected_inputs)
self.assertEqual(duplicates, expected_duplicates)
self.assertEqual(self.task.size_to_process, expected_size)
......@@ -76,33 +75,32 @@ class TestInputValidation(TestCase):
# arrange
expected_size = 300
expected_inputs = self.inputs_validated
# set to 'wrong' inputs containing a duplicate
self.task.inputs = self.inputs_with_duplicate
self.task.inputs = self.inputs_validated
# act
self.task.new_status="defined"
self.task.save()
# assert
self.assertEqual(self.task.inputs, expected_inputs)
self.assertEqual(self.task.size_to_process, expected_size)
def test_no_trigger_on_staged(self):
"""
test that the functionality does not trigger on other statusses, like 'staged'
the inputs.validated has a total sum of 300, but 'staged' should not recalculate the new size.
"""
# arrange
expected_inputs = self.inputs_with_duplicate
expected_size = 600
# set to 'wrong' inputs containing a duplicate
self.task.inputs = self.inputs_with_duplicate
self.task.inputs = self.inputs_validated
# act
self.task.new_status="staged"
self.task.save()
# assert
self.assertEqual(self.task.inputs, expected_inputs)
self.assertEqual(self.task.size_to_process, expected_size)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment