Skip to content
Snippets Groups Projects
Commit aa271d12 authored by Nico Vermaas's avatar Nico Vermaas
Browse files

connect predecessor - successors

parent ad862dca
No related branches found
No related tags found
1 merge request!394MAM-44 build specification functionality
Pipeline #112999 failed
......@@ -69,11 +69,17 @@ class Specification(models.Model):
"self",
null=True,
on_delete=models.SET_NULL,
related_name="successor",
related_name="successors",
blank=True,
)
def __str__(self):
return str(self.id)
def save(self, *args, **kwargs):
user = kwargs.pop('user', None)
if user:
self.created_by = user
# do the specification thing!
update_specification(self)
super(Specification, self).save(*args, **kwargs)
\ No newline at end of file
......@@ -140,7 +140,8 @@ class Task(models.Model):
blank=True)
def __str__(self):
return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id)
return str(self.id)
#return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id)
def count_dataproducts(self):
try:
......
......@@ -8,6 +8,11 @@ class SpecificationReadSerializer(serializers.ModelSerializer):
required=False,
)
successors = serializers.StringRelatedField(
many=True,
required=False,
)
class Meta:
model = Specification
fields = "__all__"
......@@ -25,26 +30,50 @@ class SpecificationWriteSerializer(serializers.ModelSerializer):
def create(self, validated_data):
specification = Specification.objects.create(**validated_data)
instance = Specification.objects.create(**validated_data)
# if a specification workflow is already attached, like selected from the REST API, then use it.
# another use-case is that a 'workflow_uri' is posted by an external service, if that is the case, use that.
if not specification.workflow:
if not instance.workflow:
try:
workflow_uri = self.initial_data['workflow_uri']
if workflow_uri:
try:
workflow = Workflow.objects.get(workflow_uri=workflow_uri)
specification.workflow = workflow
instance.workflow = workflow
except:
specification.status = "error"
specification.message = "workflow does not exist"
instance.status = "error"
instance.message = "workflow does not exist"
except:
specification.status = "error"
specification.message = "invalid specification"
instance.status = "error"
instance.message = "invalid specification"
specification.save()
return specification
# if a predecessor specification is already linked, then use it.
# otherwise construct the link using 'predecessor_id', at least when that is available (not required).
if not instance.predecessor:
try:
if 'predecessor_id' in self.initial_data:
predecessor_id = self.initial_data['predecessor_id']
try:
predecessor_specification = Specification.objects.get(id=predecessor_id)
instance.predecessor = predecessor_specification
except:
instance.status = "error"
instance.message = "predecessor specification does not exist"
except:
instance.status = "error"
instance.message = "invalid specification"
# if create_by is not filled in, then fill in the user that creates this specification
# note: when this is automated, then that user will probably be the 'admin' user who's token is used
created_by = instance.created_by
if not created_by:
user = self.context['request'].user
# save the new specification
instance.save(user=user)
return instance
class ActivitySerializer(serializers.ModelSerializer):
......
......@@ -7,18 +7,29 @@ from ..common import SpecificationStatus
logger = logging.getLogger(__name__)
def get_predecessor_task_id(specification):
# Task ID of the predecessor
predecessor_task_id = None
def get_predecessor_task(specification):
# get the task_id of the first (only) task of the predecessor specification
from taskdatabase.models import Task # preventing circular imports
logger.info(f'get_predecessor_task {specification.id}')
predecessor_task = None
if specification.predecessor is not None:
predecessor_specification = specification.predecessor
# find the first task that was created for the predecessor_specification
# Should only be 1 entry
if predecessor_specification.task_ids is None or len(predecessor_specification.task_ids) != 1:
raise InvalidPredecessor()
predecessor_task_id = predecessor_specification.task_ids[0]
return predecessor_task_id
predecessor_tasks = Task.objects.filter(specification=predecessor_specification)
predecessor_task = predecessor_tasks[0]
if len(predecessor_tasks) != 1:
specification.status = "error"
specification.message = "predecessor specification has multiple tasks, only 1 allowed"
return None
return predecessor_task
def create_batches(surls, batch_size):
......@@ -78,7 +89,7 @@ def parse_surl(surl, specification):
return data
def create_payload(batch, specification, predecessor_task_id=None):
def create_payload(batch, specification, predecessor_task=None):
"""
create the json payload for a task to be created
(copied from ldvspec, and adapted)
......@@ -98,14 +109,13 @@ def create_payload(batch, specification, predecessor_task_id=None):
"purge_policy": specification.purge_policy,
"status": specification.initial_task_status,
"new_status": specification.initial_task_status,
# "new_workflow_uri": specification.workflow.workflow_uri,
"workflow": specification.workflow,
"size_to_process": sum([e["size"] for e in batch]),
"inputs": inputs,
}
if predecessor_task_id:
payload["predecessor"] = predecessor_task_id
if predecessor_task:
payload["predecessor"] = predecessor_task
return payload
......@@ -124,10 +134,10 @@ def define_tasks(specification):
surls = specification.inputs["surls"]
batches = create_batches(surls, specification.batch_size)
predecessor_task_id = get_predecessor_task_id(specification)
predecessor_task = get_predecessor_task(specification)
for batch in batches:
payload = create_payload(batch, specification, predecessor_task_id)
payload = create_payload(batch, specification, predecessor_task)
# create a new task with this payload... save to ensure that it gets a primary key
task = Task(**payload)
......@@ -136,10 +146,10 @@ def define_tasks(specification):
task.specification = specification
task.save()
specification.status = SpecificationStatus.DEFINED.value
specification.status = "defined"
except Exception as error:
specification.status = SpecificationStatus.ERROR.value
specification.status = "error"
specification.message = "incorrect inputs"
specification.save()
......@@ -155,5 +165,5 @@ def undefine_tasks(specification):
tasks_to_delete = Task.objects.filter(specification=specification)
tasks_to_delete.delete()
specification.status = SpecificationStatus.CLAIMED.value
specification.status = "claimed"
specification.save()
from enum import Enum
import logging
from django.contrib.auth.models import User
from .define_tasks import *
from .input_validation import *
from ..common import SpecificationStatus
......
......@@ -31,7 +31,7 @@
{% include 'taskdatabase/pagination.html' %}
</div>
</div>
<p class="footer"> Version 3 Apr 2025</p>
<p class="footer"> Version 4 Apr 2025</p>
</div>
{% include 'taskdatabase/refresh.html' %}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment