Skip to content
Snippets Groups Projects
Select Git revision
  • 36a08c7e9eff4e6a76dad4c0dd9a1636c729f591
  • main default protected
  • dev
3 results

imcal.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_update_activity.py 9.50 KiB
    from django.test import TestCase
    import json
    from taskdatabase.models import Configuration, Task, Workflow, Activity
    from taskdatabase.services.common import State, AggregationStrategy
    
    class TestUpdateActivity(TestCase):
    
        def setUp(self):
            """
            initialize test data
            """
            # used to create the activity.storage_location
            Configuration.objects.create(key="executor:workdir", value="/project/ldv/Share/run/")
    
            self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation")
            self.workflow_requantisation.save()
    
            self.workflow_imaging_compression = Workflow(id=28, workflow_uri="imaging_compression",
                                                         aggregation_strategy = AggregationStrategy.COLLECT_H5.value)
            self.workflow_imaging_compression.save()
    
            self.task1 = Task.objects.create(sas_id=12345,
                                             filter="test_blabla",
                                             status='stored',
                                             workflow=self.workflow_requantisation,
                                             calculated_qualities={"per_task": "good", "per_sasid": "good"})
            self.task2 = Task.objects.create(sas_id=12345,
                                             status='scrubbed',
                                             workflow=self.workflow_requantisation,
                                             calculated_qualities={"per_task": "good", "per_sasid": "good"})
            self.task3 = Task.objects.create(sas_id=12345,
                                             status='archived',
                                             workflow=self.workflow_requantisation,
                                             archive={
                                                "path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9",
                                                "lta_object_id": "1101AB934B386BD5E063164A17AC38B9",
                                                "sas_id_archived": "1219995"
                                             },
                                             size_to_process=1000,
                                             size_processed=500)
            self.task4 = Task.objects.create(sas_id=12345,
                                             status='finished',
                                             workflow=self.workflow_requantisation,
                                             archive={
                                                 "path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9",
                                                 "lta_object_id": "1101AB934B386BD5E063164A17AC38B9",
                                                 "sas_id_archived": "1219995"
                                             },
                                             size_to_process=1000,
                                             size_processed=500)
            self.task5 = Task.objects.create(sas_id=12345,
                                             status='archived_failed',
                                             workflow=self.workflow_requantisation,
                                             size_to_process=1000,
                                             size_processed=500)
    
            self.task6 = Task.objects.create(sas_id=111,
                                             new_status='stored',
                                             workflow=self.workflow_requantisation,
                                             size_to_process=1000,
                                             size_processed=500)
            self.task6.save()
            self.task7 = Task.objects.create(sas_id=111,
                                             new_status='processed',
                                             workflow=self.workflow_requantisation,
                                             size_to_process=1000,
                                             size_processed=500)
            self.task7.save()
            self.task8 = Task.objects.create(sas_id=111,
                                             new_status='processed',
                                             workflow=self.workflow_requantisation,
                                             size_to_process=1000,
                                             size_processed=500,
                                             outputs={"tar_archive": [
                                                 {"size": 4885985280, "basename": "L185619_summaryCS.tar",
                                                  "nameroot": "L185619_summaryCS"}]}
                                             )
            self.task8.save()
            self.task9 = Task.objects.create(sas_id=112,
                                             new_status='processing',
                                             workflow=self.workflow_requantisation,
                                             size_to_process=1000,
                                             size_processed=500)
            self.task9.save()
            self.task10 = Task.objects.create(sas_id=112,
                                             new_status='processed',
                                             workflow=self.workflow_requantisation,
                                             size_to_process=1000,
                                             size_processed=500,
                                             outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]})
            self.task10.save()
    
            # to test imaging
            self.task11 = Task.objects.create(sas_id=113,
                                             new_status='fetched',
                                             workflow=self.workflow_imaging_compression,
                                             outputs={"inspect": {"location": "file:///project/ldv/Share/run/2023/3/26/331_30608/inspect.h5", "basename": "inspect.h5", "nameroot": "inspect"}})
            self.task11.save()
    
        def test_created_activity(self):
            """
            test if activity is created
            """
            activity = self.task1.activity
    
            # test if an activity with the correct sas_id was created
            self.assertEqual(activity.sas_id, 12345)
    
    
        def test_scrubbed(self):
            """
            test if the activity gets the ingested_fraction of 0 when scrubbed
            """
            activity = self.task2.activity
    
            self.assertEqual(activity.ingestq_status, {'scrubbed': 1})
            self.assertEqual(activity.ingested_fraction, 0)
    
    
        def test_archived(self):
            """
            test if the activity gets the ingested_fraction of an archived task
            """
            activity = self.task3.activity
    
            self.assertEqual(activity.ingestq_status,  {'scrubbed': 1, 'archived': 1})
            self.assertEqual(activity.archive['sas_id_archived'], "1219995")
    
    
        def test_finished(self):
            """
            test if the activity gets the proper values from 'archived' json of a finished task
            """
    
            activity = self.task4.activity
    
            self.assertEqual(activity.ingestq_status,  {'scrubbed': 1, 'finished': 1, 'archived': 1})
            self.assertEqual(activity.archive['sas_id_archived'], "1219995")
    
    
        def test_failed(self):
            """
            test if the activity gets the ingested_fraction of an archived task
            """
    
            activity = self.task5.activity
    
            self.assertEqual(activity.finished_fraction,  33)
            self.assertEqual(activity.total_size, 3000)
            self.assertEqual(activity.remaining, 2000)
    
    
        def test_filter_and_workflow(self):
            """
            test if the activity gets the filter and workflow_id of updated tasks
            """
    
            activity = self.task1.activity
            self.assertEqual(activity.filter, "test_blabla")
            self.assertEqual(activity.workflow_id, 22)
    
        def test_is_not_processed(self):
            """
            task 9 is not processed, task 10 is processed.
            The activity.is_processed should be false
            """
    
            activity = self.task9.activity
            self.assertFalse(activity.is_processed)
    
    
        def test_is_processed(self):
            """
            task 6, 7 and 8 are processed,
            activity.is_processed should be true and activity status should go to 'aggregate'
            """
            activity = self.task6.activity
            self.assertTrue(activity.is_processed)
            self.assertEqual(activity.status, State.AGGREGATE.value)
    
        def test_reset_activity(self):
            """
            when a task is set to DEFINED or FETCHED, the Activity is reset
            """
            activity = self.task1.activity
    
            # simulate post aggregation state
            activity.is_aggregated = True
            activity.is_processed = True
    
            # simulate task to FETCHED
            self.task1.new_status = State.FETCHED.value
            self.task1.save()
    
            # check if the activity is reset
            self.assertFalse(activity.is_aggregated)
            self.assertFalse(activity.is_processed)
    
        def test_create_storage_location(self):
            """
            create activity.storage_location
    
            WHEN a task goes to processed,
            and its workflow has the COLLECT_H5 aggregation strategy
            and its activity does not have a 'storage_location' yet
            """
    
            activity = self.task11.activity
    
            # check initial state
            self.assertEqual(activity.storage_location, None)
    
            # simulate task to PROCESSED
            self.task11.new_status = State.PROCESSED.value
            self.task11.save()
    
            expected = "/project/ldv/Share/aggregate/113"
            actual = self.task11.activity.storage_location
            self.assertEqual(actual, expected)