Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
task.py 16.10 KiB
from django.db import models
from django.urls import reverse
from django.utils import timezone
from django.utils.timezone import datetime, timedelta
from django.conf import settings
import json
import logging
from ..services import calculated_qualities as qualities
from ..services.common import State, AggregationStrategy, ACTIVITY_RESET_STATUSSEN
from ..services.specification.input_validation import validate_inputs, recalculate_size
from .workflow import *
from .specification import *
from .activity import *
from .job import *
logger = logging.getLogger(__name__)
def associate_task_with_activity(task):
# check if the task.sas_id is (still) the same as the sas_id of the associated activity
# if not, then the sas_id was changed, and another activity should be associated with the task.
if task.activity and task.sas_id != task.activity.sas_id:
task.activity = None
if not task.activity:
try:
activity = Activity.objects.get(sas_id=task.sas_id)
except:
# no activity exists yet, create it
logger.info(f'create new activity for sas_id {task.sas_id}')
activity = Activity(sas_id=task.sas_id,
project=task.project,
filter=task.filter)
activity.save()
task.activity = activity
return task.activity
def check_if_summary(task):
"""
check if this task is a summary task
- for backward compatiblity reasons this is done very ugly, by looking if certain filenames contain the substring 'summary'
- look for the new 'is_summary' flag
- and don't set it to False if some other mechanism already defined this task as is_summary
"""
if task.is_summary:
return True
try:
tars = task.outputs['summary']
for tar in tars:
if tar['is_summary']:
# a summary tarball was found, this task is a summary task
logger.info(f'task {task.id} with workflow {task.workflow} is a summary task')
return True
return False
except:
# no 'is_summary' field found, but then this is probably an older task.
# for backward compatibility, ignore the error and continue with they old filename-based method
pass
# look in the outputs.tar_archive
try:
tars = task.outputs['tar_archive']
for tar in tars:
if 'summary' in tar['basename']:
# a summary tarball was found, this task is a summary task
logger.info(f'task {task.id} with workflow {task.workflow} is a summary task')
return True
except:
# no 'tar_archive' was found
return False
return False
class Task(models.Model):
# Task control properties
task_type = models.CharField(max_length=20, default="regular")
is_summary = models.BooleanField(default=False)
is_aggregated = models.BooleanField(default=False)
filter = models.CharField(max_length=30, blank=True, null=True)
environment = models.CharField(max_length=255, blank=True, null=True)
new_status = models.CharField(max_length=50, default="defining", null=True)
status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True)
quality = models.CharField(max_length=10, blank=True, null=True)
calculated_qualities = models.JSONField(null=True, blank=True)
resume = models.BooleanField(verbose_name="Resume", default=True)
creationTime = models.DateTimeField(verbose_name="CreationTime", default=timezone.now, blank=True)
priority = models.IntegerField(default=100, null=True)
purge_policy = models.CharField(max_length=5, default="no", blank=True, null=True)
cleanup_policy = models.CharField(max_length=30, blank=True, null=True)
stage_request_id = models.IntegerField(null=True)
# LOFAR properties
project = models.CharField(max_length=30, blank=True, null=True, default="unknown")
sas_id = models.CharField(verbose_name="SAS_ID", max_length=15, blank=True, null=True)
inputs = models.JSONField(null=True, blank=True)
outputs = models.JSONField(null=True, blank=True)
metrics = models.JSONField(null=True, blank=True)
remarks = models.JSONField(null=True, blank=True)
meta_scheduling = models.JSONField(null=True, blank=True)
archive = models.JSONField(null=True, blank=True)
size_to_process = models.PositiveBigIntegerField(default=0, null=True, blank=True)
size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True)
total_processing_time = models.IntegerField(default=0, null=True, blank=True)
nr_of_dps = models.IntegerField(null=True)
# for parallel processing:
# atdb_services that are started with the --service_host_name=<host_name> flag will only pick up tasks and activities
# with a corresponding service_host_name.
service_filter = models.CharField(max_length=50, blank=True, null=True)
ingest_location = models.CharField(max_length=50, blank=True, null=True)
# relationships
workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.CASCADE, null=True, blank=True)
predecessor = models.ForeignKey('self', related_name='successors', on_delete=models.SET_NULL, null=True, blank=True)
joined_output_task = models.ForeignKey('self', related_name='joined_input_tasks', on_delete=models.SET_NULL,
null=True, blank=True)
# pipeline or observation
activity = models.ForeignKey(Activity, related_name='tasks', on_delete=models.SET_NULL, null=True, blank=True)
# specification
# note: the tasks populated by this specification object will be deleted when the specification is deleted
specification = models.ForeignKey(Specification, related_name='tasks', on_delete=models.CASCADE, null=True,
blank=True)
def __str__(self):
return str(self.id)
#return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id)
def count_dataproducts(self):
try:
self.nr_of_dps = len(self.outputs['ingest'])
except:
# error or missing
self.nr_of_dps = 0
def handle_aggregation(self):
"""
depending on the aggregation_strategy for this task, different functionality is executed
"""
try:
# for requantisation pipeline: HOLD summary tasks
if (self.workflow.aggregation_strategy == AggregationStrategy.WAIT_FOR_SUMMARY_TASK.value):
if (self.activity.status != State.AGGREGATED.value):
# if so, temporarily put it on hold so that the ancillary service can grab it with it
if (self.is_summary and not self.activity.is_aggregated):
self.resume = False
# for image_compression_pipeline: ...
if (self.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value):
if not self.is_aggregated:
# set the task to AGGREGATE,
# to prevent the datamanager from picking it up,
# and to trigger aggregator service to pick it up,
# to copy its H5 files to the activity.storage_location on spider
# the aggregator will then returns the task to PROCESSED with 'is_aggregated = true',
# so that it won't be picked up again.
self.new_status = State.AGGREGATE.value
except Exception as error:
# this should never happen
# But it can happen that tasks are inserted directly in an advanced status without going through
# the proper steps (like in tests). If that happens, just log the error and continue.
logger.error(error)
def save(self, *args, **kwargs):
logger.info(f"task.save({self}),{args},{kwargs}")
# make sure that every task has an activity (also for backward compatibility)
associate_task_with_activity(self)
# when a task becomes DEFINING or DEFINED some validation needs to be done.
# NV 10 april 2025:
# TODO: this functionality is very slow, and can be done at a better moment as part of the specification validation
# temporarily switched off here. Is this causing bug MAM-52?
#if self.new_status in [State.DEFINING.value, State.DEFINED.value]:
# validate_inputs(self)
# when a task goes to PROCESSED... handle the (potential) aggregation functionality
if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value):
self.handle_aggregation()
# count the dataproducts after processing, and again just before the ingest in something was changed
if self.new_status in [State.PROCESSED.value, State.VALIDATED.value]:
self.count_dataproducts()
# calculate the qualities for this task
if (self.status != State.STORED.value) & (self.new_status == State.STORED.value):
# read the quality_thresholds from the Configuration table
try:
quality_thresholds = json.loads(Configuration.objects.get(key='quality_thresholds').value)
except:
quality_thresholds = {
"moderate": 20,
"poor": 50,
"overall_poor": 50,
"overall_good": 90,
}
tasks_for_this_sasid = Task.objects.filter(sas_id=self.sas_id)
self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds)
if self.new_status in ACTIVITY_RESET_STATUSSEN:
self.is_aggregated = False
# check in the outputs if this task should be considered to be summary task
self.is_summary = check_if_summary(self)
# check if the task has a 'service_filter', if not... add it
if not self.service_filter:
# first look in the inputs section for a file location...
location = self.activity.service_filter
try:
if self.inputs and isinstance(self.inputs[0], dict):
location = self.inputs[0].get("location")
except:
# if no location is found, or the inputs section is empty, or doesn't even exist,
# or some other unthought of natural disaster prevents retrieving that value
# then use the activity.service_filter, which at the moment is 100% true.
pass
self.service_filter = location
# remark:
# a post_save signal is triggered, which in turn executes the activities_handler.update_activity() function
# to update the associated 'activity' with relevant aggregated information
# see documentation: https://support.astron.nl/confluence/display/SDCP/Activities
super(Task, self).save(*args, **kwargs)
# this translates a view-name (from urls.py) back to a url, to avoid hardcoded url's in the html templates
# bad : <td><a href="/atdb/tasks/{{ task.id }}/" target="_blank">{{ task.taskID }} </a> </td>
# good: <td><a href="{{ task.get_absolute_url }}" target="_blank">{{ task.taskID }} </a> </td>
def get_absolute_url(self):
return reverse('task-detail-view-api', kwargs={'pk': self.pk})
def get_jobs_statusses(self):
statusses = {}
# check the statusses of all the jobs of this id
jobs = Job.objects.filter(task_id=self.id)
for job in jobs:
try:
key = job.metadata['status'].lower()
# if key doesn't exist, add a new one
try:
statusses[key] = statusses[key] + 1
except:
statusses.update({key: 0})
statusses[key] = statusses[key] + 1
except:
pass
return statusses
@property
def predecessor_status(self):
try:
return self.predecessor.status
except:
return "no_predecessor"
@property
def has_quality(self):
try:
quality = self.outputs['quality']
return True
except:
return False
@property
def has_plots(self):
try:
quality = self.outputs['quality']
plots = quality['plots']
if len(plots) > 0:
return True
else:
return False
except:
return False
@property
def has_summary(self):
try:
summary = self.outputs['quality']['summary']
return True
except:
if self.task_type == 'aggregation':
# whether it has or hasn't, return as true,
# to prevent aggregation tasks blocking the SUM button on the validation page
# is this dirty? yes it is.
return True
else:
return False
@property
def quality_json(self):
try:
return self.outputs['quality']
except:
return None
@property
def get_quality_remarks_sasid(self):
try:
return self.remarks['quality_sasid']
except:
return None
@property
def sas_id_archived(self):
"""
check if this task already has an output SAS_ID at the LTA
"""
try:
return self.activity.archive['sas_id_archived']
except:
return None
@property
def path_to_lta(self):
"""
return the 'path_to_lta' of this task (or None if that fails)
"""
try:
return self.activity.archive['path_to_lta']
except:
return None
@property
def task_type_join(self):
try:
# find out if this is a join/joined type of task, or just a regular task
if self.joined_output_task:
# this tasks has a designated output task, so it is an input task (join)
return 'join'
joined_input_tasks = self.joined_input_tasks.all()
if joined_input_tasks.count() > 0:
# this task has input tasks, so it is an output task (joined)
return 'joined'
return 'regular'
except:
# 'the show must go on', don't crash if anything goes wrong, just show it as 'regular'
return 'regular'
@property
def joined_status(self):
# if a task has joined_input_tasks, then check their status
try:
joined_input_tasks = self.joined_input_tasks.all()
status = None
for task in joined_input_tasks:
if status and task.status != status:
return None
status = task.status
# all statusses are the same, return it
return status
except Exception as e:
print(e)
return "unknown"
@property
def stageit_url(self):
stage_request_id = self.stage_request_id
if stage_request_id:
stager_api = Configuration.objects.get(key='stager:api').value
url = f"{stager_api}/requests/{self.stage_request_id}/"
return url
else:
return None
# @property
# def storage_location(self):
# # roughly determine the storage location by looking at the 1st input
# try:
# location = self.inputs[0]['location']
# except:
# location = '-'
# return location
@property
def activity_service_filter(self):
return self.activity.service_filter
# NV: this shows the latest status change, but because it is a derived property it cannot be sorted.
# This functionality was not requested, and to avoid additional requests about 'sort' functionalty
# it is currently commented out. Could be of use later though, so I leave it in for now.
# @property
# def latest_change(self):
# qs = Status.objects.filter(task__id=self.id).order_by('-timestamp')
# if len(qs) > 0:
# status = qs[0]
# timestamp = status.timestamp
#
# return timestamp