Skip to content
Snippets Groups Projects
Commit 68a52c1f authored by Fanna Lautenbach's avatar Fanna Lautenbach
Browse files

Merge branch 'pair-prog-session-change-task-logic' into 'main'

Task logic refactoring

See merge request !82
parents 370e2ef0 1d20fdc6
No related branches found
No related tags found
1 merge request!82Task logic refactoring
Pipeline #45272 passed
Showing
with 374 additions and 306 deletions
...@@ -50,8 +50,8 @@ docker-build: ...@@ -50,8 +50,8 @@ docker-build:
fi fi
- echo "Build using $CI_REGISTRY_IMAGE/base:$HASH" - echo "Build using $CI_REGISTRY_IMAGE/base:$HASH"
# Try pulling the existing image for layer reuse; || true to ignore if it does not exist # Try pulling the existing image for layer reuse; || true to ignore if it does not exist
- docker pull $CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG || true #- docker build --cache-from $CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG --build-arg BASE_IMAGE="$CI_REGISTRY_IMAGE/base:$HASH" --pull -t "$CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG" ldvspec
- docker build --cache-from $CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG --build-arg BASE_IMAGE="$CI_REGISTRY_IMAGE/base:$HASH" --pull -t "$CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG" ldvspec - docker build --pull -t "$CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG" ldvspec
- docker push "$CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG" - docker push "$CI_REGISTRY_IMAGE:$DOCKER_IMAGE_TAG"
integration-test: integration-test:
......
...@@ -168,8 +168,6 @@ Vary: Accept ...@@ -168,8 +168,6 @@ Vary: Accept
"inputs": null, "inputs": null,
"selected_workflow": "imaging_compress_pipeline_v02", "selected_workflow": "imaging_compress_pipeline_v02",
"related_tasks": null, "related_tasks": null,
"is_ready": false,
"is_defined": false,
"async_task_result": "99622e7b-71f0-4f05-826d-23c13846642d", "async_task_result": "99622e7b-71f0-4f05-826d-23c13846642d",
"created_by": 1, "created_by": 1,
"processing_site": null "processing_site": null
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
Library OperatingSystem Library OperatingSystem
Library SeleniumLibrary Library SeleniumLibrary
Resource resource/keywords.robot Resource resource/keywords.robot
Resource resource/variables.robot
Suite Setup Setup ATDB and LDVSPEC Suite Setup Setup ATDB and LDVSPEC
# Keyword used for suite setup, local to this test suite # Keyword used for suite setup, local to this test suite
...@@ -26,6 +25,7 @@ Setup ATDB and LDVSPEC ...@@ -26,6 +25,7 @@ Setup ATDB and LDVSPEC
Test create and submit a work specification Test create and submit a work specification
[Tags] Workspecification [Tags] Workspecification
Login to LDV Spec Admin Interface
${id}= Add Work Specification 5 ${id}= Add Work Specification 5
Submit Work Specification ${id} Submit Work Specification ${id}
......
...@@ -25,7 +25,7 @@ services: ...@@ -25,7 +25,7 @@ services:
restart: always restart: always
atdb-backend: atdb-backend:
container_name: atdb.backend container_name: atdb.backend #Since there is URL validation in ATDB, it needs to contain a dot
image: git.astron.nl:5000/astron-sdc/atdb-ldv:latest image: git.astron.nl:5000/astron-sdc/atdb-ldv:latest
environment: environment:
DATABASE_HOST: atdb-db DATABASE_HOST: atdb-db
......
...@@ -62,6 +62,15 @@ Submit Work Specification ...@@ -62,6 +62,15 @@ Submit Work Specification
Submit Work Specification 1 Submit Work Specification 1
[Teardown] Close Browser [Teardown] Close Browser
Check if Task Exists in ATDB
[Tags] Keyword
[Documentation] ***Description***
... Prerequisite: submission of work specification with ID 1 succeeded
Login To ATDB Admin Interface
Check if Task Exists in ATDB ldv-spec:1
[Teardown] Close Browser
Add Group Add Group
[Tags] Keyword [Tags] Keyword
Login to LDV Spec Admin Interface Login to LDV Spec Admin Interface
......
...@@ -35,8 +35,8 @@ Submit Work Specification ...@@ -35,8 +35,8 @@ Submit Work Specification
Go To ${LDVSPEC} Go To ${LDVSPEC}
Click Button //*[@data-test-id="submit-to-atdb-${specification_id}"] Click Button //*[@data-test-id="submit-to-atdb-${specification_id}"]
Go To ${LDVSPEC}specification/${specification_id}/ Go To ${LDVSPEC}specification/${specification_id}/
Element Should Not Contain //*[@data-test-id="submission-status"] error Element Should Not Contain //*[@data-test-id="submission-status"] Error
Wait Until Element Contains //*[@data-test-id="submission-status"] defining timeout=30s Wait Until Element Contains //*[@data-test-id="submission-status"] Submitted timeout=30s
Add Work Specification Add Work Specification
[Arguments] ${obs_id} [Arguments] ${obs_id}
......
# Build arg to select the correct base image # Build arg to select the correct base image
ARG BASE_IMAGE=git.astron.nl:5000/astron-sdc/ldv-specification/base:latest FROM python:3.10
FROM ${BASE_IMAGE} ENV PYTHONUNBUFFERED 1
# Main dependencies
RUN apt-get update && apt-get install -y postgresql-client build-essential
# Make sure we are in the source directory # Make sure we are in the source directory
WORKDIR /src WORKDIR /src
......
...@@ -42,7 +42,8 @@ services: ...@@ -42,7 +42,8 @@ services:
ldv-specification-background: ldv-specification-background:
container_name: ldv-specification-background container_name: ldv-specification-background
image: git.astron.nl:5000/astron-sdc/ldv-specification:${LDVSPEC_VERSION:-latest} # image: git.astron.nl:5000/astron-sdc/ldv-specification:${LDVSPEC_VERSION:-latest}
image: ldv-specification-background
build: build:
context: .. context: ..
dockerfile: ./Dockerfile dockerfile: ./Dockerfile
......
...@@ -25,3 +25,5 @@ CACHES = { ...@@ -25,3 +25,5 @@ CACHES = {
'LOCATION': f'{os.environ.get("CACHE_HOST_SERVER", "localhost")}:{os.environ.get("CACHE_HOST_PORT", "11211")}', 'LOCATION': f'{os.environ.get("CACHE_HOST_SERVER", "localhost")}:{os.environ.get("CACHE_HOST_PORT", "11211")}',
} }
} }
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'http')
\ No newline at end of file
...@@ -35,6 +35,6 @@ class DataProductFilterSet(DynamicFilterSet): ...@@ -35,6 +35,6 @@ class DataProductFilterSet(DynamicFilterSet):
def get_submittable_workspecifications(workspecfications: QuerySet) -> List[WorkSpecification]: def get_submittable_workspecifications(workspecfications: QuerySet) -> List[WorkSpecification]:
workspecifications_with_surls = workspecfications.filter(inputs__surls__0__isnull=False) workspecifications_with_surls = workspecfications.filter(inputs__surls__0__isnull=False)
workspecifications_with_submittable_status = workspecifications_with_surls.filter( workspecifications_with_submittable_status = workspecifications_with_surls.filter(
submission_status__in=[SUBMISSION_STATUS.NOT_SUBMITTED, SUBMISSION_STATUS.ERROR]) submission_status__in=[SUBMISSION_STATUS.READY, SUBMISSION_STATUS.ERROR])
workspecifications_with_surl_data = [x for x in workspecifications_with_submittable_status if x.get_file_info[0] != 0] workspecifications_with_surl_data = [x for x in workspecifications_with_submittable_status if x.get_file_info[0] != 0]
return workspecifications_with_surl_data return workspecifications_with_surl_data
\ No newline at end of file
# Generated by Django 4.1.5 on 2023-02-03 10:39
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('lofardata', '0020_alter_workspecification_predecessor_set_null'),
]
operations = [
migrations.RenameField(
model_name='workspecification',
old_name='related_tasks',
new_name='atdb_task_ids',
),
migrations.RemoveField(
model_name='workspecification',
name='is_defined',
),
migrations.RemoveField(
model_name='workspecification',
name='is_ready',
),
migrations.AddField(
model_name='workspecification',
name='error_message',
field=models.CharField(blank=True, max_length=500, null=True),
),
migrations.AlterField(
model_name='workspecification',
name='submission_status',
field=models.CharField(choices=[('L', 'loading'), ('R', 'ready'), ('D', 'defining'), ('S', 'submitted'), ('E', 'error')], default='L', max_length=1),
),
]
...@@ -92,13 +92,15 @@ class ATDBProcessingSite(models.Model): ...@@ -92,13 +92,15 @@ class ATDBProcessingSite(models.Model):
class SUBMISSION_STATUS(models.TextChoices): class SUBMISSION_STATUS(models.TextChoices):
"""Status of Work Specifcation to ATDB""" """Status of Work Specification"""
# The surls and belonging information are being retrieved
NOT_SUBMITTED = "N", _("not submitted") LOADING = "L", _("loading")
# Tasks are ready to submit to ATDB
READY = "R", _("ready")
# Task are still being submitted to ATDB
DEFINING = "D", _("defining")
# Tasks are submitted, check ATDB for current status # Tasks are submitted, check ATDB for current status
SUBMITTED = "S", _("submitted") SUBMITTED = "S", _("submitted")
# Task need to be submitted (can change predecessor etc...)
DEFINING = "D", _("defining")
# Check log, retry submission # Check log, retry submission
ERROR = "E", _("error") ERROR = "E", _("error")
...@@ -140,14 +142,10 @@ class WorkSpecification(models.Model): ...@@ -140,14 +142,10 @@ class WorkSpecification(models.Model):
selected_workflow_tag = models.CharField(max_length=500, null=True, default="Unknown") selected_workflow_tag = models.CharField(max_length=500, null=True, default="Unknown")
# Task ID's that were created in ATDB # Task ID's that were created in ATDB
related_tasks = ArrayField(models.IntegerField(), null=True, blank=True) atdb_task_ids = ArrayField(models.IntegerField(), null=True, blank=True)
predecessor_specification = models.ForeignKey('self', null=True, on_delete=models.SET_NULL, predecessor_specification = models.ForeignKey('self', null=True, on_delete=models.SET_NULL,
related_name='successor', blank=True) related_name='successor', blank=True)
# The query for gathering files has been executed
is_ready = models.BooleanField(default=False)
is_defined = models.BooleanField(default=False)
# Should automatically submit the task after defining # Should automatically submit the task after defining
is_auto_submit = models.BooleanField(default=False) is_auto_submit = models.BooleanField(default=False)
...@@ -195,9 +193,12 @@ class WorkSpecification(models.Model): ...@@ -195,9 +193,12 @@ class WorkSpecification(models.Model):
""" """
return True return True
def is_new(self) -> bool:
return self.pk is None
def is_editable(self) -> bool: def is_editable(self) -> bool:
""" Indicates whether this specification is editable (i.e., it has not been submitted yet)""" """ Indicates whether this specification is editable (i.e., it has not been submitted yet)"""
return self.submission_status != SUBMISSION_STATUS.SUBMITTED and self.submission_status != SUBMISSION_STATUS.DEFINING return self.submission_status != SUBMISSION_STATUS.SUBMITTED and self.submission_status != SUBMISSION_STATUS.DEFINING and self.submission_status != SUBMISSION_STATUS.LOADING
@property @property
def get_file_info(self): def get_file_info(self):
...@@ -213,9 +214,11 @@ class WorkSpecification(models.Model): ...@@ -213,9 +214,11 @@ class WorkSpecification(models.Model):
submission_status = models.CharField( submission_status = models.CharField(
max_length=1, max_length=1,
choices=SUBMISSION_STATUS.choices, choices=SUBMISSION_STATUS.choices,
default=SUBMISSION_STATUS.NOT_SUBMITTED, default=SUBMISSION_STATUS.LOADING,
) )
error_message = models.CharField(max_length=500, null=True, blank=True)
def save( def save(
self, force_insert=False, force_update=False, using=None, update_fields=None self, force_insert=False, force_update=False, using=None, update_fields=None
): ):
...@@ -226,7 +229,7 @@ class WorkSpecification(models.Model): ...@@ -226,7 +229,7 @@ class WorkSpecification(models.Model):
update_fields=update_fields, update_fields=update_fields,
) )
if self.async_task_result is None: if self.async_task_result is None:
from lofardata.tasks import define_work_specification from lofardata.task.tasks import define_work_specification
res: AsyncResult = define_work_specification.delay(self.pk) res: AsyncResult = define_work_specification.delay(self.pk)
self.async_task_result = res.id self.async_task_result = res.id
......
from rest_framework.exceptions import APIException class TaskException(BaseException):
default_detail = ""
default_code = ""
class RequestNotOk(APIException): class UnknownError(TaskException): # When this happens, fix it in the task_helpers class (+ add the test)
status_code = 422 default_detail = 'Something has gone wrong but it is unclear what. Please contact the helpdesk.'
default_code = 'Bad request'
class RequestNotOk(TaskException):
default_detail = 'You are trying to submit a work specification with invalid data. Have you messed around in the data? If not, please contact team rainbow.' default_detail = 'You are trying to submit a work specification with invalid data. Have you messed around in the data? If not, please contact team rainbow.'
default_code = 'Unprocessable Entity: invalid data' default_code = 'Unprocessable Entity: invalid data'
class WorkSpecificationNoSite(APIException): class WorkerResponseNotOk(TaskException):
status_code = 422 default_detail = "ATDB wil not handle this request. Let team rainbow inspect the worker logs."
default_code = 'I am a teapot: ATDB error response'''
class WorkSpecificationNoSite(TaskException):
default_detail = 'You are trying to submit a work specification without a target processing site. We cannot send it into outer space but if you did not want that in the first place, please contact team rainbow.' default_detail = 'You are trying to submit a work specification without a target processing site. We cannot send it into outer space but if you did not want that in the first place, please contact team rainbow.'
default_code = 'Unprocessable Entity: no site' default_code = 'Bad request: no processing site'
class InvalidPredecessor(APIException): class InvalidPredecessor(TaskException):
status_code = 422
default_detail = 'You are trying to submit a work specification where the predecessor is faulty or no longer exists. Blame team rainbow for manually hacking around in the database.' default_detail = 'You are trying to submit a work specification where the predecessor is faulty or no longer exists. Blame team rainbow for manually hacking around in the database.'
default_code = 'Unprocessable Entity: invalid predecessor' default_code = 'Unprocessable Entity: invalid predecessor'
class InvalidLocation(TaskException):
default_detail = 'You are trying to submit a work specification where the location is not Sara, Jeulich or Poznan. This should never happen so there is something wrong with the used LTA data. Please contact team rainbow and/or an operator.'
default_code = 'Unprocessable Entity: invalid processing location'
class ATDBKaput(TaskException):
default_detail = "ATDB didn't give a task response while this should have happened. This is a problem with ATDB and not LDV spec."
default_code = 'I am a teapot: ATDB kaput'''
class InvalidSurl(TaskException):
default_detail = 'You are trying to submit a work specification with an invalid surl. It should at least consist of a project, a sas_id and a location.'
default_code = 'Unprocessable Entity: invalid data'
import requests
class SessionStore:
"""requests.Session Singleton"""
_session = None
@classmethod
def get_session(cls) -> requests.Session:
if cls._session is None:
cls._session = requests.Session()
return cls._session
import re
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
from lofardata.models import (
SUBMISSION_STATUS,
ATDBProcessingSite,
WorkSpecification,
)
from lofardata.task.token_auth import TokenAuth
from lofardata.task.custom_exceptions import RequestNotOk, InvalidPredecessor, InvalidLocation, WorkSpecificationNoSite, \
WorkerResponseNotOk, TaskException, ATDBKaput, InvalidSurl
def parse_surl(surl: str, work_spec: WorkSpecification) -> dict:
parsed = urlparse(surl)
host = parsed.hostname
path = parsed.path
pattern = r"^.*/projects\/(?P<project>.+)\/(?P<sas_id>\w\d*)\/"
try:
data = re.match(pattern, path).groupdict()
except Exception as error:
set_errored_work_specification(work_spec, InvalidSurl, error)
raise InvalidSurl
valid_locations = ['srm.grid.sara.nl', 'lofar-srm.fz-juelich.de', 'lta-head.lofar.psnc.pl']
if host in valid_locations:
data["location"] = host
return data
else:
set_errored_work_specification(work_spec, InvalidLocation)
raise InvalidLocation('location= {}'.format(host))
def create_atdb_inputs_file(entries, location):
inputs = [
{
"size": entry["size"],
"surl": entry["surl"],
"type": "File",
"location": location,
}
for entry in entries
]
return inputs
def split_entries_to_batches(surls: List[Any], batch_size: int) -> List[List[Any]]:
"""Split the list of entries into batches of at most `batch_size` entries"""
if batch_size == 0:
raise RequestNotOk()
n_surls = len(surls)
# NOTE: Think about using file size instead of amount of files
batches: List[List[Any]] = []
num_batches = n_surls // batch_size
num_batches += 1 if n_surls % batch_size else 0
for n in range(num_batches):
batches.append(surls[n * batch_size: (n + 1) * batch_size])
return batches
def send_to_atdb(payload, sess, site, work_spec):
response = sess.post(site.url + "tasks/", json=payload, auth=TokenAuth(site.access_token))
if not response.ok:
set_errored_work_specification(work_spec, WorkerResponseNotOk, None)
raise WorkerResponseNotOk
if work_spec.atdb_task_ids is None:
work_spec.atdb_task_ids = []
try:
response_task_id = response.json()["id"]
work_spec.atdb_task_ids.append(response_task_id)
except Exception as error:
set_errored_work_specification(work_spec, ATDBKaput, error)
raise ATDBKaput
def create_payload(batch, work_spec, atdb_predecessor_task_id=None) -> dict:
# Parse a single surl for info:
# project, sas_id & location
# This does assume that a task consists of at most 1 project, sas_id and location! #FIXME SDC-905
if len(batch) < 1:
set_errored_work_specification(work_spec, RequestNotOk)
raise RequestNotOk('Cannot create a payload for an empty batch')
parsed = parse_surl(batch[0]["surl"], work_spec)
project_id = parsed["project"]
sas_id = parsed["sas_id"]
payload = {
"project": project_id,
"sas_id": sas_id,
"task_type": "regular",
"filter": f"ldv-spec:{work_spec.pk}",
"purge_policy": work_spec.purge_policy,
"new_status": "defining",
"new_workflow_uri": work_spec.selected_workflow,
"size_to_process": sum([e["size"] for e in batch]),
"inputs": (create_atdb_inputs_file(batch, parsed["location"])),
}
if atdb_predecessor_task_id:
payload["predecessor"] = atdb_predecessor_task_id
return payload
def get_atdb_predecessor_task_id(work_spec):
# Task ID of the predecessor
atdb_predecessor_task_id: int | None = None
if work_spec.predecessor_specification is not None:
predecessor: WorkSpecification = work_spec.predecessor_specification
# Should only be 1 entry
if predecessor.atdb_task_ids is None or len(predecessor.atdb_task_ids) != 1:
raise InvalidPredecessor()
atdb_predecessor_task_id = predecessor.atdb_task_ids[0]
return atdb_predecessor_task_id
def get_processing_site(work_spec):
site: Optional[ATDBProcessingSite] = work_spec.processing_site
if site is None:
set_errored_work_specification(work_spec, WorkSpecificationNoSite, None)
raise WorkSpecificationNoSite
return site
def get_surls(work_spec) -> List[dict]:
if work_spec.inputs is None:
set_errored_work_specification(work_spec, RequestNotOk)
raise RequestNotOk("There is no 'inputs' for this work specification with id: {}".format(work_spec.id))
inputs: Dict[str, Any] = work_spec.inputs.copy()
try:
entries: List[dict] = inputs.pop("surls")
if len(entries) < 1:
raise RequestNotOk("There are no surls known for this work specification with id: {}".format(work_spec.id))
return entries
except Exception as err:
set_errored_work_specification(work_spec, RequestNotOk, err)
raise RequestNotOk
def set_errored_work_specification(work_spec: WorkSpecification, exception: TaskException, error: BaseException = None):
work_spec.submission_status = SUBMISSION_STATUS.ERROR
work_spec.error_message = exception.default_detail + ":\n" + str(error)[:len(exception.default_detail) - 1]
work_spec.save()
from lofardata.models import (
SUBMISSION_STATUS,
ATDBProcessingSite,
DataProduct,
WorkSpecification,
)
from ldvspec.celery import app
from lofardata.task.session_store import SessionStore
from lofardata.task.task_helpers import get_processing_site, get_surls, split_entries_to_batches, \
get_atdb_predecessor_task_id, create_payload, send_to_atdb, set_errored_work_specification
from lofardata.task.custom_exceptions import UnknownError, TaskException, RequestNotOk
@app.task
def define_work_specification(workspecification_id):
specification = WorkSpecification.objects.get(pk=workspecification_id)
filters = specification.filters
dataproducts = DataProduct.objects.filter(**filters).order_by("surl")
inputs = {
"surls": [
{"surl": dataproduct.surl, "size": dataproduct.filesize}
for dataproduct in dataproducts
]
}
if specification.inputs is None:
specification.inputs = inputs
else:
specification.inputs.update(inputs)
specification.submission_status = SUBMISSION_STATUS.READY
specification.save()
@app.task
def insert_task_into_atdb(workspecification_id: int):
"""This creates the task in ATDB and set's it to defining"""
sess = SessionStore.get_session()
work_spec: WorkSpecification = WorkSpecification.objects.get(
pk=workspecification_id
)
work_spec.submission_status = SUBMISSION_STATUS.DEFINING
work_spec.save()
try:
site = get_processing_site(work_spec)
surls = get_surls(work_spec)
if work_spec.batch_size > 0:
batches = split_entries_to_batches(surls, work_spec.batch_size)
else:
batches = [surls]
atdb_predecessor_task_id = get_atdb_predecessor_task_id(work_spec)
for batch in batches:
payload = create_payload(batch, work_spec, atdb_predecessor_task_id)
send_to_atdb(payload, sess, site, work_spec)
work_spec.submission_status = SUBMISSION_STATUS.SUBMITTED
work_spec.save()
except TaskException as task_error:
raise task_error
except Exception as err: # When this happens, fix it in the task_helpers class (+ add the test)
set_errored_work_specification(work_spec, UnknownError, err)
import requests
from requests.auth import AuthBase
class TokenAuth(AuthBase):
"""Basic Token Auth
Adds a: `Authorization: Token <token>` header"""
def __init__(self, token: str):
self._token = token
def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest:
r.headers["Authorization"] = f"Token {self._token}"
return r
\ No newline at end of file
import re
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
import requests
from celery.utils.log import get_task_logger
from lofardata.models import (
SUBMISSION_STATUS,
ATDBProcessingSite,
DataProduct,
WorkSpecification,
)
from requests.auth import AuthBase
from requests.exceptions import RequestException
from ldvspec.celery import app
from lofardata.custom_exceptions import RequestNotOk, WorkSpecificationNoSite, InvalidPredecessor
logger = get_task_logger(__name__)
class SessionStore:
"""requests.Session Singleton"""
_session = None
@classmethod
def get_session(cls) -> requests.Session:
if cls._session == None:
cls._session = requests.Session()
return cls._session
@app.task
def define_work_specification(workspecification_id):
specification = WorkSpecification.objects.get(pk=workspecification_id)
filters = specification.filters
dataproducts = DataProduct.objects.filter(**filters).order_by("surl")
inputs = {
"surls": [
{"surl": dataproduct.surl, "size": dataproduct.filesize}
for dataproduct in dataproducts
]
}
if specification.inputs is None:
specification.inputs = inputs
else:
specification.inputs.update(inputs)
specification.is_ready = True
specification.save()
def _parse_surl(surl: str) -> dict:
parsed = urlparse(surl)
host = parsed.hostname
path = parsed.path
pattern = r"^.*/projects\/(?P<project>.+)\/(?P<sas_id>\w\d*)\/"
data = re.match(pattern, path).groupdict()
data["location"] = host
return data
def _prepare_request_payload(
entries: List[dict],
filter_id: str,
workflow_url: str,
purge_policy: str = "no",
predecessor: int = None,
optional_parameters: Dict[str, Any] = None,
):
# Parse a single surl for info:
# project, sas_id & location
# This does assume that a task consists of at most 1 project, sas_id and location!
parsed = _parse_surl(entries[0]["surl"])
project_id = parsed["project"]
sas_id = parsed["sas_id"]
inputs = [
{
"size": e["size"],
"surl": e["surl"],
"type": "File",
"location": parsed["location"],
}
for e in entries
]
if optional_parameters:
inputs = {**optional_parameters, "surls": inputs}
data = {
"project": project_id,
"sas_id": sas_id,
"task_type": "regular",
"filter": filter_id,
"purge_policy": purge_policy,
"new_status": "defining",
"new_workflow_uri": workflow_url,
"size_to_process": sum([e["size"] for e in entries]),
"inputs": inputs,
}
if predecessor:
data["predecessor"] = predecessor
return data
class TokenAuth(AuthBase):
"""Basic Token Auth
Adds a: `Authorization: Token <token>` header"""
def __init__(self, token: str):
self._token = token
def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest:
r.headers["Authorization"] = f"Token {self._token}"
return r
def split_entries_to_batches(entries: List[Any], batch_size: int) -> List[List[Any]]:
"""Split the list of entries into batches of at most `batch_size` entries"""
n_entries = len(entries)
# NOTE: Think about using file size instead of amount of files
batches: List[List[Any]] = []
if batch_size == 0:
batches.append(entries)
elif batch_size > 0:
# Calculate amount of required batches
num_batches = n_entries // batch_size
num_batches += 1 if n_entries % batch_size else 0
for n in range(num_batches):
batches.append(entries[n * batch_size: (n + 1) * batch_size])
return batches
@app.task
def insert_task_into_atdb(workspecification_id: int):
"""This creates the task in ATDB and set's it to defining"""
sess = SessionStore.get_session()
work_spec: WorkSpecification = WorkSpecification.objects.get(
pk=workspecification_id
)
inputs: Dict[str, Any] = work_spec.inputs.copy()
try:
entries: List[dict] = inputs.pop("surls")
except IndexError:
logger.error("No surls")
work_spec.submission_status = SUBMISSION_STATUS.ERROR
raise RequestNotOk()
batches = split_entries_to_batches(entries, work_spec.batch_size)
site: Optional[ATDBProcessingSite] = work_spec.processing_site
if site is None:
raise WorkSpecificationNoSite()
url = site.url + "tasks/"
# Task ID of the predecessor
atdb_predecessor_task_id: int | None = None
if work_spec.predecessor_specification is not None:
predecessor: WorkSpecification = work_spec.predecessor_specification
if len(predecessor.related_tasks) != 1:
raise InvalidPredecessor()
# Should only be 1 entry
atdb_predecessor_task_id = predecessor.related_tasks[0]
try:
for batch in batches:
try:
payload = _prepare_request_payload(
entries=batch,
optional_parameters=inputs,
filter_id=f"ldv-spec:{work_spec.pk}",
workflow_url=work_spec.selected_workflow,
purge_policy=work_spec.purge_policy,
predecessor=atdb_predecessor_task_id,
)
except AttributeError:
raise RequestNotOk()
res = sess.post(url, json=payload, auth=TokenAuth(site.access_token))
if not res.ok:
logger.error("Request not ok: {}".format(res.text))
raise RequestNotOk()
# Store ATDB Task ID in related_tasks
if work_spec.related_tasks is None:
work_spec.related_tasks = []
data = res.json()
work_spec.related_tasks.append(data["id"])
# All went well
work_spec.submission_status = SUBMISSION_STATUS.DEFINING
if work_spec.is_auto_submit:
set_tasks_defined.delay(workspecification_id)
except (RequestException, RequestNotOk) as err:
work_spec.submission_status = SUBMISSION_STATUS.ERROR
logger.error(err)
finally:
work_spec.save()
def update_related_tasks(
work_spec: WorkSpecification,
delete: bool,
data: Optional[dict],
on_success_status: SUBMISSION_STATUS,
):
sess = SessionStore.get_session()
site: Optional[ATDBProcessingSite] = work_spec.processing_site
if site is None:
raise WorkSpecificationNoSite()
url = site.url + "tasks/"
task_ids: List[int] = work_spec.related_tasks
try:
for task_id in task_ids:
if delete:
res = sess.delete(
url + str(task_id) + "/", auth=TokenAuth(site.access_token)
)
else:
res = sess.put(
url + str(task_id) + "/",
json=data,
auth=TokenAuth(site.access_token),
)
if not res.ok:
raise RequestNotOk()
# All went well
work_spec.submission_status = on_success_status
if delete:
work_spec.related_tasks = None
except (RequestException, RequestNotOk):
work_spec.submission_status = SUBMISSION_STATUS.ERROR
finally:
work_spec.save()
@app.task
def set_tasks_defined(workspecification_id: int):
"""This sets tasks to defined so they can be picked up by ATDB services"""
work_spec: WorkSpecification = WorkSpecification.objects.get(
pk=workspecification_id
)
if work_spec.submission_status != SUBMISSION_STATUS.DEFINING:
raise ValueError("Invalid WorkSpecification state")
update_related_tasks(
work_spec, False, {"new_status": "defined"}, SUBMISSION_STATUS.SUBMITTED
)
@app.task
def delete_tasks_from_atdb(workspecification_id: int):
"""Removes related tasks from ATDB (for retrying)"""
work_spec: WorkSpecification = WorkSpecification.objects.get(
pk=workspecification_id
)
update_related_tasks(work_spec, True, None, SUBMISSION_STATUS.NOT_SUBMITTED)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment