Skip to content
Snippets Groups Projects
Select Git revision
  • 11a839134d7c2323e37885b0864c35437db7f0d0
  • main default protected
  • hookup-to-django
  • small-test-to-test-pipeline
  • connect-to-adex_cache_fastapi-database
  • improve_alta_algorithm
6 results

convert_alta_to_adex_cache.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    models.py 15.30 KiB
    from django.db import models
    from django.urls import reverse
    from django.utils import timezone
    from django.utils.timezone import datetime, timedelta
    from django.conf import settings
    
    
    import logging
    logger = logging.getLogger(__name__)
    
    # constants
    datetime_format_string = '%Y-%m-%dT%H:%M:%SZ'
    verified_statusses = ['stored','validated','scrubbed','archived','finished']
    
    class Workflow(models.Model):
        description = models.CharField(max_length=500, blank=True, null=True)
        workflow_uri = models.CharField(unique=True, max_length=30, blank=True, null=True)
        repository = models.CharField(max_length=100, blank=True, null=True)
        commit_id = models.CharField(max_length=15, blank=True, null=True)
        path = models.CharField(max_length=100, blank=True, null=True)
        oi_size_fraction = models.FloatField(blank=True, null=True)
        meta_scheduling = models.JSONField(null=True, blank=True)
        default_parameters = models.JSONField(null=True, blank=True)
    
        def __str__(self):
            return str(self.id)
    
    
    # convert the quality information from the JSONfield into a easy parsable list for the template
    def convert_quality_to_list_for_template(task):
        list = []
    
        try:
            list.append(str(task.quality_json['uv-coverage']))
            list.append(str(task.quality_json['sensitivity']))
            list.append(str(task.quality_json['observing-conditions']))
            #list.append("-")
    
        except Exception as err:
            pass
    
        return list
    
    
    def convert_quality_to_shortlist_for_template(task):
        list = []
    
        try:
            list.append(str(task.quality_json['uv-coverage']))
            list.append(str(task.quality_json['sensitivity']))
            list.append(str(task.quality_json['observing-conditions']))
        except Exception as err:
            pass
    
        return list
    
    def convert_summary_to_list_for_template(task):
        list = []
    
        try:
            summary = task.quality_json['summary']
    
        except Exception as err:
            pass
    
        return list
    
    
    class Task(models.Model):
    
        # Task control properties
        task_type = models.CharField(max_length=20, default="regular")
        filter = models.CharField(max_length=30, blank=True, null=True)
        #environment = models.JSONField(null=True, blank=True)
        environment = models.CharField(max_length=255, blank=True, null=True)
        new_status = models.CharField(max_length=50, default="defining", null=True)
        status = models.CharField(db_index=True, default="unknown", max_length=50,blank=True, null=True)
        quality = models.CharField(max_length=10,blank=True, null=True)
    
        resume = models.BooleanField(verbose_name="Resume", default=True)
        creationTime = models.DateTimeField(verbose_name="CreationTime",default=datetime.utcnow, blank=True)
    
        priority = models.IntegerField(default=100, null=True)
        purge_policy = models.CharField(max_length=5, default="no", blank=True, null=True)
        stage_request_id = models.IntegerField(null=True)
    
        # LOFAR properties
        project = models.CharField(max_length=100, blank=True, null=True, default="unknown")
        sas_id = models.CharField(verbose_name="SAS_ID",max_length=15, blank=True, null=True)
        inputs = models.JSONField(null=True, blank=True)
        outputs = models.JSONField(null=True, blank=True)
        metrics = models.JSONField(null=True, blank=True)
        remarks = models.JSONField(null=True, blank=True)
        meta_scheduling = models.JSONField(null=True, blank=True)
        archive = models.JSONField(null=True, blank=True)
    
        size_to_process = models.PositiveBigIntegerField(default=0, null=True, blank=True)
        size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True)
        total_processing_time = models.IntegerField(default=0, null=True, blank=True)
    
        # relationships
        workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.SET_NULL, null=True, blank=True)
        predecessor = models.ForeignKey('self', related_name='successors', on_delete=models.SET_NULL, null=True, blank=True)
    
        def __str__(self):
            return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id)
    
        def save(self, *args, **kwargs):
            # nv:1mar2023, temporary hack, set tasks 'on hold' as soon they get to 'stored'
            # (users forget to do that manually, causing unwanted ingests)
    
            if (self.status != 'stored') & (self.new_status == 'stored'):
                self.resume = False
    
            super(Task, self).save(*args, **kwargs)
    
    
        # this translates a view-name (from urls.py) back to a url, to avoid hardcoded url's in the html templates
        # bad : <td><a href="/atdb/tasks/{{ task.id }}/" target="_blank">{{ task.taskID }} </a> </td>
        # good: <td><a href="{{ task.get_absolute_url }}" target="_blank">{{ task.taskID }} </a> </td>
        def get_absolute_url(self):
            return reverse('task-detail-view-api', kwargs={'pk': self.pk})
    
        @property
        def predecessor_status(self):
            try:
                return self.predecessor.status
            except:
                return "no_predecessor"
    
        @property
        def has_quality(self):
            try:
                quality = self.outputs['quality']
                return True
            except:
                return False
                # todo: check if there is a 'quality' structure in the 'task.outputs' at another level?
                try:
                    quality = self.outputs[0]['quality']
                    return True
                except:
                    return False
    
        @property
        def has_plots(self):
            try:
                quality = self.outputs['quality']
                plots = quality['plots']
                if len(plots) > 0:
                    return True
                else:
                    return False
    
            except:
                return False
    
        @property
        def has_summary(self):
            try:
                summary = self.outputs['quality']['summary']
                return True
            except:
                return False
    
    
        @property
        def quality_json(self):
            try:
                return self.outputs['quality']
            except:
                return None
                # todo: check if there is a 'quality' structure in the 'task.outputs' at another level?
    
                try:
                    return self.outputs[0]['quality']
                except:
                    return None
    
        @property
        def get_quality_remarks(self):
            try:
                return self.remarks['quality']
            except:
                return None
    
        @property
        def get_quality_remarks_taskid(self):
            try:
                return self.remarks['quality_taskid']
            except:
                return None
    
        @property
        def get_quality_remarks_sasid(self):
            try:
                return self.remarks['quality_sasid']
            except:
                return None
    
        @property
        def quality_as_list(self):
            try:
                q = convert_quality_to_list_for_template(self)
                return q
            except:
                return None
    
        @property
        def quality_as_shortlist(self):
            try:
                q = convert_quality_to_shortlist_for_template(self)
                return q
            except:
                return None
    
        @property
        def summary_as_list(self):
            try:
                q = convert_summary_to_list_for_template(self)
                return q
            except:
                return None
    
        @property
        def sas_id_archived(self):
            try:
                return self.archive['sas_id_archived']
            except:
                return None
    
        @property
        def path_to_lta(self):
            try:
                return self.archive['path_to_lta']
            except:
                return None
    
    
        @property
        def sasid_is_verified(self):
    
            for task in Task.objects.filter(sas_id=self.sas_id):
               if task.status not in verified_statusses:
                   return False
            return True
    
    
    class LogEntry(models.Model):
        cpu_cycles = models.IntegerField(null=True,blank=True)
        wall_clock_time = models.IntegerField(null=True,blank=True)
        url_to_log_file = models.CharField(max_length=100, blank=True, null=True)
        service = models.CharField(max_length=30, blank=True, null=True)
        step_name = models.CharField(max_length=30, blank=True, null=True)
        timestamp = models.DateTimeField(blank=True, null=True)
        status = models.CharField(max_length=50,default="defined", blank=True, null=True)
        description = models.CharField(max_length=100, blank=True, null=True)
        size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True)
        # relationships
        task = models.ForeignKey(Task, related_name='log_entries', on_delete=models.CASCADE, null=False)
    
        def __str__(self):
            return str(self.id)+ ' - '+ str(self.task)+' - '+self.status +' ('+self.step_name+')'
    
        def save(self, *args, **kwargs):
            #override save to check how long this task has been in this state.
            try:
    
                # if this logentry already has a wall_clock_time, then don't change it.
                if not self.wall_clock_time:
    
                    # gather all entries for this task, to discover the previous one.
                    entries_for_this_task = LogEntry.objects.filter(task=self.task).order_by("-timestamp")
                    # remove entries from the list, including 'this', so that the latest remaining is the previous entry.
                    for entry in entries_for_this_task:
                        entries_for_this_task = entries_for_this_task.exclude(id=entry.id)
                        if entry == self:
                            break
    
                    try:
                        latest_entry_before_self = entries_for_this_task.latest('timestamp')
                        previous_timestamp = latest_entry_before_self.timestamp
                    except:
                        # if no previous entry exists, then use the timestamp from the task.
                        previous_timestamp = self.task.creationTime
    
                    dt = (self.timestamp - previous_timestamp).seconds
                    message = "logentry for task "+str(self.task.id)+", to "+self.status + " took " + str(dt) + " seconds"
                    logger.info(message)
    
                    self.wall_clock_time = dt
            except Exception as e:
                print(e)
    
            finally:
                # finally save the Monitor info itself also
                super(LogEntry, self).save(*args, **kwargs)
    
    
    
    class Status(models.Model):
        name = models.CharField(max_length=50, default="unknown")
        timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
    
        # relationships
        task = models.ForeignKey(Task, related_name='status_history', on_delete=models.CASCADE, null=False)
    
        # the representation of the value in the REST API
        def __str__(self):
            formatedDate = self.timestamp.strftime(datetime_format_string)
            return str(self.name)+' ('+str(formatedDate)+')'
    
    
    class Configuration(models.Model):
        filter = models.CharField(max_length=30, blank=True, null=True)
        key = models.CharField(max_length=50)
        # large value needed to hold the dcache token
        value = models.CharField(max_length=5120)
    
        # the representation of the value in the REST API
        def __str__(self):
            return str(self.key)
    
    
    class Job(models.Model):
        type = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True)
        task_id = models.IntegerField(null=True, blank=True)
        job_id = models.IntegerField(null=True, blank=True)
        metadata = models.JSONField(null=True, blank=True)
    
        # the representation of the value in the REST API
        def __str__(self):
            return 'task_id:'+str(self.task_id)+', job_id:'+str(self.job_id)
    
    
    class PostProcessingRule(models.Model):
        aggregation_key = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True)
        trigger_status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True)
    
        # relationships
        workflow_to_process = models.ForeignKey(Workflow, related_name='to_process', on_delete=models.SET_NULL, null=True, blank=True)
        workflow_to_apply = models.ForeignKey(Workflow, related_name='to_apply',on_delete=models.SET_NULL, null=True, blank=True)
    
        # the representation of the value in the REST API
        def __str__(self):
            return str(self.aggregation_key)+' - '+str(self.trigger_status)
    
    
    class LatestMonitor(models.Model):
        name = models.CharField(max_length=50, default="unknown")
        type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True)
        timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
        hostname = models.CharField(max_length=50, default="unknown")
        process_id = models.IntegerField(null=True, blank=True)
        description = models.CharField(max_length=255, blank=True, null=True)
        status = models.CharField(max_length=50, default="ok", null=True)
        metadata = models.JSONField(null=True, blank=True)
    
        @property
        def enabled(self):
            try:
                enabled = self.metadata['enabled']
                return enabled
            except:
                # only when metadata['enabled']=False' this will be register as false.
                # to make sure that services are enabled by default
                return "True"
    
        # the representation of the value in the REST API
        def __str__(self):
            return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status
    
    
    def purge_old_records():
        current_time = timezone.now()  # change this
        try:
            time_threshold = current_time - timedelta(hours=settings.MAX_MONITORING_HISTORY_HOURS)
            records_to_delete = Monitor.objects.filter(timestamp__lt=time_threshold).delete()
        except Exception as e:
            # if MAX_MONITORING_HISTORY_HOURS is not set, then do nothing and continue
            pass
    
    
    class Monitor(models.Model):
        name = models.CharField(max_length=50, default="unknown")
        type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True)
        timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
        hostname = models.CharField(max_length=50, default="unknown")
        process_id = models.IntegerField(null=True, blank=True)
        description = models.CharField(max_length=255, blank=True, null=True)
        status = models.CharField(max_length=50, default="ok", null=True)
        metadata = models.JSONField(null=True, blank=True)
    
        def save(self, *args, **kwargs):
            # check if this combination of service name + hostname already exists
            # in the LatestMonitor, and update if it is newer.
            try:
                latestMonitor = LatestMonitor.objects.get(name=self.name,hostname=self.hostname)
                # carry over the metadata, if possible
                latest_metadata = latestMonitor.metadata
                latestMonitor.delete()
            except:
                pass
    
            # this combination of name and hostname didn't yet exist, create it.
            metadata = self.metadata
            try:
                if latest_metadata:
                    metadata = latest_metadata
            except:
                pass
    
            latestMonitor = LatestMonitor(
                name=self.name,
                type=self.type,
                timestamp=self.timestamp,
                hostname = self.hostname,
                process_id = self.process_id,
                description = self.description,
                status = self.status,
                metadata = metadata
            )
            latestMonitor.save()
    
            # finally save the Monitor info itself also
            super(Monitor, self).save(*args, **kwargs)
    
            # and purge the monitoring table to its max
            purge_old_records()
    
        # the representation of the value in the REST API
        def __str__(self):
            return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status