import logging; from .common import State, verified_statusses, processed_statusses from taskdatabase.models import Task, Activity logger = logging.getLogger(__name__) def calculate_ingested_fraction(this_task): """ This 'property' of a task returns the fraction of queued/ingested tasks per SAS_ID and a list of statusses of other tasks belonging to the same SAS_ID. It is implemented as 'property', because then it can be used in html pages like this: <td>{{ task.sasid_ingested_fraction.status }}</td> <td>{{ task.sasid_ingested_fraction.completion }}%</td> A selection of statusses are considered 'queued', and another selection is considered 'ingested'. The division of those 2 are returned as a 'completed %'. A limited list of statusses for the other tasks that belong to this SAS_ID is also returned. """ result = {} statusses = {'scrubbed': 0, 'pre_archiving': 0, 'pre_archived': 0,'archiving': 0, 'archived': 0, 'finishing': 0, 'finished': 0, 'suspended': 0, 'discarded': 0, 'pre_archived_failed': 0, 'archived_failed': 0, 'finished_failed': 0} tasks = Task.objects.filter(sas_id=this_task.sas_id) for task in tasks: try: statusses[task.status] += 1 except: pass incomplete = int(statusses['scrubbed']) + int(statusses['pre_archiving']) + int(statusses['pre_archived']) + \ int(statusses['archiving']) +int(statusses['finishing']) + int(statusses['suspended']) + \ int(statusses['pre_archived_failed']) + int(statusses['archived_failed']) + int(statusses['finished_failed']) complete = int(statusses['archived']) + int(statusses['finished']) completion = round(complete / (incomplete + complete) * 100) non_zero_statusses = {key: value for key, value in statusses.items() if value != 0} result['status'] = non_zero_statusses result['completion'] = completion return result def calculate_finished_fraction(this_task): size_archived = 0 size_remaining = 0 total_size = 0 tasks = Task.objects.filter(sas_id=this_task.sas_id) for task in tasks: if task.status == State.FINISHED.value: size_archived += task.size_to_process else: size_remaining += task.size_to_process total_size += task.size_to_process result = {} try: result['fraction'] = round((size_archived / (size_remaining + size_archived)) * 100) except: result['fraction'] = -1 result['total_size'] = total_size result['remaining'] = size_remaining return result def update_activity(task): """ The activity (SAS_ID level) is updated when a task changes status. Depending on the type of status change, certain calculations and updates are performed. Doing this on status change, instead of on-the-fly when a user enters a page, balances the load and improves overall performance - to 'ARCHIVING, ARCHIVED, FINISHED' : check for incoming/existing 'archive' json from archiver - to STORED : calculate quality - to SCRUBBED, ARCHIVING, ARCHIVED, FINISHED : calculate ingested_fraction - to _FAILED : calculate finished_fraction - to STORED, PROCESSED, DISCARDED : check if all tasks are processed, set Activity to is_processed and AGGREGATE - always : determine if a task is in a 'verified' status """ logger.info(f'update_activity for task {task.id} with sas_id {task.sas_id} and status {task.status}') activity = task.activity # depending on the status transition, perform calculations if task.status == State.STORED.value: logger.info(f'- calculate_qualities') # quality is calculated per task and per sas_id, reget the quality per sas_id try: activity.calculated_quality = task.calculated_qualities['per_sasid'] activity.save() except: pass # calculate the fraction and list of statusses of ingested tasks of this SAS_ID if task.status in [State.SCRUBBED.value, State.PRE_ARCHIVING.value, State.PRE_ARCHIVED.value, State.ARCHIVING.value, State.ARCHIVED.value, State.FINISHING.value]: logger.info(f'- calculate_ingested_fraction') result = calculate_ingested_fraction(task) activity.ingested_fraction = result['completion'] activity.ingestq_status = result['status'] activity.save() # check of any task of this activity already has LTA information. If so, copy to the activity level if task.status in [State.ARCHIVING.value, State.ARCHIVED.value, State.FINISHED.value]: logger.info(f'- add archive json') for t in Task.objects.filter(sas_id=task.sas_id): try: if t.archive['sas_id_archived']: activity.archive = t.archive break except: pass activity.save() # calculate the finished fraction, this is only used on the Failures page if State.FAILED.value in task.status: logger.info(f'- calculate_finished_fraction') result = calculate_finished_fraction(task) activity.finished_fraction = result['fraction'] activity.total_size = result['total_size'] activity.remaining = result['remaining'] activity.save() # check if all tasks of this SAS_ID have a status that is considered 'processed' # this is used as a trigger for the ancillary service if task.status in processed_statusses: current_is_processed = activity.is_processed activity.is_processed = True non_discarded_found = False for t in Task.objects.filter(sas_id=task.sas_id): if t.status not in processed_statusses: activity.is_processed = False break # at least one of the tasks should NOT be in discarded, # otherwise a fully discarded SAS_ID will also register as 'is_processed' and ready to 'AGGREGATE' if t.status != State.DISCARDED.value: non_discarded_found = True # only save when changed if activity.is_processed != current_is_processed: # if the whole activity has become processed, then set the status of this activity to 'AGGREGATE' if (activity.is_processed & non_discarded_found): activity.status = State.AGGREGATE.value activity.save() # check if all tasks of this SAS_ID have a status that is considered 'verified' # this is used for the Validation Page current_is_verified = activity.is_verified activity.is_verified = True for t in Task.objects.filter(sas_id=task.sas_id): if t.status not in verified_statusses: activity.is_verified = False break # only save when changed if activity.is_verified != current_is_verified: activity.save() if activity.filter != task.filter: activity.filter = task.filter activity.save() try: if activity.workflow_id != task.workflow.id: activity.workflow_id = task.workflow.id activity.save() except: # continue, workflow is not initially needed logger.error(f'no workflow found for this task')