Skip to content
Snippets Groups Projects

Sdc 1607 bugfix discard repurpose aggregation

Merged Nico Vermaas requested to merge SDC-1607-bugfix-discard-repurpose-aggregation into master
4 files
+ 31
7
Compare changes
  • Side-by-side
  • Inline
Files
4
import logging;
import logging;
from django.conf import settings
from django.conf import settings
from django.db.models import Sum
from .common import State, AggregationStrategy, VERIFIED_STATUSSES, PROCESSED_STATUSSES, INGEST_FRACTION_STATUSSES, \
from .common import State, AggregationStrategy, VERIFIED_STATUSSES, PROCESSED_STATUSSES, INGEST_FRACTION_STATUSSES, \
UPDATE_ARCHIVE_STATUSSES, ACTIVITY_RESET_STATUSSEN
UPDATE_ARCHIVE_STATUSSES, ACTIVITY_RESET_STATUSSEN
from taskdatabase.models import Task, Activity, Configuration
from taskdatabase.models import Task, Activity, Configuration
@@ -228,7 +228,7 @@ def update_processed_and_aggregate(task):
@@ -228,7 +228,7 @@ def update_processed_and_aggregate(task):
if not ('fail' in activity.status):
if not ('fail' in activity.status):
# create a new 'aggregate_task' that is used to collect the aggregated output
# create a new 'aggregate_task' that is used to collect the aggregated output
# this has to be done only once, so this is a good place to do it.
# this has to be done only once, so this is a good place to do it.
if Task.objects.filter(sas_id=task.sas_id,task_type='aggregation').count()==0:
if Task.objects.filter(sas_id=task.sas_id,task_type='aggregation').exclude(status__icontains='discard').count()==0:
create_aggregation_task(task)
create_aggregation_task(task)
logger.info(f'- created aggregation task: {task.id}')
logger.info(f'- created aggregation task: {task.id}')
@@ -329,6 +329,27 @@ def update_service_filter(task):
@@ -329,6 +329,27 @@ def update_service_filter(task):
activity.service_filter = task.service_filter
activity.service_filter = task.service_filter
activity.save()
activity.save()
 
def update_discard(task):
 
"""
 
when all tasks of this sas_id are discarded, then reset the activity
 
because the most probably usecase is that it will be used again by a new batch of tasks
 
 
"""
 
activity = task.activity
 
 
for t in Task.objects.filter(sas_id=task.sas_id,task_type='regular'):
 
if t.status not in [State.DISCARD.value,State.DISCARDED.value]:
 
return
 
 
# everything is DISCARDED
 
activity.project = None
 
activity.filter = None
 
activity.status = "reset"
 
activity.is_verified = False
 
activity.is_processed = False
 
activity.is_aggregated = False
 
activity.service_filter = None
 
activity.save()
def update_activity(task):
def update_activity(task):
"""
"""
@@ -378,4 +399,7 @@ def update_activity(task):
@@ -378,4 +399,7 @@ def update_activity(task):
update_changed_fields(task)
update_changed_fields(task)
# check if this activity has a 'service_filter', if not, use the one from the task
# check if this activity has a 'service_filter', if not, use the one from the task
update_service_filter(task)
update_service_filter(task)
\ No newline at end of file
 
# if all tasks of this activity are discarded, then reset the activity
 
update_discard(task)
 
\ No newline at end of file
Loading