Newer
Older
from django.test import TestCase
import json
from taskdatabase.models import Task, Workflow, Activity
class TestUpdateActivity(TestCase):
def setUp(self):
self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation")
self.workflow_requantisation.save()
self.task1 = Task.objects.create(sas_id=12345,
status='stored',
workflow=self.workflow_requantisation,
calculated_qualities={"per_task": "good", "per_sasid": "good"})
self.task2 = Task.objects.create(sas_id=12345,
status='scrubbed',
workflow=self.workflow_requantisation,
calculated_qualities={"per_task": "good", "per_sasid": "good"})
self.task3 = Task.objects.create(sas_id=12345,
status='archived',
workflow=self.workflow_requantisation,
archive={
"path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9",
"lta_object_id": "1101AB934B386BD5E063164A17AC38B9",
"sas_id_archived": "1219995"
},
size_to_process=1000,
size_processed=500)
self.task4 = Task.objects.create(sas_id=12345,
status='finished',
workflow=self.workflow_requantisation,
archive={
"path_to_lta": "https://lta.lofar.eu//Lofar?project=ALL&mode=show_dataproducts_pipe&product=PulsarPipeline&pipeline_object_id=1101AB934B386BD5E063164A17AC38B9",
"lta_object_id": "1101AB934B386BD5E063164A17AC38B9",
"sas_id_archived": "1219995"
},
size_to_process=1000,
size_processed=500)
self.task5 = Task.objects.create(sas_id=12345,
status='archived_failed',
workflow=self.workflow_requantisation,
size_to_process=1000,
size_processed=500)
activity = self.task1.activity
# test if an activity with the correct sas_id was created
actual = activity.sas_id
self.assertEqual(actual, 12345)
"""
test if the activity gets the calculated quality of the sas_id of the stored task
"""
activity = self.task1.activity
actual = activity.calculated_quality
self.assertEqual(actual, "good")
"""
test if the activity gets the ingested_fraction of 0 when scrubbed
"""
activity = self.task2.activity
actual = activity.ingestq_status
self.assertEqual(actual, {'scrubbed': 1})
actual = activity.ingested_fraction
self.assertEqual(actual, 0)
def test_archived(self):
"""
test if the activity gets the ingested_fraction of an archived task
"""
activity = self.task3.activity
actual = activity.ingestq_status
self.assertEqual(actual, {'scrubbed': 1, 'archived': 1})
actual = activity.archive['sas_id_archived']
self.assertEqual(actual, "1219995")
"""
test if the activity gets the proper values from 'archived' json of a finished task
"""
activity = self.task4.activity
actual = activity.ingestq_status
self.assertEqual(actual, {'scrubbed': 1, 'archived': 1})
actual = activity.archive['sas_id_archived']
self.assertEqual(actual, "1219995")
def test_failed(self):
"""
test if the activity gets the ingested_fraction of an archived task
"""
activity = self.task5.activity
actual = activity.finished_fraction
self.assertEqual(actual, 33)
actual = activity.total_size
self.assertEqual(actual, 3000)
actual = activity.remaining
self.assertEqual(actual, 2000)
def test_filter_and_workflow(self):
"""
test if the activity gets the filter and workflow_id of updated tasks
"""
activity = self.task1.activity
actual = activity.filter
actual = activity.workflow_id
self.assertEqual(actual, 22)