Skip to content
Snippets Groups Projects
Select Git revision
  • ab9d249f42fa2ed566b8b31ec0e8dc3f189baf56
  • master default protected
  • MAM-110-propagate-output-sasid
  • MAM-109-specify-ingest-location
  • master-backup-september-2024
5 results

models.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    models.py 25.22 KiB
    from django.db import models
    from django.urls import reverse
    from django.utils import timezone
    from django.utils.timezone import datetime, timedelta
    from django.core.exceptions import MultipleObjectsReturned
    from django.conf import settings
    import json
    import logging
    
    from .services import calculated_qualities as qualities
    from .services.common import State, AggregationStrategy, ACTIVITY_RESET_STATUSSEN
    
    logger = logging.getLogger(__name__)
    
    # constants
    datetime_format_string = '%Y-%m-%dT%H:%M:%SZ'
    
    AGGREGATION_STRATEGY_CHOICES = (
        (AggregationStrategy.NONE.value, AggregationStrategy.NONE.value),
        (AggregationStrategy.WAIT_FOR_SUMMARY_TASK.value, AggregationStrategy.WAIT_FOR_SUMMARY_TASK.value),
        (AggregationStrategy.COLLECT_H5.value, AggregationStrategy.COLLECT_H5.value),
    )
    
    class Workflow(models.Model):
        description = models.CharField(max_length=500, blank=True, null=True)
        tag = models.CharField(max_length=30, blank=True, null=True)
        workflow_uri = models.CharField(unique=True, max_length=30, blank=True, null=True)
        repository = models.CharField(max_length=100, blank=True, null=True)
        commit_id = models.CharField(max_length=100, blank=True, null=True)
        path = models.CharField(max_length=100, blank=True, null=True)
        oi_size_fraction = models.FloatField(blank=True, null=True)
        meta_scheduling = models.JSONField(null=True, blank=True)
        default_parameters = models.JSONField(null=True, blank=True)
        prefetch = models.BooleanField(null=True, default=True)
    
        # this is a fixed list of options, because when an option is added it also requires changes in the aggregator service
        aggregation_strategy = models.CharField(max_length=50, choices = AGGREGATION_STRATEGY_CHOICES, default=AggregationStrategy.NONE.value)
        aggregation_script = models.CharField(max_length=100, blank=True, null=True)
        quality_thresholds = models.CharField(max_length=50, blank=True, null=True)
    
        def __str__(self):
            return str(self.id) + ' - ' + str(self.workflow_uri)
    
    
    def associate_task_with_activity(task):
    
        if not task.activity:
    
            try:
                activity = Activity.objects.get(sas_id=task.sas_id)
            except:
                # no activity exists yet, create it
                logger.info(f'create new activity for sas_id {task.sas_id}')
    
                activity = Activity(sas_id=task.sas_id,
                                    project=task.project,
                                    filter=task.filter)
                activity.save()
    
            task.activity = activity
    
        return task.activity
    
    class Activity(models.Model):
        """
        Aggregation per SAS_ID level
        """
    
        sas_id = models.CharField(db_index=True, verbose_name="SAS_ID", max_length=15, blank=True, null=True)
        workflow_id = models.IntegerField(null=True)
        project = models.CharField(max_length=100, blank=True, null=True, default="unknown")
        filter = models.CharField(max_length=30, blank=True, null=True)
    
        status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True)
        calculated_quality = models.CharField(max_length=10, blank=True, null=True)
    
        # this is the JSON blob that is filled in by the ldv_archiver during the ingest process
        archive = models.JSONField(null=True, blank=True)
    
        # set by update_activity, used by Validation Page
        is_verified  = models.BooleanField(default=False)
    
        # flag set by ATDB to indicate that all tasks of this Activity has been processed
        is_processed = models.BooleanField(default=False)
    
        # flag set (and used) by the aggregator service, so that it doesn't do double work
        is_aggregated = models.BooleanField(default=False)
        # storage_location for aggregation files
        storage_location = models.CharField(max_length=250, blank=True, null=True, default=None)
    
        finished_fraction = models.FloatField(blank=True, null=True)
        ingested_fraction = models.FloatField(blank=True, null=True)
        total_size = models.FloatField(blank=True, null=True)
        remaining = models.FloatField(blank=True, null=True)
    
        ingestq_status = models.JSONField(null=True, blank=True)
    
        @property
        def has_archived(self):
            """
            check if any task belonging to this sas_id already has an output SAS_ID at the LTA
            """
            try:
                if self.archive['sas_id_archived']:
                    return self.archive['sas_id_archived']
    
            except:
                return None
    
        def __str__(self):
            return str(self.sas_id)
    
        def create_storage_location(self, task):
            workdir = Configuration.objects.get(key='executor:workdir').value
            self.storage_location = workdir.replace('run', 'aggregate') + str(self.sas_id)
    
    
    def check_if_summary(task):
        """
        check if this task is a summary task
        for backward compatiblity reasons this is done very ugly, by looking if certain filenames contain the substring 'summary'
        """
        # look for the new 'is_summary' flag
        try:
            tars = task.outputs['summary']
    
            for tar in tars:
                if tar['is_summary']:
                    # a summary tarball was found, this task is a summary task
                    logger.info(f'task {task.id} with workflow {task.workflow} is a summary task')
                    return True
            return False
    
        except:
            # no 'is_summary' field found, but then this is probably an older task.
            # for backward compatibility, ignore the error and continue with they old filename-based method
            pass
        
        # look in the outputs.tar_archive
        try:
            tars = task.outputs['tar_archive']
            for tar in tars:
                if 'summary' in tar['basename']:
                    # a summary tarball was found, this task is a summary task
                    logger.info(f'task {task.id} with workflow {task.workflow} is a summary task')
                    return True
        except:
            # no 'tar_archive' was found
            return False
    
        return False
    
    class Task(models.Model):
    
        # Task control properties
        task_type = models.CharField(max_length=20, default="regular")
        is_summary = models.BooleanField(default=False)
        is_aggregated = models.BooleanField(default=False)
        filter = models.CharField(max_length=30, blank=True, null=True)
        environment = models.CharField(max_length=255, blank=True, null=True)
        new_status = models.CharField(max_length=50, default="defining", null=True)
        status = models.CharField(db_index=True, default="unknown", max_length=50,blank=True, null=True)
        quality = models.CharField(max_length=10,blank=True, null=True)
        calculated_qualities = models.JSONField(null=True, blank=True)
        resume = models.BooleanField(verbose_name="Resume", default=True)
        creationTime = models.DateTimeField(verbose_name="CreationTime",default=timezone.now, blank=True)
    
        priority = models.IntegerField(default=100, null=True)
        purge_policy = models.CharField(max_length=5, default="no", blank=True, null=True)
        cleanup_policy = models.CharField(max_length=30, blank=True, null=True)
        stage_request_id = models.IntegerField(null=True)
    
        # LOFAR properties
        project = models.CharField(max_length=100, blank=True, null=True, default="unknown")
        sas_id = models.CharField(verbose_name="SAS_ID",max_length=15, blank=True, null=True)
        inputs = models.JSONField(null=True, blank=True)
        outputs = models.JSONField(null=True, blank=True)
        metrics = models.JSONField(null=True, blank=True)
        remarks = models.JSONField(null=True, blank=True)
        meta_scheduling = models.JSONField(null=True, blank=True)
        archive = models.JSONField(null=True, blank=True)
    
        size_to_process = models.PositiveBigIntegerField(default=0, null=True, blank=True)
        size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True)
        total_processing_time = models.IntegerField(default=0, null=True, blank=True)
    
        # relationships
        workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.CASCADE, null=True, blank=True)
        predecessor = models.ForeignKey('self', related_name='successors', on_delete=models.SET_NULL, null=True, blank=True)
        joined_output_task = models.ForeignKey('self', related_name='joined_input_tasks', on_delete=models.SET_NULL, null=True, blank=True)
    
        # pipeline or observation
        activity = models.ForeignKey(Activity, related_name='tasks', on_delete=models.SET_NULL, null=True, blank=True)
    
        def __str__(self):
            return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id)
    
        def handle_aggregation(self):
            """
            depending on the aggregation_strategy for this task, different functionality is executed
            """
    
            try:
                # for requantisation pipeline: HOLD summary tasks
                if (self.workflow.aggregation_strategy == AggregationStrategy.WAIT_FOR_SUMMARY_TASK.value):
    
                    if (self.activity.status != State.AGGREGATED.value):
                        # if so, temporarily put it on hold so that the ancillary service can grab it with it
                        if (self.is_summary and not self.activity.is_aggregated):
                            self.resume = False
    
    
                # for image_compression_pipeline: ...
    
                if (self.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value):
    
                    if not self.is_aggregated:
                        # set the task to AGGREGATE,
                        # to prevent the datamanager from picking it up,
                        # and to trigger aggregator service to pick it up,
                        # to copy its H5 files to the activity.storage_location on spider
    
                        # the aggregator will then returns the task to PROCESSED with 'is_aggregated = true',
                        # so that it won't be picked up again.
    
                        # TODO: only activate when the aggregator service actually picks this up
                        self.new_status = State.AGGREGATE.value
                        #pass
    
    
            except Exception as error:
                # this should never happen
                # But it can happen that tasks are inserted directly in an advanced status without going through
                # the proper steps (like in tests). If that happens, just log the error and continue.
                logger.error(error)
    
        def save(self, *args, **kwargs):
    
            # make sure that every task has an activity (also for backward compatibility)
            associate_task_with_activity(self)
    
            # calculate the qualities for this task
            if (self.status != State.STORED.value) & (self.new_status == State.STORED.value):
    
                # read the quality_thresholds from the Configuration table
                try:
                    quality_thresholds = json.loads(Configuration.objects.get(key='quality_thresholds').value)
                except:
                    quality_thresholds = {
                        "moderate": 20,
                        "poor": 50,
                        "overall_poor": 50,
                        "overall_good": 90,
                    }
    
                tasks_for_this_sasid = Task.objects.filter(sas_id=self.sas_id)
                self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds)
    
            # when a task goes to PROCESSED... handle the (potential) aggregation functionality
            if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value):
                self.handle_aggregation()
    
            if self.status in ACTIVITY_RESET_STATUSSEN:
                self.is_aggregated = False
    
            # check in the outputs if this task should be considered to be summary task
            self.is_summary = check_if_summary(self)
    
    
            # remark:
            # a post_save signal is triggered, which in turn executes the activities_handler.update_activity() function
            # to update the associated 'activity' with relevant aggregated information
            # see documentation: https://support.astron.nl/confluence/display/SDCP/Activities
    
            super(Task, self).save(*args, **kwargs)
    
    
        # this translates a view-name (from urls.py) back to a url, to avoid hardcoded url's in the html templates
        # bad : <td><a href="/atdb/tasks/{{ task.id }}/" target="_blank">{{ task.taskID }} </a> </td>
        # good: <td><a href="{{ task.get_absolute_url }}" target="_blank">{{ task.taskID }} </a> </td>
        def get_absolute_url(self):
            return reverse('task-detail-view-api', kwargs={'pk': self.pk})
    
        def get_jobs_statusses(self):
            statusses = {}
    
            # check the statusses of all the jobs of this id
            jobs = Job.objects.filter(task_id=self.id)
            for job in jobs:
    
                try:
                    key = job.metadata['status'].lower()
                    # if key doesn't exist, add a new one
                    try:
                        statusses[key] = statusses[key] + 1
                    except:
                        statusses.update({ key : 0 })
                        statusses[key] = statusses[key] + 1
    
                except:
                    pass
    
            return statusses
    
    
        @property
        def predecessor_status(self):
            try:
                return self.predecessor.status
            except:
                return "no_predecessor"
    
        @property
        def has_quality(self):
            try:
                quality = self.outputs['quality']
                return True
            except:
                return False
    
        @property
        def has_plots(self):
            try:
                quality = self.outputs['quality']
                plots = quality['plots']
                if len(plots) > 0:
                    return True
                else:
                    return False
    
            except:
                return False
    
        @property
        def has_summary(self):
            try:
                summary = self.outputs['quality']['summary']
                return True
            except:
                return False
    
    
        @property
        def quality_json(self):
            try:
                return self.outputs['quality']
            except:
                return None
    
        @property
        def get_quality_remarks(self):
            try:
                return self.remarks['quality']
            except:
                return None
    
        @property
        def get_quality_remarks_taskid(self):
            try:
                return self.remarks['quality_taskid']
            except:
                return None
    
        @property
        def get_quality_remarks_sasid(self):
            try:
                return self.remarks['quality_sasid']
            except:
                return None
    
    
        @property
        def sas_id_archived(self):
            """
            check if this task already has an output SAS_ID at the LTA
            """
            try:
                return self.archive['sas_id_archived']
            except:
                return None
    
        @property
        def path_to_lta(self):
            """
            return the 'path_to_lta' of this task (or None if that fails)
            """
            try:
                return self.archive['path_to_lta']
            except:
                return None
    
        @property
        def task_type_join(self):
            try:
                # find out if this is a join/joined type of task, or just a regular task
                if self.joined_output_task:
                    # this tasks has a designated output task, so it is an input task (join)
                    return 'join'
    
                joined_input_tasks = self.joined_input_tasks.all()
                if joined_input_tasks.count()>0:
                    # this task has input tasks, so it is an output task (joined)
                    return 'joined'
    
                return 'regular'
            except:
                # 'the show must go on', don't crash if anything goes wrong, just show it as 'regular'
                return 'regular'
    
        @property
        def joined_status(self):
            # if a task has joined_input_tasks, then check their status
            try:
                joined_input_tasks = self.joined_input_tasks.all()
                status = None
                for task in joined_input_tasks:
                    if status and task.status != status:
                        return None
                    status = task.status
    
                # all statusses are the same, return it
                return status
            except Exception as e:
                print(e)
    
            return "unknown"
    
        @property
        def stageit_url(self):
            stage_request_id = self.stage_request_id
            if stage_request_id:
                stager_api = Configuration.objects.get(key='stager:api').value
                url = f"{stager_api}/requests/{self.stage_request_id}/"
                return url
            else:
                return None
    
        # NV: this shows the latest status change, but because it is a derived property it cannot be sorted.
        # This functionality was not requested, and to avoid additional requests about 'sort' functionalty
        # it is currently commented out. Could be of use later though, so I leave it in for now.
        # @property
        # def latest_change(self):
        #     qs = Status.objects.filter(task__id=self.id).order_by('-timestamp')
        #     if len(qs) > 0:
        #         status = qs[0]
        #         timestamp = status.timestamp
        #
        #     return timestamp
    
    
    class LogEntry(models.Model):
        cpu_cycles = models.IntegerField(null=True,blank=True)
        wall_clock_time = models.IntegerField(null=True,blank=True)
        url_to_log_file = models.CharField(max_length=100, blank=True, null=True)
        service = models.CharField(max_length=30, blank=True, null=True)
        step_name = models.CharField(max_length=30, blank=True, null=True)
        timestamp = models.DateTimeField(blank=True, null=True)
        status = models.CharField(max_length=50,default="defined", blank=True, null=True)
        description = models.CharField(max_length=255, blank=True, null=True)
        size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True)
        # relationships
        task = models.ForeignKey(Task, related_name='log_entries', on_delete=models.CASCADE, null=False)
    
        def __str__(self):
            return str(self.id)+ ' - '+ str(self.task)+' - '+self.status +' ('+self.step_name+')'
    
        def save(self, *args, **kwargs):
            #override save to check how long this task has been in this state.
            try:
    
                # if this logentry already has a wall_clock_time, then don't change it.
                if not self.wall_clock_time:
    
                    # gather all entries for this task, to discover the previous one.
                    entries_for_this_task = LogEntry.objects.filter(task=self.task).order_by("-timestamp")
                    # remove entries from the list, including 'this', so that the latest remaining is the previous entry.
                    for entry in entries_for_this_task:
                        entries_for_this_task = entries_for_this_task.exclude(id=entry.id)
                        if entry == self:
                            break
    
                    try:
                        latest_entry_before_self = entries_for_this_task.latest('timestamp')
                        previous_timestamp = latest_entry_before_self.timestamp
                    except:
                        # if no previous entry exists, then use the timestamp from the task.
                        previous_timestamp = self.task.creationTime
    
                    dt = (self.timestamp - previous_timestamp).seconds
                    #message = "logentry for task "+str(self.task.id)+", to "+self.status + " took " + str(dt) + " seconds"
                    #logger.info(message)
    
                    self.wall_clock_time = dt
            except Exception as e:
                print(e)
    
            finally:
                # finally save the Monitor info itself also
                super(LogEntry, self).save(*args, **kwargs)
    
    
    
    class Status(models.Model):
        name = models.CharField(max_length=50, default="unknown")
        timestamp = models.DateTimeField(default=timezone.now, blank=True)
    
        # relationships
        task = models.ForeignKey(Task, related_name='status_history', on_delete=models.CASCADE, null=False)
    
        # the representation of the value in the REST API
        def __str__(self):
            formatedDate = self.timestamp.strftime(datetime_format_string)
            return str(self.name)+' ('+str(formatedDate)+')'
    
    
    class Configuration(models.Model):
        filter = models.CharField(max_length=30, blank=True, null=True)
        key = models.CharField(max_length=50)
        # large value needed to hold the dcache token
        value = models.CharField(max_length=5120)
    
        # the representation of the value in the REST API
        def __str__(self):
            return str(self.key)
    
    
    class Job(models.Model):
        type = models.CharField(db_index=True, max_length=30, default=None,null=True, blank=True)
        task_id = models.IntegerField(null=True, blank=True)
        job_id = models.IntegerField(null=True, blank=True)
        timestamp = models.DateTimeField(default=timezone.now, blank=True)
        metadata = models.JSONField(null=True, blank=True)
    
        # the representation of the value in the REST API
        def __str__(self):
            return 'task_id:'+str(self.task_id)+', job_id:'+str(self.job_id)
    
        @property
        def webdav_url(self):
            try:
                path = self.metadata['stdout_path']
    
                # form the webdav url
                s = path.rsplit('/', 1)
                l = s[0].split('/run')
                webdav_url = "https://public.spider.surfsara.nl/project/ldv/run" + l[1]
    
                return webdav_url
            except:
                return "N/A"
    
    
    class PostProcessingRule(models.Model):
        aggregation_key = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True)
        trigger_status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True)
    
        # relationships
        workflow_to_process = models.ForeignKey(Workflow, related_name='to_process', on_delete=models.SET_NULL, null=True, blank=True)
        workflow_to_apply = models.ForeignKey(Workflow, related_name='to_apply',on_delete=models.SET_NULL, null=True, blank=True)
    
        # the representation of the value in the REST API
        def __str__(self):
            return str(self.aggregation_key)+' - '+str(self.trigger_status)
    
    
    class LatestMonitor(models.Model):
        name = models.CharField(max_length=50, default="unknown")
        type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True)
        timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
        hostname = models.CharField(max_length=50, default="unknown")
        process_id = models.IntegerField(null=True, blank=True)
        description = models.CharField(max_length=255, blank=True, null=True)
        status = models.CharField(max_length=50, default="ok", null=True)
        metadata = models.JSONField(null=True, blank=True)
    
        @property
        def enabled(self):
            try:
                enabled = self.metadata['enabled']
                return enabled
            except:
                # only when metadata['enabled']=False' this will be register as false.
                # to make sure that services are enabled by default
                return "True"
    
        # the representation of the value in the REST API
        def __str__(self):
            return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status
    
    
    def purge_old_records():
        current_time = timezone.now()  # change this
        try:
            time_threshold = current_time - timedelta(hours=settings.MAX_MONITORING_HISTORY_HOURS)
            records_to_delete = Monitor.objects.filter(timestamp__lt=time_threshold).delete()
        except Exception as e:
            # if MAX_MONITORING_HISTORY_HOURS is not set, then do nothing and continue
            pass
    
    
    class Monitor(models.Model):
        name = models.CharField(max_length=50, default="unknown")
        type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True)
        timestamp = models.DateTimeField(default=datetime.utcnow, blank=True)
        hostname = models.CharField(max_length=50, default="unknown")
        process_id = models.IntegerField(null=True, blank=True)
        description = models.CharField(max_length=255, blank=True, null=True)
        status = models.CharField(max_length=50, default="ok", null=True)
        metadata = models.JSONField(null=True, blank=True)
    
        def save(self, *args, **kwargs):
            # check if this combination of service name + hostname already exists
            # in the LatestMonitor, and update if it is newer.
            try:
                latestMonitor = LatestMonitor.objects.get(name=self.name,hostname=self.hostname)
    
            except MultipleObjectsReturned as e:
                # it has happened that latest_monitoring objects failed to delete, 
                # which leads to a growing database and failure of the 'Hold' button on the Monitoring screen.
                LatestMonitor.objects.all().delete()
            except:
                # if something else is wrong, just continue, so that a new entry can be created
                pass
    
            try:
                # carry over the metadata, if possible
                latest_metadata = latestMonitor.metadata
                latestMonitor.delete()
    
            except Exception as error:
                print(error)
                
    
            # this combination of name and hostname didn't yet exist, create it.
            metadata = self.metadata
            try:
                if latest_metadata:
                    metadata = latest_metadata
            except:
                pass
    
            latestMonitor = LatestMonitor(
                name=self.name,
                type=self.type,
                timestamp=self.timestamp,
                hostname = self.hostname,
                process_id = self.process_id,
                description = self.description,
                status = self.status,
                metadata = metadata
            )
            latestMonitor.save()
    
            # finally save the Monitor info itself also
            super(Monitor, self).save(*args, **kwargs)
    
            # and purge the monitoring table to its max
            purge_old_records()
    
        # the representation of the value in the REST API
        def __str__(self):
            return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status