Newer
Older
from taskdatabase.models import Task, Workflow
class TestIngestFraction(TestCase):
def setUp(self):
# create a list of Tasks with various values of rfi_percent to test the quality algorithms
workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation")
workflow_requantisation.save()
Task.objects.get_or_create(filter='a',sas_id=54321, status='stored', workflow=workflow_requantisation)
Task.objects.get_or_create(filter='a',sas_id=54321, status='archiving', workflow=workflow_requantisation)
Task.objects.get_or_create(filter='a',sas_id=54321, status='archived', workflow=workflow_requantisation)
Task.objects.get_or_create(filter='a',sas_id=54321, status='finishing', workflow=workflow_requantisation)
Task.objects.get_or_create(filter='a', sas_id=54321, status='finished', workflow=workflow_requantisation)
Task.objects.get_or_create(filter='b',sas_id=54321, status='finished', workflow=workflow_requantisation)
Task.objects.get_or_create(filter='a', sas_id=54321, status='discarded', workflow=workflow_requantisation)
Task.objects.get_or_create(filter='a', sas_id=54321, status='archived_failed', workflow=workflow_requantisation)
Task.objects.get_or_create(filter='a',sas_id=54321, status='scrubbed', workflow=workflow_requantisation)
Task.objects.get_or_create(filter='b',sas_id=54321, status='scrubbed', workflow=workflow_requantisation)
def test_ingest_fraction(self):
# collapse all tasks into a single task for this sas_id
task = Task.objects.filter(sas_id=54321).distinct('sas_id')[0]
statusses = task.activity.ingestq_status
completion = task.activity.ingested_fraction
self.assertEqual(statusses, {'scrubbed': 2, 'archiving': 1, 'archived': 1, 'finishing': 1, 'finished': 2, 'discarded': 1, 'archived_failed': 1})
self.assertEqual(completion,38)