from django.test import TestCase import json from taskdatabase.models import Task, Workflow, Activity from taskdatabase.services.common import State class TestProcessedSummary(TestCase): def setUp(self): """ initialize test data """ self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation") self.workflow_requantisation.save() self.task1 = Task.objects.create(sas_id=222, filter="test_blabla", new_status=State.PROCESSED.value, workflow=self.workflow_requantisation, is_summary=False) self.task1.save() self.task2 = Task.objects.create(sas_id=222, new_status=State.PROCESSED.value, workflow=self.workflow_requantisation, outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) self.task2.save() def test_processed_not_on_hold(self): """ task 1 is processed, but not a summary dataproduct. Should NOT go on hold """ actual = self.task1.resume self.assertEqual(actual, True) def test_processed_on_hold(self): """ task 2 is processed, and a summary dataproduct. Should go on hold """ actual = self.task2.resume # this test fails, because "self.resume = False" is still commented out in models.py L249 self.assertEqual(actual, False) def test_activity_is_processed(self): """ both tasks are processed, the activity should have the is_processed flag now """ actual = self.task1.activity.is_processed self.assertEqual(actual, True)