Newer
Older
from django.db import models
from django.urls import reverse

Nico Vermaas
committed
from django.utils import timezone
from django.utils.timezone import datetime, timedelta
from django.conf import settings
import logging
logger = logging.getLogger(__name__)
# constants
datetime_format_string = '%Y-%m-%dT%H:%M:%SZ'

Nico Vermaas
committed
verified_statusses = ['stored','validated','scrubbed','archived','finished','suspended','discarded']
description = models.CharField(max_length=500, blank=True, null=True)
tag = models.CharField(max_length=30, blank=True, null=True)
workflow_uri = models.CharField(unique=True, max_length=30, blank=True, null=True)
repository = models.CharField(max_length=100, blank=True, null=True)
commit_id = models.CharField(max_length=100, blank=True, null=True)
path = models.CharField(max_length=100, blank=True, null=True)
oi_size_fraction = models.FloatField(blank=True, null=True)
meta_scheduling = models.JSONField(null=True, blank=True)
default_parameters = models.JSONField(null=True, blank=True)
prefetch = models.BooleanField(null=True, default=True)
return str(self.id) + ' - ' + str(self.workflow_uri)
# convert the quality information from the JSONfield into a easy parsable list for the template
def convert_quality_to_list_for_template(task):
list = []
try:
list.append(str(task.quality_json['uv-coverage']))
list.append(str(task.quality_json['sensitivity']))
list.append(str(task.quality_json['observing-conditions']))
except Exception as err:
pass
return list
def convert_quality_to_shortlist_for_template(task):
list = []
try:
list.append(str(task.quality_json['uv-coverage']))
list.append(str(task.quality_json['sensitivity']))
list.append(str(task.quality_json['observing-conditions']))
def convert_summary_to_list_for_template(task):
list = []
try:
summary = task.quality_json['summary']
except Exception as err:
pass
return list
def calculate_qualities(task):
calculate the quality for this task, but also the quality for all the combined tasks of this sas_id
# read the quality_thresholds from the Configuration table
try:
quality_thresholds = json.loads(Configuration.objects.get(key='quality_thresholds').value)
except:
quality_thresholds = {
"moderate": 20,
"poor": 50,
"overall_poor": 50,
"overall_good": 90,
}
def calculate_quality_task(task):
""""
calculate the quality of this task based on rfi_percent values
The threshold values are written from a configuration json blob
Using this algorithm from SDCO:
rfi_i <= 20 % is good
20% <= rfi_i <= 50 is moderate
rfi_i > 50 is poor.
except when rfi_percent = 0
"""
summary = task.quality_json["summary"]
for key in summary:
record = summary[key]
rfi_percent = int(record['rfi_percent'])
if rfi_percent > 0:
quality = "good"
if rfi_percent >= quality_thresholds['moderate']:
if rfi_percent > quality_thresholds['poor']:
quality = "poor"
return quality
except Exception as error:
logger.info(error)
"""
calculate the overall quality per sas_id, based on other tasks with the same sas_id
The threshold values are written from a configuration json blob
Using this algorithm from SDCO:
if more then 90 % of all files have a good quality then the dataset has good condition.
If more then 50 % of all files have a poor quality then the dataset is poor
otherwise is moderate.
"""
try:
# gather the results of all the calculated_quality values for this sas_id
qualities = {'poor': 0, 'moderate': 0, 'good': 0}
for task in Task.objects.filter(sas_id=unsaved_task.sas_id):
# because this all happens in the overridden 'Task.save', the actual saving has not yet occurred
# So use the calculated quality from the unsaved task instead.
if task.id == unsaved_task.id:
t = unsaved_task
else:
t = task
try:
key = t.calculated_qualities['per_task']
qualities[key] = qualities[key] + 1
except:
# ignore the tasks that have no calculated quality (they are probably not 'stored').
pass
total = qualities['poor'] + qualities['moderate'] + qualities['good']
percentage_poor = (qualities['poor'] / total) * 100
percentage_good = (qualities['good'] / total) * 100
if percentage_poor >= quality_thresholds['overall_poor']:
quality_sasid = 'poor'
if percentage_good >= quality_thresholds['overall_good']:
quality_sasid = 'good'
return quality_sasid
except Exception as error:
logger.info(error)
# --- main function body ---
# calculate the quality for this task
calculated_quality_task = calculate_quality_task(task)
# store the result in task.calculated_qualities (not yet saved in the database)
d = task.calculated_qualities
if not d:
d = {}
d['per_task'] = calculated_quality_task
task.calculated_qualities = d
# update the overall quality of all tasks for this sas_id
calculated_quality_sasid = calculate_quality_sasid(task)
# store the result in task.calculated_qualities (not yet saved in the database)
d['per_sasid'] = calculated_quality_sasid
# Task control properties

Nico Vermaas
committed
task_type = models.CharField(max_length=20, default="regular")
filter = models.CharField(max_length=30, blank=True, null=True)
environment = models.CharField(max_length=255, blank=True, null=True)
new_status = models.CharField(max_length=50, default="defining", null=True)
status = models.CharField(db_index=True, default="unknown", max_length=50,blank=True, null=True)
quality = models.CharField(max_length=10,blank=True, null=True)
calculated_qualities = models.JSONField(null=True, blank=True)
resume = models.BooleanField(verbose_name="Resume", default=True)
creationTime = models.DateTimeField(verbose_name="CreationTime",default=datetime.utcnow, blank=True)
priority = models.IntegerField(default=100, null=True)
purge_policy = models.CharField(max_length=5, default="no", blank=True, null=True)
cleanup_policy = models.CharField(max_length=30, blank=True, null=True)
# LOFAR properties
project = models.CharField(max_length=100, blank=True, null=True, default="unknown")
sas_id = models.CharField(verbose_name="SAS_ID",max_length=15, blank=True, null=True)
inputs = models.JSONField(null=True, blank=True)
outputs = models.JSONField(null=True, blank=True)
remarks = models.JSONField(null=True, blank=True)
meta_scheduling = models.JSONField(null=True, blank=True)
archive = models.JSONField(null=True, blank=True)
size_to_process = models.PositiveBigIntegerField(default=0, null=True, blank=True)
size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True)
total_processing_time = models.IntegerField(default=0, null=True, blank=True)
# relationships
workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.CASCADE, null=True, blank=True)
predecessor = models.ForeignKey('self', related_name='successors', on_delete=models.SET_NULL, null=True, blank=True)
return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id)
def save(self, *args, **kwargs):
# nv:1mar2023, temporary hack, set tasks 'on hold' as soon they get to 'scrubbed'
# (users forget to do that manually, causing unwanted ingests)
if (self.status != 'scrubbed') & (self.new_status == 'scrubbed'):
# nv:19jun2023, calculate the qualities for this task
if (self.status != 'stored') & (self.new_status == 'stored'):
self.calculated_qualities = calculate_qualities(self)
super(Task, self).save(*args, **kwargs)
# this translates a view-name (from urls.py) back to a url, to avoid hardcoded url's in the html templates
# bad : <td><a href="/atdb/tasks/{{ task.id }}/" target="_blank">{{ task.taskID }} </a> </td>
# good: <td><a href="{{ task.get_absolute_url }}" target="_blank">{{ task.taskID }} </a> </td>
def get_absolute_url(self):
return reverse('task-detail-view-api', kwargs={'pk': self.pk})
@property
def predecessor_status(self):

Nico Vermaas
committed
try:
return self.predecessor.status
except:

Nico Vermaas
committed
return "no_predecessor"
@property
def has_plots(self):
try:
quality = self.outputs['quality']
plots = quality['plots']
if len(plots) > 0:
return True
else:
return False
except:
return False
@property
def has_summary(self):
try:
summary = self.outputs['quality']['summary']
return True
except:
return False
try:
return self.remarks['quality']
except:
return None

Nico Vermaas
committed
@property
def get_quality_remarks_taskid(self):
try:
return self.remarks['quality_taskid']
except:
return None
@property
def get_quality_remarks_sasid(self):
try:
return self.remarks['quality_sasid']
except:
return None
@property
def quality_as_list(self):
try:
q = convert_quality_to_list_for_template(self)
return q
except:
return None
@property
def quality_as_shortlist(self):
try:
q = convert_quality_to_shortlist_for_template(self)
return q
except:
return None
@property
def summary_as_list(self):
try:
q = convert_summary_to_list_for_template(self)
return q
except:
return None
@property
def sas_id_archived(self):
try:
return self.archive['sas_id_archived']
except:
return None
@property
@property
def sasid_is_verified(self):
for task in Task.objects.filter(sas_id=self.sas_id):
if task.status not in verified_statusses:
return False
return True
# NV: this shows the latest status change, but because it is a derived property it cannot be sorted.
# This functionality was not requested, and to avoid additional requests about 'sort' functionalty
# it is currently commented out. Could be of use later though, so I leave it in for now.
# @property
# def latest_change(self):
# qs = Status.objects.filter(task__id=self.id).order_by('-timestamp')
# if len(qs) > 0:
# status = qs[0]
# timestamp = status.timestamp
#
# return timestamp
class LogEntry(models.Model):
cpu_cycles = models.IntegerField(null=True,blank=True)
wall_clock_time = models.IntegerField(null=True,blank=True)
url_to_log_file = models.CharField(max_length=100, blank=True, null=True)
service = models.CharField(max_length=30, blank=True, null=True)
step_name = models.CharField(max_length=30, blank=True, null=True)
timestamp = models.DateTimeField(blank=True, null=True)
status = models.CharField(max_length=50,default="defined", blank=True, null=True)
description = models.CharField(max_length=255, blank=True, null=True)
size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True)
# relationships
task = models.ForeignKey(Task, related_name='log_entries', on_delete=models.CASCADE, null=False)
return str(self.id)+ ' - '+ str(self.task)+' - '+self.status +' ('+self.step_name+')'
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def save(self, *args, **kwargs):
#override save to check how long this task has been in this state.
try:
# if this logentry already has a wall_clock_time, then don't change it.
if not self.wall_clock_time:
# gather all entries for this task, to discover the previous one.
entries_for_this_task = LogEntry.objects.filter(task=self.task).order_by("-timestamp")
# remove entries from the list, including 'this', so that the latest remaining is the previous entry.
for entry in entries_for_this_task:
entries_for_this_task = entries_for_this_task.exclude(id=entry.id)
if entry == self:
break
try:
latest_entry_before_self = entries_for_this_task.latest('timestamp')
previous_timestamp = latest_entry_before_self.timestamp
except:
# if no previous entry exists, then use the timestamp from the task.
previous_timestamp = self.task.creationTime
dt = (self.timestamp - previous_timestamp).seconds
message = "logentry for task "+str(self.task.id)+", to "+self.status + " took " + str(dt) + " seconds"
logger.info(message)
self.wall_clock_time = dt
except Exception as e:
print(e)
finally:
# finally save the Monitor info itself also
super(LogEntry, self).save(*args, **kwargs)
class Status(models.Model):
name = models.CharField(max_length=50, default="unknown")
timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
# relationships
task = models.ForeignKey(Task, related_name='status_history', on_delete=models.CASCADE, null=False)
# the representation of the value in the REST API
def __str__(self):
formatedDate = self.timestamp.strftime(datetime_format_string)
return str(self.name)+' ('+str(formatedDate)+')'
filter = models.CharField(max_length=30, blank=True, null=True)
# large value needed to hold the dcache token
value = models.CharField(max_length=5120)
# the representation of the value in the REST API
def __str__(self):
return str(self.key)
class Job(models.Model):
type = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True)
task_id = models.IntegerField(null=True, blank=True)
job_id = models.IntegerField(null=True, blank=True)
metadata = models.JSONField(null=True, blank=True)
# the representation of the value in the REST API
def __str__(self):
return 'task_id:'+str(self.task_id)+', job_id:'+str(self.job_id)
class PostProcessingRule(models.Model):
aggregation_key = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True)
trigger_status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True)
# relationships
workflow_to_process = models.ForeignKey(Workflow, related_name='to_process', on_delete=models.SET_NULL, null=True, blank=True)
workflow_to_apply = models.ForeignKey(Workflow, related_name='to_apply',on_delete=models.SET_NULL, null=True, blank=True)
# the representation of the value in the REST API
def __str__(self):
return str(self.aggregation_key)+' - '+str(self.trigger_status)
class LatestMonitor(models.Model):
name = models.CharField(max_length=50, default="unknown")
type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True)
timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
hostname = models.CharField(max_length=50, default="unknown")
process_id = models.IntegerField(null=True, blank=True)
description = models.CharField(max_length=255, blank=True, null=True)
status = models.CharField(max_length=50, default="ok", null=True)
metadata = models.JSONField(null=True, blank=True)
@property
def enabled(self):
try:
enabled = self.metadata['enabled']
return enabled
except:

Nico Vermaas
committed
# only when metadata['enabled']=False' this will be register as false.
# to make sure that services are enabled by default
return "True"
# the representation of the value in the REST API
def __str__(self):
return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status

Nico Vermaas
committed
def purge_old_records():
current_time = timezone.now() # change this
try:
time_threshold = current_time - timedelta(hours=settings.MAX_MONITORING_HISTORY_HOURS)
records_to_delete = Monitor.objects.filter(timestamp__lt=time_threshold).delete()
except Exception as e:
# if MAX_MONITORING_HISTORY_HOURS is not set, then do nothing and continue
pass
class Monitor(models.Model):
name = models.CharField(max_length=50, default="unknown")
type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True)
timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
hostname = models.CharField(max_length=50, default="unknown")
process_id = models.IntegerField(null=True, blank=True)
description = models.CharField(max_length=255, blank=True, null=True)
status = models.CharField(max_length=50, default="ok", null=True)
metadata = models.JSONField(null=True, blank=True)
def save(self, *args, **kwargs):
# check if this combination of service name + hostname already exists
# in the LatestMonitor, and update if it is newer.
try:
latestMonitor = LatestMonitor.objects.get(name=self.name,hostname=self.hostname)
# carry over the metadata, if possible
latest_metadata = latestMonitor.metadata
latestMonitor.delete()
except:
pass
# this combination of name and hostname didn't yet exist, create it.
metadata = self.metadata
try:
if latest_metadata:
metadata = latest_metadata
except:
pass
latestMonitor = LatestMonitor(
name=self.name,
type=self.type,
timestamp=self.timestamp,
hostname = self.hostname,
process_id = self.process_id,
description = self.description,
status = self.status,
)
latestMonitor.save()
# finally save the Monitor info itself also
super(Monitor, self).save(*args, **kwargs)

Nico Vermaas
committed
# and purge the monitoring table to its max
purge_old_records()
# the representation of the value in the REST API
def __str__(self):
return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status