Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_update_activity.py 9.50 KiB
from django.test import TestCase
import json
from taskdatabase.models import Configuration, Task, Workflow, Activity
from taskdatabase.services.common import State, AggregationStrategy
class TestUpdateActivity(TestCase):
def setUp(self):
"""
initialize test data
"""
# used to create the activity.storage_location
Configuration.objects.create(key="executor:workdir", value="/project/ldv/Share/run/")
self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation")
self.workflow_requantisation.save()
self.workflow_imaging_compression = Workflow(id=28, workflow_uri="imaging_compression",
aggregation_strategy = AggregationStrategy.COLLECT_H5.value)
self.workflow_imaging_compression.save()
self.task1 = Task.objects.create(sas_id=12345,
filter="test_blabla",
status='stored',
workflow=self.workflow_requantisation,
calculated_qualities={"per_task": "good", "per_sasid": "good"})
self.task2 = Task.objects.create(sas_id=12345,
status='scrubbed',
workflow=self.workflow_requantisation,
calculated_qualities={"per_task": "good", "per_sasid": "good"})
self.task3 = Task.objects.create(sas_id=12345,
status='archived',
workflow=self.workflow_requantisation,
archive={
"path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9",
"lta_object_id": "1101AB934B386BD5E063164A17AC38B9",
"sas_id_archived": "1219995"
},
size_to_process=1000,
size_processed=500)
self.task4 = Task.objects.create(sas_id=12345,
status='finished',
workflow=self.workflow_requantisation,
archive={
"path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9",
"lta_object_id": "1101AB934B386BD5E063164A17AC38B9",
"sas_id_archived": "1219995"
},
size_to_process=1000,
size_processed=500)
self.task5 = Task.objects.create(sas_id=12345,
status='archived_failed',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500)
self.task6 = Task.objects.create(sas_id=111,
new_status='stored',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500)
self.task6.save()
self.task7 = Task.objects.create(sas_id=111,
new_status='processed',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500)
self.task7.save()
self.task8 = Task.objects.create(sas_id=111,
new_status='processed',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500,
outputs={"tar_archive": [
{"size": 4885985280, "basename": "L185619_summaryCS.tar",
"nameroot": "L185619_summaryCS"}]}
)
self.task8.save()
self.task9 = Task.objects.create(sas_id=112,
new_status='processing',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500)
self.task9.save()
self.task10 = Task.objects.create(sas_id=112,
new_status='processed',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500,
outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]})
self.task10.save()
# to test imaging
self.task11 = Task.objects.create(sas_id=113,
new_status='fetched',
workflow=self.workflow_imaging_compression,
outputs={"inspect": {"location": "file:///project/ldv/Share/run/2023/3/26/331_30608/inspect.h5", "basename": "inspect.h5", "nameroot": "inspect"}})
self.task11.save()
def test_created_activity(self):
"""
test if activity is created
"""
activity = self.task1.activity
# test if an activity with the correct sas_id was created
self.assertEqual(activity.sas_id, 12345)
def test_scrubbed(self):
"""
test if the activity gets the ingested_fraction of 0 when scrubbed
"""
activity = self.task2.activity
self.assertEqual(activity.ingestq_status, {'scrubbed': 1})
self.assertEqual(activity.ingested_fraction, 0)
def test_archived(self):
"""
test if the activity gets the ingested_fraction of an archived task
"""
activity = self.task3.activity
self.assertEqual(activity.ingestq_status, {'scrubbed': 1, 'archived': 1})
self.assertEqual(activity.archive['sas_id_archived'], "1219995")
def test_finished(self):
"""
test if the activity gets the proper values from 'archived' json of a finished task
"""
activity = self.task4.activity
self.assertEqual(activity.ingestq_status, {'scrubbed': 1, 'finished': 1, 'archived': 1})
self.assertEqual(activity.archive['sas_id_archived'], "1219995")
def test_failed(self):
"""
test if the activity gets the ingested_fraction of an archived task
"""
activity = self.task5.activity
self.assertEqual(activity.finished_fraction, 33)
self.assertEqual(activity.total_size, 3000)
self.assertEqual(activity.remaining, 2000)
def test_filter_and_workflow(self):
"""
test if the activity gets the filter and workflow_id of updated tasks
"""
activity = self.task1.activity
self.assertEqual(activity.filter, "test_blabla")
self.assertEqual(activity.workflow_id, 22)
def test_is_not_processed(self):
"""
task 9 is not processed, task 10 is processed.
The activity.is_processed should be false
"""
activity = self.task9.activity
self.assertFalse(activity.is_processed)
def test_is_processed(self):
"""
task 6, 7 and 8 are processed,
activity.is_processed should be true and activity status should go to 'aggregate'
"""
activity = self.task6.activity
self.assertTrue(activity.is_processed)
self.assertEqual(activity.status, State.AGGREGATE.value)
def test_reset_activity(self):
"""
when a task is set to DEFINED or FETCHED, the Activity is reset
"""
activity = self.task1.activity
# simulate post aggregation state
activity.is_aggregated = True
activity.is_processed = True
# simulate task to FETCHED
self.task1.new_status = State.FETCHED.value
self.task1.save()
# check if the activity is reset
self.assertFalse(activity.is_aggregated)
self.assertFalse(activity.is_processed)
def test_create_storage_location(self):
"""
create activity.storage_location
WHEN a task goes to processed,
and its workflow has the COLLECT_H5 aggregation strategy
and its activity does not have a 'storage_location' yet
"""
activity = self.task11.activity
# check initial state
self.assertEqual(activity.storage_location, None)
# simulate task to PROCESSED
self.task11.new_status = State.PROCESSED.value
self.task11.save()
expected = "/project/ldv/Share/aggregate/113"
actual = self.task11.activity.storage_location
self.assertEqual(actual, expected)