Newer
Older
from django.test import TestCase
import json
from taskdatabase.models import Task, Workflow, Activity
from taskdatabase.services.common import State
class TestProcessedSummary(TestCase):
def setUp(self):
"""
initialize test data
"""
self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation")
self.workflow_requantisation.save()
self.task1 = Task.objects.create(sas_id=222,
filter="test_blabla",
new_status=State.PROCESSED.value,
workflow=self.workflow_requantisation,
is_summary=False)
self.task1.save()
self.task2 = Task.objects.create(sas_id=222,
new_status=State.PROCESSED.value,
workflow=self.workflow_requantisation,
outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]})
self.task2.save()
def test_processed_not_on_hold(self):
"""
task 1 is processed, but not a summary dataproduct. Should NOT go on hold
"""
actual = self.task1.resume
self.assertEqual(actual, True)
def test_processed_on_hold(self):
"""
task 2 is processed, and a summary dataproduct. Should go on hold
"""
actual = self.task2.resume
# this test fails, because "self.resume = False" is still commented out in models.py L249
self.assertEqual(actual, False)
def test_activity_is_processed(self):
"""
both tasks are processed, the activity should have the is_processed flag now
"""
actual = self.task1.activity.is_processed
self.assertEqual(actual, True)