Skip to content
Snippets Groups Projects
Commit f4f4afcc authored by Nico Vermaas's avatar Nico Vermaas
Browse files

ancillary changes for SDC-1324

parent d1151538
Branches
No related tags found
1 merge request!350SDC-1313 ancillary dataproducts to dcache (ATDB side)
Pipeline #77730 failed
......@@ -115,6 +115,9 @@ class Activity(models.Model):
# set by update_activity, used by Validation Page
is_verified = models.BooleanField(default=False)
# flag set by ATDB to indicate that all tasks of this Activity has been processed
is_processed = models.BooleanField(default=False)
# TODO: flag set by the 'validate' step in ATDB, used by ancillary service
is_validated = models.BooleanField(default=False)
......@@ -230,12 +233,19 @@ class Task(models.Model):
tasks_for_this_sasid = Task.objects.filter(sas_id=self.sas_id)
self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds)
# nv:20feb2024, check if this task is a summary task
# when a task goes to PROCESSED...
if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value):
# check if this task is a summary task
self.is_summary = check_if_summary(self)
# if so, temporarily put it on hold so that the ancillary service can play with it
if self.is_summary:
self.resume = False
# nv:20feb2024, same as above, but for backward compatibilty reasons.
# For tasks that are already beyond PROCESSED, but not yet ingested.
# But it has no use to put them on 'hold' for the ancillary service,
# because at this point the spider work directory has already been deleted by datamanager
if (self.status != State.VALIDATED.value) & (self.new_status == State.VALIDATED.value):
self.is_summary = check_if_summary(self)
......
import logging;
from .common import State, verified_statusses
from .common import State, verified_statusses, processed_statusses
from taskdatabase.models import Task, Activity
logger = logging.getLogger(__name__)
......@@ -129,6 +129,25 @@ def update_activity(task):
activity.remaining = result['remaining']
activity.save()
# check if all tasks of this SAS_ID have a status that is considered 'processed'
# this is used as a trigger for the ancillary service
if task.status in processed_statusses:
current_is_processed = activity.is_processed
activity.is_processed = True
for t in Task.objects.filter(sas_id=task.sas_id):
if t.status not in processed_statusses:
activity.is_processed = False
break
# only save when changed
if activity.is_processed != current_is_processed:
# if the whole activity has become processed, then set the status of this activity to 'HANDLE_ANCILLARY'
if activity.is_processed:
activity.status = State.HANDLE_ANCILLARY.value
activity.save()
# check if all tasks of this SAS_ID have a status that is considered 'verified'
# this is used for the Validation Page
current_is_verified = activity.is_verified
......
......@@ -8,10 +8,13 @@ from enum import Enum
logger = logging.getLogger(__name__)
class State(Enum):
UNKNOWN = "unknown"
DEFINED = "defined"
STAGED = "staged"
FETCHED = "fetched"
PROCESSED = "processed"
HANDLE_ANCILLARY = "handle_ancillary"
ANCILLARY_HANDLED = "ancillary_handled"
STORED = 'stored'
VALIDATED = "validated"
SCRUBBED = "scrubbed"
......
......@@ -31,7 +31,7 @@
{% include 'taskdatabase/pagination.html' %}
</div>
</div>
<p class="footer"> Version 29 Mar 2024
<p class="footer"> Version 1 Apr 2024
</div>
{% include 'taskdatabase/refresh.html' %}
......
from django.test import TestCase
import json
from taskdatabase.models import Task, Workflow, Activity
from taskdatabase.services.common import State
class TestProcessedSummary(TestCase):
def setUp(self):
"""
initialize test data
"""
self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation")
self.workflow_requantisation.save()
self.task1 = Task.objects.create(sas_id=222,
filter="test_blabla",
new_status=State.PROCESSED.value,
workflow=self.workflow_requantisation,
is_summary=False)
self.task1.save()
self.task2 = Task.objects.create(sas_id=222,
new_status=State.PROCESSED.value,
workflow=self.workflow_requantisation,
outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]})
self.task2.save()
def test_processed_not_on_hold(self):
"""
task 1 is processed, but not a summary dataproduct. Should NOT go on hold
"""
actual = self.task1.resume
self.assertEqual(actual, True)
def test_processed_on_hold(self):
"""
task 2 is processed, and a summary dataproduct. Should go on hold
"""
actual = self.task2.resume
self.assertEqual(actual, False)
def test_activity_is_processed(self):
"""
both tasks are processed, the activity should have the is_processed flag now
"""
actual = self.task1.activity.is_processed
self.assertEqual(actual, True)
\ No newline at end of file
......@@ -14,7 +14,7 @@ class TestSummaryTasks(TestCase):
outputs={"tar_archive": [{"size": 4885985280, "basename": "L621240_SAP002_B073_P000_bf.tar", "nameroot": "L621240_SAP002_B073_P000_bf"}]})
self.summary_task_defined = Task.objects.create(sas_id=77777, new_status=State.DEFINED.value, workflow=self.workflow_requantisation,
outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]})
self.summary_task_stored = Task.objects.create(sas_id=77777, new_status=State.STORED.value, workflow=self.workflow_requantisation,
self.summary_task_validated = Task.objects.create(sas_id=77777, new_status=State.VALIDATED.value, workflow=self.workflow_requantisation,
outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]})
def test_no_summary_task(self):
......@@ -37,6 +37,6 @@ class TestSummaryTasks(TestCase):
"""
test summary task, at 'stored' it should know that it is a summary task and return True)
"""
self.summary_task_stored.save()
actual = self.summary_task_stored.is_summary
self.summary_task_validated.save()
actual = self.summary_task_validated.is_summary
self.assertEqual(actual, True)
from django.test import TestCase
import json
from taskdatabase.models import Task, Workflow, Activity
from taskdatabase.services.common import State
class TestUpdateActivity(TestCase):
......@@ -46,6 +47,42 @@ class TestUpdateActivity(TestCase):
size_to_process=1000,
size_processed=500)
self.task6 = Task.objects.create(sas_id=111,
new_status='stored',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500)
self.task6.save()
self.task7 = Task.objects.create(sas_id=111,
new_status='processed',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500)
self.task7.save()
self.task8 = Task.objects.create(sas_id=111,
new_status='processed',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500,
outputs={"tar_archive": [
{"size": 4885985280, "basename": "L185619_summaryCS.tar",
"nameroot": "L185619_summaryCS"}]}
)
self.task8.save()
self.task9 = Task.objects.create(sas_id=112,
new_status='processing',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500)
self.task9.save()
self.task10 = Task.objects.create(sas_id=112,
new_status='processed',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500,
outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]})
self.task10.save()
def test_created_activity(self):
"""
test if activity is created
......@@ -137,3 +174,27 @@ class TestUpdateActivity(TestCase):
actual = activity.workflow_id
self.assertEqual(actual, 22)
def test_is_not_processed(self):
"""
task 9 is not processed, task 10 is processed.
The activity.is_processed should be false
"""
activity = self.task9.activity
actual = activity.is_processed
self.assertEqual(actual, False)
def test_is_processed(self):
"""
task 6, 7 and 8 are processed,
activity.is_processed should be true and activity status should go to 'handle_ancillary'
"""
activity = self.task6.activity
actual = activity.is_processed
self.assertEqual(actual, True)
actual = activity.status
self.assertEqual(actual, State.HANDLE_ANCILLARY.value)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment