Skip to content
Snippets Groups Projects
test_update_activity.py 8.06 KiB
Newer Older
Nico Vermaas's avatar
Nico Vermaas committed
from django.test import TestCase
import json
from taskdatabase.models import Task, Workflow, Activity
from taskdatabase.services.common import State
Nico Vermaas's avatar
Nico Vermaas committed

class TestUpdateActivity(TestCase):

    def setUp(self):
        """
        initialize test data
        """
Nico Vermaas's avatar
Nico Vermaas committed
        self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation")
        self.workflow_requantisation.save()

        self.task1 = Task.objects.create(sas_id=12345,
                                         filter="test_blabla",
Nico Vermaas's avatar
Nico Vermaas committed
                                         status='stored',
                                         workflow=self.workflow_requantisation,
                                         calculated_qualities={"per_task": "good", "per_sasid": "good"})
        self.task2 = Task.objects.create(sas_id=12345,
Nico Vermaas's avatar
Nico Vermaas committed
                                         status='scrubbed',
                                         workflow=self.workflow_requantisation,
                                         calculated_qualities={"per_task": "good", "per_sasid": "good"})
        self.task3 = Task.objects.create(sas_id=12345,
Nico Vermaas's avatar
Nico Vermaas committed
                                         status='archived',
                                         workflow=self.workflow_requantisation,
                                         archive={
Nico Vermaas's avatar
Nico Vermaas committed
                                            "path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9",
                                            "lta_object_id": "1101AB934B386BD5E063164A17AC38B9",
                                            "sas_id_archived": "1219995"
                                         },
                                         size_to_process=1000,
                                         size_processed=500)
        self.task4 = Task.objects.create(sas_id=12345,
                                         status='finished',
                                         workflow=self.workflow_requantisation,
                                         archive={
                                             "path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9",
                                             "lta_object_id": "1101AB934B386BD5E063164A17AC38B9",
                                             "sas_id_archived": "1219995"
                                         },
                                         size_to_process=1000,
                                         size_processed=500)
        self.task5 = Task.objects.create(sas_id=12345,
                                         status='archived_failed',
                                         workflow=self.workflow_requantisation,
                                         size_to_process=1000,
                                         size_processed=500)
Nico Vermaas's avatar
Nico Vermaas committed

        self.task6 = Task.objects.create(sas_id=111,
                                         new_status='stored',
                                         workflow=self.workflow_requantisation,
                                         size_to_process=1000,
                                         size_processed=500)
        self.task6.save()
        self.task7 = Task.objects.create(sas_id=111,
                                         new_status='processed',
                                         workflow=self.workflow_requantisation,
                                         size_to_process=1000,
                                         size_processed=500)
        self.task7.save()
        self.task8 = Task.objects.create(sas_id=111,
                                         new_status='processed',
                                         workflow=self.workflow_requantisation,
                                         size_to_process=1000,
                                         size_processed=500,
                                         outputs={"tar_archive": [
                                             {"size": 4885985280, "basename": "L185619_summaryCS.tar",
                                              "nameroot": "L185619_summaryCS"}]}
                                         )
        self.task8.save()
        self.task9 = Task.objects.create(sas_id=112,
                                         new_status='processing',
                                         workflow=self.workflow_requantisation,
                                         size_to_process=1000,
                                         size_processed=500)
        self.task9.save()
        self.task10 = Task.objects.create(sas_id=112,
                                         new_status='processed',
                                         workflow=self.workflow_requantisation,
                                         size_to_process=1000,
                                         size_processed=500,
                                         outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]})
        self.task10.save()

Nico Vermaas's avatar
Nico Vermaas committed
    def test_created_activity(self):
        """
        test if activity is created
        """
Nico Vermaas's avatar
Nico Vermaas committed
        activity = self.task1.activity

        # test if an activity with the correct sas_id was created
        actual = activity.sas_id
        self.assertEqual(actual, 12345)

Nico Vermaas's avatar
Nico Vermaas committed

Nico Vermaas's avatar
Nico Vermaas committed
    def test_stored(self):
        """
        test if the activity gets the calculated quality of the sas_id of the stored task
        """
Nico Vermaas's avatar
Nico Vermaas committed
        activity = self.task1.activity

        actual = activity.calculated_quality
        self.assertEqual(actual, "good")

Nico Vermaas's avatar
Nico Vermaas committed

    def test_scrubbed(self):
        """
        test if the activity gets the ingested_fraction of 0 when scrubbed
        """
Nico Vermaas's avatar
Nico Vermaas committed
        activity = self.task2.activity

        actual = activity.ingestq_status
Nico Vermaas's avatar
Nico Vermaas committed
        self.assertEqual(actual, {'scrubbed': 1})

        actual = activity.ingested_fraction
        self.assertEqual(actual, 0)


    def test_archived(self):
        """
        test if the activity gets the ingested_fraction of an archived task
        """
Nico Vermaas's avatar
Nico Vermaas committed
        activity = self.task3.activity

        actual = activity.ingestq_status
        self.assertEqual(actual,  {'scrubbed': 1, 'archived': 1})
Nico Vermaas's avatar
Nico Vermaas committed

        actual = activity.archive['sas_id_archived']
        self.assertEqual(actual, "1219995")
Nico Vermaas's avatar
Nico Vermaas committed


    def test_finished(self):
        """
        test if the activity gets the proper values from 'archived' json of a finished task
        """

Nico Vermaas's avatar
Nico Vermaas committed
        activity = self.task4.activity

        actual = activity.ingestq_status
        self.assertEqual(actual,  {'scrubbed': 1, 'archived': 1})

        actual = activity.archive['sas_id_archived']
        self.assertEqual(actual, "1219995")


    def test_failed(self):
        """
        test if the activity gets the ingested_fraction of an archived task
        """

Nico Vermaas's avatar
Nico Vermaas committed
        activity = self.task5.activity

        actual = activity.finished_fraction
        self.assertEqual(actual,  33)

        actual = activity.total_size
        self.assertEqual(actual, 3000)

        actual = activity.remaining
        self.assertEqual(actual, 2000)


    def test_filter_and_workflow(self):
        """
        test if the activity gets the filter and workflow_id of updated tasks
        """
Nico Vermaas's avatar
Nico Vermaas committed

        activity = self.task1.activity

        actual = activity.filter
        self.assertEqual(actual, "test_blabla")
Nico Vermaas's avatar
Nico Vermaas committed

        actual = activity.workflow_id
        self.assertEqual(actual, 22)

    def test_is_not_processed(self):
        """
        task 9 is not processed, task 10 is processed.
        The activity.is_processed should be false
        """

        activity = self.task9.activity

        actual = activity.is_processed
        self.assertEqual(actual,  False)


    def test_is_processed(self):
        """
        task 6, 7 and 8 are processed,
        activity.is_processed should be true and activity status should go to 'aggregate'
        """

        activity = self.task6.activity

        actual = activity.is_processed
        self.assertEqual(actual, True)

        actual = activity.status
        self.assertEqual(actual, State.AGGREGATE.value)