Skip to content
Snippets Groups Projects
models.py 19.9 KiB
Newer Older
Nico Vermaas's avatar
Nico Vermaas committed
from django.db import models
from django.urls import reverse
from django.utils import timezone
from django.utils.timezone import datetime, timedelta
from django.conf import settings
import json
import logging
logger = logging.getLogger(__name__)

Nico Vermaas's avatar
Nico Vermaas committed
# constants
datetime_format_string = '%Y-%m-%dT%H:%M:%SZ'
verified_statusses = ['stored','validated','scrubbed','archived','finished','suspended','discarded']
Nico Vermaas's avatar
Nico Vermaas committed

class Workflow(models.Model):
    description = models.CharField(max_length=500, blank=True, null=True)
Nico Vermaas's avatar
Nico Vermaas committed
    tag = models.CharField(max_length=30, blank=True, null=True)
    workflow_uri = models.CharField(unique=True, max_length=30, blank=True, null=True)
    repository = models.CharField(max_length=100, blank=True, null=True)
    commit_id = models.CharField(max_length=100, blank=True, null=True)
    path = models.CharField(max_length=100, blank=True, null=True)
Nico Vermaas's avatar
Nico Vermaas committed
    oi_size_fraction = models.FloatField(blank=True, null=True)
    meta_scheduling = models.JSONField(null=True, blank=True)
    default_parameters = models.JSONField(null=True, blank=True)
    prefetch = models.BooleanField(null=True, default=True)
Nico Vermaas's avatar
Nico Vermaas committed

    def __str__(self):
        return str(self.id) + ' - ' + str(self.workflow_uri)
Nico Vermaas's avatar
Nico Vermaas committed


Nico Vermaas's avatar
Nico Vermaas committed
# convert the quality information from the JSONfield into a easy parsable list for the template
def convert_quality_to_list_for_template(task):
    list = []

    try:
        list.append(str(task.quality_json['uv-coverage']))
        list.append(str(task.quality_json['sensitivity']))
        list.append(str(task.quality_json['observing-conditions']))
Nico Vermaas's avatar
Nico Vermaas committed
        #list.append("-")
Nico Vermaas's avatar
Nico Vermaas committed

    except Exception as err:
        pass

    return list


Nico Vermaas's avatar
Nico Vermaas committed
def convert_quality_to_shortlist_for_template(task):
    list = []

    try:
        list.append(str(task.quality_json['uv-coverage']))
        list.append(str(task.quality_json['sensitivity']))
        list.append(str(task.quality_json['observing-conditions']))
Nico Vermaas's avatar
Nico Vermaas committed
    except Exception as err:
        pass

    return list

Nico Vermaas's avatar
Nico Vermaas committed
def convert_summary_to_list_for_template(task):
    list = []

    try:
        summary = task.quality_json['summary']

    except Exception as err:
        pass

    return list

Nico Vermaas's avatar
Nico Vermaas committed


def calculate_qualities(task):
    calculate the quality for this task, but also the quality for all the combined tasks of this sas_id
Nico Vermaas's avatar
Nico Vermaas committed

    # read the quality_thresholds from the Configuration table
    try:
        quality_thresholds = json.loads(Configuration.objects.get(key='quality_thresholds').value)
    except:
        quality_thresholds = {
            "moderate": 20,
            "poor": 50,
            "overall_poor": 50,
            "overall_good": 90,
        }

    def calculate_quality_task(task):
        """"
        calculate the quality of this task based on rfi_percent values
        The threshold values are written from a configuration json blob

        Using this algorithm from SDCO:
                rfi_i <= 20 % is good
                20% <= rfi_i <= 50 is moderate
                rfi_i > 50 is poor.
                except when rfi_percent	= 0
        """

            summary = task.quality_json["summary"]
Nico Vermaas's avatar
Nico Vermaas committed
            quality = None

            for key in summary:
                record = summary[key]
                rfi_percent = int(record['rfi_percent'])
                if rfi_percent > 0:
                    quality = "good"
                    if rfi_percent >= quality_thresholds['moderate']:
                        quality = "moderate"
                    if rfi_percent > quality_thresholds['poor']:
                        quality = "poor"
            return quality

        except Exception as error:
            logger.info(error)

Nico Vermaas's avatar
Nico Vermaas committed
    def calculate_quality_sasid(unsaved_task):
        """
        calculate the overall quality per sas_id, based on other tasks with the same sas_id
        The threshold values are written from a configuration json blob

        Using this algorithm from SDCO:
             if more then 90 % of all files have a good quality then the dataset has good condition.
             If more then 50 % of all files have a poor quality then the dataset is poor
             otherwise is moderate.
        """
        try:
            # gather the results of all the calculated_quality values for this sas_id
            qualities = {'poor': 0, 'moderate': 0, 'good': 0}
Nico Vermaas's avatar
Nico Vermaas committed
            for task in Task.objects.filter(sas_id=unsaved_task.sas_id):

                # because this all happens in the overridden 'Task.save', the actual saving has not yet occurred
                # So use the calculated quality from the unsaved task instead.
                if task.id == unsaved_task.id:
                    t = unsaved_task
                else:
                    t = task

                try:
                    key = t.calculated_qualities['per_task']
                    qualities[key] = qualities[key] + 1
Nico Vermaas's avatar
Nico Vermaas committed
                except:
                    # ignore the tasks that have no calculated quality (they are probably not 'stored').
                    pass


            total = qualities['poor'] + qualities['moderate'] + qualities['good']
                percentage_poor = (qualities['poor'] / total) * 100
                percentage_good = (qualities['good'] / total) * 100
Nico Vermaas's avatar
Nico Vermaas committed
                quality_sasid = "moderate"

                if percentage_poor >= quality_thresholds['overall_poor']:
                    quality_sasid = 'poor'
                if percentage_good >= quality_thresholds['overall_good']:
                    quality_sasid = 'good'

            return quality_sasid

        except Exception as error:
            logger.info(error)


Nico Vermaas's avatar
Nico Vermaas committed
    # --- main function body ---
    # calculate the quality for this task
    calculated_quality_task = calculate_quality_task(task)
Nico Vermaas's avatar
Nico Vermaas committed
    # store the result in task.calculated_qualities (not yet saved in the database)
    d = task.calculated_qualities
    if not d:
        d = {}
    d['per_task'] = calculated_quality_task
    task.calculated_qualities = d
Nico Vermaas's avatar
Nico Vermaas committed
    # update the overall quality of all tasks for this sas_id
    calculated_quality_sasid = calculate_quality_sasid(task)
Nico Vermaas's avatar
Nico Vermaas committed
    # store the result in task.calculated_qualities (not yet saved in the database)
    d['per_sasid'] = calculated_quality_sasid
Nico Vermaas's avatar
Nico Vermaas committed
    return d
class Task(models.Model):
    task_type = models.CharField(max_length=20, default="regular")
    filter = models.CharField(max_length=30, blank=True, null=True)
    environment = models.CharField(max_length=255, blank=True, null=True)
    new_status = models.CharField(max_length=50, default="defining", null=True)
    status = models.CharField(db_index=True, default="unknown", max_length=50,blank=True, null=True)
    quality = models.CharField(max_length=10,blank=True, null=True)
    calculated_qualities = models.JSONField(null=True, blank=True)
Nico Vermaas's avatar
Nico Vermaas committed
    resume = models.BooleanField(verbose_name="Resume", default=True)
Nico Vermaas's avatar
Nico Vermaas committed
    creationTime = models.DateTimeField(verbose_name="CreationTime",default=datetime.utcnow, blank=True)
    priority = models.IntegerField(default=100, null=True)
    purge_policy = models.CharField(max_length=5, default="no", blank=True, null=True)
    cleanup_policy = models.CharField(max_length=30, blank=True, null=True)
Nico Vermaas's avatar
Nico Vermaas committed
    stage_request_id = models.IntegerField(null=True)
    # LOFAR properties
    project = models.CharField(max_length=100, blank=True, null=True, default="unknown")
Nico Vermaas's avatar
Nico Vermaas committed
    sas_id = models.CharField(verbose_name="SAS_ID",max_length=15, blank=True, null=True)
    inputs = models.JSONField(null=True, blank=True)
    outputs = models.JSONField(null=True, blank=True)
Nico Vermaas's avatar
Nico Vermaas committed
    metrics = models.JSONField(null=True, blank=True)
    remarks = models.JSONField(null=True, blank=True)
    meta_scheduling = models.JSONField(null=True, blank=True)
    archive = models.JSONField(null=True, blank=True)
Nico Vermaas's avatar
Nico Vermaas committed
    size_to_process = models.PositiveBigIntegerField(default=0, null=True, blank=True)
    size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True)
    total_processing_time = models.IntegerField(default=0, null=True, blank=True)
    workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.CASCADE, null=True, blank=True)
    predecessor = models.ForeignKey('self', related_name='successors', on_delete=models.SET_NULL, null=True, blank=True)
Nico Vermaas's avatar
Nico Vermaas committed

    def __str__(self):
        return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id)
Nico Vermaas's avatar
Nico Vermaas committed

    def save(self, *args, **kwargs):
        # nv:1mar2023, temporary hack, set tasks 'on hold' as soon they get to 'scrubbed'
        # (users forget to do that manually, causing unwanted ingests)
        if (self.status != 'scrubbed') & (self.new_status == 'scrubbed'):
        # nv:19jun2023, calculate the qualities for this task
        if (self.status != 'stored') & (self.new_status == 'stored'):
            self.calculated_qualities = calculate_qualities(self)
        super(Task, self).save(*args, **kwargs)


    # this translates a view-name (from urls.py) back to a url, to avoid hardcoded url's in the html templates
    # bad : <td><a href="/atdb/tasks/{{ task.id }}/" target="_blank">{{ task.taskID }} </a> </td>
    # good: <td><a href="{{ task.get_absolute_url }}" target="_blank">{{ task.taskID }} </a> </td>
    def get_absolute_url(self):
        return reverse('task-detail-view-api', kwargs={'pk': self.pk})

    @property
    def predecessor_status(self):
Nico Vermaas's avatar
Nico Vermaas committed

Nico Vermaas's avatar
Nico Vermaas committed
    @property
    def has_quality(self):
        try:
Nico Vermaas's avatar
Nico Vermaas committed
            quality = self.outputs['quality']
Nico Vermaas's avatar
Nico Vermaas committed
            return True
        except:
Nico Vermaas's avatar
Nico Vermaas committed
            return False
Nico Vermaas's avatar
Nico Vermaas committed
    @property
    def has_plots(self):
        try:
            quality = self.outputs['quality']
            plots = quality['plots']
            if len(plots) > 0:
                return True
            else:
                return False

        except:
            return False

    @property
    def has_summary(self):
        try:
            summary = self.outputs['quality']['summary']
            return True
        except:
            return False


Nico Vermaas's avatar
Nico Vermaas committed
    @property
    def quality_json(self):
Nico Vermaas's avatar
Nico Vermaas committed
        try:
Nico Vermaas's avatar
Nico Vermaas committed
            return self.outputs['quality']
Nico Vermaas's avatar
Nico Vermaas committed
        except:
Nico Vermaas's avatar
Nico Vermaas committed
            return None
Nico Vermaas's avatar
Nico Vermaas committed
    def get_quality_remarks(self):
        try:
            return self.remarks['quality']
        except:
            return None

    @property
    def get_quality_remarks_taskid(self):
        try:
            return self.remarks['quality_taskid']
        except:
            return None

    @property
    def get_quality_remarks_sasid(self):
        try:
            return self.remarks['quality_sasid']
        except:
            return None

Nico Vermaas's avatar
Nico Vermaas committed
    @property
    def quality_as_list(self):
        try:
            q = convert_quality_to_list_for_template(self)
            return q
        except:
            return None
Nico Vermaas's avatar
Nico Vermaas committed
    @property
    def quality_as_shortlist(self):
        try:
            q = convert_quality_to_shortlist_for_template(self)
            return q
        except:
            return None

Nico Vermaas's avatar
Nico Vermaas committed
    @property
    def summary_as_list(self):
        try:
            q = convert_summary_to_list_for_template(self)
            return q
        except:
            return None

    @property
    def sas_id_archived(self):
        try:
            return self.archive['sas_id_archived']
        except:
            return None

Nico Vermaas's avatar
Nico Vermaas committed
    def path_to_lta(self):
Nico Vermaas's avatar
Nico Vermaas committed
            return self.archive['path_to_lta']
        except:
            return None

    def sasid_is_verified(self):
        for task in Task.objects.filter(sas_id=self.sas_id):
           if task.status not in verified_statusses:
               return False
        return True

    # NV: this shows the latest status change, but because it is a derived property it cannot be sorted.
    # This functionality was not requested, and to avoid additional requests about 'sort' functionalty
    # it is currently commented out. Could be of use later though, so I leave it in for now.
    # @property
    # def latest_change(self):
    #     qs = Status.objects.filter(task__id=self.id).order_by('-timestamp')
    #     if len(qs) > 0:
    #         status = qs[0]
    #         timestamp = status.timestamp
    #
    #     return timestamp

class LogEntry(models.Model):
    cpu_cycles = models.IntegerField(null=True,blank=True)
    wall_clock_time = models.IntegerField(null=True,blank=True)
    url_to_log_file = models.CharField(max_length=100, blank=True, null=True)
Nico Vermaas's avatar
Nico Vermaas committed
    service = models.CharField(max_length=30, blank=True, null=True)
    step_name = models.CharField(max_length=30, blank=True, null=True)
    timestamp = models.DateTimeField(blank=True, null=True)
    status = models.CharField(max_length=50,default="defined", blank=True, null=True)
    description = models.CharField(max_length=255, blank=True, null=True)
Nico Vermaas's avatar
Nico Vermaas committed
    size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True)
    # relationships
    task = models.ForeignKey(Task, related_name='log_entries', on_delete=models.CASCADE, null=False)

    def __str__(self):
        return str(self.id)+ ' - '+ str(self.task)+' - '+self.status +' ('+self.step_name+')'
    def save(self, *args, **kwargs):
        #override save to check how long this task has been in this state.
        try:

            # if this logentry already has a wall_clock_time, then don't change it.
            if not self.wall_clock_time:

                # gather all entries for this task, to discover the previous one.
                entries_for_this_task = LogEntry.objects.filter(task=self.task).order_by("-timestamp")
                # remove entries from the list, including 'this', so that the latest remaining is the previous entry.
                for entry in entries_for_this_task:
                    entries_for_this_task = entries_for_this_task.exclude(id=entry.id)
                    if entry == self:
                        break

                try:
                    latest_entry_before_self = entries_for_this_task.latest('timestamp')
                    previous_timestamp = latest_entry_before_self.timestamp
                except:
                    # if no previous entry exists, then use the timestamp from the task.
                    previous_timestamp = self.task.creationTime

                dt = (self.timestamp - previous_timestamp).seconds
                message = "logentry for task "+str(self.task.id)+", to "+self.status + " took " + str(dt) + " seconds"
                logger.info(message)

                self.wall_clock_time = dt
        except Exception as e:
            print(e)

        finally:
            # finally save the Monitor info itself also
            super(LogEntry, self).save(*args, **kwargs)


Nico Vermaas's avatar
Nico Vermaas committed
class Status(models.Model):
    name = models.CharField(max_length=50, default="unknown")
    timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
    task = models.ForeignKey(Task, related_name='status_history', on_delete=models.CASCADE, null=False)
Nico Vermaas's avatar
Nico Vermaas committed

    # the representation of the value in the REST API
    def __str__(self):
        formatedDate = self.timestamp.strftime(datetime_format_string)
        return str(self.name)+' ('+str(formatedDate)+')'

Nico Vermaas's avatar
Nico Vermaas committed

class Configuration(models.Model):
    filter = models.CharField(max_length=30, blank=True, null=True)
Nico Vermaas's avatar
Nico Vermaas committed
    key = models.CharField(max_length=50)
    # large value needed to hold the dcache token
    value = models.CharField(max_length=5120)
Nico Vermaas's avatar
Nico Vermaas committed

    # the representation of the value in the REST API
    def __str__(self):
Nico Vermaas's avatar
Nico Vermaas committed
        return str(self.key)


class Job(models.Model):
    type = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True)
    task_id = models.IntegerField(null=True, blank=True)
    job_id = models.IntegerField(null=True, blank=True)
    metadata = models.JSONField(null=True, blank=True)

    # the representation of the value in the REST API
    def __str__(self):
        return 'task_id:'+str(self.task_id)+', job_id:'+str(self.job_id)


class PostProcessingRule(models.Model):
    aggregation_key = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True)
    trigger_status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True)

    # relationships
    workflow_to_process = models.ForeignKey(Workflow, related_name='to_process', on_delete=models.SET_NULL, null=True, blank=True)
    workflow_to_apply = models.ForeignKey(Workflow, related_name='to_apply',on_delete=models.SET_NULL, null=True, blank=True)
    # the representation of the value in the REST API
    def __str__(self):
        return str(self.aggregation_key)+' - '+str(self.trigger_status)


class LatestMonitor(models.Model):
    name = models.CharField(max_length=50, default="unknown")
    type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True)
    timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
    hostname = models.CharField(max_length=50, default="unknown")
    process_id = models.IntegerField(null=True, blank=True)
    description = models.CharField(max_length=255, blank=True, null=True)
    status = models.CharField(max_length=50, default="ok", null=True)
    metadata = models.JSONField(null=True, blank=True)

    @property
    def enabled(self):
        try:
            enabled = self.metadata['enabled']
            return enabled
        except:
            # only when metadata['enabled']=False' this will be register as false.
            # to make sure that services are enabled by default
            return "True"
    # the representation of the value in the REST API
    def __str__(self):
        return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status
def purge_old_records():
    current_time = timezone.now()  # change this
    try:
        time_threshold = current_time - timedelta(hours=settings.MAX_MONITORING_HISTORY_HOURS)
        records_to_delete = Monitor.objects.filter(timestamp__lt=time_threshold).delete()
    except Exception as e:
        # if MAX_MONITORING_HISTORY_HOURS is not set, then do nothing and continue
        pass


class Monitor(models.Model):
    name = models.CharField(max_length=50, default="unknown")
    type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True)
    timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
    hostname = models.CharField(max_length=50, default="unknown")
    process_id = models.IntegerField(null=True, blank=True)
    description = models.CharField(max_length=255, blank=True, null=True)
    status = models.CharField(max_length=50, default="ok", null=True)
    metadata = models.JSONField(null=True, blank=True)

    def save(self, *args, **kwargs):
        # check if this combination of service name + hostname already exists
        # in the LatestMonitor, and update if it is newer.
        try:
            latestMonitor = LatestMonitor.objects.get(name=self.name,hostname=self.hostname)
            # carry over the metadata, if possible
            latest_metadata = latestMonitor.metadata
            latestMonitor.delete()
        except:
            pass

        # this combination of name and hostname didn't yet exist, create it.
        metadata = self.metadata
        try:
            if latest_metadata:
                metadata = latest_metadata
        except:
            pass

        latestMonitor = LatestMonitor(
            name=self.name,
            type=self.type,
            timestamp=self.timestamp,
            hostname = self.hostname,
            process_id = self.process_id,
            description = self.description,
            status = self.status,
            metadata = metadata
        # finally save the Monitor info itself also
        super(Monitor, self).save(*args, **kwargs)

        # and purge the monitoring table to its max
        purge_old_records()

    # the representation of the value in the REST API
    def __str__(self):
        return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status