Newer
Older
from django.test import TestCase
import json
from taskdatabase.models import Task, Workflow, Activity
from taskdatabase.services.common import State
self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation")
self.workflow_requantisation.save()
self.task1 = Task.objects.create(sas_id=12345,
status='stored',
workflow=self.workflow_requantisation,
calculated_qualities={"per_task": "good", "per_sasid": "good"})
self.task2 = Task.objects.create(sas_id=12345,
status='scrubbed',
workflow=self.workflow_requantisation,
calculated_qualities={"per_task": "good", "per_sasid": "good"})
self.task3 = Task.objects.create(sas_id=12345,
status='archived',
workflow=self.workflow_requantisation,
archive={
"path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9",
"lta_object_id": "1101AB934B386BD5E063164A17AC38B9",
"sas_id_archived": "1219995"
},
size_to_process=1000,
size_processed=500)
self.task4 = Task.objects.create(sas_id=12345,
status='finished',
workflow=self.workflow_requantisation,
archive={
"path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9",
"lta_object_id": "1101AB934B386BD5E063164A17AC38B9",
"sas_id_archived": "1219995"
},
size_to_process=1000,
size_processed=500)
self.task5 = Task.objects.create(sas_id=12345,
status='archived_failed',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500)
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
self.task6 = Task.objects.create(sas_id=111,
new_status='stored',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500)
self.task6.save()
self.task7 = Task.objects.create(sas_id=111,
new_status='processed',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500)
self.task7.save()
self.task8 = Task.objects.create(sas_id=111,
new_status='processed',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500,
outputs={"tar_archive": [
{"size": 4885985280, "basename": "L185619_summaryCS.tar",
"nameroot": "L185619_summaryCS"}]}
)
self.task8.save()
self.task9 = Task.objects.create(sas_id=112,
new_status='processing',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500)
self.task9.save()
self.task10 = Task.objects.create(sas_id=112,
new_status='processed',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500,
outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]})
self.task10.save()
activity = self.task1.activity
# test if an activity with the correct sas_id was created
actual = activity.sas_id
self.assertEqual(actual, 12345)
"""
test if the activity gets the calculated quality of the sas_id of the stored task
"""
activity = self.task1.activity
actual = activity.calculated_quality
self.assertEqual(actual, "good")
"""
test if the activity gets the ingested_fraction of 0 when scrubbed
"""
activity = self.task2.activity
actual = activity.ingestq_status
self.assertEqual(actual, {'scrubbed': 1})
actual = activity.ingested_fraction
self.assertEqual(actual, 0)
def test_archived(self):
"""
test if the activity gets the ingested_fraction of an archived task
"""
activity = self.task3.activity
actual = activity.ingestq_status
self.assertEqual(actual, {'scrubbed': 1, 'archived': 1})
actual = activity.archive['sas_id_archived']
self.assertEqual(actual, "1219995")
"""
test if the activity gets the proper values from 'archived' json of a finished task
"""
activity = self.task4.activity
actual = activity.ingestq_status
self.assertEqual(actual, {'scrubbed': 1, 'archived': 1})
actual = activity.archive['sas_id_archived']
self.assertEqual(actual, "1219995")
def test_failed(self):
"""
test if the activity gets the ingested_fraction of an archived task
"""
activity = self.task5.activity
actual = activity.finished_fraction
self.assertEqual(actual, 33)
actual = activity.total_size
self.assertEqual(actual, 3000)
actual = activity.remaining
self.assertEqual(actual, 2000)
def test_filter_and_workflow(self):
"""
test if the activity gets the filter and workflow_id of updated tasks
"""
activity = self.task1.activity
actual = activity.filter
actual = activity.workflow_id
self.assertEqual(actual, 22)
def test_is_not_processed(self):
"""
task 9 is not processed, task 10 is processed.
The activity.is_processed should be false
"""
activity = self.task9.activity
actual = activity.is_processed
self.assertEqual(actual, False)
def test_is_processed(self):
"""
task 6, 7 and 8 are processed,
activity.is_processed should be true and activity status should go to 'aggregate'
"""
activity = self.task6.activity
actual = activity.is_processed
self.assertEqual(actual, True)
actual = activity.status
self.assertEqual(actual, State.AGGREGATE.value)