Newer
Older
from django.db import models
from django.urls import reverse

Nico Vermaas
committed
from django.utils import timezone
from django.utils.timezone import datetime, timedelta
from django.conf import settings
from .services import calculated_qualities as qualities
from .services.common import State, verified_statusses
logger = logging.getLogger(__name__)
# constants
datetime_format_string = '%Y-%m-%dT%H:%M:%SZ'
description = models.CharField(max_length=500, blank=True, null=True)
tag = models.CharField(max_length=30, blank=True, null=True)
workflow_uri = models.CharField(unique=True, max_length=30, blank=True, null=True)
repository = models.CharField(max_length=100, blank=True, null=True)
commit_id = models.CharField(max_length=100, blank=True, null=True)
path = models.CharField(max_length=100, blank=True, null=True)
oi_size_fraction = models.FloatField(blank=True, null=True)
meta_scheduling = models.JSONField(null=True, blank=True)
default_parameters = models.JSONField(null=True, blank=True)
prefetch = models.BooleanField(null=True, default=True)
return str(self.id) + ' - ' + str(self.workflow_uri)
# convert the quality information from the JSONfield into a easy parsable list for the template
def convert_quality_to_list_for_template(task):
list = []
try:
list.append(str(task.quality_json['uv-coverage']))
except:
list.append("-")
try:
list.append(str(task.quality_json['sensitivity']))
try:
list.append(str(task.quality_json['observing-conditions']))
except:
list.append("-")
def convert_quality_to_shortlist_for_template(task):
list = []
try:
list.append(str(task.quality_json['uv-coverage']))
list.append(str(task.quality_json['sensitivity']))
list.append(str(task.quality_json['observing-conditions']))
def convert_summary_to_list_for_template(task):
list = []
try:
summary = task.quality_json['summary']
except Exception as err:
pass
return list
def associate_task_with_activity(task):
if not task.activity:
try:
activity = Activity.objects.get(sas_id=task.sas_id)
except:
# no activity exists yet, create it
logger.info(f'create new activity for sas_id {task.sas_id}')
activity = Activity(sas_id=task.sas_id,
project=task.project,
workflow_id = task.workflow.id,
filter=task.filter)
activity.save()
task.activity = activity
return task.activity
"""
sas_id = models.CharField(db_index=True, verbose_name="SAS_ID", max_length=15, blank=True, null=True)
project = models.CharField(max_length=100, blank=True, null=True, default="unknown")
filter = models.CharField(max_length=30, blank=True, null=True)
status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True)
calculated_quality = models.CharField(max_length=10, blank=True, null=True)
# this is the JSON blob that is filled in by the ldv_archiver during the ingest process
archive = models.JSONField(null=True, blank=True)
is_verified = models.BooleanField(default=False)
finished_fraction = models.FloatField(blank=True, null=True)
ingested_fraction = models.FloatField(blank=True, null=True)
total_size = models.FloatField(blank=True, null=True)
remaining = models.FloatField(blank=True, null=True)
ingestq_status = models.JSONField(null=True, blank=True)
@property
def has_archived(self):
"""
check if any task belonging to this sas_id already has an output SAS_ID at the LTA
"""
try:
if self.archive['sas_id_archived']:
return self.archive['sas_id_archived']
except:
return None
def __str__(self):
return str(self.sas_id)
# Task control properties

Nico Vermaas
committed
task_type = models.CharField(max_length=20, default="regular")
filter = models.CharField(max_length=30, blank=True, null=True)
environment = models.CharField(max_length=255, blank=True, null=True)
new_status = models.CharField(max_length=50, default="defining", null=True)
status = models.CharField(db_index=True, default="unknown", max_length=50,blank=True, null=True)
quality = models.CharField(max_length=10,blank=True, null=True)
calculated_qualities = models.JSONField(null=True, blank=True)
resume = models.BooleanField(verbose_name="Resume", default=True)
creationTime = models.DateTimeField(verbose_name="CreationTime",default=timezone.now, blank=True)
priority = models.IntegerField(default=100, null=True)
purge_policy = models.CharField(max_length=5, default="no", blank=True, null=True)
cleanup_policy = models.CharField(max_length=30, blank=True, null=True)
# LOFAR properties
project = models.CharField(max_length=100, blank=True, null=True, default="unknown")
sas_id = models.CharField(verbose_name="SAS_ID",max_length=15, blank=True, null=True)
inputs = models.JSONField(null=True, blank=True)
outputs = models.JSONField(null=True, blank=True)
remarks = models.JSONField(null=True, blank=True)
meta_scheduling = models.JSONField(null=True, blank=True)
archive = models.JSONField(null=True, blank=True)
size_to_process = models.PositiveBigIntegerField(default=0, null=True, blank=True)
size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True)
total_processing_time = models.IntegerField(default=0, null=True, blank=True)
# relationships
workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.CASCADE, null=True, blank=True)
predecessor = models.ForeignKey('self', related_name='successors', on_delete=models.SET_NULL, null=True, blank=True)
joined_output_task = models.ForeignKey('self', related_name='joined_input_tasks', on_delete=models.SET_NULL, null=True, blank=True)
activity = models.ForeignKey(Activity, related_name='tasks', on_delete=models.SET_NULL, null=True, blank=True)
return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id)
def save(self, *args, **kwargs):
# nv:1mar2023, temporary hack, set tasks 'on hold' as soon they get to 'scrubbed'
# (users forget to do that manually, causing unwanted ingests)
if (self.status != State.SCRUBBED.value) & (self.new_status == State.SCRUBBED.value):
# nv:19jun2023, calculate the qualities for this task
if (self.status != State.STORED.value) & (self.new_status == State.STORED.value):
# read the quality_thresholds from the Configuration table
try:
quality_thresholds = json.loads(Configuration.objects.get(key='quality_thresholds').value)
except:
quality_thresholds = {
"moderate": 20,
"poor": 50,
"overall_poor": 50,
"overall_good": 90,
}
tasks_for_this_sasid = Task.objects.filter(sas_id=self.sas_id)
self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds)
# make sure that every task has an activity (backward compatibility)
# TODO: uncomment to enable SDC-1188 functionality for deploy STEP 2
associate_task_with_activity(self)
# remark:
# a post_save signal is triggered by this save()
# to update the associated 'activity' with relevant aggregated information
super(Task, self).save(*args, **kwargs)
# this translates a view-name (from urls.py) back to a url, to avoid hardcoded url's in the html templates
# bad : <td><a href="/atdb/tasks/{{ task.id }}/" target="_blank">{{ task.taskID }} </a> </td>
# good: <td><a href="{{ task.get_absolute_url }}" target="_blank">{{ task.taskID }} </a> </td>
def get_absolute_url(self):
return reverse('task-detail-view-api', kwargs={'pk': self.pk})
def get_jobs_statusses(self):
statusses = {}
# check the statusses of all the jobs of this id
jobs = Job.objects.filter(task_id=self.id)
for job in jobs:
try:
key = job.metadata['status'].lower()
# if key doesn't exist, add a new one
try:
statusses[key] = statusses[key] + 1
except:
statusses.update({ key : 0 })
statusses[key] = statusses[key] + 1
except:
pass
return statusses
@property
def predecessor_status(self):

Nico Vermaas
committed
try:
return self.predecessor.status
except:

Nico Vermaas
committed
return "no_predecessor"
@property
def has_plots(self):
try:
quality = self.outputs['quality']
plots = quality['plots']
if len(plots) > 0:
return True
else:
return False
except:
return False
@property
def has_summary(self):
try:
summary = self.outputs['quality']['summary']
return True
except:
return False
try:
return self.remarks['quality']
except:
return None

Nico Vermaas
committed
@property
def get_quality_remarks_taskid(self):
try:
return self.remarks['quality_taskid']
except:
return None
@property
def get_quality_remarks_sasid(self):
try:
return self.remarks['quality_sasid']
except:
return None
@property
def quality_as_list(self):
try:
q = convert_quality_to_list_for_template(self)
return q
except:
return None
@property
def quality_as_shortlist(self):
try:
q = convert_quality_to_shortlist_for_template(self)
return q
except:
return None
@property
def summary_as_list(self):
try:
q = convert_summary_to_list_for_template(self)
return q
except:
return None
@property
def sas_id_archived(self):
"""
check if this task already has an output SAS_ID at the LTA
"""
try:
return self.archive['sas_id_archived']
except:
return None
# keep the old mechanism in comments to test/evaluate
# TODO: remove when it is no longer used by the GUI
# --- <CUT> ---
@property
def sas_id_has_archived(self):
"""
check if any task belonging to this sas_id already has an output SAS_ID at the LTA
"""
try:
for task in Task.objects.filter(sas_id=self.sas_id):
try:
if task.archive['sas_id_archived']:
return task.archive['sas_id_archived']
except:
pass
except:
return None
# --- </CUT> ---
@property
def path_to_lta(self):
"""
return the 'path_to_lta' of this task (or None if that fails)
"""
try:
return self.archive['path_to_lta']
except:
return None
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
# keep the old mechanism in comments to test/evaluate, remove later when it works
# TODO: remove when it is no longer used by the GUI
# ---- <CUT> ----
@property
def sasid_path_to_lta(self):
"""
check if any task belonging to this sas_id already has a 'path_to_lta' setting
"""
try:
for task in Task.objects.filter(sas_id=self.sas_id):
try:
if task.archive['path_to_lta']:
return task.archive['path_to_lta']
except:
# if 'path_to_lta' is not found, or 'archive' is empty, continue to the next task
pass
except:
return None
@property
def sasid_is_verified(self):
for task in Task.objects.filter(sas_id=self.sas_id):
if task.status not in verified_statusses:
return False
return True
@property
def sasid_finished_fraction(self):
size_archived = 0
size_remaining = 0
total_size = 0
tasks = Task.objects.filter(sas_id=self.sas_id)
for task in tasks:
if task.status == State.FINISHED.value:
size_archived = size_archived + task.size_to_process
else:
size_remaining = size_remaining + task.size_to_process
total_size = total_size + task.size_to_process
finished = {}
try:
finished['fraction'] = round((size_archived / (size_remaining + size_archived)) * 100)
except:
finished['fraction'] = -1
finished['total_size'] = total_size
finished['remaining'] = size_remaining
return finished
@property
def sasid_ingested_fraction(self):
"""
This 'property' of a task returns the fraction of queued/ingested tasks per SAS_ID
and a list of statusses of other tasks belonging to the same SAS_ID.
It is implemented as 'property', because then it can be used in html pages like this:
<td>{{ task.sasid_ingested_fraction.status }}</td>
<td>{{ task.sasid_ingested_fraction.completion }}%</td>
A selection of statusses are considered 'queued', and another selection is considered 'ingested'.
The division of those 2 are returned as a 'completed %'.
A limited list of statusses for the other tasks that belong to this SAS_ID is also returned.
"""
result = {}
statusses = {'scrubbed': 0, 'archiving': 0, 'archived': 0, 'finishing': 0, 'finished': 0,
'suspended': 0,'discarded': 0, 'archived_failed': 0, 'finished_failed': 0}
tasks = Task.objects.filter(sas_id=self.sas_id)
for task in tasks:
try:
statusses[task.status] += 1
except:
pass
incomplete = int(statusses['scrubbed']) + int(statusses['archiving']) + int(statusses['finishing']) +\
int(statusses['suspended']) + int(statusses['archived_failed']) + int(statusses['finished_failed'])
complete = int(statusses['archived']) + int(statusses['finished'])
completion = round(complete / (incomplete + complete) * 100)
non_zero_statusses = {key: value for key, value in statusses.items() if value != 0}
result['status'] = non_zero_statusses
result['completion'] = completion
return result
# ---- </CUT> ----
@property
def task_type_join(self):
try:
# find out if this is a join/joined type of task, or just a regular task
if self.joined_output_task:
# this tasks has a designated output task, so it is an input task (join)
return 'join'
joined_input_tasks = self.joined_input_tasks.all()
if joined_input_tasks.count()>0:
# this task has input tasks, so it is an output task (joined)
return 'joined'
return 'regular'
except:
# 'the show must go on', don't crash if anything goes wrong, just show it as 'regular'
return 'regular'
@property
def joined_status(self):
# if a task has joined_input_tasks, then check their status
try:
joined_input_tasks = self.joined_input_tasks.all()
status = None
for task in joined_input_tasks:
if status and task.status != status:
return None
status = task.status
# all statusses are the same, return it
return status
except Exception as e:
print(e)
return "unknown"
@property
def stageit_url(self):
stage_request_id = self.stage_request_id
if stage_request_id:
stager_api = Configuration.objects.get(key='stager:api').value
url = f"{stager_api}/requests/{self.stage_request_id}/"
return url
else:
return None
# NV: this shows the latest status change, but because it is a derived property it cannot be sorted.
# This functionality was not requested, and to avoid additional requests about 'sort' functionalty
# it is currently commented out. Could be of use later though, so I leave it in for now.
# @property
# def latest_change(self):
# qs = Status.objects.filter(task__id=self.id).order_by('-timestamp')
# if len(qs) > 0:
# status = qs[0]
# timestamp = status.timestamp
#
# return timestamp
class LogEntry(models.Model):
cpu_cycles = models.IntegerField(null=True,blank=True)
wall_clock_time = models.IntegerField(null=True,blank=True)
url_to_log_file = models.CharField(max_length=100, blank=True, null=True)
service = models.CharField(max_length=30, blank=True, null=True)
step_name = models.CharField(max_length=30, blank=True, null=True)
timestamp = models.DateTimeField(blank=True, null=True)
status = models.CharField(max_length=50,default="defined", blank=True, null=True)
description = models.CharField(max_length=255, blank=True, null=True)
size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True)
# relationships
task = models.ForeignKey(Task, related_name='log_entries', on_delete=models.CASCADE, null=False)
return str(self.id)+ ' - '+ str(self.task)+' - '+self.status +' ('+self.step_name+')'
def save(self, *args, **kwargs):
#override save to check how long this task has been in this state.
try:
# if this logentry already has a wall_clock_time, then don't change it.
if not self.wall_clock_time:
# gather all entries for this task, to discover the previous one.
entries_for_this_task = LogEntry.objects.filter(task=self.task).order_by("-timestamp")
# remove entries from the list, including 'this', so that the latest remaining is the previous entry.
for entry in entries_for_this_task:
entries_for_this_task = entries_for_this_task.exclude(id=entry.id)
if entry == self:
break
try:
latest_entry_before_self = entries_for_this_task.latest('timestamp')
previous_timestamp = latest_entry_before_self.timestamp
except:
# if no previous entry exists, then use the timestamp from the task.
previous_timestamp = self.task.creationTime
dt = (self.timestamp - previous_timestamp).seconds
#message = "logentry for task "+str(self.task.id)+", to "+self.status + " took " + str(dt) + " seconds"
#logger.info(message)
self.wall_clock_time = dt
except Exception as e:
print(e)
finally:
# finally save the Monitor info itself also
super(LogEntry, self).save(*args, **kwargs)
class Status(models.Model):
name = models.CharField(max_length=50, default="unknown")
timestamp = models.DateTimeField(default=timezone.now, blank=True)
# relationships
task = models.ForeignKey(Task, related_name='status_history', on_delete=models.CASCADE, null=False)
# the representation of the value in the REST API
def __str__(self):
formatedDate = self.timestamp.strftime(datetime_format_string)
return str(self.name)+' ('+str(formatedDate)+')'
filter = models.CharField(max_length=30, blank=True, null=True)
# large value needed to hold the dcache token
value = models.CharField(max_length=5120)
# the representation of the value in the REST API
def __str__(self):
return str(self.key)
class Job(models.Model):
type = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True)
task_id = models.IntegerField(null=True, blank=True)
job_id = models.IntegerField(null=True, blank=True)
timestamp = models.DateTimeField(default=timezone.now, blank=True)
metadata = models.JSONField(null=True, blank=True)
# the representation of the value in the REST API
def __str__(self):
return 'task_id:'+str(self.task_id)+', job_id:'+str(self.job_id)
@property
def webdav_url(self):
try:
path = self.metadata['stdout_path']
# form the webdav url
s = path.rsplit('/', 1)
l = s[0].split('/run')
webdav_url = "https://public.spider.surfsara.nl/project/ldv/run" + l[1]
return webdav_url
except:
return "N/A"
class PostProcessingRule(models.Model):
aggregation_key = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True)
trigger_status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True)
# relationships
workflow_to_process = models.ForeignKey(Workflow, related_name='to_process', on_delete=models.SET_NULL, null=True, blank=True)
workflow_to_apply = models.ForeignKey(Workflow, related_name='to_apply',on_delete=models.SET_NULL, null=True, blank=True)
# the representation of the value in the REST API
def __str__(self):
return str(self.aggregation_key)+' - '+str(self.trigger_status)
class LatestMonitor(models.Model):
name = models.CharField(max_length=50, default="unknown")
type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True)
timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
hostname = models.CharField(max_length=50, default="unknown")
process_id = models.IntegerField(null=True, blank=True)
description = models.CharField(max_length=255, blank=True, null=True)
status = models.CharField(max_length=50, default="ok", null=True)
metadata = models.JSONField(null=True, blank=True)
@property
def enabled(self):
try:
enabled = self.metadata['enabled']
return enabled
except:

Nico Vermaas
committed
# only when metadata['enabled']=False' this will be register as false.
# to make sure that services are enabled by default
return "True"
# the representation of the value in the REST API
def __str__(self):
return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status

Nico Vermaas
committed
def purge_old_records():
current_time = timezone.now() # change this
try:
time_threshold = current_time - timedelta(hours=settings.MAX_MONITORING_HISTORY_HOURS)
records_to_delete = Monitor.objects.filter(timestamp__lt=time_threshold).delete()
except Exception as e:
# if MAX_MONITORING_HISTORY_HOURS is not set, then do nothing and continue
pass
class Monitor(models.Model):
name = models.CharField(max_length=50, default="unknown")
type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True)
timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
hostname = models.CharField(max_length=50, default="unknown")
process_id = models.IntegerField(null=True, blank=True)
description = models.CharField(max_length=255, blank=True, null=True)
status = models.CharField(max_length=50, default="ok", null=True)
metadata = models.JSONField(null=True, blank=True)
def save(self, *args, **kwargs):
# check if this combination of service name + hostname already exists
# in the LatestMonitor, and update if it is newer.
try:
latestMonitor = LatestMonitor.objects.get(name=self.name,hostname=self.hostname)
# carry over the metadata, if possible
latest_metadata = latestMonitor.metadata
latestMonitor.delete()
except:
pass
# this combination of name and hostname didn't yet exist, create it.
metadata = self.metadata
try:
if latest_metadata:
metadata = latest_metadata
except:
pass
latestMonitor = LatestMonitor(
name=self.name,
type=self.type,
timestamp=self.timestamp,
hostname = self.hostname,
process_id = self.process_id,
description = self.description,
status = self.status,
)
latestMonitor.save()
# finally save the Monitor info itself also
super(Monitor, self).save(*args, **kwargs)

Nico Vermaas
committed
# and purge the monitoring table to its max
purge_old_records()
# the representation of the value in the REST API
def __str__(self):
return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status