from django.db import models from django.urls import reverse from django.utils import timezone from django.utils.timezone import datetime, timedelta from django.conf import settings import json import logging from enum import Enum from .services import calculated_qualities as qualities logger = logging.getLogger(__name__) # constants class State(Enum): DEFINED = "defined" STAGED = "staged" FETCHED = "fetched" PROCESSED = "processed" STORED = 'stored' VALIDATED = "validated" SCRUBBED = "scrubbed" ARCHIVED = "archived" FINISHED = "finished" SUSPENDED = "suspended" DISCARDED = "discarded" FAILED = "failed" datetime_format_string = '%Y-%m-%dT%H:%M:%SZ' verified_statusses = ['stored','validated','scrubbed','archived','finished','suspended','discarded'] class Workflow(models.Model): description = models.CharField(max_length=500, blank=True, null=True) tag = models.CharField(max_length=30, blank=True, null=True) workflow_uri = models.CharField(unique=True, max_length=30, blank=True, null=True) repository = models.CharField(max_length=100, blank=True, null=True) commit_id = models.CharField(max_length=100, blank=True, null=True) path = models.CharField(max_length=100, blank=True, null=True) oi_size_fraction = models.FloatField(blank=True, null=True) meta_scheduling = models.JSONField(null=True, blank=True) default_parameters = models.JSONField(null=True, blank=True) prefetch = models.BooleanField(null=True, default=True) def __str__(self): return str(self.id) + ' - ' + str(self.workflow_uri) # convert the quality information from the JSONfield into a easy parsable list for the template def convert_quality_to_list_for_template(task): list = [] try: list.append(str(task.quality_json['uv-coverage'])) list.append(str(task.quality_json['sensitivity'])) list.append(str(task.quality_json['observing-conditions'])) #list.append("-") except Exception as err: pass return list def convert_quality_to_shortlist_for_template(task): list = [] try: list.append(str(task.quality_json['uv-coverage'])) list.append(str(task.quality_json['sensitivity'])) list.append(str(task.quality_json['observing-conditions'])) except Exception as err: pass return list def convert_summary_to_list_for_template(task): list = [] try: summary = task.quality_json['summary'] except Exception as err: pass return list class Task(models.Model): # Task control properties task_type = models.CharField(max_length=20, default="regular") filter = models.CharField(max_length=30, blank=True, null=True) environment = models.CharField(max_length=255, blank=True, null=True) new_status = models.CharField(max_length=50, default="defining", null=True) status = models.CharField(db_index=True, default="unknown", max_length=50,blank=True, null=True) quality = models.CharField(max_length=10,blank=True, null=True) calculated_qualities = models.JSONField(null=True, blank=True) resume = models.BooleanField(verbose_name="Resume", default=True) creationTime = models.DateTimeField(verbose_name="CreationTime",default=timezone.now, blank=True) priority = models.IntegerField(default=100, null=True) purge_policy = models.CharField(max_length=5, default="no", blank=True, null=True) cleanup_policy = models.CharField(max_length=30, blank=True, null=True) stage_request_id = models.IntegerField(null=True) # LOFAR properties project = models.CharField(max_length=100, blank=True, null=True, default="unknown") sas_id = models.CharField(verbose_name="SAS_ID",max_length=15, blank=True, null=True) inputs = models.JSONField(null=True, blank=True) outputs = models.JSONField(null=True, blank=True) metrics = models.JSONField(null=True, blank=True) remarks = models.JSONField(null=True, blank=True) meta_scheduling = models.JSONField(null=True, blank=True) archive = models.JSONField(null=True, blank=True) size_to_process = models.PositiveBigIntegerField(default=0, null=True, blank=True) size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True) total_processing_time = models.IntegerField(default=0, null=True, blank=True) # relationships workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.CASCADE, null=True, blank=True) predecessor = models.ForeignKey('self', related_name='successors', on_delete=models.SET_NULL, null=True, blank=True) joined_output_task = models.ForeignKey('self', related_name='joined_input_tasks', on_delete=models.SET_NULL, null=True, blank=True) def __str__(self): return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id) def save(self, *args, **kwargs): # nv:1mar2023, temporary hack, set tasks 'on hold' as soon they get to 'scrubbed' # (users forget to do that manually, causing unwanted ingests) if (self.status != State.SCRUBBED.value) & (self.new_status == State.SCRUBBED.value): self.resume = False # nv:19jun2023, calculate the qualities for this task if (self.status != State.STORED.value) & (self.new_status == State.STORED.value): # read the quality_thresholds from the Configuration table try: quality_thresholds = json.loads(Configuration.objects.get(key='quality_thresholds').value) except: quality_thresholds = { "moderate": 20, "poor": 50, "overall_poor": 50, "overall_good": 90, } tasks_for_this_sasid = Task.objects.filter(sas_id=self.sas_id) self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds) super(Task, self).save(*args, **kwargs) # this translates a view-name (from urls.py) back to a url, to avoid hardcoded url's in the html templates # bad : <td><a href="/atdb/tasks/{{ task.id }}/" target="_blank">{{ task.taskID }} </a> </td> # good: <td><a href="{{ task.get_absolute_url }}" target="_blank">{{ task.taskID }} </a> </td> def get_absolute_url(self): return reverse('task-detail-view-api', kwargs={'pk': self.pk}) def get_jobs_statusses(self): statusses = {} # check the statusses of all the jobs of this id jobs = Job.objects.filter(task_id=self.id) for job in jobs: try: key = job.metadata['status'].lower() # if key doesn't exist, add a new one try: statusses[key] = statusses[key] + 1 except: statusses.update({ key : 0 }) statusses[key] = statusses[key] + 1 except: pass return statusses @property def predecessor_status(self): try: return self.predecessor.status except: return "no_predecessor" @property def has_quality(self): try: quality = self.outputs['quality'] return True except: return False @property def has_plots(self): try: quality = self.outputs['quality'] plots = quality['plots'] if len(plots) > 0: return True else: return False except: return False @property def has_summary(self): try: summary = self.outputs['quality']['summary'] return True except: return False @property def quality_json(self): try: return self.outputs['quality'] except: return None @property def get_quality_remarks(self): try: return self.remarks['quality'] except: return None @property def get_quality_remarks_taskid(self): try: return self.remarks['quality_taskid'] except: return None @property def get_quality_remarks_sasid(self): try: return self.remarks['quality_sasid'] except: return None @property def quality_as_list(self): try: q = convert_quality_to_list_for_template(self) return q except: return None @property def quality_as_shortlist(self): try: q = convert_quality_to_shortlist_for_template(self) return q except: return None @property def summary_as_list(self): try: q = convert_summary_to_list_for_template(self) return q except: return None @property def sas_id_archived(self): try: return self.archive['sas_id_archived'] except: return None @property def path_to_lta(self): try: return self.archive['path_to_lta'] except: return None @property def sasid_is_verified(self): for task in Task.objects.filter(sas_id=self.sas_id): if task.status not in verified_statusses: return False return True @property def sasid_finished_fraction(self): size_archived = 0 size_remaining = 0 total_size = 0 tasks = Task.objects.filter(sas_id=self.sas_id) for task in tasks: if task.status == State.FINISHED.value: size_archived = size_archived + task.size_to_process else: size_remaining = size_remaining + task.size_to_process total_size = total_size + task.size_to_process finished = {} try: finished['fraction'] = round((size_archived / (size_remaining + size_archived)) * 100) except: finished['fraction'] = -1 finished['total_size'] = total_size finished['remaining'] = size_remaining return finished @property def joined_status(self): # if a task has joined_input_tasks, then check their status try: joined_input_tasks = self.joined_input_tasks.all() status = None for task in joined_input_tasks: if status and task.status != status: return None status = task.status # all statusses are the same, return it return status except Exception as e: print(e) return "unknown" @property def stageit_url(self): stage_request_id = self.stage_request_id if stage_request_id: stager_api = Configuration.objects.get(key='stager:api').value url = f"{stager_api}/requests/{self.stage_request_id}/" return url else: return None # NV: this shows the latest status change, but because it is a derived property it cannot be sorted. # This functionality was not requested, and to avoid additional requests about 'sort' functionalty # it is currently commented out. Could be of use later though, so I leave it in for now. # @property # def latest_change(self): # qs = Status.objects.filter(task__id=self.id).order_by('-timestamp') # if len(qs) > 0: # status = qs[0] # timestamp = status.timestamp # # return timestamp class LogEntry(models.Model): cpu_cycles = models.IntegerField(null=True,blank=True) wall_clock_time = models.IntegerField(null=True,blank=True) url_to_log_file = models.CharField(max_length=100, blank=True, null=True) service = models.CharField(max_length=30, blank=True, null=True) step_name = models.CharField(max_length=30, blank=True, null=True) timestamp = models.DateTimeField(blank=True, null=True) status = models.CharField(max_length=50,default="defined", blank=True, null=True) description = models.CharField(max_length=255, blank=True, null=True) size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True) # relationships task = models.ForeignKey(Task, related_name='log_entries', on_delete=models.CASCADE, null=False) def __str__(self): return str(self.id)+ ' - '+ str(self.task)+' - '+self.status +' ('+self.step_name+')' def save(self, *args, **kwargs): #override save to check how long this task has been in this state. try: # if this logentry already has a wall_clock_time, then don't change it. if not self.wall_clock_time: # gather all entries for this task, to discover the previous one. entries_for_this_task = LogEntry.objects.filter(task=self.task).order_by("-timestamp") # remove entries from the list, including 'this', so that the latest remaining is the previous entry. for entry in entries_for_this_task: entries_for_this_task = entries_for_this_task.exclude(id=entry.id) if entry == self: break try: latest_entry_before_self = entries_for_this_task.latest('timestamp') previous_timestamp = latest_entry_before_self.timestamp except: # if no previous entry exists, then use the timestamp from the task. previous_timestamp = self.task.creationTime dt = (self.timestamp - previous_timestamp).seconds message = "logentry for task "+str(self.task.id)+", to "+self.status + " took " + str(dt) + " seconds" logger.info(message) self.wall_clock_time = dt except Exception as e: print(e) finally: # finally save the Monitor info itself also super(LogEntry, self).save(*args, **kwargs) class Status(models.Model): name = models.CharField(max_length=50, default="unknown") timestamp = models.DateTimeField(default=timezone.now, blank=True) # relationships task = models.ForeignKey(Task, related_name='status_history', on_delete=models.CASCADE, null=False) # the representation of the value in the REST API def __str__(self): formatedDate = self.timestamp.strftime(datetime_format_string) return str(self.name)+' ('+str(formatedDate)+')' class Configuration(models.Model): filter = models.CharField(max_length=30, blank=True, null=True) key = models.CharField(max_length=50) # large value needed to hold the dcache token value = models.CharField(max_length=5120) # the representation of the value in the REST API def __str__(self): return str(self.key) class Job(models.Model): type = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True) task_id = models.IntegerField(null=True, blank=True) job_id = models.IntegerField(null=True, blank=True) timestamp = models.DateTimeField(default=timezone.now, blank=True) metadata = models.JSONField(null=True, blank=True) # the representation of the value in the REST API def __str__(self): return 'task_id:'+str(self.task_id)+', job_id:'+str(self.job_id) @property def webdav_url(self): try: path = self.metadata['stdout_path'] # form the webdav url s = path.rsplit('/', 1) l = s[0].split('/run') webdav_url = "https://public.spider.surfsara.nl/project/ldv/run" + l[1] return webdav_url except: return "N/A" class PostProcessingRule(models.Model): aggregation_key = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True) trigger_status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True) # relationships workflow_to_process = models.ForeignKey(Workflow, related_name='to_process', on_delete=models.SET_NULL, null=True, blank=True) workflow_to_apply = models.ForeignKey(Workflow, related_name='to_apply',on_delete=models.SET_NULL, null=True, blank=True) # the representation of the value in the REST API def __str__(self): return str(self.aggregation_key)+' - '+str(self.trigger_status) class LatestMonitor(models.Model): name = models.CharField(max_length=50, default="unknown") type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True) timestamp = models.DateTimeField(default=datetime.utcnow, blank=True) hostname = models.CharField(max_length=50, default="unknown") process_id = models.IntegerField(null=True, blank=True) description = models.CharField(max_length=255, blank=True, null=True) status = models.CharField(max_length=50, default="ok", null=True) metadata = models.JSONField(null=True, blank=True) @property def enabled(self): try: enabled = self.metadata['enabled'] return enabled except: # only when metadata['enabled']=False' this will be register as false. # to make sure that services are enabled by default return "True" # the representation of the value in the REST API def __str__(self): return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status def purge_old_records(): current_time = timezone.now() # change this try: time_threshold = current_time - timedelta(hours=settings.MAX_MONITORING_HISTORY_HOURS) records_to_delete = Monitor.objects.filter(timestamp__lt=time_threshold).delete() except Exception as e: # if MAX_MONITORING_HISTORY_HOURS is not set, then do nothing and continue pass class Monitor(models.Model): name = models.CharField(max_length=50, default="unknown") type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True) timestamp = models.DateTimeField(default=datetime.utcnow, blank=True) hostname = models.CharField(max_length=50, default="unknown") process_id = models.IntegerField(null=True, blank=True) description = models.CharField(max_length=255, blank=True, null=True) status = models.CharField(max_length=50, default="ok", null=True) metadata = models.JSONField(null=True, blank=True) def save(self, *args, **kwargs): # check if this combination of service name + hostname already exists # in the LatestMonitor, and update if it is newer. try: latestMonitor = LatestMonitor.objects.get(name=self.name,hostname=self.hostname) # carry over the metadata, if possible latest_metadata = latestMonitor.metadata latestMonitor.delete() except: pass # this combination of name and hostname didn't yet exist, create it. metadata = self.metadata try: if latest_metadata: metadata = latest_metadata except: pass latestMonitor = LatestMonitor( name=self.name, type=self.type, timestamp=self.timestamp, hostname = self.hostname, process_id = self.process_id, description = self.description, status = self.status, metadata = metadata ) latestMonitor.save() # finally save the Monitor info itself also super(Monitor, self).save(*args, **kwargs) # and purge the monitoring table to its max purge_old_records() # the representation of the value in the REST API def __str__(self): return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status