from django.test import TestCase import json from taskdatabase.models import Task, Workflow, Activity class TestUpdateActivity(TestCase): def setUp(self): self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation") self.workflow_requantisation.save() self.task1 = Task.objects.create(sas_id=12345, status='stored', workflow=self.workflow_requantisation, calculated_qualities={"per_task": "good", "per_sasid": "good"}) self.task2 = Task.objects.create(sas_id=12345, status='archived', workflow=self.workflow_requantisation, archive={ "path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9", "lta_object_id": "1101AB934B386BD5E063164A17AC38B9", "sas_id_archived": "1219995" }) def test_created_activity(self): activity = self.task1.activity # test if an activity with the correct sas_id was created actual = activity.sas_id self.assertEqual(actual, 12345) def test_stored(self): # test if the activity gets the calculated quality of the sas_id of the stored task activity = self.task1.activity actual = activity.calculated_quality self.assertEqual(actual, "good") def test_archived(self): # test if the activity gets the ingested_fraction of an archived task activity = self.task2.activity actual = activity.ingestq_status self.assertEqual(actual, {'archived': 1}) actual = activity.archive['sas_id_archived'] self.assertEqual(actual, "1219995")