Skip to content
Snippets Groups Projects
Commit 198dc76f authored by Nico Vermaas's avatar Nico Vermaas
Browse files

create aggregation task

parent 563b529e
No related branches found
No related tags found
1 merge request!361SDC-1423 use aggregation strategy
Pipeline #89608 failed
...@@ -8,7 +8,7 @@ import json ...@@ -8,7 +8,7 @@ import json
import logging import logging
from .services import calculated_qualities as qualities from .services import calculated_qualities as qualities
from .services.common import State, AggregationStrategy from .services.common import State, AggregationStrategy, ACTIVITY_RESET_STATUSSEN
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -224,8 +224,8 @@ class Task(models.Model): ...@@ -224,8 +224,8 @@ class Task(models.Model):
# so that it won't be picked up again. # so that it won't be picked up again.
# TODO: only activate when the aggregator service actually picks this up # TODO: only activate when the aggregator service actually picks this up
#self.new_status = State.AGGREGATE.value self.new_status = State.AGGREGATE.value
pass #pass
except Exception as error: except Exception as error:
...@@ -260,6 +260,9 @@ class Task(models.Model): ...@@ -260,6 +260,9 @@ class Task(models.Model):
if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value): if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value):
self.handle_aggregation() self.handle_aggregation()
if self.status in ACTIVITY_RESET_STATUSSEN:
self.is_aggregated = False
# check in the outputs if this task should be considered to be summary task # check in the outputs if this task should be considered to be summary task
self.is_summary = check_if_summary(self) self.is_summary = check_if_summary(self)
......
...@@ -45,7 +45,7 @@ class TaskWriteSerializer(serializers.ModelSerializer): ...@@ -45,7 +45,7 @@ class TaskWriteSerializer(serializers.ModelSerializer):
model = Task model = Task
fields = ('id','task_type','filter','predecessor','successors', fields = ('id','task_type','filter','predecessor','successors',
'joined_output_task', 'joined_output_task',
'project','sas_id','priority','purge_policy','cleanup_policy','resume', 'project','sas_id','priority','purge_policy','cleanup_policy','resume','is_aggregated',
'new_workflow_id','new_workflow_uri','workflow', 'new_workflow_id','new_workflow_uri','workflow',
'stage_request_id', 'stage_request_id',
'status','new_status','quality','calculated_qualities', 'status','new_status','quality','calculated_qualities',
......
...@@ -178,7 +178,7 @@ def update_processed_and_aggregate(task): ...@@ -178,7 +178,7 @@ def update_processed_and_aggregate(task):
current_is_processed = activity.is_processed current_is_processed = activity.is_processed
activity.is_processed = True activity.is_processed = True
non_discarded_found = False non_discarded_found = False
for t in Task.objects.filter(sas_id=task.sas_id): for t in Task.objects.filter(sas_id=task.sas_id,task_type='regular'):
if t.status not in PROCESSED_STATUSSES: if t.status not in PROCESSED_STATUSSES:
activity.is_processed = False activity.is_processed = False
break break
...@@ -202,7 +202,9 @@ def update_processed_and_aggregate(task): ...@@ -202,7 +202,9 @@ def update_processed_and_aggregate(task):
if (task.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value): if (task.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value):
# check if the activity is ready to collect H5 data # check if the activity is ready to collect H5 data
if (not activity.is_aggregated and activity.status != State.COLLECTING_DATA.value): if (not activity.is_aggregated and
activity.status != State.COLLECTING_DATA.value and
activity.status != State.AGGREGATE.value):
# check if there is already a storage_location, if not, add it. # check if there is already a storage_location, if not, add it.
if not activity.storage_location: if not activity.storage_location:
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
{% include 'taskdatabase/pagination.html' %} {% include 'taskdatabase/pagination.html' %}
</div> </div>
</div> </div>
<p class="footer"> Version 29 Jul 2024 <p class="footer"> Version 30 Jul 2024
</div> </div>
{% include 'taskdatabase/refresh.html' %} {% include 'taskdatabase/refresh.html' %}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment