Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
activities_handler.py 15.55 KiB
import logging;
from django.conf import settings
from django.db.models import Sum
from .common import State, AggregationStrategy, VERIFIED_STATUSSES, PROCESSED_STATUSSES, INGEST_FRACTION_STATUSSES, \
UPDATE_ARCHIVE_STATUSSES, ACTIVITY_RESET_STATUSSEN
from taskdatabase.models import Task, Activity, Configuration
status_list = settings.STATUSSES_WITH_DATA
logger = logging.getLogger(__name__)
def calculate_ingested_fraction(this_task):
"""
This 'property' of a task returns the fraction of queued/ingested tasks per SAS_ID
and a list of statusses of other tasks belonging to the same SAS_ID.
It is implemented as 'property', because then it can be used in html pages like this:
<td>{{ task.sasid_ingested_fraction.status }}</td>
<td>{{ task.sasid_ingested_fraction.completion }}%</td>
A selection of statusses are considered 'queued', and another selection is considered 'ingested'.
The division of those 2 are returned as a 'completed %'.
A limited list of statusses for the other tasks that belong to this SAS_ID is also returned.
"""
result = {}
statusses = {'scrubbed': 0, 'pre_archiving': 0, 'pre_archived': 0,'archiving': 0, 'archived': 0,
'finishing': 0, 'finished': 0, 'suspended': 0, 'discarded': 0,
'pre_archived_failed': 0, 'archived_failed': 0, 'finished_failed': 0}
tasks = Task.objects.filter(sas_id=this_task.sas_id)
for task in tasks:
try:
statusses[task.status] += 1
except:
pass
incomplete = int(statusses['scrubbed']) + int(statusses['pre_archiving']) + int(statusses['pre_archived']) + \
int(statusses['archiving']) +int(statusses['finishing']) + int(statusses['suspended']) + \
int(statusses['pre_archived_failed']) + int(statusses['archived_failed']) + int(statusses['finished_failed'])
complete = int(statusses['archived']) + int(statusses['finished'])
completion = round(complete / (incomplete + complete) * 100)
non_zero_statusses = {key: value for key, value in statusses.items() if value != 0}
result['status'] = non_zero_statusses
result['completion'] = completion
return result
def calculate_finished_fraction(this_task):
size_archived = 0
size_remaining = 0
total_size = 0
tasks = Task.objects.filter(sas_id=this_task.sas_id)
for task in tasks:
if task.status == State.FINISHED.value:
size_archived += task.size_to_process
else:
size_remaining += task.size_to_process
total_size += task.size_to_process
result = {}
try:
result['fraction'] = round((size_archived / (size_remaining + size_archived)) * 100)
except:
result['fraction'] = -1
result['total_size'] = total_size
result['remaining'] = size_remaining
return result
def update_ingest_fractions(task):
"""
The IngestQ page shows the fraction of completed/archived tasks per SAS_ID
This is the function that calculates that fraction when a relevant status transition was done.
"""
logger.info(f'- update_ingest_fractions')
activity = task.activity
result = calculate_ingested_fraction(task)
activity.ingested_fraction = result['completion']
activity.ingestq_status = result['status']
activity.save()
def update_archive_info(task):
"""
The Finished page shows some information about the archiving/ingest results,
like the new SAS_ID that the resulting output of the pipeline got in the LTA
This is the function retrieves that information from the tasks and stores it in the Activity
when a relevant status transition was done.
"""
logger.info(f'- update_archive_info')
activity = task.activity
for t in Task.objects.filter(sas_id=task.sas_id):
try:
if t.archive['sas_id_archived']:
activity.archive = t.archive
break
except:
pass
activity.save()
def update_finished_fraction(task):
"""
The Failures page shows the failed tasks,
but also the fraction of the tasks that were succesfully ingested for the same SAS_ID (activity)
This is the function that calculates that fraction when a relevant status transition was done.
"""
logger.info(f'- update_finished_fraction')
activity = task.activity
result = calculate_finished_fraction(task)
activity.finished_fraction = result['fraction']
activity.total_size = result['total_size']
activity.remaining = result['remaining']
activity.save()
def update_nr_of_dps(task):
"""
(re)count the nr_of_dps in all the tasks of this SAS_ID and add the sum to the Activity.nr_of_dps
"""
activity = task.activity
nr_of_dps = 0
for t in Task.objects.filter(sas_id=task.sas_id,task_type='regular'):
try:
if t.nr_of_dps > 0:
nr_of_dps += t.nr_of_dps
except:
pass
# faster, but the unit tests can't handle it
#nr_of_dps = Task.objects.filter(sas_id=task.sas_id, task_type='regular').aggregate(Sum('nr_of_dps'))['nr_of_dps__sum']
activity.nr_of_dps = nr_of_dps
logger.info(f'- update_nr_of_dps: {activity.nr_of_dps}')
activity.save()
def reset_activity(task):
"""
When a task is recycled back to DEFINED or FETCHED then the activity as a whole is no longer 'processed' or 'aggregated'.
Reset those fields accordingly
"""
try:
logger.info(f'- reset activity')
activity = task.activity
activity.status = task.status
activity.is_processed = False
activity.is_aggregated = False
activity.save()
except:
# only bother with it if the task actually has an activity attached
# which is always... except in some simpler unittests
pass
def create_aggregation_task(task):
"""
create a new aggregation task based on this task
"""
aggregation_task = Task(
task_type='aggregation',
filter=task.filter,
project=task.project,
sas_id=task.sas_id,
workflow=task.workflow,
status=State.IDLE.value,
new_status=State.IDLE.value,
service_filter = task.service_filter,
activity=task.activity)
aggregation_task.save()
return aggregation_task
def update_processed_and_aggregate(task):
"""
Check if the whole SAS_ID (activity) is processed.
Currently this is used to set the activity.aggregate status, which triggers the aggregator service.
"""
logger.info(f'- update_processed')
activity = task.activity
# this complicated looking piece of code checks if ALL tasks of the activity are processed,
# if yes... set activity.status = AGGREGATE
# this needs to be done for all aggregation_strategies,
# because it signals the moment where all the input data for aggregation is (or has been) available.
# For some strategies something extra needs to be done...
current_is_processed = activity.is_processed
activity.is_processed = True
non_discarded_found = False
for t in Task.objects.filter(sas_id=task.sas_id,task_type='regular'):
if t.status not in PROCESSED_STATUSSES:
activity.is_processed = False
break
# at least one of the tasks should NOT be in discarded,
# otherwise a fully discarded SAS_ID will also register as 'is_processed' and ready to 'AGGREGATE'
if t.status != State.DISCARDED.value:
non_discarded_found = True
if (task.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value):
# check if there is already a storage_location, if not, add it... unless the value is 'unknown'
if not activity.storage_location or activity.storage_location == 'unknown':
# for this aggregation_strategy, the activity storage_location is the workdir of the aggregation task
activity.create_storage_location()
logger.info(f'- created storage_location: {activity.storage_location}')
# check if the activity is ready to collect H5 data
if (not activity.is_aggregated and
activity.status != State.COLLECTING_DATA.value and
activity.status != State.AGGREGATE.value):
if not ('fail' in activity.status):
# create a new 'aggregate_task' that is used to collect the aggregated output
# this has to be done only once, so this is a good place to do it.
if Task.objects.filter(sas_id=task.sas_id,task_type='aggregation').exclude(status__icontains='discard').count()==0:
create_aggregation_task(task)
logger.info(f'- created aggregation task: {task.id}')
# this means that its tasks know that they should copy their H5 files to the storage_location
# (but the tasks cannot do that, the aggregator service does)
logger.info(f'- activity.is_aggregated: {activity.is_aggregated}...')
logger.info(f'- activity.status from {activity.status} to {State.COLLECTING_DATA.value}...')
activity.status = State.COLLECTING_DATA.value
activity.save()
# only save when changed
if activity.is_processed != current_is_processed:
# if the whole activity has become 'processed',
# and there is an aggregation strategy defined
# and this activity is not yet 'aggregated'
# then set the status of this activity to 'AGGREGATE'
logger.info(f'- activity => is_processed: {activity.is_processed}...')
if (task.workflow.aggregation_strategy == AggregationStrategy.WAIT_FOR_SUMMARY_TASK.value):
# check if there is a summary_task... if not, define this current task as summary_task and put it on hold
if Task.objects.filter(sas_id=task.sas_id,is_summary=True).count()==0:
task.is_summary=True
task.resume=False
task.save()
if (task.workflow.aggregation_strategy != AggregationStrategy.NONE.value):
logger.info(f'- non_discarded_found: {non_discarded_found}...')
if (activity.is_processed & non_discarded_found):
if not (activity.is_aggregated):
logger.info(f'- activity.is_aggregated: {activity.is_aggregated}...')
logger.info(f'- activity.status from {activity.status} to {State.AGGREGATE.value}...')
activity.status = State.AGGREGATE.value
activity.save()
def update_is_verified(task):
"""
The Validation page shows Activities (SAS_ID's) that are ready to be validated, by giving them a quality.
All tasks belonging to the SAS_ID must be 'verified' for that.
If that is the case, the activity itself gets the 'is_verified' flag.
This is the function that checks the verified state and updates the activity.is_verified flag.
"""
logger.info(f'- update_verified')
activity = task.activity
# check if all tasks of this SAS_ID have a status that is considered 'verified'
# this is used for the Validation Page
current_is_verified = activity.is_verified
activity.is_verified = True
for t in Task.objects.filter(sas_id=task.sas_id):
if t.status not in VERIFIED_STATUSSES:
activity.is_verified = False
break
# only save when changed
if activity.is_verified != current_is_verified:
activity.save()
def update_changed_fields(task):
"""
It shouldn't happen, but technically it is possible that users change the filter
"""
logger.info(f'- update_changed_fields')
activity = task.activity
if activity.filter != task.filter:
activity.filter = task.filter
activity.save()
try:
if activity.workflow_id != task.workflow.id:
activity.workflow_id = task.workflow.id
activity.save()
except:
# continue, workflow is not initially needed
logger.error(f'no workflow found for this task')
def update_service_filter(task):
"""
This should happen only once and ensures that the service_filter on SAS_ID level is set also
This is a safety line for when tasks are created without inputs and without a service_filter
They can then resort to the activity_service_filter
(this can happen when through the API, although there is no indication that it happens yet).
"""
activity = task.activity
if not activity.service_filter:
logger.info(f'- update_service_filter {activity} => {task.service_filter}')
if task.service_filter:
activity.service_filter = task.service_filter
activity.save()
def update_discard(task):
"""
when all tasks of this sas_id are discarded, then reset the activity
because the most probably usecase is that it will be used again by a new batch of tasks
"""
activity = task.activity
for t in Task.objects.filter(sas_id=task.sas_id,task_type='regular'):
if t.status not in [State.DISCARD.value,State.DISCARDED.value]:
return
# everything is DISCARDED
activity.project = None
activity.filter = None
activity.status = "reset"
activity.is_verified = False
activity.is_processed = False
activity.is_aggregated = False
activity.service_filter = None
activity.save()
def update_activity(task):
"""
The activity (SAS_ID level) is updated when a task changes status.
Depending on the type of status change, certain calculations and updates are performed.
Doing this on status change, instead of on-the-fly when a user enters a page, balances the load
and improves overall performance
- to 'ARCHIVING, ARCHIVED, FINISHED' : check for incoming/existing 'archive' json from archiver
- to SCRUBBED .. FINISHED : calculate ingested_fraction
- to _FAILED : calculate finished_fraction
- to (processed_statusses) : check if all tasks are processed, set Activity to is_processed and AGGREGATE
- always : determine if a task is in a 'verified' status
"""
logger.info(f'update_activity for task {task.id} with sas_id {task.sas_id} and status {task.status}')
# depending on the status transition, perform the appropriate action
# calculate the fraction and list of statusses of ingested tasks of this SAS_ID
if task.status in INGEST_FRACTION_STATUSSES:
update_ingest_fractions(task)
# check of any task of this activity already has LTA information. If so, copy to the activity level
if task.status in UPDATE_ARCHIVE_STATUSSES:
update_archive_info(task)
if task.status in ACTIVITY_RESET_STATUSSEN:
reset_activity(task)
# calculate the finished fraction for failed tasks
if State.FAILED.value in task.status:
update_finished_fraction(task)
if task.status in [State.PROCESSED.value, State.VALIDATED.value]:
update_nr_of_dps(task)
# check if all tasks of this SAS_ID have a status that is considered 'processed'
# this is used as a trigger for the ancillary service
if task.status in PROCESSED_STATUSSES:
update_processed_and_aggregate(task)
# check if all tasks for this activity are in a verified status.
update_is_verified(task)
# check if users have changed certain fields after the specification step
update_changed_fields(task)
# check if this activity has a 'service_filter', if not, use the one from the task
update_service_filter(task)
# if all tasks of this activity are discarded, then reset the activity
update_discard(task)