Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_atdb_insert.py 11.90 KiB
from unittest import mock

import rest_framework.test as rtest
from django.contrib.auth import get_user_model
from django.urls import reverse
from lofardata.models import SUBMISSION_STATUS, ATDBProcessingSite, WorkSpecification
from lofardata.tasks import SessionStore, insert_task_into_atdb
from lofardata.tests.util import MockResponse, mocked_delay
from rest_framework import status

# This is needed to initialize a session
SessionStore.get_session()


# ATDB Get Requests
def mocked_requests_get(*args, **kwargs):
    if args[0] == "http://someurl.com/test.json":
        return MockResponse({"key1": "value1"}, 200)
    elif args[0] == "http://someotherurl.com/anothertest.json":
        return MockResponse({"key2": "value2"}, 200)

    return MockResponse(None, 404)


# ATDB Post Requests
def mocked_requests_post(*args, **kwargs):

    if kwargs["json"]["project"] == "kaput_42":
        return MockResponse({"detail": "not ok"}, 500)

    if args[0] == "https://example.com/atdb/tasks/":
        return MockResponse({"id": 42}, 201)

    return MockResponse(None, 404)


class TestATDBInsertRequest(rtest.APITestCase):
    def setUp(self):
        self.user = get_user_model().objects.create_superuser("admin")
        self.client.force_authenticate(self.user)

    def test_ws_submit_request(self):
        """Test submitting a workspecification to ATDB"""
        with mock.patch("lofardata.tasks.insert_task_into_atdb.delay") as mocked_delay:
            data = {}
            res = self.client.post(
                reverse("workspecification-submit", kwargs={"pk": 1}), data=data
            )

            mocked_delay.assert_called_once_with("1")

            self.assertEqual(
                res.status_code,
                status.HTTP_202_ACCEPTED,
                "Submitting job failed:\n" + str(res.content),
            )


@mock.patch(
    "lofardata.tasks.SessionStore._session.post", side_effect=mocked_requests_post
)
class TestATDBInsertTask(rtest.APITestCase):
    @mock.patch(
        "lofardata.tasks.define_work_specification.delay", side_effect=mocked_delay
    )
    def setUp(self, mock_delay: mock.MagicMock):
        site: ATDBProcessingSite = ATDBProcessingSite.objects.create(
            name="Dwingeloo", url="https://example.com/atdb/", access_token="DummyToken"
        )

        # Create Work Specification
        ws: WorkSpecification = WorkSpecification.objects.create(processing_site=site)
        mock_delay.assert_called_once_with(ws.pk)  # not really needed here actually
        ws.refresh_from_db()

        # Simulated interaction where the workflow was set
        ws.selected_workflow = "https://example.com/atdb/workflow/1/"
        ws.save()
        self.work_spec: WorkSpecification = ws

    def test_insert_task_into_atdb(
        self,
        mock_post: mock.MagicMock,
    ):

        self.work_spec.inputs = {
            "surls": [
                {
                    "size": 7395952640,
                    "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                }
            ]
        }
        self.work_spec.save()

        EXAMPLE_TASK = {
            "project": "lc0_012",
            "sas_id": "161482",
            "task_type": "regular",
            "filter": f"ldv-spec:{self.work_spec.pk}",
            "purge_policy": "no",
            "new_status": "defining",
            "new_workflow_uri": "https://example.com/atdb/workflow/1/",
            "size_to_process": 7395952640,
            "inputs": [
                {
                    "size": 7395952640,
                    "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                    "type": "File",
                    "location": "srm.grid.sara.nl",
                }
            ],
        }

        insert_task_into_atdb(workspecification_id=self.work_spec.pk)

        mock_post.assert_called_once_with(
            "https://example.com/atdb/tasks/", json=EXAMPLE_TASK, auth=mock.ANY
        )

        self.work_spec.refresh_from_db()
        self.assertEquals(
            self.work_spec.submission_status,
            SUBMISSION_STATUS.DEFINING,
            "Work Spec should be in defining state",
        )
        self.assertEqual(len(self.work_spec.related_tasks), 1)  # expect 1 task

    def test_batching_of_files(self, mock_post: mock.MagicMock):
        """Tests batching of multiple files"""

        self.work_spec.inputs = {
            "surls": [
                {
                    "size": 7395952640,
                    "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                    "type": "File",
                    "location": "srm.grid.sara.nl",
                }
            ]
            * 10  # ten entries
        }

        self.work_spec.batch_size = 3  # expect 4 batches
        self.work_spec.save()

        insert_task_into_atdb(workspecification_id=self.work_spec.pk)

        self.assertEqual(mock_post.call_count, 4, "Invalid amount of calls to ATDB")

        self.work_spec.refresh_from_db()
        self.assertEquals(
            self.work_spec.submission_status,
            SUBMISSION_STATUS.DEFINING,
            "Work Spec should be in defining state",
        )
        self.assertEqual(len(self.work_spec.related_tasks), 4)  # expect 4 tasks

    def test_batching_of_files_with_options(self, mock_post: mock.MagicMock):
        """Tests batching of multiple files"""

        self.work_spec.inputs = {
            "optional": True,
            "surls": [
                         {
                             "size": 7395952640,
                             "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                             "type": "File",
                             "location": "srm.grid.sara.nl",
                         }
                     ]
                     * 10  # ten entries
        }

        self.work_spec.batch_size = 3  # expect 4 batches
        self.work_spec.save()

        insert_task_into_atdb(workspecification_id=self.work_spec.pk)

        self.assertEqual(mock_post.call_count, 4, "Invalid amount of calls to ATDB")

        for call_arg in mock_post.call_args_list:
            self.assertIn('optional', call_arg.kwargs['json']['inputs'])
            self.assertTrue(call_arg.kwargs['json']['inputs']['optional'])

        self.work_spec.refresh_from_db()
        self.assertEquals(
            self.work_spec.submission_status,
            SUBMISSION_STATUS.DEFINING,
            "Work Spec should be in defining state",
        )
        self.assertEqual(len(self.work_spec.related_tasks), 4)  # expect 4 tasks

    def test_batching_of_files_exact_fit(self, mock_post: mock.MagicMock):
        """Tests batching of multiple files which fit the bath_size exactly"""

        self.work_spec.inputs = {
            "surls": [
                {
                    "size": 7395952640,
                    "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                    "type": "File",
                    "location": "srm.grid.sara.nl",
                }
            ]
            * 9
        }

        self.work_spec.batch_size = 3  # expect 3 batches
        self.work_spec.save()

        insert_task_into_atdb(workspecification_id=self.work_spec.pk)

        self.assertEqual(mock_post.call_count, 3, "Invalid amount of calls to ATDB")

        self.work_spec.refresh_from_db()
        self.assertEquals(
            self.work_spec.submission_status,
            SUBMISSION_STATUS.DEFINING,
            "Work Spec should be in defining state",
        )
        self.assertEqual(len(self.work_spec.related_tasks), 3)  # expect 3 tasks

    def test_atdb_kaput(self, mock_post: mock.MagicMock):
        """Test handling of ATDB Failure"""
        self.work_spec.inputs = {
            "surls": [
                {
                    "size": 7395952640,
                    "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/kaput_42/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                }
            ]
        }
        self.work_spec.save()

        insert_task_into_atdb(workspecification_id=self.work_spec.pk)

        self.work_spec.refresh_from_db()
        self.assertEquals(
            self.work_spec.submission_status,
            SUBMISSION_STATUS.ERROR,
            "Work Spec should be in error state",
        )

    def test_atdb_kaput_batch(self, mock_post: mock.MagicMock):
        """Test handling of ATDB Failure"""
        self.work_spec.inputs = {
            "surls": [
                {
                    "size": 7395952640,
                    "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                },
                {
                    "size": 7395952640,
                    "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/kaput_42/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                },
                {
                    "size": 7395952640,
                    "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                },
            ]
        }
        self.work_spec.batch_size = 1  # use a small batch size
        self.work_spec.save()

        insert_task_into_atdb(workspecification_id=self.work_spec.pk)

        self.work_spec.refresh_from_db()
        self.assertEquals(
            self.work_spec.submission_status,
            SUBMISSION_STATUS.ERROR,
            "Work Spec should be in error state",
        )

    def test_auto_submit_after_success(self, mock_post):
        """Test whether submission get's triggered"""

        self.work_spec.inputs = {
            "surls": [
                {
                    "size": 7395952640,
                    "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                    "type": "File",
                    "location": "srm.grid.sara.nl",
                }
            ]
            * 10  # ten entries
        }

        self.work_spec.batch_size = 3  # expect 4 batches
        self.work_spec.is_auto_submit = True
        self.work_spec.save()

        with mock.patch("lofardata.tasks.set_tasks_defined.delay") as submit_delay:
            insert_task_into_atdb(workspecification_id=self.work_spec.pk)
            submit_delay.assert_called_once_with(self.work_spec.pk)

    def test_auto_submit_after_failure(self, mock_post):
        """Test whether submission is not done with a failure in a batch"""

        self.work_spec.inputs = {
            "surls": [
                {
                    "size": 7395952640,
                    "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                },
                {
                    "size": 7395952640,
                    "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/kaput_42/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                },
                {
                    "size": 7395952640,
                    "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar",
                },
            ]
        }

        self.work_spec.batch_size = 1
        self.work_spec.is_auto_submit = True
        self.work_spec.save()

        with mock.patch("lofardata.tasks.set_tasks_defined.delay") as submit_delay:
            insert_task_into_atdb(workspecification_id=self.work_spec.pk)
            submit_delay.assert_not_called()