diff --git a/atdb/atdb/static/taskdatabase/style.css b/atdb/atdb/static/taskdatabase/style.css index 258fea6f7d651f99114aa1a96a4fb24c02182932..b7bc0ad4864e16e52e057104d69501074f7c9921 100644 --- a/atdb/atdb/static/taskdatabase/style.css +++ b/atdb/atdb/static/taskdatabase/style.css @@ -27,7 +27,7 @@ TD { background-color: lightgreen; } -.aggregate_failed,.aggregating_failed { +.aggregate_failed,.aggregating_failed,.defining_failed,.defined_failed { color: red; font-weight: bold; } diff --git a/atdb/taskdatabase/models.py b/atdb/taskdatabase/models.py index e3422b212b4e86c9f74a6a9cd78a6add8ae43866..293a41e1ef08deda41faa2386261b3fb342423ef 100644 --- a/atdb/taskdatabase/models.py +++ b/atdb/taskdatabase/models.py @@ -9,6 +9,7 @@ import logging from .services import calculated_qualities as qualities from .services.common import State, AggregationStrategy, ACTIVITY_RESET_STATUSSEN +from .services.specification.input_validation import validate_inputs,recalculate_size logger = logging.getLogger(__name__) @@ -271,6 +272,10 @@ class Task(models.Model): # make sure that every task has an activity (also for backward compatibility) associate_task_with_activity(self) + # when a task becomes DEFINING or DEFINED some validation needs to be done. + if self.new_status in [State.DEFINING.value, State.DEFINED.value]: + validate_inputs(self) + # when a task goes to PROCESSED... handle the (potential) aggregation functionality if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value): self.handle_aggregation() diff --git a/atdb/taskdatabase/services/common.py b/atdb/taskdatabase/services/common.py index a9b60ce7dfc3aec50adb219f40ba669cfaa653de..835dfc17efe55d1f01c08b4b664c42faa24b0b24 100644 --- a/atdb/taskdatabase/services/common.py +++ b/atdb/taskdatabase/services/common.py @@ -9,6 +9,7 @@ from enum import Enum logger = logging.getLogger(__name__) class State(Enum): UNKNOWN = "unknown" + DEFINING = "defining" DEFINED = "defined" STAGED = "staged" FETCHED = "fetched" diff --git a/atdb/taskdatabase/services/signals.py b/atdb/taskdatabase/services/signals.py index 2442e3d78cb5bdb571aeab3969bc2a6f182dd420..9a81796f3563c837131e42fe021773ba7b857e54 100644 --- a/atdb/taskdatabase/services/signals.py +++ b/atdb/taskdatabase/services/signals.py @@ -5,15 +5,32 @@ from django.core.signals import request_started, request_finished from django.contrib.auth.models import User from django.dispatch import receiver from django.contrib.contenttypes.models import ContentType +from django.utils import timezone from taskdatabase.models import Task, Workflow, LogEntry, Status from .activities_handler import update_activity + """ Signals sent from different parts of the backend are centrally defined and handled here. """ logger = logging.getLogger(__name__) +def add_logentry(task, step, status, description, service='ATDB'): + """ + add log entry + usually this functionality is called by external services directly using the REST API to report to ATDB + but there are also 'internal services', like input_validation, that can report to the user this way. + """ + logentry = LogEntry( + task=task, + service=service, + step_name=step, + status=status, + description=description, + timestamp=timezone.now()) + logentry.save() + #--- Task signals------------- @@ -67,6 +84,16 @@ def handle_post_save(sender, **kwargs): update_activity(task) + if task.environment: + step = task.environment.split('::')[0] + description = task.environment.split('::')[1] + add_logentry(task,step,task.status,description) + + # clear the 'cache' + task.environment='' + disconnect_signals() + task.save() + connect_signals() def connect_signals(): #logger.info("connect_signals") diff --git a/atdb/taskdatabase/services/specification/input_validation.py b/atdb/taskdatabase/services/specification/input_validation.py new file mode 100644 index 0000000000000000000000000000000000000000..4acbff89bef248ad633df2fce9bd444161a50acb --- /dev/null +++ b/atdb/taskdatabase/services/specification/input_validation.py @@ -0,0 +1,124 @@ +import logging; +import os.path + +logger = logging.getLogger(__name__) + +# copied from ldv_ingest module +# https://git.astron.nl/astron-sdc/ldv-services/-/blob/master/atdb_services_pip/atdb_services/ldv_ingest.py?ref_type=heads#L1721 +def get_filename_without_hash(file_name): + """ + Remove magic number (hash) between latest '_' and '.' + The hash should be 8 characters long otherwise the original file_name will be returned + e.g. LOFAR_PULSAR_ARCHIVE_locus004_L231550_red_c122bb36.tar will result in + LOFAR_PULSAR_ARCHIVE_locus004_L231550_red.tar + :param file_name: + :return file_name_without_hash: + """ + file_name_without_hash = file_name + if "_" and "." in file_name: + file_start_of_hash = file_name.rsplit("_", 1)[0] + file_end_of_hash = file_name.rsplit(".", 1)[1] + file_name_without_hash = f"{file_start_of_hash}.{file_end_of_hash}" + + # Check difference in length should be 8 char + '_' + if (len(file_name) - len(file_name_without_hash)) != 9: + logger.warning(f"No correct hash found in '{file_name_without_hash}'. Use the original filename") + file_name_without_hash = file_name + + # only return the filename without the path + filename = os.path.basename(file_name_without_hash) + return filename + +def check_duplicates(data): + """ + task.inputs can contain double entries (caused by faulty ingests during the 'mom era'). Remove them. + The following is an example of an inputs section with a duplicate 'surl', the duplicate will be removed + + :return: found_duplicates is True when a duplicate was found + :return: + [ + { + "size": 33638400, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }, + { + "size": 33638400, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB001_uv.dppp.MS_74948c4c.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }, + { + "size": 33638400, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }] + + """ + logger.info(f'check_duplicates') + + if not data: + # tasks without inputs should just return + return False,None,None + + found_duplicates = False + seen_surls = set() + unique_inputs = [] + duplicates = [] + + for item in data: + + filename_without_hash = get_filename_without_hash(item["surl"]) + + if filename_without_hash not in seen_surls: + unique_inputs.append(item) + seen_surls.add(filename_without_hash) + else: + duplicates.append(filename_without_hash) + found_duplicates = True + logger.info(f'duplicate found: {filename_without_hash}') + + return found_duplicates,duplicates,unique_inputs + + +def recalculate_size(data): + """ + Operators or ATDB could have removed files from the inputs json blob. + If that has happened, the task.size_to_process needs to be recalculated + """ + if not data: + # tasks without inputs should just return + return None + + logger.info(f'recalculate_size') + new_size = sum([e["size"] for e in data]) + return new_size + + +def validate_inputs(task): + """ + check if the task.inputs is valid + - currently only duplicates are checked, but this functionality can be expanded + """ + + # --- check for duplicate files in task.inputs --- + found_duplicates,duplicates,unique_inputs = check_duplicates(task.inputs) + + # if a duplicate was found, recalculate the size_to_process and update the inputs list of the task + if found_duplicates: + + # set a failed state. + new_status = task.new_status + "_failed" + task.new_status = new_status + + # add some information for the user + # (abuse the unused 'task.environment' field as a 'cache' to transport this to a 'signal' (to avoid circular imports) + task.environment = f"inputs_duplicates::{duplicates}" + + + # --- recalculate sizes --- + size_to_process = recalculate_size(task.inputs) + if size_to_process: + task.size_to_process = size_to_process \ No newline at end of file diff --git a/atdb/taskdatabase/static/taskdatabase/style.css b/atdb/taskdatabase/static/taskdatabase/style.css index 258fea6f7d651f99114aa1a96a4fb24c02182932..b7bc0ad4864e16e52e057104d69501074f7c9921 100644 --- a/atdb/taskdatabase/static/taskdatabase/style.css +++ b/atdb/taskdatabase/static/taskdatabase/style.css @@ -27,7 +27,7 @@ TD { background-color: lightgreen; } -.aggregate_failed,.aggregating_failed { +.aggregate_failed,.aggregating_failed,.defining_failed,.defined_failed { color: red; font-weight: bold; } diff --git a/atdb/taskdatabase/templates/taskdatabase/index.html b/atdb/taskdatabase/templates/taskdatabase/index.html index 370d1dbb990984b016dc0e9134a03eb34e02e945..5bc1d629f39e1c6b7f457b52c2d1cbb447a9bdef 100644 --- a/atdb/taskdatabase/templates/taskdatabase/index.html +++ b/atdb/taskdatabase/templates/taskdatabase/index.html @@ -31,7 +31,7 @@ {% include 'taskdatabase/pagination.html' %} </div> </div> - <p class="footer"> Version 30 Jan 2025</p> + <p class="footer"> Version 4 Feb 2025</p> </div> {% include 'taskdatabase/refresh.html' %} diff --git a/atdb/taskdatabase/tests/test_input_validation.py b/atdb/taskdatabase/tests/test_input_validation.py new file mode 100644 index 0000000000000000000000000000000000000000..a3e768f7affa12c74bb335ba0faa3d656bdf276b --- /dev/null +++ b/atdb/taskdatabase/tests/test_input_validation.py @@ -0,0 +1,106 @@ +from django.test import TestCase +from django.utils import timezone +from datetime import datetime +from taskdatabase.models import LogEntry, Task, Workflow +from taskdatabase.services.specification import input_validation + +class TestInputValidation(TestCase): + + def setUp(self): + + # used to test the get_size calculation, this uses a database + self.workflow = Workflow(id=22, workflow_uri="psrfits_requantisation") + self.workflow.save() + + self.inputs_with_duplicate = [ + { + "size": 100, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }, + { + "size": 200, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB001_uv.dppp.MS_74948c4c.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }, + { + "size": 300, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }] + + self.inputs_validated = [ + { + "size": 100, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }, + { + "size": 200, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB001_uv.dppp.MS_74948c4c.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }] + + self.task = Task.objects.create(sas_id='5432', status='defining', new_status='defining', + size_to_process = 100, workflow=self.workflow, inputs = self.inputs_with_duplicate) + self.task.save() + + + def test_check_duplicates(self): + """ + test if duplicates are detected + """ + # arrange + expected_size = 600 + expected_duplicates = ['L432340_SB000_uv.dppp.MS.tar'] + + # act + input_validation.validate_inputs(self.task) + found_duplicates, duplicates, unique_inputs = input_validation.check_duplicates(self.inputs_with_duplicate) + + # assert + self.assertEqual(duplicates, expected_duplicates) + self.assertEqual(self.task.size_to_process, expected_size) + + + def test_trigger_on_defined(self): + """ + test that the functionality (also) triggers on 'defined' + """ + + # arrange + expected_size = 300 + + # set to 'wrong' inputs containing a duplicate + self.task.inputs = self.inputs_validated + + # act + self.task.new_status="defined" + self.task.save() + + # assert + self.assertEqual(self.task.size_to_process, expected_size) + + def test_no_trigger_on_staged(self): + """ + test that the functionality does not trigger on other statusses, like 'staged' + the inputs.validated has a total sum of 300, but 'staged' should not recalculate the new size. + """ + + # arrange + expected_size = 600 + + # set to 'wrong' inputs containing a duplicate + self.task.inputs = self.inputs_validated + + # act + self.task.new_status="staged" + self.task.save() + + # assert + self.assertEqual(self.task.size_to_process, expected_size)