Skip to content
Snippets Groups Projects
Commit 0384de4b authored by Nico Vermaas's avatar Nico Vermaas
Browse files

check for STORED instead of PROCESSED

added unit tests
parent 99a4bf1d
No related branches found
No related tags found
1 merge request!343added mechanism and structure to check if a task is a 'summary' task
Pipeline #73241 failed
......@@ -217,7 +217,7 @@ class Task(models.Model):
self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds)
# nv:20feb2024, check if this task is a summary task
if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value):
if (self.status != State.STORED.value) & (self.new_status == State.STORED.value):
self.is_summary = check_if_summary(self)
# nv:20feb2024, same as above, but for backward compatibilty reasons.
......
......@@ -31,7 +31,7 @@
{% include 'taskdatabase/pagination.html' %}
</div>
</div>
<p class="footer"> Version 20 Feb 2024
<p class="footer"> Version 22 Feb 2024
</div>
{% include 'taskdatabase/refresh.html' %}
......
from django.test import TestCase
import json
from taskdatabase.models import Task, Workflow, Activity
from taskdatabase.services.common import State
class TestSummaryTasks(TestCase):
def setUp(self):
"""
initialize test data
"""
self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation")
self.workflow_requantisation.save()
self.no_summary_task = Task.objects.create(sas_id=77777, new_status=State.DEFINED.value, workflow=self.workflow_requantisation,
outputs={"tar_archive": [{"size": 4885985280, "basename": "L621240_SAP002_B073_P000_bf.tar", "nameroot": "L621240_SAP002_B073_P000_bf"}]})
self.summary_task_defined = Task.objects.create(sas_id=77777, new_status=State.DEFINED.value, workflow=self.workflow_requantisation,
outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]})
self.summary_task_stored = Task.objects.create(sas_id=77777, new_status=State.STORED.value, workflow=self.workflow_requantisation,
outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]})
def test_no_summary_task(self):
"""
test task that is not a summary task
"""
actual = self.no_summary_task.is_summary
self.assertEqual(actual, False)
def test_summary_task_defined(self):
"""
test summary task, but before it knows that it becomes a summary task (which it only knows when 'processed')
"""
actual = self.summary_task_defined.is_summary
self.assertEqual(actual, False)
def test_summary_task_stored(self):
"""
test summary task, at 'stored' it should know that it is a summary task and return True)
"""
self.summary_task_stored.save()
actual = self.summary_task_stored.is_summary
self.assertEqual(actual, True)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment