diff --git a/.gitignore b/.gitignore index be432928e6cdac49de4f8f3055bece17ef8139b6..ff67b07ffac82f9ea55c74aa1b55d3a243798fd9 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,4 @@ integration/local/test/output venv log -output +output \ No newline at end of file diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f1e354a5f361bfd00ad7cc3c3ff28dbfe557db2d..83f898bb27d7b27faa3c5a0d51ad3dfa8f75b92f 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -50,8 +50,8 @@ docker-build: fi - echo "Build using $CI_REGISTRY_IMAGE/base:$HASH" # Try pulling the existing image for layer reuse; || true to ignore if it does not exist - - docker pull $CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG || true - - docker build --cache-from $CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG --build-arg BASE_IMAGE="$CI_REGISTRY_IMAGE/base:$HASH" --pull -t "$CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG" ldvspec + #- docker build --cache-from $CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG --build-arg BASE_IMAGE="$CI_REGISTRY_IMAGE/base:$HASH" --pull -t "$CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG" ldvspec + - docker build --pull -t "$CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG" ldvspec - docker push "$CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG" integration-test: diff --git a/README.md b/README.md index b47d5a895d1c431ae8ab31f338d5eba6fb2d4a5b..615dda6648d34e7b0020c23b051f02d2b62e98e8 100644 --- a/README.md +++ b/README.md @@ -168,8 +168,6 @@ Vary: Accept "inputs": null, "selected_workflow": "imaging_compress_pipeline_v02", "related_tasks": null, - "is_ready": false, - "is_defined": false, "async_task_result": "99622e7b-71f0-4f05-826d-23c13846642d", "created_by": 1, "processing_site": null diff --git a/integration/integration-test.robot b/integration/integration-test.robot index 70253d6edc1555167d3880ddf224e1b0181aea08..5cca3e57e6a76e17dd51fd88fb196146d5997a67 100644 --- a/integration/integration-test.robot +++ b/integration/integration-test.robot @@ -2,7 +2,6 @@ Library OperatingSystem Library SeleniumLibrary Resource resource/keywords.robot -Resource resource/variables.robot Suite Setup Setup ATDB and LDVSPEC # Keyword used for suite setup, local to this test suite @@ -26,6 +25,7 @@ Setup ATDB and LDVSPEC Test create and submit a work specification [Tags] Workspecification + Login to LDV Spec Admin Interface ${id}= Add Work Specification 5 Submit Work Specification ${id} diff --git a/integration/local/docker-compose-integration-local.yml b/integration/local/docker-compose-integration-local.yml index 95941b80e7ea4da7a59181cb58a1bbe88fc7f29e..bae92997a176f155fdff670c6aeb538db0fef9d9 100644 --- a/integration/local/docker-compose-integration-local.yml +++ b/integration/local/docker-compose-integration-local.yml @@ -25,7 +25,7 @@ services: restart: always atdb-backend: - container_name: atdb.backend + container_name: atdb.backend #Since there is URL validation in ATDB, it needs to contain a dot image: git.astron.nl:5000/astron-sdc/atdb-ldv:latest environment: DATABASE_HOST: atdb-db @@ -68,7 +68,7 @@ services: CELERY_BROKER_URL: amqp://guest@rabbitmq:5672 volumes: - ../scripts:/scripts/ - command: ["bash", "/scripts/ldv-spec-start.sh"] + command: [ "bash", "/scripts/ldv-spec-start.sh" ] depends_on: - spec-db - atdb-backend diff --git a/integration/local/test/keywords-tests.robot b/integration/local/test/keywords-tests.robot index e62124edc5086eeb26b25e255dc719b22b682b84..e34b0639219d3105180200e38507e1c940ea06ba 100644 --- a/integration/local/test/keywords-tests.robot +++ b/integration/local/test/keywords-tests.robot @@ -62,6 +62,15 @@ Submit Work Specification Submit Work Specification 1 [Teardown] Close Browser +Check if Task Exists in ATDB + [Tags] Keyword + [Documentation] ***Description*** + ... Prerequisite: submission of work specification with ID 1 succeeded + Login To ATDB Admin Interface + Check if Task Exists in ATDB ldv-spec:1 + [Teardown] Close Browser + + Add Group [Tags] Keyword Login to LDV Spec Admin Interface diff --git a/integration/resource/keywords.robot b/integration/resource/keywords.robot index 95bf40a82702dd9091029d25bb95859d119fcd43..c8a40bfd175d26270cb990649c5f03357b387cac 100644 --- a/integration/resource/keywords.robot +++ b/integration/resource/keywords.robot @@ -35,8 +35,8 @@ Submit Work Specification Go To ${LDVSPEC} Click Button //*[@data-test-id="submit-to-atdb-${specification_id}"] Go To ${LDVSPEC}specification/${specification_id}/ - Element Should Not Contain //*[@data-test-id="submission-status"] error - Wait Until Element Contains //*[@data-test-id="submission-status"] defining timeout=30s + Element Should Not Contain //*[@data-test-id="submission-status"] Error + Wait Until Element Contains //*[@data-test-id="submission-status"] Submitted timeout=30s Add Work Specification [Arguments] ${obs_id} diff --git a/ldvspec/Dockerfile b/ldvspec/Dockerfile index 0ca6b97c41b0939578db13113d38dc588c68dd17..241a4c9f3d035bca73976147e61cc3568a9ddfa7 100644 --- a/ldvspec/Dockerfile +++ b/ldvspec/Dockerfile @@ -1,6 +1,8 @@ # Build arg to select the correct base image -ARG BASE_IMAGE=git.astron.nl:5000/astron-sdc/ldv-specification/base:latest -FROM ${BASE_IMAGE} +FROM python:3.10 +ENV PYTHONUNBUFFERED 1 +# Main dependencies +RUN apt-get update && apt-get install -y postgresql-client build-essential # Make sure we are in the source directory WORKDIR /src diff --git a/ldvspec/docker/docker-compose-local.yml b/ldvspec/docker/docker-compose-local.yml index be9799910198e29d9b95b24b63a7183a2dd869a9..910a1dd5f9a399a7df145f356470463a5ced7722 100644 --- a/ldvspec/docker/docker-compose-local.yml +++ b/ldvspec/docker/docker-compose-local.yml @@ -42,7 +42,8 @@ services: ldv-specification-background: container_name: ldv-specification-background - image: git.astron.nl:5000/astron-sdc/ldv-specification:${LDVSPEC_VERSION:-latest} +# image: git.astron.nl:5000/astron-sdc/ldv-specification:${LDVSPEC_VERSION:-latest} + image: ldv-specification-background build: context: .. dockerfile: ./Dockerfile diff --git a/ldvspec/ldvspec/settings/local.py b/ldvspec/ldvspec/settings/local.py index aae226e0bd15b164892939e1f7934e637544ee3c..5965838089bb0355c75087e12a2c283acab51fd6 100644 --- a/ldvspec/ldvspec/settings/local.py +++ b/ldvspec/ldvspec/settings/local.py @@ -24,4 +24,6 @@ CACHES = { 'BACKEND': 'django.core.cache.backends.memcached.PyMemcacheCache', 'LOCATION': f'{os.environ.get("CACHE_HOST_SERVER", "localhost")}:{os.environ.get("CACHE_HOST_PORT", "11211")}', } -} \ No newline at end of file +} + +SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'http') \ No newline at end of file diff --git a/ldvspec/lofardata/custom_exceptions.py b/ldvspec/lofardata/custom_exceptions.py deleted file mode 100644 index 5cc0030075dd6d071b4ea4662477ae9b5282d4b3..0000000000000000000000000000000000000000 --- a/ldvspec/lofardata/custom_exceptions.py +++ /dev/null @@ -1,19 +0,0 @@ -from rest_framework.exceptions import APIException - - -class RequestNotOk(APIException): - status_code = 422 - default_detail = 'You are trying to submit a work specification with invalid data. Have you messed around in the data? If not, please contact team rainbow.' - default_code = 'Unprocessable Entity: invalid data' - - -class WorkSpecificationNoSite(APIException): - status_code = 422 - default_detail = 'You are trying to submit a work specification without a target processing site. We cannot send it into outer space but if you did not want that in the first place, please contact team rainbow.' - default_code = 'Unprocessable Entity: no site' - - -class InvalidPredecessor(APIException): - status_code = 422 - default_detail = 'You are trying to submit a work specification where the predecessor is faulty or no longer exists. Blame team rainbow for manually hacking around in the database.' - default_code = 'Unprocessable Entity: invalid predecessor' diff --git a/ldvspec/lofardata/filters.py b/ldvspec/lofardata/filters.py index b690a13c3c54d447d07691316687288c751fcb44..f453d610b277bccf40a2c9157e83484880cffe66 100644 --- a/ldvspec/lofardata/filters.py +++ b/ldvspec/lofardata/filters.py @@ -35,6 +35,6 @@ class DataProductFilterSet(DynamicFilterSet): def get_submittable_workspecifications(workspecfications: QuerySet) -> List[WorkSpecification]: workspecifications_with_surls = workspecfications.filter(inputs__surls__0__isnull=False) workspecifications_with_submittable_status = workspecifications_with_surls.filter( - submission_status__in=[SUBMISSION_STATUS.NOT_SUBMITTED, SUBMISSION_STATUS.ERROR]) + submission_status__in=[SUBMISSION_STATUS.READY, SUBMISSION_STATUS.ERROR]) workspecifications_with_surl_data = [x for x in workspecifications_with_submittable_status if x.get_file_info[0] != 0] return workspecifications_with_surl_data \ No newline at end of file diff --git a/ldvspec/lofardata/migrations/0021_rename_related_tasks_workspecification_atdb_task_ids_and_more.py b/ldvspec/lofardata/migrations/0021_rename_related_tasks_workspecification_atdb_task_ids_and_more.py new file mode 100644 index 0000000000000000000000000000000000000000..1d6c274e8c46babbd4a8698fafa26c08e18d43e0 --- /dev/null +++ b/ldvspec/lofardata/migrations/0021_rename_related_tasks_workspecification_atdb_task_ids_and_more.py @@ -0,0 +1,36 @@ +# Generated by Django 4.1.5 on 2023-02-03 10:39 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('lofardata', '0020_alter_workspecification_predecessor_set_null'), + ] + + operations = [ + migrations.RenameField( + model_name='workspecification', + old_name='related_tasks', + new_name='atdb_task_ids', + ), + migrations.RemoveField( + model_name='workspecification', + name='is_defined', + ), + migrations.RemoveField( + model_name='workspecification', + name='is_ready', + ), + migrations.AddField( + model_name='workspecification', + name='error_message', + field=models.CharField(blank=True, max_length=500, null=True), + ), + migrations.AlterField( + model_name='workspecification', + name='submission_status', + field=models.CharField(choices=[('L', 'loading'), ('R', 'ready'), ('D', 'defining'), ('S', 'submitted'), ('E', 'error')], default='L', max_length=1), + ), + ] diff --git a/ldvspec/lofardata/models.py b/ldvspec/lofardata/models.py index d03501600f941a3b9675dda4a30d861785825885..4ae2ace09240f082fd0176079390995ca5ef7c74 100644 --- a/ldvspec/lofardata/models.py +++ b/ldvspec/lofardata/models.py @@ -92,13 +92,15 @@ class ATDBProcessingSite(models.Model): class SUBMISSION_STATUS(models.TextChoices): - """Status of Work Specifcation to ATDB""" - - NOT_SUBMITTED = "N", _("not submitted") + """Status of Work Specification""" + # The surls and belonging information are being retrieved + LOADING = "L", _("loading") + # Tasks are ready to submit to ATDB + READY = "R", _("ready") + # Task are still being submitted to ATDB + DEFINING = "D", _("defining") # Tasks are submitted, check ATDB for current status SUBMITTED = "S", _("submitted") - # Task need to be submitted (can change predecessor etc...) - DEFINING = "D", _("defining") # Check log, retry submission ERROR = "E", _("error") @@ -140,14 +142,10 @@ class WorkSpecification(models.Model): selected_workflow_tag = models.CharField(max_length=500, null=True, default="Unknown") # Task ID's that were created in ATDB - related_tasks = ArrayField(models.IntegerField(), null=True, blank=True) + atdb_task_ids = ArrayField(models.IntegerField(), null=True, blank=True) predecessor_specification = models.ForeignKey('self', null=True, on_delete=models.SET_NULL, related_name='successor', blank=True) - # The query for gathering files has been executed - is_ready = models.BooleanField(default=False) - is_defined = models.BooleanField(default=False) - # Should automatically submit the task after defining is_auto_submit = models.BooleanField(default=False) @@ -195,9 +193,12 @@ class WorkSpecification(models.Model): """ return True + def is_new(self) -> bool: + return self.pk is None + def is_editable(self) -> bool: """ Indicates whether this specification is editable (i.e., it has not been submitted yet)""" - return self.submission_status != SUBMISSION_STATUS.SUBMITTED and self.submission_status != SUBMISSION_STATUS.DEFINING + return self.submission_status != SUBMISSION_STATUS.SUBMITTED and self.submission_status != SUBMISSION_STATUS.DEFINING and self.submission_status != SUBMISSION_STATUS.LOADING @property def get_file_info(self): @@ -213,9 +214,11 @@ class WorkSpecification(models.Model): submission_status = models.CharField( max_length=1, choices=SUBMISSION_STATUS.choices, - default=SUBMISSION_STATUS.NOT_SUBMITTED, + default=SUBMISSION_STATUS.LOADING, ) + error_message = models.CharField(max_length=500, null=True, blank=True) + def save( self, force_insert=False, force_update=False, using=None, update_fields=None ): @@ -226,7 +229,7 @@ class WorkSpecification(models.Model): update_fields=update_fields, ) if self.async_task_result is None: - from lofardata.tasks import define_work_specification + from lofardata.task.tasks import define_work_specification res: AsyncResult = define_work_specification.delay(self.pk) self.async_task_result = res.id diff --git a/ldvspec/lofardata/task/__init__.py b/ldvspec/lofardata/task/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/ldvspec/lofardata/task/custom_exceptions.py b/ldvspec/lofardata/task/custom_exceptions.py new file mode 100644 index 0000000000000000000000000000000000000000..57a3c8691a55892b5fbf463ae4a26d11e4841dce --- /dev/null +++ b/ldvspec/lofardata/task/custom_exceptions.py @@ -0,0 +1,43 @@ +class TaskException(BaseException): + default_detail = "" + default_code = "" + + +class UnknownError(TaskException): # When this happens, fix it in the task_helpers class (+ add the test) + default_detail = 'Something has gone wrong but it is unclear what. Please contact the helpdesk.' + default_code = 'Bad request' + + +class RequestNotOk(TaskException): + default_detail = 'You are trying to submit a work specification with invalid data. Have you messed around in the data? If not, please contact team rainbow.' + default_code = 'Unprocessable Entity: invalid data' + + +class WorkerResponseNotOk(TaskException): + default_detail = "ATDB wil not handle this request. Let team rainbow inspect the worker logs." + default_code = 'I am a teapot: ATDB error response''' + + +class WorkSpecificationNoSite(TaskException): + default_detail = 'You are trying to submit a work specification without a target processing site. We cannot send it into outer space but if you did not want that in the first place, please contact team rainbow.' + default_code = 'Bad request: no processing site' + + +class InvalidPredecessor(TaskException): + default_detail = 'You are trying to submit a work specification where the predecessor is faulty or no longer exists. Blame team rainbow for manually hacking around in the database.' + default_code = 'Unprocessable Entity: invalid predecessor' + + +class InvalidLocation(TaskException): + default_detail = 'You are trying to submit a work specification where the location is not Sara, Jeulich or Poznan. This should never happen so there is something wrong with the used LTA data. Please contact team rainbow and/or an operator.' + default_code = 'Unprocessable Entity: invalid processing location' + + +class ATDBKaput(TaskException): + default_detail = "ATDB didn't give a task response while this should have happened. This is a problem with ATDB and not LDV spec." + default_code = 'I am a teapot: ATDB kaput''' + + +class InvalidSurl(TaskException): + default_detail = 'You are trying to submit a work specification with an invalid surl. It should at least consist of a project, a sas_id and a location.' + default_code = 'Unprocessable Entity: invalid data' diff --git a/ldvspec/lofardata/task/session_store.py b/ldvspec/lofardata/task/session_store.py new file mode 100644 index 0000000000000000000000000000000000000000..39105b38f249c2ebb985c383699871c95db2d764 --- /dev/null +++ b/ldvspec/lofardata/task/session_store.py @@ -0,0 +1,13 @@ +import requests + + +class SessionStore: + """requests.Session Singleton""" + + _session = None + + @classmethod + def get_session(cls) -> requests.Session: + if cls._session is None: + cls._session = requests.Session() + return cls._session diff --git a/ldvspec/lofardata/task/task_helpers.py b/ldvspec/lofardata/task/task_helpers.py new file mode 100644 index 0000000000000000000000000000000000000000..ee81a9867a5e4d73d06855f25a0d91422934ed36 --- /dev/null +++ b/ldvspec/lofardata/task/task_helpers.py @@ -0,0 +1,155 @@ +import re +from typing import Any, Dict, List, Optional +from urllib.parse import urlparse + +from lofardata.models import ( + SUBMISSION_STATUS, + ATDBProcessingSite, + WorkSpecification, +) + +from lofardata.task.token_auth import TokenAuth + +from lofardata.task.custom_exceptions import RequestNotOk, InvalidPredecessor, InvalidLocation, WorkSpecificationNoSite, \ + WorkerResponseNotOk, TaskException, ATDBKaput, InvalidSurl + + +def parse_surl(surl: str, work_spec: WorkSpecification) -> dict: + parsed = urlparse(surl) + host = parsed.hostname + path = parsed.path + pattern = r"^.*/projects\/(?P<project>.+)\/(?P<sas_id>\w\d*)\/" + + try: + data = re.match(pattern, path).groupdict() + except Exception as error: + set_errored_work_specification(work_spec, InvalidSurl, error) + raise InvalidSurl + + valid_locations = ['srm.grid.sara.nl', 'lofar-srm.fz-juelich.de', 'lta-head.lofar.psnc.pl'] + if host in valid_locations: + data["location"] = host + return data + else: + set_errored_work_specification(work_spec, InvalidLocation) + raise InvalidLocation('location= {}'.format(host)) + + +def create_atdb_inputs_file(entries, location): + inputs = [ + { + "size": entry["size"], + "surl": entry["surl"], + "type": "File", + "location": location, + } + for entry in entries + ] + return inputs + + +def split_entries_to_batches(surls: List[Any], batch_size: int) -> List[List[Any]]: + """Split the list of entries into batches of at most `batch_size` entries""" + if batch_size == 0: + raise RequestNotOk() + + n_surls = len(surls) + + # NOTE: Think about using file size instead of amount of files + batches: List[List[Any]] = [] + num_batches = n_surls // batch_size + num_batches += 1 if n_surls % batch_size else 0 + + for n in range(num_batches): + batches.append(surls[n * batch_size: (n + 1) * batch_size]) + + return batches + + +def send_to_atdb(payload, sess, site, work_spec): + response = sess.post(site.url + "tasks/", json=payload, auth=TokenAuth(site.access_token)) + if not response.ok: + set_errored_work_specification(work_spec, WorkerResponseNotOk, None) + raise WorkerResponseNotOk + + if work_spec.atdb_task_ids is None: + work_spec.atdb_task_ids = [] + + try: + response_task_id = response.json()["id"] + work_spec.atdb_task_ids.append(response_task_id) + except Exception as error: + set_errored_work_specification(work_spec, ATDBKaput, error) + raise ATDBKaput + + +def create_payload(batch, work_spec, atdb_predecessor_task_id=None) -> dict: + # Parse a single surl for info: + # project, sas_id & location + # This does assume that a task consists of at most 1 project, sas_id and location! #FIXME SDC-905 + if len(batch) < 1: + set_errored_work_specification(work_spec, RequestNotOk) + raise RequestNotOk('Cannot create a payload for an empty batch') + + parsed = parse_surl(batch[0]["surl"], work_spec) + project_id = parsed["project"] + sas_id = parsed["sas_id"] + + payload = { + "project": project_id, + "sas_id": sas_id, + "task_type": "regular", + "filter": f"ldv-spec:{work_spec.pk}", + "purge_policy": work_spec.purge_policy, + "new_status": "defining", + "new_workflow_uri": work_spec.selected_workflow, + "size_to_process": sum([e["size"] for e in batch]), + "inputs": (create_atdb_inputs_file(batch, parsed["location"])), + } + + if atdb_predecessor_task_id: + payload["predecessor"] = atdb_predecessor_task_id + + return payload + + +def get_atdb_predecessor_task_id(work_spec): + # Task ID of the predecessor + atdb_predecessor_task_id: int | None = None + if work_spec.predecessor_specification is not None: + predecessor: WorkSpecification = work_spec.predecessor_specification + # Should only be 1 entry + if predecessor.atdb_task_ids is None or len(predecessor.atdb_task_ids) != 1: + raise InvalidPredecessor() + atdb_predecessor_task_id = predecessor.atdb_task_ids[0] + return atdb_predecessor_task_id + + +def get_processing_site(work_spec): + site: Optional[ATDBProcessingSite] = work_spec.processing_site + if site is None: + set_errored_work_specification(work_spec, WorkSpecificationNoSite, None) + raise WorkSpecificationNoSite + return site + + +def get_surls(work_spec) -> List[dict]: + if work_spec.inputs is None: + set_errored_work_specification(work_spec, RequestNotOk) + raise RequestNotOk("There is no 'inputs' for this work specification with id: {}".format(work_spec.id)) + + inputs: Dict[str, Any] = work_spec.inputs.copy() + try: + entries: List[dict] = inputs.pop("surls") + if len(entries) < 1: + raise RequestNotOk("There are no surls known for this work specification with id: {}".format(work_spec.id)) + return entries + except Exception as err: + set_errored_work_specification(work_spec, RequestNotOk, err) + raise RequestNotOk + + +def set_errored_work_specification(work_spec: WorkSpecification, exception: TaskException, error: BaseException = None): + work_spec.submission_status = SUBMISSION_STATUS.ERROR + work_spec.error_message = exception.default_detail + ":\n" + str(error)[:len(exception.default_detail) - 1] + work_spec.save() diff --git a/ldvspec/lofardata/task/tasks.py b/ldvspec/lofardata/task/tasks.py new file mode 100644 index 0000000000000000000000000000000000000000..2efd5c99404f02ebcdc2b6141727b26154c0a772 --- /dev/null +++ b/ldvspec/lofardata/task/tasks.py @@ -0,0 +1,69 @@ +from lofardata.models import ( + SUBMISSION_STATUS, + ATDBProcessingSite, + DataProduct, + WorkSpecification, +) + +from ldvspec.celery import app + +from lofardata.task.session_store import SessionStore +from lofardata.task.task_helpers import get_processing_site, get_surls, split_entries_to_batches, \ + get_atdb_predecessor_task_id, create_payload, send_to_atdb, set_errored_work_specification + +from lofardata.task.custom_exceptions import UnknownError, TaskException, RequestNotOk + + +@app.task +def define_work_specification(workspecification_id): + specification = WorkSpecification.objects.get(pk=workspecification_id) + filters = specification.filters + + dataproducts = DataProduct.objects.filter(**filters).order_by("surl") + inputs = { + "surls": [ + {"surl": dataproduct.surl, "size": dataproduct.filesize} + for dataproduct in dataproducts + ] + } + if specification.inputs is None: + specification.inputs = inputs + else: + specification.inputs.update(inputs) + specification.submission_status = SUBMISSION_STATUS.READY + specification.save() + + +@app.task +def insert_task_into_atdb(workspecification_id: int): + """This creates the task in ATDB and set's it to defining""" + sess = SessionStore.get_session() + + work_spec: WorkSpecification = WorkSpecification.objects.get( + pk=workspecification_id + ) + + work_spec.submission_status = SUBMISSION_STATUS.DEFINING + work_spec.save() + + try: + site = get_processing_site(work_spec) + surls = get_surls(work_spec) + + if work_spec.batch_size > 0: + batches = split_entries_to_batches(surls, work_spec.batch_size) + else: + batches = [surls] + + atdb_predecessor_task_id = get_atdb_predecessor_task_id(work_spec) + + for batch in batches: + payload = create_payload(batch, work_spec, atdb_predecessor_task_id) + send_to_atdb(payload, sess, site, work_spec) + + work_spec.submission_status = SUBMISSION_STATUS.SUBMITTED + work_spec.save() + except TaskException as task_error: + raise task_error + except Exception as err: # When this happens, fix it in the task_helpers class (+ add the test) + set_errored_work_specification(work_spec, UnknownError, err) diff --git a/ldvspec/lofardata/task/token_auth.py b/ldvspec/lofardata/task/token_auth.py new file mode 100644 index 0000000000000000000000000000000000000000..5febf92b75cee88aea2e213615e5b0f505c59450 --- /dev/null +++ b/ldvspec/lofardata/task/token_auth.py @@ -0,0 +1,15 @@ +import requests +from requests.auth import AuthBase + + +class TokenAuth(AuthBase): + """Basic Token Auth + + Adds a: `Authorization: Token <token>` header""" + + def __init__(self, token: str): + self._token = token + + def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest: + r.headers["Authorization"] = f"Token {self._token}" + return r \ No newline at end of file diff --git a/ldvspec/lofardata/tasks.py b/ldvspec/lofardata/tasks.py deleted file mode 100644 index 01f3e6b94db5c595975e46fa4157d378077cd16d..0000000000000000000000000000000000000000 --- a/ldvspec/lofardata/tasks.py +++ /dev/null @@ -1,278 +0,0 @@ -import re -from typing import Any, Dict, List, Optional -from urllib.parse import urlparse - -import requests -from celery.utils.log import get_task_logger -from lofardata.models import ( - SUBMISSION_STATUS, - ATDBProcessingSite, - DataProduct, - WorkSpecification, -) -from requests.auth import AuthBase -from requests.exceptions import RequestException - -from ldvspec.celery import app - -from lofardata.custom_exceptions import RequestNotOk, WorkSpecificationNoSite, InvalidPredecessor - -logger = get_task_logger(__name__) - - - -class SessionStore: - """requests.Session Singleton""" - - _session = None - - @classmethod - def get_session(cls) -> requests.Session: - if cls._session == None: - cls._session = requests.Session() - return cls._session - - -@app.task -def define_work_specification(workspecification_id): - specification = WorkSpecification.objects.get(pk=workspecification_id) - filters = specification.filters - - dataproducts = DataProduct.objects.filter(**filters).order_by("surl") - - inputs = { - "surls": [ - {"surl": dataproduct.surl, "size": dataproduct.filesize} - for dataproduct in dataproducts - ] - } - if specification.inputs is None: - specification.inputs = inputs - else: - specification.inputs.update(inputs) - specification.is_ready = True - specification.save() - - -def _parse_surl(surl: str) -> dict: - parsed = urlparse(surl) - host = parsed.hostname - path = parsed.path - pattern = r"^.*/projects\/(?P<project>.+)\/(?P<sas_id>\w\d*)\/" - data = re.match(pattern, path).groupdict() - data["location"] = host - return data - - -def _prepare_request_payload( - entries: List[dict], - filter_id: str, - workflow_url: str, - purge_policy: str = "no", - predecessor: int = None, - optional_parameters: Dict[str, Any] = None, -): - # Parse a single surl for info: - # project, sas_id & location - # This does assume that a task consists of at most 1 project, sas_id and location! - parsed = _parse_surl(entries[0]["surl"]) - project_id = parsed["project"] - sas_id = parsed["sas_id"] - - inputs = [ - { - "size": e["size"], - "surl": e["surl"], - "type": "File", - "location": parsed["location"], - } - for e in entries - ] - if optional_parameters: - inputs = {**optional_parameters, "surls": inputs} - - data = { - "project": project_id, - "sas_id": sas_id, - "task_type": "regular", - "filter": filter_id, - "purge_policy": purge_policy, - "new_status": "defining", - "new_workflow_uri": workflow_url, - "size_to_process": sum([e["size"] for e in entries]), - "inputs": inputs, - } - - if predecessor: - data["predecessor"] = predecessor - - return data - - -class TokenAuth(AuthBase): - """Basic Token Auth - - Adds a: `Authorization: Token <token>` header""" - - def __init__(self, token: str): - self._token = token - - def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest: - r.headers["Authorization"] = f"Token {self._token}" - return r - - -def split_entries_to_batches(entries: List[Any], batch_size: int) -> List[List[Any]]: - """Split the list of entries into batches of at most `batch_size` entries""" - n_entries = len(entries) - - # NOTE: Think about using file size instead of amount of files - batches: List[List[Any]] = [] - if batch_size == 0: - batches.append(entries) - elif batch_size > 0: - # Calculate amount of required batches - num_batches = n_entries // batch_size - num_batches += 1 if n_entries % batch_size else 0 - - for n in range(num_batches): - batches.append(entries[n * batch_size: (n + 1) * batch_size]) - - return batches - - -@app.task -def insert_task_into_atdb(workspecification_id: int): - """This creates the task in ATDB and set's it to defining""" - sess = SessionStore.get_session() - - work_spec: WorkSpecification = WorkSpecification.objects.get( - pk=workspecification_id - ) - inputs: Dict[str, Any] = work_spec.inputs.copy() - - try: - entries: List[dict] = inputs.pop("surls") - except IndexError: - logger.error("No surls") - work_spec.submission_status = SUBMISSION_STATUS.ERROR - raise RequestNotOk() - - batches = split_entries_to_batches(entries, work_spec.batch_size) - - site: Optional[ATDBProcessingSite] = work_spec.processing_site - if site is None: - raise WorkSpecificationNoSite() - url = site.url + "tasks/" - - # Task ID of the predecessor - atdb_predecessor_task_id: int | None = None - if work_spec.predecessor_specification is not None: - predecessor: WorkSpecification = work_spec.predecessor_specification - if len(predecessor.related_tasks) != 1: - raise InvalidPredecessor() - # Should only be 1 entry - atdb_predecessor_task_id = predecessor.related_tasks[0] - - try: - for batch in batches: - try: - payload = _prepare_request_payload( - entries=batch, - optional_parameters=inputs, - filter_id=f"ldv-spec:{work_spec.pk}", - workflow_url=work_spec.selected_workflow, - purge_policy=work_spec.purge_policy, - predecessor=atdb_predecessor_task_id, - ) - except AttributeError: - raise RequestNotOk() - - res = sess.post(url, json=payload, auth=TokenAuth(site.access_token)) - - if not res.ok: - logger.error("Request not ok: {}".format(res.text)) - raise RequestNotOk() - - # Store ATDB Task ID in related_tasks - if work_spec.related_tasks is None: - work_spec.related_tasks = [] - - data = res.json() - work_spec.related_tasks.append(data["id"]) - - # All went well - work_spec.submission_status = SUBMISSION_STATUS.DEFINING - if work_spec.is_auto_submit: - set_tasks_defined.delay(workspecification_id) - except (RequestException, RequestNotOk) as err: - work_spec.submission_status = SUBMISSION_STATUS.ERROR - logger.error(err) - finally: - work_spec.save() - -def update_related_tasks( - work_spec: WorkSpecification, - delete: bool, - data: Optional[dict], - on_success_status: SUBMISSION_STATUS, -): - sess = SessionStore.get_session() - - site: Optional[ATDBProcessingSite] = work_spec.processing_site - if site is None: - raise WorkSpecificationNoSite() - url = site.url + "tasks/" - - task_ids: List[int] = work_spec.related_tasks - try: - for task_id in task_ids: - if delete: - res = sess.delete( - url + str(task_id) + "/", auth=TokenAuth(site.access_token) - ) - else: - res = sess.put( - url + str(task_id) + "/", - json=data, - auth=TokenAuth(site.access_token), - ) - - if not res.ok: - raise RequestNotOk() - - # All went well - work_spec.submission_status = on_success_status - if delete: - work_spec.related_tasks = None - except (RequestException, RequestNotOk): - work_spec.submission_status = SUBMISSION_STATUS.ERROR - finally: - work_spec.save() - - -@app.task -def set_tasks_defined(workspecification_id: int): - """This sets tasks to defined so they can be picked up by ATDB services""" - - work_spec: WorkSpecification = WorkSpecification.objects.get( - pk=workspecification_id - ) - - if work_spec.submission_status != SUBMISSION_STATUS.DEFINING: - raise ValueError("Invalid WorkSpecification state") - - update_related_tasks( - work_spec, False, {"new_status": "defined"}, SUBMISSION_STATUS.SUBMITTED - ) - - -@app.task -def delete_tasks_from_atdb(workspecification_id: int): - """Removes related tasks from ATDB (for retrying)""" - - work_spec: WorkSpecification = WorkSpecification.objects.get( - pk=workspecification_id - ) - - update_related_tasks(work_spec, True, None, SUBMISSION_STATUS.NOT_SUBMITTED) diff --git a/ldvspec/lofardata/templates/lofardata/index.html b/ldvspec/lofardata/templates/lofardata/index.html index 7b1b7aebe1f60609e17a7d31100780de1a36d0a0..88896013b0ac87059823b0780c3fcea0fbe4c35c 100644 --- a/ldvspec/lofardata/templates/lofardata/index.html +++ b/ldvspec/lofardata/templates/lofardata/index.html @@ -21,16 +21,16 @@ {% if ungrouped_work_specifications.exists %} <div class="group-wrapper"> - {% include 'lofardata/overview_templates/group_actions.html' with group=None checked="checked" %} + {% include 'lofardata/partial_templates/group_actions.html' with group=None checked="checked" %} <div class="group-content"> <div class="content-inner"> <div class="table"> - {% include 'lofardata/overview_templates/table_header.html' with id="ungrouped" %} + {% include 'lofardata/partial_templates/table_header.html' with id="ungrouped" %} <div class="table__content table__content--scroll table__content--scroll-small"> {% for specification in ungrouped_work_specifications %} - {% include 'lofardata/overview_templates/table_entry.html' with specification=specification id="ungrouped" %} + {% include 'lofardata/partial_templates/table_entry.html' with specification=specification id="ungrouped" %} {% endfor %} </div> </div> @@ -42,14 +42,14 @@ {% for group in object_list %} <div class="group-wrapper"> - {% include 'lofardata/overview_templates/group_actions.html' with group=group checked="unchecked" %} + {% include 'lofardata/partial_templates/group_actions.html' with group=group checked="unchecked" %} <div class="group-content"> <div class="content-inner"> <div class="table"> - {% include 'lofardata/overview_templates/table_header.html' with id=group.name %} + {% include 'lofardata/partial_templates/table_header.html' with id=group.name %} <div class="table__content table__content--scroll table__content--scroll-small"> {% for specification in group.workspecification_set.all %} - {% include 'lofardata/overview_templates/table_entry.html' with specification=specification id=group.name %} + {% include 'lofardata/partial_templates/table_entry.html' with specification=specification id=group.name %} {% endfor %} </div> </div> @@ -58,23 +58,22 @@ </div> {% endfor %} - <script type='text/javascript' - src='https://cdnjs.cloudflare.com/ajax/libs/knockout/3.5.0/knockout-min.js'></script> - <script> - function toggle(source, name) { - const checkboxes = document.getElementsByName(name); - for (let i = 0, n = checkboxes.length; i < n; i++) { - checkboxes[i].checked = source.checked; - const event = new Event('click'); - checkboxes[i].dispatchEvent(event); - } + <script type='text/javascript' src='https://cdnjs.cloudflare.com/ajax/libs/knockout/3.5.0/knockout-min.js'></script> + <script> + function toggle(source, name) { + const checkboxes = document.getElementsByName(name); + for (let i = 0, n = checkboxes.length; i < n; i++) { + checkboxes[i].checked = source.checked; + const event = new Event('click'); + checkboxes[i].dispatchEvent(event); } + } - const csrf_token = '{{ csrf_token }}'; - const delete_multiple_uri = '{% url 'workspecification-delete-multiple' %}'; - const submit_multiple_uri = '{% url 'workspecification-submit-multiple' %}'; - </script> - <script src="{% static 'workflow_actions.js' %}"></script> + const csrf_token = '{{ csrf_token }}'; + const delete_multiple_uri = '{% url 'workspecification-delete-multiple' %}'; + const submit_multiple_uri = '{% url 'workspecification-submit-multiple' %}'; + </script> + <script src="{% static 'workflow_actions.js' %}"></script> {% else %} <div class="flex-wrapper flex-wrapper--centered flex-wrapper--column"> <h1>Welcome to the LDV Specification tool</h1> diff --git a/ldvspec/lofardata/templates/lofardata/partial_templates/badges.html b/ldvspec/lofardata/templates/lofardata/partial_templates/badges.html new file mode 100644 index 0000000000000000000000000000000000000000..dc2f5a214c5c18dcdeb4f5843c1d6a006414104f --- /dev/null +++ b/ldvspec/lofardata/templates/lofardata/partial_templates/badges.html @@ -0,0 +1,22 @@ +{% load define_action %} +{% if status == "L" %} + {% define "secondary" as badgecolor %} + {% define "The work specification is now retrieving the necessary information." as badgeTitle %} +{% elif status == "D" %} + {% define "secondary" as badgecolor %} + {% define "The work specification is now being send to ATDB." as badgeTitle %} +{% elif status == "S" %} + {% define "green" as badgecolor %} + {% define "The work specification has been succcesfully processed in ATDB." as badgeTitle %} +{% elif status == "E" %} + {% define "red" as badgecolor %} + {% define "Retrieving the files to process or submitting the task to ATDB resulted in an error. Please reports this to the support helpdesk or try again." as badgeTitle %} +{% else %} + {% define "primary" as badgecolor %} + {% define "The work specification has been created but has not yet been send to ATDB for processing." as badgeTitle %} +{% endif %} +<div class="badge badge--{{ badgecolor }} margin-top text text--capitalized" + data-test-id="submission-status" + title="{{ badgeTitle }}"> + {{ label }} +</div> \ No newline at end of file diff --git a/ldvspec/lofardata/templates/lofardata/overview_templates/group_actions.html b/ldvspec/lofardata/templates/lofardata/partial_templates/group_actions.html similarity index 100% rename from ldvspec/lofardata/templates/lofardata/overview_templates/group_actions.html rename to ldvspec/lofardata/templates/lofardata/partial_templates/group_actions.html diff --git a/ldvspec/lofardata/templates/lofardata/overview_templates/table_entry.html b/ldvspec/lofardata/templates/lofardata/partial_templates/table_entry.html similarity index 87% rename from ldvspec/lofardata/templates/lofardata/overview_templates/table_entry.html rename to ldvspec/lofardata/templates/lofardata/partial_templates/table_entry.html index 5c1019e743a06b7d8d503d0b52d6da56bc08dd85..a748e66eb6e90a18b74e5c7ae64b35993989fa22 100644 --- a/ldvspec/lofardata/templates/lofardata/overview_templates/table_entry.html +++ b/ldvspec/lofardata/templates/lofardata/partial_templates/table_entry.html @@ -37,27 +37,7 @@ <div class="table__cell table__cell--truncate table__cell--horizontal-center">{{ specification.created_by }}</div> <div class="table__cell table__cell--truncate table__cell--horizontal-center"> - {% if specification.get_submission_status_display == "submitted" %} - {% define "green" as badgecolor %} - {% define "The work specification has been succcesfully processed in ATDB." as badgeTitle %} - {% endif %} - {% if specification.get_submission_status_display == "error" %} - {% define "red" as badgecolor %} - {% define "Retrieving the files to process resulted in an error. Please reports this to the support helpdesk." as badgeTitle %} - {% endif %} - {% if specification.get_submission_status_display == "defining" %} - {% define "secondary" as badgecolor %} - {% define "The work specification has been send to ATDB but has not yet been processed there." as badgeTitle %} - {% endif %} - {% if specification.get_submission_status_display == "undefined" or specification.get_submission_status_display == "not submitted" %} - {% define "primary" as badgecolor %} - {% define "The work specification has been created but has not yet been send to ATDB for processing." as badgeTitle %} - {% endif %} - <div class="badge badge--{{ badgecolor }} margin-top" - title="{{ badgeTitle }}" - data-test-id="submission-status"> - {{ specification.get_submission_status_display }} - </div> + {% include 'lofardata/partial_templates/badges.html' with status=specification.submission_status label=specification.get_submission_status_display %} </div> diff --git a/ldvspec/lofardata/templates/lofardata/overview_templates/table_header.html b/ldvspec/lofardata/templates/lofardata/partial_templates/table_header.html similarity index 100% rename from ldvspec/lofardata/templates/lofardata/overview_templates/table_header.html rename to ldvspec/lofardata/templates/lofardata/partial_templates/table_header.html diff --git a/ldvspec/lofardata/templates/lofardata/workspecification/detail.html b/ldvspec/lofardata/templates/lofardata/workspecification/detail.html index 042de1b513ca81abfb76c68d25385913dafcdbcd..5f90ab1581d7c4c3e29defb8500d151c8e31ab68 100644 --- a/ldvspec/lofardata/templates/lofardata/workspecification/detail.html +++ b/ldvspec/lofardata/templates/lofardata/workspecification/detail.html @@ -18,7 +18,7 @@ {% if not can_be_edited %} <a class="button button--primary button--disabled button--icon-button margin-left" title="Cannot be edited by this user"> - <span class="icon icon--pen"></span> + <span class="icon icon--pen"></span> </a> {% elif not object.is_editable %} <a class="button button--primary button--disabled button--icon-button margin-left" @@ -58,7 +58,7 @@ {% can_be_submitted_by object user as can_be_submitted %} {% if not can_be_submitted %} - <a class="button button--icon-button button--primary button--disabled" + <a class="button button--icon-button button--primary button--disabled" title="Cannot be submitted by this user" href="#"> <span class="icon icon--color-inherit icon--play"></span> @@ -158,35 +158,23 @@ <div class="table__row table__row--dark table__row--padding"> <div class="table__cell table__cell--title">Submission status</div> <div class="table__cell"> - {% if object.get_submission_status_display == "submitted" %} - {% define "green" as badgecolor %} - {% define "The work specification has been succcesfully processed in ATDB." as badgeTitle %} - {% endif %} - {% if object.get_submission_status_display == "error" %} - {% define "red" as badgecolor %} - {% define "Retrieving the files to process resulted in an error. Please reports this to the support helpdesk." as badgeTitle %} - {% endif %} - {% if object.get_submission_status_display == "defining" %} - {% define "secondary" as badgecolor %} - {% define "The work specification has been send to ATDB but has not yet been processed there." as badgeTitle %} - {% endif %} - {% if object.get_submission_status_display == "undefined" or object.get_submission_status_display == "not submitted" %} - {% define "primary" as badgecolor %} - {% define "The work specification has been created but has not yet been send to ATDB for processing." as badgeTitle %} - {% endif %} - <div class="badge badge--{{ badgecolor }} margin-top" data-test-id="submission-status" - title="{{ badgeTitle }}"> - {{ object.get_submission_status_display }} - </div> + {% include '../partial_templates/badges.html' with status=object.submission_status label=object.get_submission_status_display %} </div> </div> + {% if object.error_message %} + <div class="table__row table__row--dark table__row--padding"> + <div class="table__cell table__cell--title">Error details</div> + <div class="table__cell">{{ object.error_message }}</div> + </div> + {% endif %} + <div class="table__row table__row--dark table__row--padding"> <div class="table__cell table__cell--title">Filters</div> <div class="table__cell"> {{ object.filters }}</div> </div> - {% if object.is_ready or object.is_defined %} + {% if object.get_submission_status_display == "ready" or object.get_submission_status_display == "defining" or object.get_submission_status_display == "submitted" %} {% if number_of_files == 0 %} <div class="table__row table__row--dark table__row--padding"> <div class="table__cell table__cell--title">Number of files</div> diff --git a/ldvspec/lofardata/tests/test_atdb_delete.py b/ldvspec/lofardata/tests/test_atdb_delete.py deleted file mode 100644 index 1814d565b317213d66c9e00926de9b5ac84d3cb8..0000000000000000000000000000000000000000 --- a/ldvspec/lofardata/tests/test_atdb_delete.py +++ /dev/null @@ -1,108 +0,0 @@ -import re -from unittest import mock - -import rest_framework.test as rtest -from lofardata.models import SUBMISSION_STATUS, ATDBProcessingSite, WorkSpecification -from lofardata.tasks import SessionStore, delete_tasks_from_atdb -from lofardata.tests.util import MockResponse, mocked_delay - -task_detail_uri = re.compile(r"^.*/atdb/tasks/(?P<task_id>\d+)/") -# This is needed to initialize a session -SessionStore.get_session() - -# ATDB Post Requests -def mocked_requests_delete(*args, **kwargs): - - url = args[0] - - # Broken task - if "500" in url: - return MockResponse(None, 500, "Server Error.") - - if task_detail_uri.match(url): - # TODO: add real ATDB response - return MockResponse(None, 204) - - return MockResponse(None, 404) - - -@mock.patch( - "lofardata.tasks.SessionStore._session.delete", side_effect=mocked_requests_delete -) -class TestATDBDeleteRequest(rtest.APITestCase): - @mock.patch( - "lofardata.tasks.define_work_specification.delay", side_effect=mocked_delay - ) - def setUp(self, mock_delay: mock.MagicMock): - site: ATDBProcessingSite = ATDBProcessingSite.objects.create( - name="Dwingeloo", url="https://example.com/atdb/", access_token="DummyToken" - ) - - # Create Work Specification - ws: WorkSpecification = WorkSpecification.objects.create(processing_site=site) - mock_delay.assert_called_once_with(ws.pk) # not really needed here actually - ws.refresh_from_db() - - # Simulated interaction where the workflow was set - ws.selected_workflow = "https://example.com/atdb/workflow/1/" - ws.save() - self.work_spec: WorkSpecification = ws - - def test_delete(self, mocked_delete: mock.MagicMock): - self.work_spec.related_tasks = [1] # just a single task - self.work_spec.save() - - delete_tasks_from_atdb(self.work_spec.pk) - - mocked_delete.assert_called_once_with( - f"https://example.com/atdb/tasks/{self.work_spec.related_tasks[0]}/", - auth=mock.ANY, - ) - - self.work_spec.refresh_from_db() - self.assertEqual( - self.work_spec.submission_status, - SUBMISSION_STATUS.NOT_SUBMITTED, - "Status should be not submitted after delete", - ) - self.assertIsNone( - self.work_spec.related_tasks, - "Related tasks should be deleted after delete request", - ) - - def test_delete_of_batches(self, mocked_delete: mock.MagicMock): - self.work_spec.related_tasks = [1, 2, 3, 4] - self.work_spec.save() - - delete_tasks_from_atdb(self.work_spec.pk) - - call_list = [ - mock.call(f"https://example.com/atdb/tasks/{task_id}/", auth=mock.ANY) - for task_id in self.work_spec.related_tasks - ] - - mocked_delete.assert_has_calls(call_list, any_order=True) - - self.work_spec.refresh_from_db() - self.assertEqual( - self.work_spec.submission_status, - SUBMISSION_STATUS.NOT_SUBMITTED, - "Status should be not submitted after delete", - ) - self.assertIsNone( - self.work_spec.related_tasks, - "All related tasks should be deleted after delete request", - ) - - def test_delete_of_batches_with_failing(self, _: mock.MagicMock): - self.work_spec.related_tasks = [1, 2, 500, 4] - self.work_spec.save() - - delete_tasks_from_atdb(self.work_spec.pk) - - self.work_spec.refresh_from_db() - self.assertEqual( - self.work_spec.submission_status, - SUBMISSION_STATUS.ERROR, - "Status should be error after delete failed", - ) diff --git a/ldvspec/lofardata/tests/test_atdb_insert.py b/ldvspec/lofardata/tests/test_atdb_insert.py deleted file mode 100644 index 735cdabb5f4ec49e5434f5a215f9c6c008b54f18..0000000000000000000000000000000000000000 --- a/ldvspec/lofardata/tests/test_atdb_insert.py +++ /dev/null @@ -1,329 +0,0 @@ -from unittest import mock -from unittest.mock import Mock - -import rest_framework.test as rtest -from django.contrib.auth import get_user_model -from django.urls import reverse -from lofardata.models import SUBMISSION_STATUS, ATDBProcessingSite, WorkSpecification -from lofardata.tasks import SessionStore, insert_task_into_atdb -from lofardata.tests.util import MockResponse, mocked_delay -from rest_framework import status - -# This is needed to initialize a session -SessionStore.get_session() - - -# ATDB Get Requests -def mocked_requests_get(*args, **kwargs): - if args[0] == "http://someurl.com/test.json": - return MockResponse({"key1": "value1"}, 200) - elif args[0] == "http://someotherurl.com/anothertest.json": - return MockResponse({"key2": "value2"}, 200) - - return MockResponse(None, 404) - - -# ATDB Post Requests -def mocked_requests_post(*args, **kwargs): - - if kwargs["json"]["project"] == "kaput_42": - return MockResponse({"detail": "not ok"}, 500) - - if args[0] == "https://example.com/atdb/tasks/": - return MockResponse({"id": 42}, 201) - - return MockResponse(None, 404) - - -class TestATDBInsertRequest(rtest.APITestCase): - def setUp(self): - self.user = get_user_model().objects.create_superuser("admin") - self.client.force_authenticate(self.user) - - @mock.patch.object(WorkSpecification, 'objects') - def test_ws_submit_request(self, queryset_mock): - """Test submitting a workspecification to ATDB""" - - queryset_mock.exists = Mock(return_value=True) - with mock.patch("lofardata.view_helpers.submit.insert_task_into_atdb.delay") as mocked_delay: - data = {} - res = self.client.post( - reverse("workspecification-submit", kwargs={"pk": '1'}), data=data - ) - - mocked_delay.assert_called_once_with('1') - - # TODO: make a switch (now it always redirects to the front-end) - self.assertEqual( - res.status_code, - status.HTTP_302_FOUND, - "Submitting job failed:\n" + str(res.content), - ) - - -@mock.patch( - "lofardata.tasks.SessionStore._session.post", side_effect=mocked_requests_post -) -class TestATDBInsertTask(rtest.APITestCase): - @mock.patch( - "lofardata.tasks.define_work_specification.delay", side_effect=mocked_delay - ) - def setUp(self, mock_delay: mock.MagicMock): - site: ATDBProcessingSite = ATDBProcessingSite.objects.create( - name="Dwingeloo", url="https://example.com/atdb/", access_token="DummyToken" - ) - - # Create Work Specification - ws: WorkSpecification = WorkSpecification.objects.create(processing_site=site) - mock_delay.assert_called_once_with(ws.pk) # not really needed here actually - ws.refresh_from_db() - - # Simulated interaction where the workflow was set - ws.selected_workflow = "https://example.com/atdb/workflow/1/" - ws.save() - self.work_spec: WorkSpecification = ws - - def test_insert_task_into_atdb( - self, - mock_post: mock.MagicMock, - ): - - self.work_spec.inputs = { - "surls": [ - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - } - ] - } - self.work_spec.save() - - EXAMPLE_TASK = { - "project": "lc0_012", - "sas_id": "161482", - "task_type": "regular", - "filter": f"ldv-spec:{self.work_spec.pk}", - "purge_policy": "no", - "new_status": "defining", - "new_workflow_uri": "https://example.com/atdb/workflow/1/", - "size_to_process": 7395952640, - "inputs": [ - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - "type": "File", - "location": "srm.grid.sara.nl", - } - ], - } - - insert_task_into_atdb(workspecification_id=self.work_spec.pk) - - mock_post.assert_called_once_with( - "https://example.com/atdb/tasks/", json=EXAMPLE_TASK, auth=mock.ANY - ) - - self.work_spec.refresh_from_db() - self.assertEquals( - self.work_spec.submission_status, - SUBMISSION_STATUS.DEFINING, - "Work Spec should be in defining state", - ) - self.assertEqual(len(self.work_spec.related_tasks), 1) # expect 1 task - - def test_batching_of_files(self, mock_post: mock.MagicMock): - """Tests batching of multiple files""" - - self.work_spec.inputs = { - "surls": [ - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - "type": "File", - "location": "srm.grid.sara.nl", - } - ] - * 10 # ten entries - } - - self.work_spec.batch_size = 3 # expect 4 batches - self.work_spec.save() - - insert_task_into_atdb(workspecification_id=self.work_spec.pk) - - self.assertEqual(mock_post.call_count, 4, "Invalid amount of calls to ATDB") - - self.work_spec.refresh_from_db() - self.assertEquals( - self.work_spec.submission_status, - SUBMISSION_STATUS.DEFINING, - "Work Spec should be in defining state", - ) - self.assertEqual(len(self.work_spec.related_tasks), 4) # expect 4 tasks - - def test_batching_of_files_with_options(self, mock_post: mock.MagicMock): - """Tests batching of multiple files""" - - self.work_spec.inputs = { - "optional": True, - "surls": [ - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - "type": "File", - "location": "srm.grid.sara.nl", - } - ] - * 10 # ten entries - } - - self.work_spec.batch_size = 3 # expect 4 batches - self.work_spec.save() - - insert_task_into_atdb(workspecification_id=self.work_spec.pk) - - self.assertEqual(mock_post.call_count, 4, "Invalid amount of calls to ATDB") - - for call_arg in mock_post.call_args_list: - self.assertIn('optional', call_arg.kwargs['json']['inputs']) - self.assertTrue(call_arg.kwargs['json']['inputs']['optional']) - - self.work_spec.refresh_from_db() - self.assertEquals( - self.work_spec.submission_status, - SUBMISSION_STATUS.DEFINING, - "Work Spec should be in defining state", - ) - self.assertEqual(len(self.work_spec.related_tasks), 4) # expect 4 tasks - - def test_batching_of_files_exact_fit(self, mock_post: mock.MagicMock): - """Tests batching of multiple files which fit the bath_size exactly""" - - self.work_spec.inputs = { - "surls": [ - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - "type": "File", - "location": "srm.grid.sara.nl", - } - ] - * 9 - } - - self.work_spec.batch_size = 3 # expect 3 batches - self.work_spec.save() - - insert_task_into_atdb(workspecification_id=self.work_spec.pk) - - self.assertEqual(mock_post.call_count, 3, "Invalid amount of calls to ATDB") - - self.work_spec.refresh_from_db() - self.assertEquals( - self.work_spec.submission_status, - SUBMISSION_STATUS.DEFINING, - "Work Spec should be in defining state", - ) - self.assertEqual(len(self.work_spec.related_tasks), 3) # expect 3 tasks - - def test_atdb_kaput(self, mock_post: mock.MagicMock): - """Test handling of ATDB Failure""" - self.work_spec.inputs = { - "surls": [ - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/kaput_42/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - } - ] - } - self.work_spec.save() - - insert_task_into_atdb(workspecification_id=self.work_spec.pk) - - self.work_spec.refresh_from_db() - self.assertEquals( - self.work_spec.submission_status, - SUBMISSION_STATUS.ERROR, - "Work Spec should be in error state", - ) - - def test_atdb_kaput_batch(self, mock_post: mock.MagicMock): - """Test handling of ATDB Failure""" - self.work_spec.inputs = { - "surls": [ - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - }, - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/kaput_42/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - }, - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - }, - ] - } - self.work_spec.batch_size = 1 # use a small batch size - self.work_spec.save() - - insert_task_into_atdb(workspecification_id=self.work_spec.pk) - - self.work_spec.refresh_from_db() - self.assertEquals( - self.work_spec.submission_status, - SUBMISSION_STATUS.ERROR, - "Work Spec should be in error state", - ) - - def test_auto_submit_after_success(self, mock_post): - """Test whether submission get's triggered""" - - self.work_spec.inputs = { - "surls": [ - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - "type": "File", - "location": "srm.grid.sara.nl", - } - ] - * 10 # ten entries - } - - self.work_spec.batch_size = 3 # expect 4 batches - self.work_spec.is_auto_submit = True - self.work_spec.save() - - with mock.patch("lofardata.tasks.set_tasks_defined.delay") as submit_delay: - insert_task_into_atdb(workspecification_id=self.work_spec.pk) - submit_delay.assert_called_once_with(self.work_spec.pk) - - def test_auto_submit_after_failure(self, mock_post): - """Test whether submission is not done with a failure in a batch""" - - self.work_spec.inputs = { - "surls": [ - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - }, - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/kaput_42/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - }, - { - "size": 7395952640, - "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/161482/L161482_SAP000_SB231_uv.MS_e051c795.tar", - }, - ] - } - - self.work_spec.batch_size = 1 - self.work_spec.is_auto_submit = True - self.work_spec.save() - - with mock.patch("lofardata.tasks.set_tasks_defined.delay") as submit_delay: - insert_task_into_atdb(workspecification_id=self.work_spec.pk) - submit_delay.assert_not_called() diff --git a/ldvspec/lofardata/tests/test_atdb_status_change.py b/ldvspec/lofardata/tests/test_atdb_status_change.py deleted file mode 100644 index 198b4197dde05d82720aa68e6ec17a2d1abdcd25..0000000000000000000000000000000000000000 --- a/ldvspec/lofardata/tests/test_atdb_status_change.py +++ /dev/null @@ -1,111 +0,0 @@ -import re -from unittest import mock - -import rest_framework.test as rtest -from lofardata.models import SUBMISSION_STATUS, ATDBProcessingSite, WorkSpecification -from lofardata.tasks import SessionStore, set_tasks_defined -from lofardata.tests.util import MockResponse, mocked_delay - -task_detail_uri = re.compile(r"^.*/atdb/tasks/(?P<task_id>\d+)/") -# This is needed to initialize a session -SessionStore.get_session() - -# ATDB Post Requests -def mocked_requests_put(*args, **kwargs): - - url = args[0] - - # Broken task - if "500" in url: - return MockResponse(None, 500, "Server Error.") - - if task_detail_uri.match(url): - # TODO: add real ATDB response - return MockResponse({"id": 42}, 200) - - return MockResponse(None, 404) - - -@mock.patch( - "lofardata.tasks.SessionStore._session.put", side_effect=mocked_requests_put -) -class TestATDBTaskStatusChange(rtest.APITestCase): - @mock.patch( - "lofardata.tasks.define_work_specification.delay", side_effect=mocked_delay - ) - def setUp(self, mock_delay: mock.MagicMock): - site: ATDBProcessingSite = ATDBProcessingSite.objects.create( - name="Dwingeloo", url="https://example.com/atdb/", access_token="DummyToken" - ) - - # Create Work Specification - ws: WorkSpecification = WorkSpecification.objects.create(processing_site=site) - mock_delay.assert_called_once_with(ws.pk) # not really needed here actually - ws.refresh_from_db() - - # Simulated interaction where the workflow was set - ws.selected_workflow = "https://example.com/atdb/workflow/1/" - ws.save() - self.work_spec: WorkSpecification = ws - - def test_request_submission(self, mocked_put: mock.MagicMock): - - self.work_spec.related_tasks = [1] # just a single task - self.work_spec.submission_status = SUBMISSION_STATUS.DEFINING - self.work_spec.save() - - expected_payload = {"new_status": "defined"} - - set_tasks_defined(self.work_spec.pk) - - mocked_put.assert_called_once_with( - f"https://example.com/atdb/tasks/{self.work_spec.related_tasks[0]}/", - json=expected_payload, - auth=mock.ANY, - ) - - self.work_spec.refresh_from_db() - self.assertAlmostEqual( - self.work_spec.submission_status, SUBMISSION_STATUS.SUBMITTED - ) - - def test_request_submission_of_batches(self, mocked_put: mock.MagicMock): - - self.work_spec.related_tasks = [1, 2, 3] - self.work_spec.submission_status = SUBMISSION_STATUS.DEFINING - self.work_spec.save() - - expected_payload = {"new_status": "defined"} - - set_tasks_defined(self.work_spec.pk) - - call_list = [ - mock.call( - f"https://example.com/atdb/tasks/{task_id}/", - json=expected_payload, - auth=mock.ANY, - ) - for task_id in self.work_spec.related_tasks - ] - - mocked_put.assert_has_calls(call_list, any_order=True) - - self.work_spec.refresh_from_db() - self.assertAlmostEqual( - self.work_spec.submission_status, SUBMISSION_STATUS.SUBMITTED - ) - - def test_request_submission_of_batches_with_failing_task(self, _: mock.MagicMock): - - self.work_spec.related_tasks = [1, 2, 500, 4] - self.work_spec.submission_status = SUBMISSION_STATUS.DEFINING - self.work_spec.save() - - set_tasks_defined(self.work_spec.pk) - - self.work_spec.refresh_from_db() - self.assertEquals( - self.work_spec.submission_status, - SUBMISSION_STATUS.ERROR, - "Status should be error after put failure", - ) diff --git a/ldvspec/lofardata/tests/test_dataproductinfo.py b/ldvspec/lofardata/tests/test_dataproductinfo.py index 36970725d1a6a3c301abaadce70011bbe5421ed2..102265ba8d0157d0fd6a6fb67c50b61adce7c99e 100644 --- a/ldvspec/lofardata/tests/test_dataproductinfo.py +++ b/ldvspec/lofardata/tests/test_dataproductinfo.py @@ -175,7 +175,7 @@ class RetrieveDataProductInformation(dtest.TestCase): class CountUnspecifiedFiles(dtest.TestCase): @mock.patch( - "lofardata.tasks.define_work_specification.delay", side_effect=mocked_delay + "lofardata.task.tasks.define_work_specification.delay", side_effect=mocked_delay ) def setUp(self, mock_delay: mock.MagicMock): diff --git a/ldvspec/lofardata/tests/test_group_creation.py b/ldvspec/lofardata/tests/test_group_creation.py index 557b15b9d327fcab028215a0578fb84d836b026b..c156d645478dae93f78aed8724549976112a92f9 100644 --- a/ldvspec/lofardata/tests/test_group_creation.py +++ b/ldvspec/lofardata/tests/test_group_creation.py @@ -20,7 +20,7 @@ class WorkSpecificationCreationDatabaseInteraction(dtest.TestCase): self.delay_return_value = types.SimpleNamespace(id='5') - @mock.patch('lofardata.tasks.define_work_specification.delay') + @mock.patch('lofardata.task.tasks.define_work_specification.delay') def test_create_group_with_one_obs_id(self, mock_delay): mock_delay.return_value = self.delay_return_value create_work_specification_for_group(self.group, self.user, "123", 4) @@ -36,7 +36,7 @@ class WorkSpecificationCreationDatabaseInteraction(dtest.TestCase): self.assertTupleEqual(actual_inherited_group_value, (self.site, self.group.selected_workflow, self.group.selected_workflow_tag)) - @mock.patch('lofardata.tasks.define_work_specification.delay') + @mock.patch('lofardata.task.tasks.define_work_specification.delay') def test_create_group_with_multiple_obs_ids(self, mock_delay): mock_delay.return_value = self.delay_return_value create_work_specifications_for_group(self.group, self.user, ["123", "456", " 781 "], 3) @@ -52,7 +52,7 @@ class WorkSpecificationCreationDatabaseInteraction(dtest.TestCase): self.assertTupleEqual(actual_inherited_group_value, (self.site, self.group.selected_workflow, self.group.selected_workflow_tag)) - @mock.patch('lofardata.tasks.define_work_specification.delay') + @mock.patch('lofardata.task.tasks.define_work_specification.delay') def test_create_group_with_filters(self, mock_delay): mock_delay.return_value = self.delay_return_value obs_ids = ['123', '456'] @@ -66,7 +66,7 @@ class WorkSpecificationCreationDatabaseInteraction(dtest.TestCase): # times, without regard to order. self.assertCountEqual(actual_obs_ids, obs_ids) - @mock.patch('lofardata.tasks.define_work_specification.delay') + @mock.patch('lofardata.task.tasks.define_work_specification.delay') def test_create_group_batch_size(self, mock_delay): mock_delay.return_value = self.delay_return_value obs_ids = ['123', '456'] @@ -79,7 +79,7 @@ class WorkSpecificationCreationDatabaseInteraction(dtest.TestCase): @mock.patch("lofardata.view_helpers.specification.set_post_submit_values") - @mock.patch('lofardata.tasks.define_work_specification.delay') + @mock.patch('lofardata.task.tasks.define_work_specification.delay') def test_create_group_with_post_submit_values(self, mock_delay, post_submit_mock): mock_delay.return_value = self.delay_return_value create_work_specifications_for_group(self.group, self.user, ["123"], 1) diff --git a/ldvspec/lofardata/tests/test_parse_surl.py b/ldvspec/lofardata/tests/test_parse_surl.py deleted file mode 100644 index 1c02f58ef6744f0df4749c852aa29a2958f97cba..0000000000000000000000000000000000000000 --- a/ldvspec/lofardata/tests/test_parse_surl.py +++ /dev/null @@ -1,36 +0,0 @@ -import unittest - -from lofardata.tasks import _parse_surl - - -class ParseSurls(unittest.TestCase): - def test_parse_surl_project_anything_then_underscore_then_digits(self): - actual = _parse_surl('srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc4_030/402998/L402998_summaryIS_f5baa34a.tar') - self.assertEqual(actual['project'], 'lc4_030') - - - def test_parse_surl_project_anything_then_digits(self): - actual = _parse_surl('srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/commissioning2012/59765/L59765_SAP002_B000_S0_P000_bf.raw_02d36e06.tar') - self.assertEqual(actual['project'], 'commissioning2012') - - - def test_parse_surl_project_only_digits(self): - actual = _parse_surl( - 'srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/123456/59765/L59765_SAP002_B000_S0_P000_bf.raw_02d36e06.tar') - self.assertEqual(actual['project'], '123456') - - def test_parse_surl_project_anything(self): - actual = _parse_surl('srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/1@!_$52aaP/402998/L402998_summaryIS_f5baa34a.tar') - self.assertEqual(actual['project'], '1@!_$52aaP') - - def test_parse_surl_project_empty(self): - with self.assertRaises(AttributeError): - _parse_surl('srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects//402998/L402998_summaryIS_f5baa34a.tar') - - def test_parse_surl_no_project(self): - with self.assertRaises(AttributeError): - _parse_surl('srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/L402998_summaryIS_f5baa34a.tar') - - def test_parse_surl_no_project_but_something_else(self): - with self.assertRaises(AttributeError): - _parse_surl('srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/software/RUG_build_2015.tar') diff --git a/ldvspec/lofardata/tests/test_queryset.py b/ldvspec/lofardata/tests/test_queryset.py index d1f8ba340c39555da8c1e266e320d502604e5638..12b312456102671e2c707aa5e2dfac7559cba654 100644 --- a/ldvspec/lofardata/tests/test_queryset.py +++ b/ldvspec/lofardata/tests/test_queryset.py @@ -22,9 +22,9 @@ class TestQuerySet(dtest.TestCase): ] } - WorkSpecification.objects.get_or_create(pk=1, submission_status=SUBMISSION_STATUS.NOT_SUBMITTED, + WorkSpecification.objects.get_or_create(pk=1, submission_status=SUBMISSION_STATUS.READY, async_task_result=1) - WorkSpecification.objects.get_or_create(pk=2, submission_status=SUBMISSION_STATUS.NOT_SUBMITTED, + WorkSpecification.objects.get_or_create(pk=2, submission_status=SUBMISSION_STATUS.READY, inputs=input_with_surls, async_task_result=1) WorkSpecification.objects.get_or_create(pk=3, submission_status=SUBMISSION_STATUS.DEFINING, async_task_result=1) WorkSpecification.objects.get_or_create(pk=4, submission_status=SUBMISSION_STATUS.DEFINING, diff --git a/ldvspec/lofardata/tests/test_split_entries.py b/ldvspec/lofardata/tests/test_split_entries.py deleted file mode 100644 index 300db5501e26386839423b9a0815a59877615d29..0000000000000000000000000000000000000000 --- a/ldvspec/lofardata/tests/test_split_entries.py +++ /dev/null @@ -1,27 +0,0 @@ -import unittest - -from lofardata.tasks import split_entries_to_batches - - -class SplitEntries(unittest.TestCase): - def test_no_splitting(self): - res = split_entries_to_batches([*range(10)], 0) - self.assertListEqual(res, [[*range(10)]]) - - def test_splitting_empty_array(self): - res = split_entries_to_batches([], 2) - self.assertListEqual(res, []) - - def test_splitting_bs1(self): - res = split_entries_to_batches([*range(10)], 1) - self.assertEqual(len(res), 10) - # TODO: check contents - - def test_splitting_exact_match(self): - res = split_entries_to_batches([*range(9)], 3) - self.assertEqual(len(res), 3) - - def test_splitting_with_left_over(self): - res = split_entries_to_batches([*range(10)], 3) - self.assertEqual(len(res), 4) - self.assertEqual(len(res[-1]), 1) diff --git a/ldvspec/lofardata/tests/test_task_helpers.py b/ldvspec/lofardata/tests/test_task_helpers.py new file mode 100644 index 0000000000000000000000000000000000000000..36dc1038e5458b86b4111ea397a9ba976bd4d68e --- /dev/null +++ b/ldvspec/lofardata/tests/test_task_helpers.py @@ -0,0 +1,383 @@ +import unittest +from unittest import mock + +import lofardata.task.task_helpers as helpers +from lofardata.task.custom_exceptions import RequestNotOk, InvalidLocation, InvalidPredecessor, WorkSpecificationNoSite, \ + WorkerResponseNotOk, ATDBKaput, InvalidSurl +from lofardata.task.session_store import SessionStore + +from lofardata.models import WorkSpecification, PURGE_POLICY, SUBMISSION_STATUS, ATDBProcessingSite + +from lofardata.tests.util import mocked_delay, SessionMock + + +class ParseSurls(unittest.TestCase): + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def setUp(self, mock_delay): + self.work_spec = WorkSpecification.objects.create() + + def test_project_anything_then_underscore_then_digits(self): + actual = helpers.parse_surl( + 'srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc4_030/402998/L402998_summaryIS_f5baa34a.tar', self.work_spec) + self.assertEqual(actual['project'], 'lc4_030') + + def test_project_anything_then_digits(self): + actual = helpers.parse_surl( + 'srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/commissioning2012/59765/L59765_SAP002_B000_S0_P000_bf.raw_02d36e06.tar', self.work_spec) + self.assertEqual(actual['project'], 'commissioning2012') + + def test_project_only_digits(self): + actual = helpers.parse_surl( + 'srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/123456/59765/L59765_SAP002_B000_S0_P000_bf.raw_02d36e06.tar', self.work_spec) + self.assertEqual(actual['project'], '123456') + + def test_project_anything(self): + actual = helpers.parse_surl( + 'srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/1@!_$52aaP/402998/L402998_summaryIS_f5baa34a.tar', self.work_spec) + self.assertEqual(actual['project'], '1@!_$52aaP') + + def test_project_empty(self): + with self.assertRaises(InvalidSurl): + helpers.parse_surl( + 'srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects//402998/L402998_summaryIS_f5baa34a.tar', self.work_spec) + + def test_no_project(self): + with self.assertRaises(InvalidSurl): + helpers.parse_surl( + 'srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/L402998_summaryIS_f5baa34a.tar', self.work_spec) + + def test_no_project_but_something_else(self): + with self.assertRaises(InvalidSurl): + helpers.parse_surl('srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/software/RUG_build_2015.tar', self.work_spec) + + def test_no_valid_location(self): + with self.assertRaises(InvalidLocation): + helpers.parse_surl('srm://data/lofar/ops/projects/lc4_030/402998/L402998_summaryIS_f5baa34a.tar', self.work_spec) + + def test_valid_location_sara(self): + actual = helpers.parse_surl( + 'srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc4_030/402998/L402998_summaryIS_f5baa34a.tar', self.work_spec) + self.assertEqual(actual['location'], 'srm.grid.sara.nl') + + def test_valid_location_jeulich(self): + actual = helpers.parse_surl( + 'srm://lofar-srm.fz-juelich.de:8443/pnfs/fz-juelich.de/data/lofar/ops/projects/lt10_002/682020/L682020_SAP000_B120_S1_P000_bf.h5_091c1e78.tar', self.work_spec) + self.assertEqual(actual['location'], 'lofar-srm.fz-juelich.de') + + def test_valid_location_poznan(self): + actual = helpers.parse_surl( + 'srm://lta-head.lofar.psnc.pl:8443/lofar/ops/projects/lt10_002/698739/L698739_SAP000_B045_S1_P000_bf.h5_4a2eb67a.tar', self.work_spec) + self.assertEqual(actual['location'], 'lta-head.lofar.psnc.pl') + + +class SplitEntries(unittest.TestCase): + def test_no_splitting(self): + with self.assertRaises(RequestNotOk): + helpers.split_entries_to_batches([*range(10)], 0) + + def test_splitting_empty_array(self): + res = helpers.split_entries_to_batches([], 2) + self.assertListEqual(res, []) + + def test_splitting_bs1(self): + res = helpers.split_entries_to_batches([*range(10)], 1) + self.assertEqual(len(res), 10) + # TODO: check contents + + def test_splitting_exact_match(self): + res = helpers.split_entries_to_batches([*range(9)], 3) + self.assertEqual(len(res), 3) + + def test_splitting_with_left_over(self): + res = helpers.split_entries_to_batches([*range(10)], 3) + self.assertEqual(len(res), 4) + self.assertEqual(len(res[-1]), 1) + + +class SendToATDB(unittest.TestCase): + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def setUp(self, _): + self.atdb_site = ATDBProcessingSite(url="http://x-men.com", access_token="wolverine-37") + self.work_spec = WorkSpecification.objects.create() + + @mock.patch.object(SessionStore, 'get_session', return_value=SessionMock()) + def test_send_to_atdb_without_response_task_ids(self, _): + session = SessionStore.get_session() + payload = {"atdb_kaput": True} + self.assertRaises(ATDBKaput, + lambda: helpers.send_to_atdb(payload, session, self.atdb_site, self.work_spec)) + self.work_spec.refresh_from_db() + self.assertEqual(self.work_spec.submission_status, SUBMISSION_STATUS.ERROR) + self.assertEqual(self.work_spec.atdb_task_ids, []) + + @mock.patch.object(SessionStore, 'get_session', return_value=SessionMock()) + def test_send_to_atdb_error(self, _): + session = SessionStore.get_session() + payload = {"inputs": []} + self.assertRaises(WorkerResponseNotOk, + lambda: helpers.send_to_atdb(payload, session, self.atdb_site, self.work_spec)) + self.work_spec.refresh_from_db() + self.assertEqual(self.work_spec.submission_status, SUBMISSION_STATUS.ERROR) + + @mock.patch.object(SessionStore, 'get_session', return_value=SessionMock()) + def test_send_to_atdb_with_task_ids(self, mock_session): + atdb_task_id = 37 + session = SessionStore.get_session() + payload = {"test_arg": atdb_task_id} + helpers.send_to_atdb(payload, session, self.atdb_site, self.work_spec) + self.assertEqual(self.work_spec.atdb_task_ids, [atdb_task_id]) + + @mock.patch.object(SessionStore, 'get_session', return_value=SessionMock()) + def test_send_to_atdb_adding_task_ids(self, mock_session): + atdb_task_id_1 = 73 + self.work_spec.atdb_task_ids = [atdb_task_id_1] + atdb_task_id_2 = 37 + session = SessionStore.get_session() + payload = {"test_arg": atdb_task_id_2} + helpers.send_to_atdb(payload, session, self.atdb_site, self.work_spec) + self.assertEqual(self.work_spec.atdb_task_ids, [atdb_task_id_1, atdb_task_id_2]) + + +class CreatePayload(unittest.TestCase): + + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def setUp(self, mock_delay): + self.work_spec = WorkSpecification.objects.create() + self.work_spec.purge_policy = PURGE_POLICY.NO + self.work_spec.selected_workflow = "MyCoolWorkflow" + self.work_spec.save() + mock_delay.assert_called_with(self.work_spec.id) + + def test_create_payload_basic(self): + surl = "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/86574/L86574_SAP000_SB139_uv.MS_bc088c70.tar" + size = 97884160 + batch = [{"size": size, "surl": surl}] + + actual = helpers.create_payload(batch, self.work_spec) + expected = { + "project": "lc0_012", + "sas_id": "86574", + "task_type": "regular", + "filter": f"ldv-spec:{self.work_spec.pk}", + "purge_policy": self.work_spec.purge_policy, + "new_status": "defining", + "new_workflow_uri": self.work_spec.selected_workflow, + "size_to_process": size, + "inputs": [{"size": size, "surl": surl, "type": "File", "location": "srm.grid.sara.nl"}], + } + self.assertDictEqual(actual, expected, "Invalid payload for basic testcase") + + @unittest.skip("FIXME: SDC-905") + def test_create_payload_multiple_locations(self): + surl_one = "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/86574/L86574_SAP000_SB139_uv.MS_bc088c70.tar" + size_one = 97884160 + surl_two = "srm://lta-head.lofar.psnc.pl:8443/lofar/ops/projects/lt10_002/698739/L698739_SAP000_B045_S1_P000_bf.h5_4a2eb67a.tar" + size_two = 1234567 + batch = [{"size": size_one, "surl": surl_one}, {"size": size_two, "surl": surl_two}] + + actual = helpers.create_payload(batch, self.work_spec) + expected = { + "project": "lc0_012", + "sas_id": "86574", + "task_type": "regular", + "filter": f"ldv-spec:{self.work_spec.pk}", + "purge_policy": self.work_spec.purge_policy, + "new_status": "defining", + "new_workflow_uri": self.work_spec.selected_workflow, + "size_to_process": size_one + size_two, + "inputs": [{"size": size_one, "surl": surl_one, "type": "File", "location": "srm.grid.sara.nl"}, + {"size": size_two, "surl": surl_two, "type": "File", "location": "lta-head.lofar.psnc.pl"}], + } + self.assertDictEqual(actual, expected, "Invalid payload for multiple locations testcase") + + def test_create_payload_multiple_projects(self): + # Placeholder #FIXME: SDC-905 + pass + + def test_create_payload_multiple_sas_ids(self): + # Placeholder #FIXME: SDC-905 + pass + + def test_create_payload_with_predecessor(self): + surl = "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/86574/L86574_SAP000_SB139_uv.MS_bc088c70.tar" + size = 97884160 + batch = [{"size": size, "surl": surl}] + predecessor = 42 + + actual = helpers.create_payload(batch, self.work_spec, predecessor) + expected = { + "project": "lc0_012", + "sas_id": "86574", + "task_type": "regular", + "filter": f"ldv-spec:{self.work_spec.pk}", + "purge_policy": self.work_spec.purge_policy, + "new_status": "defining", + "new_workflow_uri": self.work_spec.selected_workflow, + "size_to_process": size, + "inputs": [{"size": size, "surl": surl, "type": "File", "location": "srm.grid.sara.nl"}], + "predecessor": predecessor + } + self.assertDictEqual(actual, expected, "Invalid payload with predecessor") + + def test_create_payload_empty_batch(self): + batch = [] + + self.assertRaises(RequestNotOk, lambda: helpers.create_payload(batch, self.work_spec)) + + +class GetATDBPredecessorTaskId(unittest.TestCase): + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def test_get_atdb_predecessor_task_id_no_predecessor(self, mock_delay): + self.work_spec = WorkSpecification.objects.create() + self.work_spec.save() + + mock_delay.assert_called_with(self.work_spec.id) + + actual = helpers.get_atdb_predecessor_task_id(self.work_spec) + expected = None + self.assertEqual(actual, expected, "Retrieving task ids of not existing predecessor is not None") + + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def test_get_atdb_predecessor_task_id_basic(self, mock_delay): + self.predecessor = WorkSpecification.objects.create() + self.predecessor.atdb_task_ids = [1] + self.predecessor.save() + mock_delay.assert_called_with(self.predecessor.id) + + self.work_spec = WorkSpecification.objects.create() + self.work_spec.predecessor_specification = self.predecessor + self.work_spec.save() + mock_delay.assert_called_with(self.work_spec.id) + + actual = helpers.get_atdb_predecessor_task_id(self.work_spec) + expected = 1 + self.assertEqual(actual, expected, "Retrieving task id of not existing predecessor is incorrect") + + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def test_get_atdb_predecessor_task_id_missing_task_ids(self, mock_delay): + self.predecessor = WorkSpecification.objects.create() + self.predecessor.save() + mock_delay.assert_called_with(self.predecessor.id) + + self.work_spec = WorkSpecification.objects.create() + self.work_spec.predecessor_specification = self.predecessor + self.work_spec.save() + mock_delay.assert_called_with(self.work_spec.id) + + self.assertRaises(InvalidPredecessor, lambda: helpers.get_atdb_predecessor_task_id(self.work_spec)) + + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def test_get_atdb_predecessor_task_multiple_ids(self, mock_delay): + self.predecessor = WorkSpecification.objects.create() + self.predecessor.atdb_task_ids = [1, 2] + self.predecessor.save() + mock_delay.assert_called_with(self.predecessor.id) + + self.work_spec = WorkSpecification.objects.create() + self.work_spec.predecessor_specification = self.predecessor + self.work_spec.save() + + mock_delay.assert_called_with(self.work_spec.id) + mock_delay.assert_called_with(self.work_spec.id) + + self.assertRaises(InvalidPredecessor, lambda: helpers.get_atdb_predecessor_task_id(self.work_spec)) + + +class GetProcessingSite(unittest.TestCase): + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def test_get_processing_site_basic(self, mock_delay): + self.work_spec = WorkSpecification.objects.create() + self.site = ATDBProcessingSite() + self.site.save() + self.work_spec.processing_site = self.site + self.work_spec.save() + + mock_delay.assert_called_with(self.work_spec.id) + + actual = helpers.get_processing_site(self.work_spec) + expected = self.site + self.assertEqual(actual, expected, "Retrieving basic atdb processing site did not succeed") + + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def test_get_processing_site_missing_site(self, mock_delay): + self.work_spec = WorkSpecification.objects.create() + self.work_spec.save() + mock_delay.assert_called_with(self.work_spec.id) + + self.assertRaises(WorkSpecificationNoSite, lambda: helpers.get_processing_site(self.work_spec)) + + +class GetSurls(unittest.TestCase): + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def test_get_surls_single_entry(self, mock_delay): + self.work_spec = WorkSpecification.objects.create() + entry = [{"size": 3773, + "surl": 'srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc4_030/402998/L402998_summaryIS_f5baa34a.tar', + "type": "File", "location": "srm.grid.sara.nl"}] + self.work_spec.inputs = {"surls": entry} + self.work_spec.save() + + mock_delay.assert_called_with(self.work_spec.id) + + actual = helpers.get_surls(self.work_spec) + expected = entry + self.assertListEqual(actual, expected, "Retrieving single surl did not succeed") + + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def test_get_surls_multiple_entries(self, mock_delay): + self.work_spec = WorkSpecification.objects.create() + entry_list = [{"size": 3773, + "surl": 'srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc4_030/402998/L402998_summaryIS_f5baa34a.tar', + "type": "File", "location": "srm.grid.sara.nl"}, + {"size": 1234, + "surl": 'srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc4_030/402998/wolverine_is_cool.tar', + "type": "File", "location": "srm.grid.sara.nl"}] + self.work_spec.inputs = {"surls": entry_list} + self.work_spec.save() + + mock_delay.assert_called_with(self.work_spec.id) + + actual = helpers.get_surls(self.work_spec) + expected = entry_list + self.assertListEqual(actual, expected, "Retrieving multiple surls did not succeed") + + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def test_get_surls_no_inputs(self, mock_delay): + self.work_spec = WorkSpecification.objects.create() + self.work_spec.save() + mock_delay.assert_called_with(self.work_spec.id) + + self.assertRaises(RequestNotOk, lambda: helpers.get_surls(self.work_spec)) + + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def test_get_surls_no_surls(self, mock_delay): + self.work_spec = WorkSpecification.objects.create() + self.work_spec.inputs = {"batman": [{"size": 123, "type": "File", "location": "srm.grid.sara.nl"}]} + self.work_spec.save() + mock_delay.assert_called_with(self.work_spec.id) + + self.assertRaises(RequestNotOk, lambda: helpers.get_surls(self.work_spec)) + + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def test_get_surls_empty(self, mock_delay): + self.work_spec = WorkSpecification.objects.create() + entry = [] + self.work_spec.inputs = {"surls": entry} + self.work_spec.save() + + mock_delay.assert_called_with(self.work_spec.id) + self.assertRaises(RequestNotOk, lambda: helpers.get_surls(self.work_spec)) + +class SetErroredWorkSpecification(unittest.TestCase): + + @mock.patch('lofardata.task.tasks.define_work_specification.delay', side_effect=mocked_delay) + def setUp(self, mock_delay): + self.work_spec = WorkSpecification.objects.create() + self.work_spec.save() + mock_delay.assert_called_with(self.work_spec.id) + + def test_basic_error_case(self): + message = "Foo is Bar" + helpers.set_errored_work_specification(self.work_spec, RequestNotOk, IndexError(message)) + self.assertEqual(self.work_spec.submission_status, SUBMISSION_STATUS.ERROR) + self.assertEqual(self.work_spec.error_message, RequestNotOk.default_detail + ":\n" + message) diff --git a/ldvspec/lofardata/tests/test_tasks.py b/ldvspec/lofardata/tests/test_tasks.py new file mode 100644 index 0000000000000000000000000000000000000000..87ec3ce20ace5367971c3e382604d98cdf5ac328 --- /dev/null +++ b/ldvspec/lofardata/tests/test_tasks.py @@ -0,0 +1,105 @@ +from unittest import mock + +import django.test as dtest +from lofardata.models import SUBMISSION_STATUS, ATDBProcessingSite, WorkSpecification, DataProduct +from lofardata.task.tasks import insert_task_into_atdb, define_work_specification +from lofardata.task.session_store import SessionStore + +from lofardata.tests.util import mocked_delay, SessionMock + +SessionStore.get_session() + + +@mock.patch("lofardata.task.tasks.define_work_specification.delay", side_effect=mocked_delay) +class DefineWorkSpecification(dtest.TestCase): + def setUp(self): + self.site: ATDBProcessingSite = ATDBProcessingSite.objects.create( + name="x-men", url="https://marvel.com/x-men/", access_token="Wolverine" + ) + + self.work_spec: WorkSpecification = WorkSpecification(id=1, processing_site=self.site, + filters={"obs_id": "1337"}) + self.dataproduct = dict(obs_id='1337', oid_source='SAS', dataproduct_source='lofar', + dataproduct_type='observation', + project='LT10_10_Classified', location='lta-head.lofar.psnc.pl', + activity='secret stuff', + surl='srm://lta-head.lofar.psnc.pl:4884/x-men/wolverine.tar', filesize=40, + antenna_set='HBA Dual Inner', instrument_filter='110-190 MHz', dysco_compression=False) + DataProduct.objects.create(**self.dataproduct) + + def test_statuses(self, mock_delay): + self.assertEqual(self.work_spec.submission_status, SUBMISSION_STATUS.LOADING) + self.work_spec.save() + define_work_specification(1) + self.work_spec.refresh_from_db() + self.assertEqual(self.work_spec.submission_status, SUBMISSION_STATUS.READY) + + def test_update_inputs(self, mock_delay): + self.work_spec.save() + define_work_specification(1) + self.work_spec.refresh_from_db() + self.assertEqual(self.work_spec.inputs, + {'surls': [{'size': 40, 'surl': 'srm://lta-head.lofar.psnc.pl:4884/x-men/wolverine.tar'}]}) + + # add another dataproduct for the same filter + self.dataproduct['surl'] = 'srm://lta-head.lofar.psnc.pl:4884/x-men/hulk.tar' + self.dataproduct['filesize'] = 120 + DataProduct.objects.create(**self.dataproduct) + + self.work_spec.save() + define_work_specification(1) + self.work_spec.refresh_from_db() + self.assertEqual(self.work_spec.inputs, + {'surls': [{'size': 120, 'surl': 'srm://lta-head.lofar.psnc.pl:4884/x-men/hulk.tar'}, + {'size': 40, 'surl': 'srm://lta-head.lofar.psnc.pl:4884/x-men/wolverine.tar'}]}) + + +@mock.patch.object(SessionStore, 'get_session', return_value=SessionMock()) +@mock.patch("lofardata.task.tasks.define_work_specification.delay", side_effect=mocked_delay) +@mock.patch("lofardata.task.tasks.insert_task_into_atdb.delay") +class InsertTaskIntoATDB(dtest.TestCase): + @mock.patch("lofardata.task.tasks.define_work_specification.delay", side_effect=mocked_delay) + def setUp(self, mock_delay): + self.site: ATDBProcessingSite = ATDBProcessingSite.objects.create( + name="x-men", url="https://marvel.com/x-men/", access_token="Wolverine" + ) + + test_values = dict(obs_id='1337', oid_source='SAS', dataproduct_source='lofar', + dataproduct_type='observation', + project='lc0_012', + location='lta-head.lofar.psnc.pl', + activity='observation', + surl='srm://lta-head.lofar.psnc.pl:123/pnfs/grid.sara.nl/data/lofar/ops/projects/lc0_012/1337/very_nice.tar', + filesize=40, + antenna_set="HBA Dual Inner", + instrument_filter="110-190 MHz", + dysco_compression=True) + + dp = DataProduct(**test_values) + dp.save() + + self.work_spec: WorkSpecification = WorkSpecification.objects.create(id=1, processing_site=self.site, + filters={"obs_id": "1337"}) + define_work_specification(1) + self.work_spec.refresh_from_db() + + def test_statuses(self, session_mock, mock_delay, mock_insert): + self.assertEqual(self.work_spec.submission_status, SUBMISSION_STATUS.READY) + insert_task_into_atdb(1) + self.work_spec.refresh_from_db() + self.assertEqual(self.work_spec.submission_status, SUBMISSION_STATUS.SUBMITTED) + + def test_with_batch_size(self, session_mock, mock_delay, mock_insert): + self.work_spec.batch_size = 1 + insert_task_into_atdb(1) + self.work_spec.refresh_from_db() + self.assertEqual(self.work_spec.submission_status, SUBMISSION_STATUS.SUBMITTED) + + def test_with_predecessor(self, session_mock, mock_delay, mock_insert): + predecessor: WorkSpecification = WorkSpecification.objects.create(id=2, processing_site=self.site, + filters={"obs_id": "1337"}) + + self.work_spec.predecessor_specification = predecessor + insert_task_into_atdb(1) + self.work_spec.refresh_from_db() + self.assertEqual(self.work_spec.submission_status, SUBMISSION_STATUS.SUBMITTED) diff --git a/ldvspec/lofardata/tests/test_workrequest.py b/ldvspec/lofardata/tests/test_workrequest.py index 89a614415e78484e52cdad2da1cf5db8cfccb618..780087805df88862540194c4d00f4fbfc5754f20 100644 --- a/ldvspec/lofardata/tests/test_workrequest.py +++ b/ldvspec/lofardata/tests/test_workrequest.py @@ -1,11 +1,9 @@ import unittest.mock -import django.test as dtest -import lofardata.tasks as tasks -import rest_framework.status as response_status +import lofardata.task.tasks as tasks import rest_framework.test as rtest from django.contrib.auth import get_user_model -from lofardata.models import DataProduct, WorkSpecification +from lofardata.models import DataProduct, WorkSpecification, SUBMISSION_STATUS # NOTE: Instead of specifying the exact addresses, you could use `reverse` from Django @@ -16,7 +14,7 @@ class TestWorkSpecificationRequest(rtest.APITestCase): self.user = get_user_model().objects.create_superuser('admin') self.client.force_authenticate(self.user) - @unittest.mock.patch('lofardata.tasks.define_work_specification.delay') + @unittest.mock.patch('lofardata.task.tasks.define_work_specification.delay') def test_insert_work_request(self, mocked_task): # Mocking business mocked_task.return_value = unittest.mock.MagicMock() @@ -36,11 +34,11 @@ class TestWorkSpecificationRequest(rtest.APITestCase): self.assertEqual('asyncresultid', inserted_work_specification['async_task_result']) tasks.define_work_specification(inserted_work_specification['id']) work_spec_object = WorkSpecification.objects.get(pk=inserted_work_specification['id']) - self.assertTrue(work_spec_object.is_ready) + self.assertEqual(work_spec_object.submission_status, SUBMISSION_STATUS.READY) self.assertEqual({'surls': [{"size": 123456, "surl": 'srm://lofarlta/mynice_file.tar.gz'}]}, work_spec_object.inputs) - @unittest.mock.patch('lofardata.tasks.define_work_specification.delay') + @unittest.mock.patch('lofardata.task.tasks.define_work_specification.delay') def test_insert_work_request_nested_fields(self, mocked_task): # Mocking business mocked_task.return_value = unittest.mock.MagicMock() @@ -65,6 +63,6 @@ class TestWorkSpecificationRequest(rtest.APITestCase): self.assertEqual('asyncresultid', inserted_work_specification['async_task_result']) tasks.define_work_specification(inserted_work_specification['id']) work_spec_object = WorkSpecification.objects.get(pk=inserted_work_specification['id']) - self.assertTrue(work_spec_object.is_ready) + self.assertEqual(work_spec_object.submission_status, SUBMISSION_STATUS.READY) self.assertEqual({'surls': [{"size": 123456, "surl": 'srm://lofarlta/mynice_file_with_rhythm.tar.gz'}]}, work_spec_object.inputs) diff --git a/ldvspec/lofardata/tests/test_workspecification_editable.py b/ldvspec/lofardata/tests/test_workspecification_editable.py index ef359462724d9b8a768f25859261716cb9ffa656..f6015dd3574520eae667cc9ba9b276a1e75b1154 100644 --- a/ldvspec/lofardata/tests/test_workspecification_editable.py +++ b/ldvspec/lofardata/tests/test_workspecification_editable.py @@ -11,10 +11,14 @@ class WorkSpecificationEditable(unittest.TestCase): specification = WorkSpecification(submission_status=SUBMISSION_STATUS.DEFINING) self.assertFalse(specification.is_editable()) - def test_editable_not_submitted(self): - specification = WorkSpecification(submission_status=SUBMISSION_STATUS.NOT_SUBMITTED) + def test_editable_ready(self): + specification = WorkSpecification(submission_status=SUBMISSION_STATUS.READY) self.assertTrue(specification.is_editable()) def test_editable_error(self): specification = WorkSpecification(submission_status=SUBMISSION_STATUS.ERROR) - self.assertTrue(specification.is_editable()) \ No newline at end of file + self.assertTrue(specification.is_editable()) + + def test_not_editable_loading(self): + specification = WorkSpecification(submission_status=SUBMISSION_STATUS.LOADING) + self.assertFalse(specification.is_editable()) \ No newline at end of file diff --git a/ldvspec/lofardata/tests/util.py b/ldvspec/lofardata/tests/util.py index 7d58fa2a80e15f284390268efff3f66ac165bec6..bd78e314cbdd32ee4369886f7977de369a5eb4e3 100644 --- a/ldvspec/lofardata/tests/util.py +++ b/ldvspec/lofardata/tests/util.py @@ -8,13 +8,39 @@ from lofardata.models import WorkSpecification, AsyncResult def mocked_delay(*args) -> AsyncResult: """Mock the delay used in saving a WorkSpecification""" ws: WorkSpecification = WorkSpecification.objects.get(pk=args[0]) - ws.is_ready = True ws.async_task_result = 42 # prevents infinite recursion in task ws.save() return AsyncResult(id="Test", backend=None) +class SessionMock(): + + # Example get (not yet used) + # def get(self, *args, **kwargs): + # url = args[0] + # + # return MockResponse({}, 200) + + def post(self, *args, **kwargs): + """Mocked ATDB API""" + + data = kwargs.get("json", {}) + + if data.get("test_arg"): + return MockResponse({"id": data["test_arg"]}, 201) + + if data.get("atdb_kaput"): + return MockResponse({}, 201) + + # Tests for Empty Inputs + if len(data.get("inputs", [])) == 0: + return MockResponse({}, 400) + + # Successful creation of task + return MockResponse({"id": 42}, 201) + + class MockResponse: """Mocked requests.Response class @@ -69,7 +95,7 @@ class MockResponse: """ def __init__( - self, json_data: Optional[dict], status_code: int, text: Optional[str] = None + self, json_data: Optional[dict], status_code: int, text: Optional[str] = None ): self.json_data = json_data self.status_code = status_code diff --git a/ldvspec/lofardata/view_helpers/specification.py b/ldvspec/lofardata/view_helpers/specification.py index 06f0f480317cf11f26c3992407656b02aea7b179..60e887182b443da81bb7eeff12719db41aacf533 100644 --- a/ldvspec/lofardata/view_helpers/specification.py +++ b/ldvspec/lofardata/view_helpers/specification.py @@ -7,7 +7,6 @@ from lofardata.models import Group, WorkSpecification def set_post_submit_values(specification, user): specification.async_task_result = None - specification.is_ready = False if specification.created_by is None: specification.created_by = user diff --git a/ldvspec/lofardata/view_helpers/submit.py b/ldvspec/lofardata/view_helpers/submit.py index 7669e801716d1614cad862db9db95fafaa4b6ee2..b092956503671a44bc283315c80e102802b07e0e 100644 --- a/ldvspec/lofardata/view_helpers/submit.py +++ b/ldvspec/lofardata/view_helpers/submit.py @@ -1,12 +1,11 @@ from typing import List -from lofardata.tasks import insert_task_into_atdb -import time +from lofardata.task.tasks import insert_task_into_atdb -def submit_single_to_atdb(workspecification_id: int, delay:int = 2): - submit_list_to_atdb([workspecification_id], delay) -def submit_list_to_atdb(workspecification_ids: List[int], delay:int=2): +def submit_single_to_atdb(workspecification_id: int): + submit_list_to_atdb([workspecification_id]) + + +def submit_list_to_atdb(workspecification_ids: List[int]): for workspecification_id in workspecification_ids: insert_task_into_atdb.delay(workspecification_id) - - time.sleep(delay) \ No newline at end of file diff --git a/ldvspec/lofardata/views/workspecification/template.py b/ldvspec/lofardata/views/workspecification/template.py index eee055cddc18ccc1dfaa251dea0be56970e69252..c1ca46c5bea4a62ab3bf611106004f291cc4262a 100644 --- a/ldvspec/lofardata/views/workspecification/template.py +++ b/ldvspec/lofardata/views/workspecification/template.py @@ -22,7 +22,7 @@ class WorkSpecificationCreateUpdateView(LoginRequiredMixin, CanEditWorkSpecifica @silk_profiler(name="Create/update work specification") def get(self, request, *args, **kwargs): specification = self.get_object() - if specification.is_editable(): + if specification.is_new() or specification.is_editable(): return super().get(request, *args, **kwargs) else: return redirect('specification-detail', self.kwargs["pk"])