Skip to content
Snippets Groups Projects
Commit 2b9a8ddb authored by Nico Vermaas's avatar Nico Vermaas
Browse files

SDC-1663 add input validation

parent 61308a7d
No related branches found
No related tags found
1 merge request!388Sdc 1663 inputs validation
Pipeline #106278 failed
...@@ -9,6 +9,7 @@ import logging ...@@ -9,6 +9,7 @@ import logging
from .services import calculated_qualities as qualities from .services import calculated_qualities as qualities
from .services.common import State, AggregationStrategy, ACTIVITY_RESET_STATUSSEN from .services.common import State, AggregationStrategy, ACTIVITY_RESET_STATUSSEN
from .services.specification.input_validation import validate_inputs
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -271,6 +272,10 @@ class Task(models.Model): ...@@ -271,6 +272,10 @@ class Task(models.Model):
# make sure that every task has an activity (also for backward compatibility) # make sure that every task has an activity (also for backward compatibility)
associate_task_with_activity(self) associate_task_with_activity(self)
# when a task comes in as DEFINING some validation needs to be done.
if (self.status != State.DEFINING.value) & (self.new_status == State.DEFINING.value):
validate_inputs(self)
# when a task goes to PROCESSED... handle the (potential) aggregation functionality # when a task goes to PROCESSED... handle the (potential) aggregation functionality
if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value): if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value):
self.handle_aggregation() self.handle_aggregation()
......
...@@ -9,6 +9,7 @@ from enum import Enum ...@@ -9,6 +9,7 @@ from enum import Enum
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class State(Enum): class State(Enum):
UNKNOWN = "unknown" UNKNOWN = "unknown"
DEFINING = "defining"
DEFINED = "defined" DEFINED = "defined"
STAGED = "staged" STAGED = "staged"
FETCHED = "fetched" FETCHED = "fetched"
......
import logging;
logger = logging.getLogger(__name__)
def remove_duplicates(data):
"""
task.inputs can contain double entries (caused by faulty ingests during the 'mom era'). Remove them.
The following is an example of an inputs section with a duplicate 'surl', the duplicate will be removed
[
{
"size": 33638400,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar",
"type": "File",
"location": "srm.grid.sara.nl"
},
{
"size": 33638400,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB001_uv.dppp.MS_74948c4c.tar",
"type": "File",
"location": "srm.grid.sara.nl"
},
{
"size": 33638400,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar",
"type": "File",
"location": "srm.grid.sara.nl"
}]
"""
logger.info(f'check_for_duplicates')
recalculate = False
seen_surls = set()
unique_inputs = []
for item in data:
if item["surl"] not in seen_surls:
unique_inputs.append(item)
seen_surls.add(item["surl"])
else:
recalculate = True
logger.info(f'duplicate found: {item["surl"]}')
return recalculate,unique_inputs
def recalculate_size(data):
"""
Operators or ATDB could have removed files from the inputs json blob.
If that has happened, the task.size_to_process needs to be recalculated
"""
logger.info(f'recalculate_sums')
new_size = sum([e["size"] for e in data])
return new_size
def validate_inputs(task):
"""
check if the task.inputs is valid
- currently only duplicates are checked, but this functionality can be expanded
"""
inputs = task.inputs
# remove duplicates
recalculate,unique_inputs = remove_duplicates(inputs)
# if a duplicate was found, recalculate the size_to_process and update the inputs list of the task
if recalculate:
task.inputs = unique_inputs
recalculate_size(unique_inputs)
task.size_to_process = recalculate_size(unique_inputs)
\ No newline at end of file
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
{% include 'taskdatabase/pagination.html' %} {% include 'taskdatabase/pagination.html' %}
</div> </div>
</div> </div>
<p class="footer"> Version 30 Jan 2025</p> <p class="footer"> Version 3 Feb 2025</p>
</div> </div>
{% include 'taskdatabase/refresh.html' %} {% include 'taskdatabase/refresh.html' %}
......
from django.test import TestCase
from django.utils import timezone
from datetime import datetime
from taskdatabase.models import LogEntry, Task, Workflow, Job, Configuration
from taskdatabase.services.specification import input_validation
from unittest.mock import Mock, MagicMock, patch
class TestInputValidation(TestCase):
def setUp(self):
# used to test the get_size calculation, this uses a database
self.workflow = Workflow(id=22, workflow_uri="psrfits_requantisation")
self.workflow.save()
self.inputs_in = [
{
"size": 100,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar",
"type": "File",
"location": "srm.grid.sara.nl"
},
{
"size": 200,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB001_uv.dppp.MS_74948c4c.tar",
"type": "File",
"location": "srm.grid.sara.nl"
},
{
"size": 300,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar",
"type": "File",
"location": "srm.grid.sara.nl"
}]
self.task = Task.objects.create(sas_id='5432', status='defining', new_status='defining',
size_to_process = 100, workflow=self.workflow, inputs = self.inputs_in)
self.task.save()
def test_validate_inputs(self):
# arrange
expected_size = 300
expected_inputs = [
{
"size": 100,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar",
"type": "File",
"location": "srm.grid.sara.nl"
},
{
"size": 200,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB001_uv.dppp.MS_74948c4c.tar",
"type": "File",
"location": "srm.grid.sara.nl"
}]
# act
input_validation.validate_inputs(self.task)
inputs = self.task.inputs
size = self.task.size_to_process
# assert
self.assertEqual(inputs, expected_inputs)
self.assertEqual(size, expected_size)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment