diff --git a/.gitignore b/.gitignore index a047a9475d58e13a9b65c3e7f237c80f8b05ea2d..3758912e3aca385e97a83ad833d945eb6675402a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .idea/ -__pycache__/ \ No newline at end of file +.vscode +__pycache__/ diff --git a/README.md b/README.md index f150411a3a307a83e3cc07120905e0108e51e1aa..1585d34d9b68be33c8c7aa2b99a77d9101bb4ba2 100644 --- a/README.md +++ b/README.md @@ -5,58 +5,63 @@ LDV Specification Application. For filling ATDB-LDV with processing tasks for LO ## Documentation (Confluence) * The plan: https://support.astron.nl/confluence/pages/viewpage.action?pageId=84215267 * https://support.astron.nl/confluence/display/SDCP/LDV+Documentation - + * deployment diagram of the current situation (in production) - https://drive.google.com/file/d/1_j9Fp505pZTxcmzAEdgftdPkoIFrKfAX/view?usp=sharing - + ## Collaborate * create `your branch` from `main` * add your functionality * test your functionality locally - - * merge `main` into `your branch` before creating a MR + + * merge `main` into `your branch` before creating a MR * merge with `main` * deploy in test, and test it * deploy in production, and test it - + ### Local update -After a collegue has made changes, then locally: +After a colleague has made changes, then locally: ``` > git pull > pip install -r requirements\dev.txt - > python manage.py migrate --settings=ldvspec.settings.dev + > python manage.py migrate --settings=ldvspec.settings.dev ``` - + ## Local Development Environment ### Postgres Database in Docker Run `docker-compose -f docker-compose-postgres-dev.yml up -d` with the following compose file to spin up a new Postgres container. See the `docker-compose-postgres-dev.yml` file in the `docker` directory. -(not that port 5433 is used. You can change that at will, but then also change it in `dev.py`) +(the shown configuration is based on the `dev.py` file. You can change, but then also change it in `dev.py`) ```yaml version: "3.7" services: - + ldv-spec-db: image: postgres:14 container_name: ldv-spec-postgres - expose: - - 5433 ports: - - 5433:5432 + - "5432:5432" environment: - POSTGRES_PASSWORD: "secret" - POSTGRES_USER: "postgres" - POSTGRES_DB: "ldv-spec-db" + POSTGRES_PASSWORD: "atdb123" + POSTGRES_USER: "atdb_admin" + POSTGRES_DB: "ldv-spec-db" volumes: - ldv-spec-db:/var/lib/postgresql/data restart: always - + + rabbitmq: + container_name: ldv-spec-rabbit + image: rabbitmq:3-management + ports: + - "5672:5672" + volumes: ldv-spec-db: + ``` ### Django Application @@ -64,10 +69,10 @@ volumes: * open the project in Pycharm * create a venv (File -> Settings -> Project -> Project Interpreter -> (click cog) -> add) * pip install -r requirements\dev.txt - - * check and/or change the database connection in settings/dev/py. In this example it connects to a database server on 'raspiastro', - you have to change that to the server where you run your Postgres Docker container (localhost?) - + + * check and/or change the database connection in settings/dev/py. In this example it connects to a database server on 'raspiastro', + you have to change that to the server where you run your Postgres Docker container (localhost?) + ```python DATABASES = { 'default': { @@ -83,10 +88,14 @@ DATABASES = { ``` > python manage.py migrate --settings=ldvspec.settings.dev - > python manage.py runserver --settings=ldvspec.settings.dev > python manage.py createsuperuser --settings=ldvspec.settings.dev + > python manage.py runserver --settings=ldvspec.settings.dev + + # In another terminal (for background tasks): + > celery -A ldvspec worker -l INFO + # Note: for windows you might need to add the `--pool=solo` parameter ``` - + ## Test Environment * https://sdc-dev.astron.nl/ldvspec/ @@ -101,13 +110,13 @@ This will change to a database on the sdc-db machine. ## admin user * admin:admin - + ## Build & Deploy -The CI/CD pipeline creates 2 Docker containers: +The CI/CD pipeline creates 2 Docker containers: * ldv-specification : The Django application * ldv-spec-postgres : The Postgres database - + The database can also be accessed externally: * host : sdc-dev.astron.nl / sdc.astron.nl * port : 12000 @@ -117,9 +126,9 @@ The CI/CD pipeline creates 2 Docker containers: Log into the ldv-specification container. (using the portainer GUI or with the docker exec) * https://sdc-dev.astron.nl/portainer/ * https://sdc.astron.nl/portainer/#/home - + ``` -> cd /src +> cd /src > python manage.py migrate --settings=ldvspec.settings.docker_sdc > python manage.py createsuperuser --settings=ldvspec.settings.docker_sdc ``` @@ -130,13 +139,13 @@ Log into the ldv-specification container. (using the portainer GUI or with the d See also: * https://support.astron.nl/confluence/display/SDCP/API+dynamics - + ### Add a work specification With this url you can specify work * http://127.0.0.1:8000/ldvspec/api/v1/workspecification/ -This is an example of structure of the LOFAR data in the `ldv-spec-db` database. +This is an example of structure of the LOFAR data in the `ldv-spec-db` database. Which also shows which fields can be used to filter on. * http://127.0.0.1:8000/ldvspec/api/v1/data/ @@ -199,17 +208,17 @@ Vary: Accept The workspecification endpoint now shows an overview of specified work, which is ready to be sent to ATDB-LDV: * http://127.0.0.1:8000/ldvspec/api/v1/workspecification/ - - + + ### Troubleshooting *Q: OperationalError at /ldvspec/api/v1/workspecification/* [WinError 10061] No connection could be made because the target machine actively refused it A: make sure that you have a connection to a celery broker (RabbitMQ) when running the application in development mode. Example on Windows machine: -``` +``` SET CELERY_BROKER_URL=amqp://guest@raspiastro:5672 python manage.py runserver --settings=ldvspec.settings.dev ``` ---- \ No newline at end of file +--- diff --git a/ldvspec/lofardata/migrations/0009_auto_20220906_0856.py b/ldvspec/lofardata/migrations/0009_auto_20220906_0856.py new file mode 100644 index 0000000000000000000000000000000000000000..0cdf2d5888e692f0acde71d512883aa47cbeba71 --- /dev/null +++ b/ldvspec/lofardata/migrations/0009_auto_20220906_0856.py @@ -0,0 +1,38 @@ +# Generated by Django 3.2 on 2022-09-06 08:56 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('lofardata', '0008_atdbprocessingsite_access_token'), + ] + + operations = [ + migrations.AddField( + model_name='workspecification', + name='batch_size', + field=models.IntegerField(default=0), + ), + migrations.AddField( + model_name='workspecification', + name='is_auto_submit', + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name='workspecification', + name='predecessor_task', + field=models.IntegerField(null=True), + ), + migrations.AddField( + model_name='workspecification', + name='purge_policy', + field=models.CharField(choices=[('yes', 'yes'), ('no', 'no'), ('do', 'do')], default='no', max_length=16), + ), + migrations.AddField( + model_name='workspecification', + name='submission_status', + field=models.CharField(choices=[('N', 'not submitted'), ('S', 'submitted'), ('D', 'defining'), ('E', 'error')], default='N', max_length=1), + ), + ] diff --git a/ldvspec/lofardata/models.py b/ldvspec/lofardata/models.py index d49b10f697778bed8a6682f63642cb5c730bf25a..15f513231024e81455c74f684f3d26c4ea6312be 100644 --- a/ldvspec/lofardata/models.py +++ b/ldvspec/lofardata/models.py @@ -1,8 +1,10 @@ -from django.db import models from urllib.parse import urlsplit + +from celery.result import AsyncResult from django.contrib.auth.models import User from django.contrib.postgres.fields import ArrayField -from celery.result import AsyncResult +from django.db import models +from django.utils.translation import gettext_lazy as _ class DataLocation(models.Model): @@ -35,7 +37,7 @@ class DataLocation(models.Model): class DataProduct(models.Model): obs_id = models.CharField(verbose_name="OBS_ID", max_length=15) - oid_source = models.CharField(verbose_name='OBS_ID_SOURCE', max_length=15) + oid_source = models.CharField(verbose_name="OBS_ID_SOURCE", max_length=15) dataproduct_source = models.CharField(max_length=20) dataproduct_type = models.CharField(max_length=50) project = models.CharField(max_length=50) @@ -46,28 +48,33 @@ class DataProduct(models.Model): additional_meta = models.JSONField() @staticmethod - def insert_dataproduct(obs_id, - oid_source, - dataproduct_source, - dataproduct_type, - project, - activity, - surl, - filesize, - additional_meta): + def insert_dataproduct( + obs_id, + oid_source, + dataproduct_source, + dataproduct_type, + project, + activity, + surl, + filesize, + additional_meta, + ): scheme, netloc, *_ = urlsplit(surl) - dp = DataProduct(obs_id=obs_id, - oid_source=oid_source, - dataproduct_source=dataproduct_source, - dataproduct_type=dataproduct_type, - project=project, - location=DataLocation.insert_location_from_string('://'.join((scheme, netloc))), - activity=activity, - filesize=filesize, - additional_meta=additional_meta, - surl=surl - ) + dp = DataProduct( + obs_id=obs_id, + oid_source=oid_source, + dataproduct_source=dataproduct_source, + dataproduct_type=dataproduct_type, + project=project, + location=DataLocation.insert_location_from_string( + "://".join((scheme, netloc)) + ), + activity=activity, + filesize=filesize, + additional_meta=additional_meta, + surl=surl, + ) dp.save() return dp @@ -84,24 +91,78 @@ class ATDBProcessingSite(models.Model): access_token = models.CharField(max_length=1000, null=True) +class SUBMISSION_STATUS(models.TextChoices): + """Status of Work Specifcation to ATDB""" + + NOT_SUBMITTED = "N", _("not submitted") + # Tasks are submitted, check ATDB for current status + SUBMITTED = "S", _("submitted") + # Task need to be submitted (can change predecessor etc...) + DEFINING = "D", _("defining") + # Check log, retry submission + ERROR = "E", _("error") + + +class PURGE_POLICY(models.TextChoices): + """ATDB Purge Policy""" + + # @Mancini, what are these? + YES = "yes", _("yes") + NO = "no", _("no") + DO = "do", _("do") # only purge, no workflow execution + + class WorkSpecification(models.Model): + """Work Specification""" + created_on = models.DateTimeField(auto_now_add=True) created_by = models.ForeignKey(User, on_delete=models.DO_NOTHING, null=True) filters = models.JSONField(null=True) + # Input data containing sizes/urls for submission to ATDB inputs = models.JSONField(null=True) + + # ATDB Workflow URL selected_workflow = models.CharField(max_length=500, null=True) + + # Task ID's that were created in ATDB related_tasks = ArrayField(models.IntegerField(), null=True) + predecessor_task = models.IntegerField(null=True) + + # The query for gathering files has been executed is_ready = models.BooleanField(default=False) is_defined = models.BooleanField(default=False) + + # Should automatically submit the task after defining + is_auto_submit = models.BooleanField(default=False) + async_task_result = models.CharField(max_length=100, null=True) - processing_site = models.ForeignKey(ATDBProcessingSite, null=True, on_delete=models.DO_NOTHING) + processing_site = models.ForeignKey( + ATDBProcessingSite, null=True, on_delete=models.DO_NOTHING + ) + purge_policy = models.CharField( + max_length=16, choices=PURGE_POLICY.choices, default=PURGE_POLICY.NO + ) - def save(self, force_insert=False, force_update=False, using=None, - update_fields=None): - super(WorkSpecification, self).save(force_insert=force_insert, force_update=force_update, using=using, - update_fields=update_fields) + # How many files per task. 0 is single task with all files + batch_size = models.IntegerField(default=0, null=False, blank=False) + submission_status = models.CharField( + max_length=1, + choices=SUBMISSION_STATUS.choices, + default=SUBMISSION_STATUS.NOT_SUBMITTED, + ) + + def save( + self, force_insert=False, force_update=False, using=None, update_fields=None + ): + super(WorkSpecification, self).save( + force_insert=force_insert, + force_update=force_update, + using=using, + update_fields=update_fields, + ) if self.async_task_result is None: from lofardata.tasks import define_work_specification + res: AsyncResult = define_work_specification.delay(self.pk) self.async_task_result = res.id - super(WorkSpecification, self).save(update_fields=['async_task_result']) + super(WorkSpecification, self).save(update_fields=["async_task_result"]) diff --git a/ldvspec/lofardata/tasks.py b/ldvspec/lofardata/tasks.py index 0aca20b69df114961403e2b11cad8d800ee3ddfd..148097203af20b8f310dc1e6699474ed10cf3dec 100644 --- a/ldvspec/lofardata/tasks.py +++ b/ldvspec/lofardata/tasks.py @@ -1,5 +1,38 @@ +import re +from typing import Any, List, Optional +from urllib.parse import urlparse + +import requests from ldvspec.celery import app -from lofardata.models import WorkSpecification, DataProduct +from requests.auth import AuthBase +from requests.exceptions import RequestException + +from lofardata.models import ( + SUBMISSION_STATUS, + ATDBProcessingSite, + DataProduct, + WorkSpecification, +) + + +class RequestNotOk(Exception): + pass + + +class WorkSpecificationNoSite(Exception): + pass + + +class SessionStore: + """requests.Session Singleton""" + + _session = None + + @classmethod + def get_session(cls) -> requests.Session: + if cls._session == None: + cls._session = requests.Session() + return cls._session @app.task @@ -8,7 +41,214 @@ def define_work_specification(workspecification_id): filters = specification.filters dataproducts = DataProduct.objects.filter(**filters) - inputs = {'surls': [dataproduct.surl for dataproduct in dataproducts]} + inputs = { + "surls": [ + {"surl": dataproduct.surl, "size": dataproduct.filesize} + for dataproduct in dataproducts + ] + } specification.inputs = inputs specification.is_ready = True specification.save() + + +def _parse_surl(surl: str) -> dict: + parsed = urlparse(surl) + host = parsed.hostname + path = parsed.path + pattern = r"^.*/projects\/(?P<project>.*_\d*)\/(?P<sas_id>\w\d*)\/" + data = re.match(pattern, path).groupdict() + data["location"] = host + return data + + +def _prepare_request_payload( + entries: List[dict], + filter_id: str, + workflow_url: str, + purge_policy: str = "no", + predecessor: int = None, +): + + # Parse a single surl for info: + # project, sas_id & location + # This does assume that a task consists of at most 1 project, sas_id and location! + parsed = _parse_surl(entries[0]["surl"]) + project_id = parsed["project"] + sas_id = parsed["sas_id"] + + inputs = [ + { + "size": e["size"], + "surl": e["surl"], + "type": "File", + "location": parsed["location"], + } + for e in entries + ] + + data = { + "project": project_id, + "sas_id": sas_id, + "task_type": "regular", + "filter": filter_id, + "purge_policy": purge_policy, + "new_status": "defining", + "new_workflow_uri": workflow_url, + "size_to_process": sum([e["size"] for e in entries]), + "inputs": inputs, + } + + if predecessor: + data["predecessor"] = predecessor + + return data + + +class TokenAuth(AuthBase): + """Basic Token Auth + + Adds a: `Authorization: Token <token>` header""" + + def __init__(self, token: str): + self._token = token + + def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest: + r.headers["Authorization"] = f"Token {self._token}" + return r + + +def split_entries_to_batches(entries: List[Any], batch_size: int) -> List[List[Any]]: + """Split the list of entries into batches of at most `batch_size` entries""" + n_entries = len(entries) + + # NOTE: Think about using file size instead of amount of files + batches: List[List[Any]] = [] + if batch_size == 0: + batches.append(entries) + elif batch_size > 0: + # Calculate amount of required batches + num_batches = n_entries // batch_size + num_batches += 1 if n_entries % batch_size else 0 + + for n in range(num_batches): + batches.append(entries[n * batch_size : (n + 1) * batch_size]) + + return batches + + +@app.task +def insert_task_into_atdb(workspecification_id: int): + """This creates the task in ATDB and set's it to defining""" + sess = SessionStore.get_session() + + work_spec: WorkSpecification = WorkSpecification.objects.get( + pk=workspecification_id + ) + + entries: List[dict] = work_spec.inputs["surls"] + + batches = split_entries_to_batches(entries, work_spec.batch_size) + + site: Optional[ATDBProcessingSite] = work_spec.processing_site + if site is None: + raise WorkSpecificationNoSite() + url = site.url + "tasks/" + + try: + for batch in batches: + payload = _prepare_request_payload( + entries=batch, + filter_id=f"ldv-spec:{work_spec.pk}", + workflow_url=work_spec.selected_workflow, + purge_policy=work_spec.purge_policy, + predecessor=work_spec.predecessor_task, + ) + + res = sess.post(url, json=payload, auth=TokenAuth(site.access_token)) + + if not res.ok: + raise RequestNotOk() + + # Store ATDB Task ID in related_tasks + if work_spec.related_tasks is None: + work_spec.related_tasks = [] + + data = res.json() + work_spec.related_tasks.append(data["id"]) + + # All went well + work_spec.submission_status = SUBMISSION_STATUS.DEFINING + if work_spec.is_auto_submit: + set_tasks_defined.delay(workspecification_id) + except (RequestException, RequestNotOk): + work_spec.submission_status = SUBMISSION_STATUS.ERROR + finally: + work_spec.save() + + +def update_related_tasks( + work_spec: WorkSpecification, + delete: bool, + data: Optional[dict], + on_success_status: SUBMISSION_STATUS, +): + sess = SessionStore.get_session() + + site: Optional[ATDBProcessingSite] = work_spec.processing_site + if site is None: + raise WorkSpecificationNoSite() + url = site.url + "tasks/" + + task_ids: List[int] = work_spec.related_tasks + try: + for task_id in task_ids: + if delete: + res = sess.delete( + url + str(task_id) + "/", auth=TokenAuth(site.access_token) + ) + else: + res = sess.put( + url + str(task_id) + "/", + json=data, + auth=TokenAuth(site.access_token), + ) + + if not res.ok: + raise RequestNotOk() + + # All went well + work_spec.submission_status = on_success_status + if delete: + work_spec.related_tasks = None + except (RequestException, RequestNotOk): + work_spec.submission_status = SUBMISSION_STATUS.ERROR + finally: + work_spec.save() + + +@app.task +def set_tasks_defined(workspecification_id: int): + """This sets tasks to defined so they can be picked up by ATDB services""" + + work_spec: WorkSpecification = WorkSpecification.objects.get( + pk=workspecification_id + ) + + if work_spec.submission_status != SUBMISSION_STATUS.DEFINING: + raise ValueError("Invalid WorkSpecification state") + + update_related_tasks( + work_spec, False, {"new_status": "defined"}, SUBMISSION_STATUS.SUBMITTED + ) + + +@app.task +def delete_tasks_from_atdb(workspecification_id: int): + """Removes related tasks from ATDB (for retrying)""" + + work_spec: WorkSpecification = WorkSpecification.objects.get( + pk=workspecification_id + ) + + update_related_tasks(work_spec, True, None, SUBMISSION_STATUS.NOT_SUBMITTED) diff --git a/ldvspec/lofardata/tests/test_atdb_delete.py b/ldvspec/lofardata/tests/test_atdb_delete.py new file mode 100644 index 0000000000000000000000000000000000000000..1814d565b317213d66c9e00926de9b5ac84d3cb8 --- /dev/null +++ b/ldvspec/lofardata/tests/test_atdb_delete.py @@ -0,0 +1,108 @@ +import re +from unittest import mock + +import rest_framework.test as rtest +from lofardata.models import SUBMISSION_STATUS, ATDBProcessingSite, WorkSpecification +from lofardata.tasks import SessionStore, delete_tasks_from_atdb +from lofardata.tests.util import MockResponse, mocked_delay + +task_detail_uri = re.compile(r"^.*/atdb/tasks/(?P<task_id>\d+)/") +# This is needed to initialize a session +SessionStore.get_session() + +# ATDB Post Requests +def mocked_requests_delete(*args, **kwargs): + + url = args[0] + + # Broken task + if "500" in url: + return MockResponse(None, 500, "Server Error.") + + if task_detail_uri.match(url): + # TODO: add real ATDB response + return MockResponse(None, 204) + + return MockResponse(None, 404) + + +@mock.patch( + "lofardata.tasks.SessionStore._session.delete", side_effect=mocked_requests_delete +) +class TestATDBDeleteRequest(rtest.APITestCase): + @mock.patch( + "lofardata.tasks.define_work_specification.delay", side_effect=mocked_delay + ) + def setUp(self, mock_delay: mock.MagicMock): + site: ATDBProcessingSite = ATDBProcessingSite.objects.create( + name="Dwingeloo", url="https://example.com/atdb/", access_token="DummyToken" + ) + + # Create Work Specification + ws: WorkSpecification = WorkSpecification.objects.create(processing_site=site) + mock_delay.assert_called_once_with(ws.pk) # not really needed here actually + ws.refresh_from_db() + + # Simulated interaction where the workflow was set + ws.selected_workflow = "https://example.com/atdb/workflow/1/" + ws.save() + self.work_spec: WorkSpecification = ws + + def test_delete(self, mocked_delete: mock.MagicMock): + self.work_spec.related_tasks = [1] # just a single task + self.work_spec.save() + + delete_tasks_from_atdb(self.work_spec.pk) + + mocked_delete.assert_called_once_with( + f"https://example.com/atdb/tasks/{self.work_spec.related_tasks[0]}/", + auth=mock.ANY, + ) + + self.work_spec.refresh_from_db() + self.assertEqual( + self.work_spec.submission_status, + SUBMISSION_STATUS.NOT_SUBMITTED, + "Status should be not submitted after delete", + ) + self.assertIsNone( + self.work_spec.related_tasks, + "Related tasks should be deleted after delete request", + ) + + def test_delete_of_batches(self, mocked_delete: mock.MagicMock): + self.work_spec.related_tasks = [1, 2, 3, 4] + self.work_spec.save() + + delete_tasks_from_atdb(self.work_spec.pk) + + call_list = [ + mock.call(f"https://example.com/atdb/tasks/{task_id}/", auth=mock.ANY) + for task_id in self.work_spec.related_tasks + ] + + mocked_delete.assert_has_calls(call_list, any_order=True) + + self.work_spec.refresh_from_db() + self.assertEqual( + self.work_spec.submission_status, + SUBMISSION_STATUS.NOT_SUBMITTED, + "Status should be not submitted after delete", + ) + self.assertIsNone( + self.work_spec.related_tasks, + "All related tasks should be deleted after delete request", + ) + + def test_delete_of_batches_with_failing(self, _: mock.MagicMock): + self.work_spec.related_tasks = [1, 2, 500, 4] + self.work_spec.save() + + delete_tasks_from_atdb(self.work_spec.pk) + + self.work_spec.refresh_from_db() + self.assertEqual( + self.work_spec.submission_status, + SUBMISSION_STATUS.ERROR, + "Status should be error after delete failed", + ) diff --git a/ldvspec/lofardata/tests/test_atdb_insert.py b/ldvspec/lofardata/tests/test_atdb_insert.py new file mode 100644 index 0000000000000000000000000000000000000000..82786029a4f722de69b12fc902ef82bf846c0557 --- /dev/null +++ b/ldvspec/lofardata/tests/test_atdb_insert.py @@ -0,0 +1,289 @@ +from unittest import mock + +import rest_framework.test as rtest +from django.contrib.auth import get_user_model +from django.urls import reverse +from lofardata.models import SUBMISSION_STATUS, ATDBProcessingSite, WorkSpecification +from lofardata.tasks import SessionStore, insert_task_into_atdb +from lofardata.tests.util import MockResponse, mocked_delay +from rest_framework import status + +# This is needed to initialize a session +SessionStore.get_session() + + +# ATDB Get Requests +def mocked_requests_get(*args, **kwargs): + if args[0] == "http://someurl.com/test.json": + return MockResponse({"key1": "value1"}, 200) + elif args[0] == "http://someotherurl.com/anothertest.json": + return MockResponse({"key2": "value2"}, 200) + + return MockResponse(None, 404) + + +# ATDB Post Requests +def mocked_requests_post(*args, **kwargs): + + if kwargs["json"]["project"] == "kaput_42": + return MockResponse({"detail": "not ok"}, 500) + + if args[0] == "https://example.com/atdb/tasks/": + return MockResponse({"id": 42}, 201) + + return MockResponse(None, 404) + + +class TestATDBInsertRequest(rtest.APITestCase): + def setUp(self): + self.user = get_user_model().objects.create_superuser("admin") + self.client.force_authenticate(self.user) + + def test_ws_submit_request(self): + """Test submitting a workspecification to ATDB""" + with mock.patch("lofardata.tasks.insert_task_into_atdb.delay") as mocked_delay: + data = {} + res = self.client.post( + reverse("workspecification-submit", kwargs={"pk": 1}), data=data + ) + + mocked_delay.assert_called_once_with("1") + + self.assertEqual( + res.status_code, + status.HTTP_202_ACCEPTED, + "Submitting job failed:\n" + str(res.content), + ) + + +@mock.patch( + "lofardata.tasks.SessionStore._session.post", side_effect=mocked_requests_post +) +class TestATDBInsertTask(rtest.APITestCase): + @mock.patch( + "lofardata.tasks.define_work_specification.delay", side_effect=mocked_delay + ) + def setUp(self, mock_delay: mock.MagicMock): + site: ATDBProcessingSite = ATDBProcessingSite.objects.create( + name="Dwingeloo", url="https://example.com/atdb/", access_token="DummyToken" + ) + + # Create Work Specification + ws: WorkSpecification = WorkSpecification.objects.create(processing_site=site) + mock_delay.assert_called_once_with(ws.pk) # not really needed here actually + ws.refresh_from_db() + + # Simulated interaction where the workflow was set + ws.selected_workflow = "https://example.com/atdb/workflow/1/" + ws.save() + self.work_spec: WorkSpecification = ws + + def test_insert_task_into_atdb( + self, + mock_post: mock.MagicMock, + ): + + self.work_spec.inputs = { + "surls": [ + { + "size": 7395952640, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", + } + ] + } + self.work_spec.save() + + EXAMPLE_TASK = { + "project": "lc0_012", + "sas_id": "161482", + "task_type": "regular", + "filter": f"ldv-spec:{self.work_spec.pk}", + "purge_policy": "no", + "new_status": "defining", + "new_workflow_uri": "https://example.com/atdb/workflow/1/", + "size_to_process": 7395952640, + "inputs": [ + { + "size": 7395952640, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", + "type": "File", + "location": "srm.grid.sara.nl", + } + ], + } + + insert_task_into_atdb(workspecification_id=self.work_spec.pk) + + mock_post.assert_called_once_with( + "https://example.com/atdb/tasks/", json=EXAMPLE_TASK, auth=mock.ANY + ) + + self.work_spec.refresh_from_db() + self.assertEquals( + self.work_spec.submission_status, + SUBMISSION_STATUS.DEFINING, + "Work Spec should be in defining state", + ) + self.assertEqual(len(self.work_spec.related_tasks), 1) # expect 1 task + + def test_batching_of_files(self, mock_post: mock.MagicMock): + """Tests batching of multiple files""" + + self.work_spec.inputs = { + "surls": [ + { + "size": 7395952640, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", + "type": "File", + "location": "srm.grid.sara.nl", + } + ] + * 10 # ten entries + } + + self.work_spec.batch_size = 3 # expect 4 batches + self.work_spec.save() + + insert_task_into_atdb(workspecification_id=self.work_spec.pk) + + self.assertEqual(mock_post.call_count, 4, "Invalid amount of calls to ATDB") + + self.work_spec.refresh_from_db() + self.assertEquals( + self.work_spec.submission_status, + SUBMISSION_STATUS.DEFINING, + "Work Spec should be in defining state", + ) + self.assertEqual(len(self.work_spec.related_tasks), 4) # expect 4 tasks + + def test_batching_of_files_exact_fit(self, mock_post: mock.MagicMock): + """Tests batching of multiple files which fit the bath_size exactly""" + + self.work_spec.inputs = { + "surls": [ + { + "size": 7395952640, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", + "type": "File", + "location": "srm.grid.sara.nl", + } + ] + * 9 + } + + self.work_spec.batch_size = 3 # expect 3 batches + self.work_spec.save() + + insert_task_into_atdb(workspecification_id=self.work_spec.pk) + + self.assertEqual(mock_post.call_count, 3, "Invalid amount of calls to ATDB") + + self.work_spec.refresh_from_db() + self.assertEquals( + self.work_spec.submission_status, + SUBMISSION_STATUS.DEFINING, + "Work Spec should be in defining state", + ) + self.assertEqual(len(self.work_spec.related_tasks), 3) # expect 3 tasks + + def test_atdb_kaput(self, mock_post: mock.MagicMock): + """Test handling of ATDB Failure""" + self.work_spec.inputs = { + "surls": [ + { + "size": 7395952640, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/kaput_42/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", + } + ] + } + self.work_spec.save() + + insert_task_into_atdb(workspecification_id=self.work_spec.pk) + + self.work_spec.refresh_from_db() + self.assertEquals( + self.work_spec.submission_status, + SUBMISSION_STATUS.ERROR, + "Work Spec should be in error state", + ) + + def test_atdb_kaput_batch(self, mock_post: mock.MagicMock): + """Test handling of ATDB Failure""" + self.work_spec.inputs = { + "surls": [ + { + "size": 7395952640, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", + }, + { + "size": 7395952640, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/kaput_42/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", + }, + { + "size": 7395952640, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", + }, + ] + } + self.work_spec.batch_size = 1 # use a small batch size + self.work_spec.save() + + insert_task_into_atdb(workspecification_id=self.work_spec.pk) + + self.work_spec.refresh_from_db() + self.assertEquals( + self.work_spec.submission_status, + SUBMISSION_STATUS.ERROR, + "Work Spec should be in error state", + ) + + def test_auto_submit_after_success(self, mock_post): + """Test whether submission get's triggered""" + + self.work_spec.inputs = { + "surls": [ + { + "size": 7395952640, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", + "type": "File", + "location": "srm.grid.sara.nl", + } + ] + * 10 # ten entries + } + + self.work_spec.batch_size = 3 # expect 4 batches + self.work_spec.is_auto_submit = True + self.work_spec.save() + + with mock.patch("lofardata.tasks.set_tasks_defined.delay") as submit_delay: + insert_task_into_atdb(workspecification_id=self.work_spec.pk) + submit_delay.assert_called_once_with(self.work_spec.pk) + + def test_auto_submit_after_failure(self, mock_post): + """Test whether submission is not done with a failure in a batch""" + + self.work_spec.inputs = { + "surls": [ + { + "size": 7395952640, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", + }, + { + "size": 7395952640, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/kaput_42/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", + }, + { + "size": 7395952640, + "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", + }, + ] + } + + self.work_spec.batch_size = 1 + self.work_spec.is_auto_submit = True + self.work_spec.save() + + with mock.patch("lofardata.tasks.set_tasks_defined.delay") as submit_delay: + insert_task_into_atdb(workspecification_id=self.work_spec.pk) + submit_delay.assert_not_called() diff --git a/ldvspec/lofardata/tests/test_atdb_status_change.py b/ldvspec/lofardata/tests/test_atdb_status_change.py new file mode 100644 index 0000000000000000000000000000000000000000..198b4197dde05d82720aa68e6ec17a2d1abdcd25 --- /dev/null +++ b/ldvspec/lofardata/tests/test_atdb_status_change.py @@ -0,0 +1,111 @@ +import re +from unittest import mock + +import rest_framework.test as rtest +from lofardata.models import SUBMISSION_STATUS, ATDBProcessingSite, WorkSpecification +from lofardata.tasks import SessionStore, set_tasks_defined +from lofardata.tests.util import MockResponse, mocked_delay + +task_detail_uri = re.compile(r"^.*/atdb/tasks/(?P<task_id>\d+)/") +# This is needed to initialize a session +SessionStore.get_session() + +# ATDB Post Requests +def mocked_requests_put(*args, **kwargs): + + url = args[0] + + # Broken task + if "500" in url: + return MockResponse(None, 500, "Server Error.") + + if task_detail_uri.match(url): + # TODO: add real ATDB response + return MockResponse({"id": 42}, 200) + + return MockResponse(None, 404) + + +@mock.patch( + "lofardata.tasks.SessionStore._session.put", side_effect=mocked_requests_put +) +class TestATDBTaskStatusChange(rtest.APITestCase): + @mock.patch( + "lofardata.tasks.define_work_specification.delay", side_effect=mocked_delay + ) + def setUp(self, mock_delay: mock.MagicMock): + site: ATDBProcessingSite = ATDBProcessingSite.objects.create( + name="Dwingeloo", url="https://example.com/atdb/", access_token="DummyToken" + ) + + # Create Work Specification + ws: WorkSpecification = WorkSpecification.objects.create(processing_site=site) + mock_delay.assert_called_once_with(ws.pk) # not really needed here actually + ws.refresh_from_db() + + # Simulated interaction where the workflow was set + ws.selected_workflow = "https://example.com/atdb/workflow/1/" + ws.save() + self.work_spec: WorkSpecification = ws + + def test_request_submission(self, mocked_put: mock.MagicMock): + + self.work_spec.related_tasks = [1] # just a single task + self.work_spec.submission_status = SUBMISSION_STATUS.DEFINING + self.work_spec.save() + + expected_payload = {"new_status": "defined"} + + set_tasks_defined(self.work_spec.pk) + + mocked_put.assert_called_once_with( + f"https://example.com/atdb/tasks/{self.work_spec.related_tasks[0]}/", + json=expected_payload, + auth=mock.ANY, + ) + + self.work_spec.refresh_from_db() + self.assertAlmostEqual( + self.work_spec.submission_status, SUBMISSION_STATUS.SUBMITTED + ) + + def test_request_submission_of_batches(self, mocked_put: mock.MagicMock): + + self.work_spec.related_tasks = [1, 2, 3] + self.work_spec.submission_status = SUBMISSION_STATUS.DEFINING + self.work_spec.save() + + expected_payload = {"new_status": "defined"} + + set_tasks_defined(self.work_spec.pk) + + call_list = [ + mock.call( + f"https://example.com/atdb/tasks/{task_id}/", + json=expected_payload, + auth=mock.ANY, + ) + for task_id in self.work_spec.related_tasks + ] + + mocked_put.assert_has_calls(call_list, any_order=True) + + self.work_spec.refresh_from_db() + self.assertAlmostEqual( + self.work_spec.submission_status, SUBMISSION_STATUS.SUBMITTED + ) + + def test_request_submission_of_batches_with_failing_task(self, _: mock.MagicMock): + + self.work_spec.related_tasks = [1, 2, 500, 4] + self.work_spec.submission_status = SUBMISSION_STATUS.DEFINING + self.work_spec.save() + + set_tasks_defined(self.work_spec.pk) + + self.work_spec.refresh_from_db() + self.assertEquals( + self.work_spec.submission_status, + SUBMISSION_STATUS.ERROR, + "Status should be error after put failure", + ) diff --git a/ldvspec/lofardata/tests/test_util_funcs.py b/ldvspec/lofardata/tests/test_util_funcs.py new file mode 100644 index 0000000000000000000000000000000000000000..300db5501e26386839423b9a0815a59877615d29 --- /dev/null +++ b/ldvspec/lofardata/tests/test_util_funcs.py @@ -0,0 +1,27 @@ +import unittest + +from lofardata.tasks import split_entries_to_batches + + +class SplitEntries(unittest.TestCase): + def test_no_splitting(self): + res = split_entries_to_batches([*range(10)], 0) + self.assertListEqual(res, [[*range(10)]]) + + def test_splitting_empty_array(self): + res = split_entries_to_batches([], 2) + self.assertListEqual(res, []) + + def test_splitting_bs1(self): + res = split_entries_to_batches([*range(10)], 1) + self.assertEqual(len(res), 10) + # TODO: check contents + + def test_splitting_exact_match(self): + res = split_entries_to_batches([*range(9)], 3) + self.assertEqual(len(res), 3) + + def test_splitting_with_left_over(self): + res = split_entries_to_batches([*range(10)], 3) + self.assertEqual(len(res), 4) + self.assertEqual(len(res[-1]), 1) diff --git a/ldvspec/lofardata/tests/test_workrequest.py b/ldvspec/lofardata/tests/test_workrequest.py index 736b91cff7b3fb6ac538ad2c392525dc0d983a2b..8f7157c9a7ac7186598a0131e34cfd047e626671 100644 --- a/ldvspec/lofardata/tests/test_workrequest.py +++ b/ldvspec/lofardata/tests/test_workrequest.py @@ -1,15 +1,18 @@ -import django.test as dtest -import rest_framework.test as rtest import unittest.mock -from django.contrib.auth.models import User -import rest_framework.status as response_status -from lofardata.models import WorkSpecification, DataProduct + +import django.test as dtest import lofardata.tasks as tasks +import rest_framework.status as response_status +import rest_framework.test as rtest +from django.contrib.auth import get_user_model +from lofardata.models import DataProduct, WorkSpecification +# NOTE: Instead of specifying the exact addresses, you could use `reverse` from Django +# to create the addresses for you class TestWorkSpecificationRequest(rtest.APITestCase): def setUp(self): - self.user = User.objects.create_superuser('admin') + self.user = get_user_model().objects.create_superuser('admin') self.client.force_authenticate(self.user) @unittest.mock.patch('lofardata.tasks.define_work_specification.delay') @@ -33,7 +36,7 @@ class TestWorkSpecificationRequest(rtest.APITestCase): tasks.define_work_specification(inserted_work_specification['id']) work_spec_object = WorkSpecification.objects.get(pk=inserted_work_specification['id']) self.assertTrue(work_spec_object.is_ready) - self.assertEqual({'surls': ['srm://lofarlta/mynice_file.tar.gz']}, work_spec_object.inputs) + self.assertEqual({'surls': [{"size": 123456, "surl": 'srm://lofarlta/mynice_file.tar.gz'}]}, work_spec_object.inputs) @unittest.mock.patch('lofardata.tasks.define_work_specification.delay') def test_insert_work_request_nested_fields(self, mocked_task): @@ -61,5 +64,4 @@ class TestWorkSpecificationRequest(rtest.APITestCase): tasks.define_work_specification(inserted_work_specification['id']) work_spec_object = WorkSpecification.objects.get(pk=inserted_work_specification['id']) self.assertTrue(work_spec_object.is_ready) - self.assertEqual({'surls': ['srm://lofarlta/mynice_file_with_rhythm.tar.gz']}, work_spec_object.inputs) - + self.assertEqual({'surls': [{"size": 123456, "surl":'srm://lofarlta/mynice_file_with_rhythm.tar.gz' }]}, work_spec_object.inputs) diff --git a/ldvspec/lofardata/tests/util.py b/ldvspec/lofardata/tests/util.py new file mode 100644 index 0000000000000000000000000000000000000000..5320b82289a4d3f03a53188ed117dc68cdec7ed3 --- /dev/null +++ b/ldvspec/lofardata/tests/util.py @@ -0,0 +1,80 @@ +"""Utility functions for testing""" + +from typing import Optional + +from lofardata.models import WorkSpecification, AsyncResult + +def mocked_delay(*args) -> AsyncResult: + """Mock the delay used in saving a WorkSpecification""" + ws: WorkSpecification = WorkSpecification.objects.get(pk=args[0]) + ws.is_ready = True + ws.async_task_result = 42 # prevents infinite recursion in task + ws.save() + + return AsyncResult(id="Test", backend=None) + + +class MockResponse: + """Mocked requests.Response class + + Args: + json_data (dict): dictionary containing the response + status_code (int): HTTP Status code + text (str): Optional str representation, useful for HTTP 50X errors + + Implemented methods/properties + - json() -> dict + - status_code -> int + - ok -> boolean (set automatically; status_code < 400) + - text -> str (optional, specifically for errors) + + Examples: + + - `MockResponse({"status": "approved"}, 200)`= + - `MockResponse({"detail": "authentication details not provided"}, 401)` + - `MockResponse({}, 500, "Server Error")` + + Usage with `mock.patch`: + ``` + def mocked_get(*args, **kwargs): + url = args[0] + + if url == "https://example.com/: + return MockResponse({"ok": True}, 200) + + return MockResponse(None, 404) + + def mocked_post(*args, **kwargs): + url = args[0] + + # Can check the json body + data = kwargs.get("json") + + if "foo" in data: + return MockResponse({"foo": "bar"}, 200) + elif "tea" in data: + return MockResponse(None, 400, "Coffee") + + return MockResponse(None, 404) + + @mock.patch("requests.post", side_effect=mocked_post) + @mock.patch("requests.get", side_effect=mocked_get) + def test_something(self, mocked_get, mocked_post): + # Can check the mocked_* for calls and arguments + # tip: mock.ANY can be used to allow all arguments + pass + + ``` + """ + + def __init__( + self, json_data: Optional[dict], status_code: int, text: Optional[str] = None + ): + self.json_data = json_data + self.status_code = status_code + self.ok = status_code < 400 + self.text = text + + def json(self) -> Optional[dict]: + """Dictionary of response data""" + return self.json_data diff --git a/ldvspec/lofardata/urls.py b/ldvspec/lofardata/urls.py index e322ebe7a642f10b5c2e7ba11fa5f2e8be5f03e0..2d4d4b2dac7b429ccdb35215d54dfdd9a1dd17df 100644 --- a/ldvspec/lofardata/urls.py +++ b/ldvspec/lofardata/urls.py @@ -1,11 +1,18 @@ -from django.urls import include, path from django.contrib.auth import views as auth_views +from django.urls import include, path +from rest_framework.routers import DefaultRouter from rest_framework.schemas import get_schema_view from . import views +router = DefaultRouter() +router.register(r'api/v1/workspecification', views.WorkSpecificationViewset, basename="workspecification") + urlpatterns = [ + # Workaround for injecting the urls from the ModelViewSet, which requires a "Router" + *router.urls, + # Perhaps both accounts and login could be moved to the ldv-spec main urls file? # authentication path('accounts/', include('django.contrib.auth.urls')), @@ -17,8 +24,6 @@ urlpatterns = [ path('api/v1/insert_dataproduct/', views.InsertMultiDataproductView.as_view(), name='dataproduct-insert'), path('api/v1/data-location/', views.DataLocationView.as_view(), name='datalocation'), path('api/v1/data/<int:pk>/', views.DataProductDetailsView.as_view(), name='dataproduct-detail-view-api'), - path('api/v1/workspecification/', views.InsertWorkSpecification.as_view(), - name='workspecification'), path('api/v1/uws/', include('uws.urls')), path('api/v1/openapi/', get_schema_view( diff --git a/ldvspec/lofardata/views.py b/ldvspec/lofardata/views.py index e3939b73fee4c795c888dd37d229f3533f9f6792..ad50d95a09e351722a2849f64e0b2da67e7cd281 100644 --- a/ldvspec/lofardata/views.py +++ b/ldvspec/lofardata/views.py @@ -1,15 +1,26 @@ -from django.shortcuts import render from django.conf import settings from django.contrib.auth.models import User - -from rest_framework import generics +from django.shortcuts import render from django_filters import rest_framework as filters +from rest_framework import generics, status, viewsets +from rest_framework.decorators import action +from rest_framework.response import Response from rest_framework.schemas.openapi import AutoSchema -from .models import DataProduct, DataProductFilter, DataLocation, WorkSpecification, ATDBProcessingSite -from .serializers import DataProductSerializer, \ - DataProductFlatSerializer, DataLocationSerializer, \ - WorkSpecificationSerializer +from .models import ( + ATDBProcessingSite, + DataLocation, + DataProduct, + DataProductFilter, + WorkSpecification, +) +from .serializers import ( + DataLocationSerializer, + DataProductFlatSerializer, + DataProductSerializer, + WorkSpecificationSerializer, +) +from .tasks import insert_task_into_atdb class DynamicFilterSet(filters.FilterSet): @@ -22,7 +33,7 @@ class DynamicFilterSet(filters.FilterSet): def _load_filters(self): if self.Meta.filter_class is None: - raise Exception('Define filter_class meta attribute') + raise Exception("Define filter_class meta attribute") for item in self.Meta.filter_class.objects.all(): field_obj = self.Meta.model._meta.get_field(item.field) filter_class, *_ = self.filter_for_lookup(field_obj, item.lookup_type) @@ -35,15 +46,16 @@ class DataProductFilterSet(DynamicFilterSet): model = DataProduct filter_class = DataProductFilter fields = { - 'obs_id': ['exact', 'icontains'], + "obs_id": ["exact", "icontains"], } # ---------- GUI Views ----------- + def index(request): - atdb_hosts = ATDBProcessingSite.objects.values('name', 'url') - return render(request, "lofardata/index.html", {'atdb_hosts': atdb_hosts}) + atdb_hosts = ATDBProcessingSite.objects.values("name", "url") + return render(request, "lofardata/index.html", {"atdb_hosts": atdb_hosts}) # ---------- REST API views ---------- @@ -51,7 +63,7 @@ class DataProductView(generics.ListCreateAPIView): model = DataProduct serializer_class = DataProductSerializer - queryset = DataProduct.objects.all().order_by('obs_id') + queryset = DataProduct.objects.all().order_by("obs_id") # using the Django Filter Backend - https://django-filter.readthedocs.io/en/latest/index.html filter_backends = (filters.DjangoFilterBackend,) @@ -67,30 +79,31 @@ class DataProductDetailsView(generics.RetrieveUpdateDestroyAPIView): class DataLocationView(generics.ListCreateAPIView): model = DataLocation serializer_class = DataLocationSerializer - queryset = DataLocation.objects.all().order_by('name') + queryset = DataLocation.objects.all().order_by("name") class InsertWorkSpecificationSchema(AutoSchema): def get_operation_id_base(self, path, method, action): - return 'createDataProductMulti' + return "createDataProductMulti" class InsertMultiDataproductView(generics.CreateAPIView): """ Add single DataProduct """ + queryset = DataProduct.objects.all() serializer_class = DataProductFlatSerializer schema = InsertWorkSpecificationSchema() def get_serializer(self, *args, **kwargs): - """ if an array is passed, set serializer to many """ - if isinstance(kwargs.get('data', {}), list): - kwargs['many'] = True + """if an array is passed, set serializer to many""" + if isinstance(kwargs.get("data", {}), list): + kwargs["many"] = True return super().get_serializer(*args, **kwargs) -class InsertWorkSpecification(generics.CreateAPIView, generics.ListCreateAPIView): +class WorkSpecificationViewset(viewsets.ModelViewSet): queryset = WorkSpecification.objects.all() serializer_class = WorkSpecificationSerializer @@ -100,3 +113,14 @@ class InsertWorkSpecification(generics.CreateAPIView, generics.ListCreateAPIView return self.queryset.filter(created_by=current_user.id) else: return self.queryset + + @action(detail=True, methods=["POST"]) + def submit(self, request, pk=None) -> Response: + + # TODO: check that there are some matches in the request? + + # TODO: how to specify the filter? + + res = insert_task_into_atdb.delay(pk) + + return Response({"detail": "accepted"}, status=status.HTTP_202_ACCEPTED)