diff --git a/atdb/taskdatabase/models.py b/atdb/taskdatabase/models.py index e3422b212b4e86c9f74a6a9cd78a6add8ae43866..4e661494319e8a9b45212b7b9a1a4bf2c03c352b 100644 --- a/atdb/taskdatabase/models.py +++ b/atdb/taskdatabase/models.py @@ -9,6 +9,7 @@ import logging from .services import calculated_qualities as qualities from .services.common import State, AggregationStrategy, ACTIVITY_RESET_STATUSSEN +from .services.specification.input_validation import validate_inputs logger = logging.getLogger(__name__) @@ -271,6 +272,10 @@ class Task(models.Model): # make sure that every task has an activity (also for backward compatibility) associate_task_with_activity(self) + # when a task comes in as DEFINING some validation needs to be done. + if (self.status != State.DEFINING.value) & (self.new_status == State.DEFINING.value): + validate_inputs(self) + # when a task goes to PROCESSED... handle the (potential) aggregation functionality if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value): self.handle_aggregation() diff --git a/atdb/taskdatabase/services/common.py b/atdb/taskdatabase/services/common.py index a9b60ce7dfc3aec50adb219f40ba669cfaa653de..835dfc17efe55d1f01c08b4b664c42faa24b0b24 100644 --- a/atdb/taskdatabase/services/common.py +++ b/atdb/taskdatabase/services/common.py @@ -9,6 +9,7 @@ from enum import Enum logger = logging.getLogger(__name__) class State(Enum): UNKNOWN = "unknown" + DEFINING = "defining" DEFINED = "defined" STAGED = "staged" FETCHED = "fetched" diff --git a/atdb/taskdatabase/services/specification/input_validation.py b/atdb/taskdatabase/services/specification/input_validation.py new file mode 100644 index 0000000000000000000000000000000000000000..1cc13fce64a49dc994cb25db16cd49bda41cd0aa --- /dev/null +++ b/atdb/taskdatabase/services/specification/input_validation.py @@ -0,0 +1,69 @@ +import logging; +logger = logging.getLogger(__name__) + +def remove_duplicates(data): + """ + task.inputs can contain double entries (caused by faulty ingests during the 'mom era'). Remove them. + The following is an example of an inputs section with a duplicate 'surl', the duplicate will be removed + [ + { + "size": 33638400, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }, + { + "size": 33638400, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB001_uv.dppp.MS_74948c4c.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }, + { + "size": 33638400, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }] + + """ + logger.info(f'check_for_duplicates') + + recalculate = False + seen_surls = set() + unique_inputs = [] + + for item in data: + if item["surl"] not in seen_surls: + unique_inputs.append(item) + seen_surls.add(item["surl"]) + else: + recalculate = True + logger.info(f'duplicate found: {item["surl"]}') + + return recalculate,unique_inputs + + +def recalculate_size(data): + """ + Operators or ATDB could have removed files from the inputs json blob. + If that has happened, the task.size_to_process needs to be recalculated + """ + logger.info(f'recalculate_sums') + new_size = sum([e["size"] for e in data]) + return new_size + +def validate_inputs(task): + """ + check if the task.inputs is valid + - currently only duplicates are checked, but this functionality can be expanded + """ + inputs = task.inputs + + # remove duplicates + recalculate,unique_inputs = remove_duplicates(inputs) + + # if a duplicate was found, recalculate the size_to_process and update the inputs list of the task + if recalculate: + task.inputs = unique_inputs + recalculate_size(unique_inputs) + task.size_to_process = recalculate_size(unique_inputs) \ No newline at end of file diff --git a/atdb/taskdatabase/templates/taskdatabase/index.html b/atdb/taskdatabase/templates/taskdatabase/index.html index 370d1dbb990984b016dc0e9134a03eb34e02e945..080cda67cb38c15d925b318855f881db17e863e9 100644 --- a/atdb/taskdatabase/templates/taskdatabase/index.html +++ b/atdb/taskdatabase/templates/taskdatabase/index.html @@ -31,7 +31,7 @@ {% include 'taskdatabase/pagination.html' %} </div> </div> - <p class="footer"> Version 30 Jan 2025</p> + <p class="footer"> Version 3 Feb 2025</p> </div> {% include 'taskdatabase/refresh.html' %} diff --git a/atdb/taskdatabase/tests/test_input_validation.py b/atdb/taskdatabase/tests/test_input_validation.py new file mode 100644 index 0000000000000000000000000000000000000000..f847fc72e25c9fd7510982b0a00f9a2cc5fed69c --- /dev/null +++ b/atdb/taskdatabase/tests/test_input_validation.py @@ -0,0 +1,69 @@ +from django.test import TestCase +from django.utils import timezone +from datetime import datetime +from taskdatabase.models import LogEntry, Task, Workflow, Job, Configuration +from taskdatabase.services.specification import input_validation +from unittest.mock import Mock, MagicMock, patch + +class TestInputValidation(TestCase): + + def setUp(self): + + # used to test the get_size calculation, this uses a database + self.workflow = Workflow(id=22, workflow_uri="psrfits_requantisation") + self.workflow.save() + + self.inputs_in = [ + { + "size": 100, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }, + { + "size": 200, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB001_uv.dppp.MS_74948c4c.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }, + { + "size": 300, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }] + + + self.task = Task.objects.create(sas_id='5432', status='defining', new_status='defining', + size_to_process = 100, workflow=self.workflow, inputs = self.inputs_in) + self.task.save() + + def test_validate_inputs(self): + + # arrange + expected_size = 300 + expected_inputs = [ + { + "size": 100, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }, + { + "size": 200, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB001_uv.dppp.MS_74948c4c.tar", + "type": "File", + "location": "srm.grid.sara.nl" + }] + + # act + input_validation.validate_inputs(self.task) + inputs = self.task.inputs + size = self.task.size_to_process + + # assert + self.assertEqual(inputs, expected_inputs) + self.assertEqual(size, expected_size) + + +