-
Mattia Mancini authoredMattia Mancini authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_atdb_insert.py 11.90 KiB
from unittest import mock
import rest_framework.test as rtest
from django.contrib.auth import get_user_model
from django.urls import reverse
from lofardata.models import SUBMISSION_STATUS, ATDBProcessingSite, WorkSpecification
from lofardata.tasks import SessionStore, insert_task_into_atdb
from lofardata.tests.util import MockResponse, mocked_delay
from rest_framework import status
# This is needed to initialize a session
SessionStore.get_session()
# ATDB Get Requests
def mocked_requests_get(*args, **kwargs):
if args[0] == "http://someurl.com/test.json":
return MockResponse({"key1": "value1"}, 200)
elif args[0] == "http://someotherurl.com/anothertest.json":
return MockResponse({"key2": "value2"}, 200)
return MockResponse(None, 404)
# ATDB Post Requests
def mocked_requests_post(*args, **kwargs):
if kwargs["json"]["project"] == "kaput_42":
return MockResponse({"detail": "not ok"}, 500)
if args[0] == "https://example.com/atdb/tasks/":
return MockResponse({"id": 42}, 201)
return MockResponse(None, 404)
class TestATDBInsertRequest(rtest.APITestCase):
def setUp(self):
self.user = get_user_model().objects.create_superuser("admin")
self.client.force_authenticate(self.user)
def test_ws_submit_request(self):
"""Test submitting a workspecification to ATDB"""
with mock.patch("lofardata.tasks.insert_task_into_atdb.delay") as mocked_delay:
data = {}
res = self.client.post(
reverse("workspecification-submit", kwargs={"pk": 1}), data=data
)
mocked_delay.assert_called_once_with("1")
self.assertEqual(
res.status_code,
status.HTTP_202_ACCEPTED,
"Submitting job failed:\n" + str(res.content),
)
@mock.patch(
"lofardata.tasks.SessionStore._session.post", side_effect=mocked_requests_post
)
class TestATDBInsertTask(rtest.APITestCase):
@mock.patch(
"lofardata.tasks.define_work_specification.delay", side_effect=mocked_delay
)
def setUp(self, mock_delay: mock.MagicMock):
site: ATDBProcessingSite = ATDBProcessingSite.objects.create(
name="Dwingeloo", url="https://example.com/atdb/", access_token="DummyToken"
)
# Create Work Specification
ws: WorkSpecification = WorkSpecification.objects.create(processing_site=site)
mock_delay.assert_called_once_with(ws.pk) # not really needed here actually
ws.refresh_from_db()
# Simulated interaction where the workflow was set
ws.selected_workflow = "https://example.com/atdb/workflow/1/"
ws.save()
self.work_spec: WorkSpecification = ws
def test_insert_task_into_atdb(
self,
mock_post: mock.MagicMock,
):
self.work_spec.inputs = {
"surls": [
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
}
]
}
self.work_spec.save()
EXAMPLE_TASK = {
"project": "lc0_012",
"sas_id": "161482",
"task_type": "regular",
"filter": f"ldv-spec:{self.work_spec.pk}",
"purge_policy": "no",
"new_status": "defining",
"new_workflow_uri": "https://example.com/atdb/workflow/1/",
"size_to_process": 7395952640,
"inputs": [
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
"type": "File",
"location": "srm.grid.sara.nl",
}
],
}
insert_task_into_atdb(workspecification_id=self.work_spec.pk)
mock_post.assert_called_once_with(
"https://example.com/atdb/tasks/", json=EXAMPLE_TASK, auth=mock.ANY
)
self.work_spec.refresh_from_db()
self.assertEquals(
self.work_spec.submission_status,
SUBMISSION_STATUS.DEFINING,
"Work Spec should be in defining state",
)
self.assertEqual(len(self.work_spec.related_tasks), 1) # expect 1 task
def test_batching_of_files(self, mock_post: mock.MagicMock):
"""Tests batching of multiple files"""
self.work_spec.inputs = {
"surls": [
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
"type": "File",
"location": "srm.grid.sara.nl",
}
]
* 10 # ten entries
}
self.work_spec.batch_size = 3 # expect 4 batches
self.work_spec.save()
insert_task_into_atdb(workspecification_id=self.work_spec.pk)
self.assertEqual(mock_post.call_count, 4, "Invalid amount of calls to ATDB")
self.work_spec.refresh_from_db()
self.assertEquals(
self.work_spec.submission_status,
SUBMISSION_STATUS.DEFINING,
"Work Spec should be in defining state",
)
self.assertEqual(len(self.work_spec.related_tasks), 4) # expect 4 tasks
def test_batching_of_files_with_options(self, mock_post: mock.MagicMock):
"""Tests batching of multiple files"""
self.work_spec.inputs = {
"optional": True,
"surls": [
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
"type": "File",
"location": "srm.grid.sara.nl",
}
]
* 10 # ten entries
}
self.work_spec.batch_size = 3 # expect 4 batches
self.work_spec.save()
insert_task_into_atdb(workspecification_id=self.work_spec.pk)
self.assertEqual(mock_post.call_count, 4, "Invalid amount of calls to ATDB")
for call_arg in mock_post.call_args_list:
self.assertIn('optional', call_arg.kwargs['json']['inputs'])
self.assertTrue(call_arg.kwargs['json']['inputs']['optional'])
self.work_spec.refresh_from_db()
self.assertEquals(
self.work_spec.submission_status,
SUBMISSION_STATUS.DEFINING,
"Work Spec should be in defining state",
)
self.assertEqual(len(self.work_spec.related_tasks), 4) # expect 4 tasks
def test_batching_of_files_exact_fit(self, mock_post: mock.MagicMock):
"""Tests batching of multiple files which fit the bath_size exactly"""
self.work_spec.inputs = {
"surls": [
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
"type": "File",
"location": "srm.grid.sara.nl",
}
]
* 9
}
self.work_spec.batch_size = 3 # expect 3 batches
self.work_spec.save()
insert_task_into_atdb(workspecification_id=self.work_spec.pk)
self.assertEqual(mock_post.call_count, 3, "Invalid amount of calls to ATDB")
self.work_spec.refresh_from_db()
self.assertEquals(
self.work_spec.submission_status,
SUBMISSION_STATUS.DEFINING,
"Work Spec should be in defining state",
)
self.assertEqual(len(self.work_spec.related_tasks), 3) # expect 3 tasks
def test_atdb_kaput(self, mock_post: mock.MagicMock):
"""Test handling of ATDB Failure"""
self.work_spec.inputs = {
"surls": [
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/kaput_42/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
}
]
}
self.work_spec.save()
insert_task_into_atdb(workspecification_id=self.work_spec.pk)
self.work_spec.refresh_from_db()
self.assertEquals(
self.work_spec.submission_status,
SUBMISSION_STATUS.ERROR,
"Work Spec should be in error state",
)
def test_atdb_kaput_batch(self, mock_post: mock.MagicMock):
"""Test handling of ATDB Failure"""
self.work_spec.inputs = {
"surls": [
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
},
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/kaput_42/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
},
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
},
]
}
self.work_spec.batch_size = 1 # use a small batch size
self.work_spec.save()
insert_task_into_atdb(workspecification_id=self.work_spec.pk)
self.work_spec.refresh_from_db()
self.assertEquals(
self.work_spec.submission_status,
SUBMISSION_STATUS.ERROR,
"Work Spec should be in error state",
)
def test_auto_submit_after_success(self, mock_post):
"""Test whether submission get's triggered"""
self.work_spec.inputs = {
"surls": [
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
"type": "File",
"location": "srm.grid.sara.nl",
}
]
* 10 # ten entries
}
self.work_spec.batch_size = 3 # expect 4 batches
self.work_spec.is_auto_submit = True
self.work_spec.save()
with mock.patch("lofardata.tasks.set_tasks_defined.delay") as submit_delay:
insert_task_into_atdb(workspecification_id=self.work_spec.pk)
submit_delay.assert_called_once_with(self.work_spec.pk)
def test_auto_submit_after_failure(self, mock_post):
"""Test whether submission is not done with a failure in a batch"""
self.work_spec.inputs = {
"surls": [
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
},
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/kaput_42/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
},
{
"size": 7395952640,
"surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
},
]
}
self.work_spec.batch_size = 1
self.work_spec.is_auto_submit = True
self.work_spec.save()
with mock.patch("lofardata.tasks.set_tasks_defined.delay") as submit_delay:
insert_task_into_atdb(workspecification_id=self.work_spec.pk)
submit_delay.assert_not_called()