from django.test import TestCase import json from taskdatabase.models import Task, Workflow, Activity from taskdatabase.services.common import State class TestUpdateActivity(TestCase): def setUp(self): """ initialize test data """ self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation") self.workflow_requantisation.save() self.task1 = Task.objects.create(sas_id=12345, filter="test_blabla", status='stored', workflow=self.workflow_requantisation, calculated_qualities={"per_task": "good", "per_sasid": "good"}) self.task2 = Task.objects.create(sas_id=12345, status='scrubbed', workflow=self.workflow_requantisation, calculated_qualities={"per_task": "good", "per_sasid": "good"}) self.task3 = Task.objects.create(sas_id=12345, status='archived', workflow=self.workflow_requantisation, archive={ "path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9", "lta_object_id": "1101AB934B386BD5E063164A17AC38B9", "sas_id_archived": "1219995" }, size_to_process=1000, size_processed=500) self.task4 = Task.objects.create(sas_id=12345, status='finished', workflow=self.workflow_requantisation, archive={ "path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9", "lta_object_id": "1101AB934B386BD5E063164A17AC38B9", "sas_id_archived": "1219995" }, size_to_process=1000, size_processed=500) self.task5 = Task.objects.create(sas_id=12345, status='archived_failed', workflow=self.workflow_requantisation, size_to_process=1000, size_processed=500) self.task6 = Task.objects.create(sas_id=111, new_status='stored', workflow=self.workflow_requantisation, size_to_process=1000, size_processed=500) self.task6.save() self.task7 = Task.objects.create(sas_id=111, new_status='processed', workflow=self.workflow_requantisation, size_to_process=1000, size_processed=500) self.task7.save() self.task8 = Task.objects.create(sas_id=111, new_status='processed', workflow=self.workflow_requantisation, size_to_process=1000, size_processed=500, outputs={"tar_archive": [ {"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]} ) self.task8.save() self.task9 = Task.objects.create(sas_id=112, new_status='processing', workflow=self.workflow_requantisation, size_to_process=1000, size_processed=500) self.task9.save() self.task10 = Task.objects.create(sas_id=112, new_status='processed', workflow=self.workflow_requantisation, size_to_process=1000, size_processed=500, outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) self.task10.save() def test_created_activity(self): """ test if activity is created """ activity = self.task1.activity # test if an activity with the correct sas_id was created actual = activity.sas_id self.assertEqual(actual, 12345) def test_stored(self): """ test if the activity gets the calculated quality of the sas_id of the stored task """ activity = self.task1.activity actual = activity.calculated_quality self.assertEqual(actual, "good") def test_scrubbed(self): """ test if the activity gets the ingested_fraction of 0 when scrubbed """ activity = self.task2.activity actual = activity.ingestq_status self.assertEqual(actual, {'scrubbed': 1}) actual = activity.ingested_fraction self.assertEqual(actual, 0) def test_archived(self): """ test if the activity gets the ingested_fraction of an archived task """ activity = self.task3.activity actual = activity.ingestq_status self.assertEqual(actual, {'scrubbed': 1, 'archived': 1}) actual = activity.archive['sas_id_archived'] self.assertEqual(actual, "1219995") def test_finished(self): """ test if the activity gets the proper values from 'archived' json of a finished task """ activity = self.task4.activity actual = activity.ingestq_status self.assertEqual(actual, {'scrubbed': 1, 'archived': 1}) actual = activity.archive['sas_id_archived'] self.assertEqual(actual, "1219995") def test_failed(self): """ test if the activity gets the ingested_fraction of an archived task """ activity = self.task5.activity actual = activity.finished_fraction self.assertEqual(actual, 33) actual = activity.total_size self.assertEqual(actual, 3000) actual = activity.remaining self.assertEqual(actual, 2000) def test_filter_and_workflow(self): """ test if the activity gets the filter and workflow_id of updated tasks """ activity = self.task1.activity actual = activity.filter self.assertEqual(actual, "test_blabla") actual = activity.workflow_id self.assertEqual(actual, 22) def test_is_not_processed(self): """ task 9 is not processed, task 10 is processed. The activity.is_processed should be false """ activity = self.task9.activity actual = activity.is_processed self.assertEqual(actual, False) def test_is_processed(self): """ task 6, 7 and 8 are processed, activity.is_processed should be true and activity status should go to 'aggregate' """ activity = self.task6.activity actual = activity.is_processed self.assertEqual(actual, True) actual = activity.status self.assertEqual(actual, State.AGGREGATE.value)