Select Git revision
alerta_grafana.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
activities_handler.py 11.90 KiB
import logging;
from django.conf import settings
from .common import State, AggregationStrategy, VERIFIED_STATUSSES, PROCESSED_STATUSSES, INGEST_FRACTION_STATUSSES, \
UPDATE_ARCHIVE_STATUSSES, ACTIVITY_RESET_STATUSSEN
from taskdatabase.models import Task, Activity, Configuration
status_list = settings.STATUSSES_WITH_DATA
logger = logging.getLogger(__name__)
def calculate_ingested_fraction(this_task):
"""
This 'property' of a task returns the fraction of queued/ingested tasks per SAS_ID
and a list of statusses of other tasks belonging to the same SAS_ID.
It is implemented as 'property', because then it can be used in html pages like this:
<td>{{ task.sasid_ingested_fraction.status }}</td>
<td>{{ task.sasid_ingested_fraction.completion }}%</td>
A selection of statusses are considered 'queued', and another selection is considered 'ingested'.
The division of those 2 are returned as a 'completed %'.
A limited list of statusses for the other tasks that belong to this SAS_ID is also returned.
"""
result = {}
statusses = {'scrubbed': 0, 'pre_archiving': 0, 'pre_archived': 0,'archiving': 0, 'archived': 0,
'finishing': 0, 'finished': 0, 'suspended': 0, 'discarded': 0,
'pre_archived_failed': 0, 'archived_failed': 0, 'finished_failed': 0}
tasks = Task.objects.filter(sas_id=this_task.sas_id)
for task in tasks:
try:
statusses[task.status] += 1
except:
pass
incomplete = int(statusses['scrubbed']) + int(statusses['pre_archiving']) + int(statusses['pre_archived']) + \
int(statusses['archiving']) +int(statusses['finishing']) + int(statusses['suspended']) + \
int(statusses['pre_archived_failed']) + int(statusses['archived_failed']) + int(statusses['finished_failed'])
complete = int(statusses['archived']) + int(statusses['finished'])
completion = round(complete / (incomplete + complete) * 100)
non_zero_statusses = {key: value for key, value in statusses.items() if value != 0}
result['status'] = non_zero_statusses
result['completion'] = completion
return result
def calculate_finished_fraction(this_task):
size_archived = 0
size_remaining = 0
total_size = 0
tasks = Task.objects.filter(sas_id=this_task.sas_id)
for task in tasks:
if task.status == State.FINISHED.value:
size_archived += task.size_to_process
else:
size_remaining += task.size_to_process
total_size += task.size_to_process
result = {}
try:
result['fraction'] = round((size_archived / (size_remaining + size_archived)) * 100)
except:
result['fraction'] = -1
result['total_size'] = total_size
result['remaining'] = size_remaining
return result
def update_ingest_fractions(task):
"""
The IngestQ page shows the fraction of completed/archived tasks per SAS_ID
This is the function that calculates that fraction when a relevant status transition was done.
"""
logger.info(f'- update_ingest_fractions')
activity = task.activity
result = calculate_ingested_fraction(task)
activity.ingested_fraction = result['completion']
activity.ingestq_status = result['status']
activity.save()
def update_archive_info(task):
"""
The Finished page shows some information about the archiving/ingest results,
like the new SAS_ID that the resulting output of the pipeline got in the LTA
This is the function retrieves that information from the tasks and stores it in the Activity
when a relevant status transition was done.
"""
logger.info(f'- update_archive_info')
activity = task.activity
for t in Task.objects.filter(sas_id=task.sas_id):
try:
if t.archive['sas_id_archived']:
activity.archive = t.archive
break
except:
pass
activity.save()
def update_finished_fraction(task):
"""
The Failures page shows the failed tasks,
but also the fraction of the tasks that were succesfully ingested for the same SAS_ID (activity)
This is the function that calculates that fraction when a relevant status transition was done.
"""
logger.info(f'- update_finished_fraction')
activity = task.activity
result = calculate_finished_fraction(task)
activity.finished_fraction = result['fraction']
activity.total_size = result['total_size']
activity.remaining = result['remaining']
activity.save()
def reset_activity(task):
"""
When a task is recycled back to DEFINED or FETCHED then the activity as a whole is no longer 'processed' or 'aggregated'.
Reset those fields accordingly
"""
try:
logger.info(f'- reset activity')
activity = task.activity
activity.status = task.status
activity.is_processed = False
activity.is_aggregated = False
activity.save()
except:
# only bother with it if the task actually has an activity attached
# which is always... except in some simpler unittests
pass
def create_aggregation_task(task):
"""
create a new aggregation task based on this task
"""
aggregation_task = Task(
task_type='aggregation',
filter=task.filter,
project=task.project,
sas_id=task.sas_id,
workflow=task.workflow,
status=State.IDLE.value,
new_status=State.IDLE.value,
activity=task.activity)
aggregation_task.save()
return aggregation_task
def update_processed_and_aggregate(task):
"""
Check if the whole SAS_ID (activity) is processed.
Currently this is used to set the activity.aggregate status, which triggers the aggregator service.
"""
logger.info(f'- update_processed')
activity = task.activity
# this complicated looking piece of code checks if ALL tasks of the activity are processed,
# if yes... set activity.status = AGGREGATE
# this needs to be done for all aggregation_strategies,
# because it signals the moment where all the input data for aggregation is (or has been) available.
# For some strategies something extra needs to be done...
current_is_processed = activity.is_processed
activity.is_processed = True
non_discarded_found = False
for t in Task.objects.filter(sas_id=task.sas_id,task_type='regular'):
if t.status not in PROCESSED_STATUSSES:
activity.is_processed = False
break
# at least one of the tasks should NOT be in discarded,
# otherwise a fully discarded SAS_ID will also register as 'is_processed' and ready to 'AGGREGATE'
if t.status != State.DISCARDED.value:
non_discarded_found = True
# only save when changed
if activity.is_processed != current_is_processed:
# if the whole activity has become processed, then set the status of this activity to 'AGGREGATE'
# unless it was already aggregated
if (activity.is_processed & non_discarded_found):
if not (activity.is_aggregated):
activity.status = State.AGGREGATE.value
activity.save()
if (task.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value):
# check if the activity is ready to collect H5 data
if (not activity.is_aggregated and
activity.status != State.COLLECTING_DATA.value and
activity.status != State.AGGREGATE.value):
# create a new 'aggregate_task' that is used to collect the aggregated output
# this has to be done only once, so this is a good place to do it.
aggregation_tasks = Task.objects.filter(sas_id=task.sas_id,task_type='aggregation')
if aggregation_tasks.count()==0:
aggregation_task = create_aggregation_task(task)
else:
aggregation_task = aggregation_tasks[0]
# check if there is already a storage_location, if not, add it.
if not activity.storage_location:
# for this aggregation_strategy, the activity storage_location is the workdir of the aggregation task
activity.create_storage_location()
# this means that its tasks know that they should copy their H5 files to the storage_location
# (but the tasks cannot do that, the aggregator service does)
activity.status = State.COLLECTING_DATA.value
activity.save()
def update_is_verified(task):
"""
The Validation page shows Activities (SAS_ID's) that are ready to be validated, by giving them a quality.
All tasks belonging to the SAS_ID must be 'verified' for that.
If that is the case, the activity itself gets the 'is_verified' flag.
This is the function that checks the verified state and updates the activity.is_verified flag.
"""
logger.info(f'- update_verified')
activity = task.activity
# check if all tasks of this SAS_ID have a status that is considered 'verified'
# this is used for the Validation Page
current_is_verified = activity.is_verified
activity.is_verified = True
for t in Task.objects.filter(sas_id=task.sas_id):
if t.status not in VERIFIED_STATUSSES:
activity.is_verified = False
break
# only save when changed
if activity.is_verified != current_is_verified:
activity.save()
def update_changed_fields(task):
"""
It shouldn't happen, but technically it is possible that users change the filter
"""
logger.info(f'- update_changed_fields')
activity = task.activity
if activity.filter != task.filter:
activity.filter = task.filter
activity.save()
try:
if activity.workflow_id != task.workflow.id:
activity.workflow_id = task.workflow.id
activity.save()
except:
# continue, workflow is not initially needed
logger.error(f'no workflow found for this task')
def update_activity(task):
"""
The activity (SAS_ID level) is updated when a task changes status.
Depending on the type of status change, certain calculations and updates are performed.
Doing this on status change, instead of on-the-fly when a user enters a page, balances the load
and improves overall performance
- to 'ARCHIVING, ARCHIVED, FINISHED' : check for incoming/existing 'archive' json from archiver
- to SCRUBBED .. FINISHED : calculate ingested_fraction
- to _FAILED : calculate finished_fraction
- to (processed_statusses) : check if all tasks are processed, set Activity to is_processed and AGGREGATE
- always : determine if a task is in a 'verified' status
"""
logger.info(f'update_activity for task {task.id} with sas_id {task.sas_id} and status {task.status}')
# depending on the status transition, perform the appropriate action
# calculate the fraction and list of statusses of ingested tasks of this SAS_ID
if task.status in INGEST_FRACTION_STATUSSES:
update_ingest_fractions(task)
# check of any task of this activity already has LTA information. If so, copy to the activity level
if task.status in UPDATE_ARCHIVE_STATUSSES:
update_archive_info(task)
if task.status in ACTIVITY_RESET_STATUSSEN:
reset_activity(task)
# calculate the finished fraction for failed tasks
if State.FAILED.value in task.status:
update_finished_fraction(task)
# check if all tasks of this SAS_ID have a status that is considered 'processed'
# this is used as a trigger for the ancillary service
if task.status in PROCESSED_STATUSSES:
update_processed_and_aggregate(task)
# check if all tasks for this activity are in a verified status.
update_is_verified(task)
# check if users have changed certain fields after the specification step
update_changed_fields(task)