From 4647b2e66a80c1dccdcdde7c7c9cf383242e9d62 Mon Sep 17 00:00:00 2001 From: Vermaas <vermaas@astron.nl> Date: Tue, 4 Feb 2025 16:23:44 +0100 Subject: [PATCH] adapted the functionality based on review --- atdb/atdb/static/taskdatabase/style.css | 2 +- atdb/taskdatabase/models.py | 2 +- atdb/taskdatabase/services/signals.py | 27 +++++++ .../specification/input_validation.py | 78 +++++++++++++++---- .../static/taskdatabase/style.css | 2 +- .../templates/taskdatabase/index.html | 2 +- .../tests/test_input_validation.py | 28 ++++--- 7 files changed, 106 insertions(+), 35 deletions(-) diff --git a/atdb/atdb/static/taskdatabase/style.css b/atdb/atdb/static/taskdatabase/style.css index 258fea6f..b7bc0ad4 100644 --- a/atdb/atdb/static/taskdatabase/style.css +++ b/atdb/atdb/static/taskdatabase/style.css @@ -27,7 +27,7 @@ TD { background-color: lightgreen; } -.aggregate_failed,.aggregating_failed { +.aggregate_failed,.aggregating_failed,.defining_failed,.defined_failed { color: red; font-weight: bold; } diff --git a/atdb/taskdatabase/models.py b/atdb/taskdatabase/models.py index 0ed582db..293a41e1 100644 --- a/atdb/taskdatabase/models.py +++ b/atdb/taskdatabase/models.py @@ -9,7 +9,7 @@ import logging from .services import calculated_qualities as qualities from .services.common import State, AggregationStrategy, ACTIVITY_RESET_STATUSSEN -from .services.specification.input_validation import validate_inputs +from .services.specification.input_validation import validate_inputs,recalculate_size logger = logging.getLogger(__name__) diff --git a/atdb/taskdatabase/services/signals.py b/atdb/taskdatabase/services/signals.py index 2442e3d7..9a81796f 100644 --- a/atdb/taskdatabase/services/signals.py +++ b/atdb/taskdatabase/services/signals.py @@ -5,15 +5,32 @@ from django.core.signals import request_started, request_finished from django.contrib.auth.models import User from django.dispatch import receiver from django.contrib.contenttypes.models import ContentType +from django.utils import timezone from taskdatabase.models import Task, Workflow, LogEntry, Status from .activities_handler import update_activity + """ Signals sent from different parts of the backend are centrally defined and handled here. """ logger = logging.getLogger(__name__) +def add_logentry(task, step, status, description, service='ATDB'): + """ + add log entry + usually this functionality is called by external services directly using the REST API to report to ATDB + but there are also 'internal services', like input_validation, that can report to the user this way. + """ + logentry = LogEntry( + task=task, + service=service, + step_name=step, + status=status, + description=description, + timestamp=timezone.now()) + logentry.save() + #--- Task signals------------- @@ -67,6 +84,16 @@ def handle_post_save(sender, **kwargs): update_activity(task) + if task.environment: + step = task.environment.split('::')[0] + description = task.environment.split('::')[1] + add_logentry(task,step,task.status,description) + + # clear the 'cache' + task.environment='' + disconnect_signals() + task.save() + connect_signals() def connect_signals(): #logger.info("connect_signals") diff --git a/atdb/taskdatabase/services/specification/input_validation.py b/atdb/taskdatabase/services/specification/input_validation.py index 9ed544bc..8000ad8b 100644 --- a/atdb/taskdatabase/services/specification/input_validation.py +++ b/atdb/taskdatabase/services/specification/input_validation.py @@ -1,10 +1,41 @@ import logging; +import os.path + logger = logging.getLogger(__name__) -def remove_duplicates(data): +# copied from ldv_ingest module +# https://git.astron.nl/astron-sdc/ldv-services/-/blob/master/atdb_services_pip/atdb_services/ldv_ingest.py?ref_type=heads#L1721 +def get_filename_without_hash(file_name): + """ + Remove magic number (hash) between latest '_' and '.' + The hash should be 8 characters long otherwise the original file_name will be returned + e.g. LOFAR_PULSAR_ARCHIVE_locus004_L231550_red_c122bb36.tar will result in + LOFAR_PULSAR_ARCHIVE_locus004_L231550_red.tar + :param file_name: + :return file_name_without_hash: + """ + file_name_without_hash = file_name + if "_" and "." in file_name: + file_start_of_hash = file_name.rsplit("_", 1)[0] + file_end_of_hash = file_name.rsplit(".", 1)[1] + file_name_without_hash = f"{file_start_of_hash}.{file_end_of_hash}" + + # Check difference in length should be 8 char + '_' + if (len(file_name) - len(file_name_without_hash)) != 9: + logger.warning(f"No correct hash found in '{file_name_without_hash}'. Use the original filename") + file_name_without_hash = file_name + + # only return the filename without the path + filename = os.path.basename(file_name_without_hash) + return filename + +def check_duplicates(data): """ task.inputs can contain double entries (caused by faulty ingests during the 'mom era'). Remove them. The following is an example of an inputs section with a duplicate 'surl', the duplicate will be removed + + :return: found_duplicates is True when a duplicate was found + :return: [ { "size": 33638400, @@ -26,25 +57,30 @@ def remove_duplicates(data): }] """ - logger.info(f'check_for_duplicates') + logger.info(f'check_duplicates') if not data: # tasks without inputs should just return return False,None - recalculate = False + found_duplicates = False seen_surls = set() unique_inputs = [] + duplicates = [] for item in data: - if item["surl"] not in seen_surls: + + filename_without_hash = get_filename_without_hash(item["surl"]) + + if filename_without_hash not in seen_surls: unique_inputs.append(item) - seen_surls.add(item["surl"]) + seen_surls.add(filename_without_hash) else: - recalculate = True - logger.info(f'duplicate found: {item["surl"]}') + duplicates.append(filename_without_hash) + found_duplicates = True + logger.info(f'duplicate found: {filename_without_hash}') - return recalculate,unique_inputs + return found_duplicates,duplicates, unique_inputs def recalculate_size(data): @@ -52,22 +88,32 @@ def recalculate_size(data): Operators or ATDB could have removed files from the inputs json blob. If that has happened, the task.size_to_process needs to be recalculated """ - logger.info(f'recalculate_sums') + logger.info(f'recalculate_size') new_size = sum([e["size"] for e in data]) return new_size + def validate_inputs(task): """ check if the task.inputs is valid - currently only duplicates are checked, but this functionality can be expanded """ - inputs = task.inputs - # remove duplicates - recalculate,unique_inputs = remove_duplicates(inputs) + # --- check for duplicate files in task.inputs --- + found_duplicates,duplicates,unique_inputs = check_duplicates(task.inputs) # if a duplicate was found, recalculate the size_to_process and update the inputs list of the task - if recalculate: - task.inputs = unique_inputs - recalculate_size(unique_inputs) - task.size_to_process = recalculate_size(unique_inputs) \ No newline at end of file + if found_duplicates: + + # set a failed state. + new_status = task.new_status + "_failed" + task.new_status = new_status + + # add some information for the user + # (abuse the unused 'task.environment' field as a 'cache' to transport this to a 'signal' (to avoid circular imports) + task.environment = f"inputs_duplicates::{duplicates}" + + + # --- recalculate sizes --- + size_to_process = recalculate_size(task.inputs) + task.size_to_process = size_to_process \ No newline at end of file diff --git a/atdb/taskdatabase/static/taskdatabase/style.css b/atdb/taskdatabase/static/taskdatabase/style.css index 258fea6f..b7bc0ad4 100644 --- a/atdb/taskdatabase/static/taskdatabase/style.css +++ b/atdb/taskdatabase/static/taskdatabase/style.css @@ -27,7 +27,7 @@ TD { background-color: lightgreen; } -.aggregate_failed,.aggregating_failed { +.aggregate_failed,.aggregating_failed,.defining_failed,.defined_failed { color: red; font-weight: bold; } diff --git a/atdb/taskdatabase/templates/taskdatabase/index.html b/atdb/taskdatabase/templates/taskdatabase/index.html index 080cda67..5bc1d629 100644 --- a/atdb/taskdatabase/templates/taskdatabase/index.html +++ b/atdb/taskdatabase/templates/taskdatabase/index.html @@ -31,7 +31,7 @@ {% include 'taskdatabase/pagination.html' %} </div> </div> - <p class="footer"> Version 3 Feb 2025</p> + <p class="footer"> Version 4 Feb 2025</p> </div> {% include 'taskdatabase/refresh.html' %} diff --git a/atdb/taskdatabase/tests/test_input_validation.py b/atdb/taskdatabase/tests/test_input_validation.py index 21e7f080..a3e768f7 100644 --- a/atdb/taskdatabase/tests/test_input_validation.py +++ b/atdb/taskdatabase/tests/test_input_validation.py @@ -1,9 +1,8 @@ from django.test import TestCase from django.utils import timezone from datetime import datetime -from taskdatabase.models import LogEntry, Task, Workflow, Job, Configuration +from taskdatabase.models import LogEntry, Task, Workflow from taskdatabase.services.specification import input_validation -from unittest.mock import Mock, MagicMock, patch class TestInputValidation(TestCase): @@ -51,21 +50,21 @@ class TestInputValidation(TestCase): size_to_process = 100, workflow=self.workflow, inputs = self.inputs_with_duplicate) self.task.save() - def test_validate_inputs(self): + + def test_check_duplicates(self): """ - run the validates_inputs function with a task.inputs that contains a duplicate. - The duplicate should be removed, and the size recalculated + test if duplicates are detected """ - # arrange - expected_size = 300 - expected_inputs = self.inputs_validated + expected_size = 600 + expected_duplicates = ['L432340_SB000_uv.dppp.MS.tar'] # act input_validation.validate_inputs(self.task) + found_duplicates, duplicates, unique_inputs = input_validation.check_duplicates(self.inputs_with_duplicate) # assert - self.assertEqual(self.task.inputs, expected_inputs) + self.assertEqual(duplicates, expected_duplicates) self.assertEqual(self.task.size_to_process, expected_size) @@ -76,33 +75,32 @@ class TestInputValidation(TestCase): # arrange expected_size = 300 - expected_inputs = self.inputs_validated # set to 'wrong' inputs containing a duplicate - self.task.inputs = self.inputs_with_duplicate + self.task.inputs = self.inputs_validated # act self.task.new_status="defined" self.task.save() # assert - self.assertEqual(self.task.inputs, expected_inputs) self.assertEqual(self.task.size_to_process, expected_size) def test_no_trigger_on_staged(self): """ test that the functionality does not trigger on other statusses, like 'staged' + the inputs.validated has a total sum of 300, but 'staged' should not recalculate the new size. """ # arrange - expected_inputs = self.inputs_with_duplicate + expected_size = 600 # set to 'wrong' inputs containing a duplicate - self.task.inputs = self.inputs_with_duplicate + self.task.inputs = self.inputs_validated # act self.task.new_status="staged" self.task.save() # assert - self.assertEqual(self.task.inputs, expected_inputs) + self.assertEqual(self.task.size_to_process, expected_size) -- GitLab