from django.db import models from django.urls import reverse from django.utils import timezone from django.utils.timezone import datetime, timedelta from django.conf import settings import logging logger = logging.getLogger(__name__) # constants datetime_format_string = '%Y-%m-%dT%H:%M:%SZ' class Workflow(models.Model): description = models.CharField(max_length=500, blank=True, null=True) workflow_uri = models.CharField(unique=True, max_length=30, blank=True, null=True) repository = models.CharField(max_length=100, blank=True, null=True) commit_id = models.CharField(max_length=15, blank=True, null=True) path = models.CharField(max_length=100, blank=True, null=True) oi_size_fraction = models.FloatField(blank=True, null=True) meta_scheduling = models.JSONField(null=True, blank=True) default_parameters = models.JSONField(null=True, blank=True) def __str__(self): return str(self.id) # convert the quality information from the JSONfield into a easy parsable list for the template def convert_quality_to_list_for_template(task): list = [] try: list.append(str(task.quality_json['uv-coverage'])) list.append(str(task.quality_json['sensitivity'])) list.append(str(task.quality_json['observing-conditions'])) #list.append("-") except Exception as err: pass return list def convert_quality_to_shortlist_for_template(task): list = [] try: list.append(str(task.quality_json['uv-coverage'])) list.append(str(task.quality_json['sensitivity'])) list.append(str(task.quality_json['observing-conditions'])) except Exception as err: pass return list def convert_summary_to_list_for_template(task): list = [] try: summary = task.quality_json['summary'] except Exception as err: pass return list class Task(models.Model): # Task control properties task_type = models.CharField(max_length=20, default="regular") filter = models.CharField(max_length=30, blank=True, null=True) #environment = models.JSONField(null=True, blank=True) environment = models.CharField(max_length=255, blank=True, null=True) new_status = models.CharField(max_length=50, default="defining", null=True) status = models.CharField(db_index=True, default="unknown", max_length=50,blank=True, null=True) quality = models.CharField(max_length=10,blank=True, null=True) resume = models.BooleanField(verbose_name="Resume", default=True) creationTime = models.DateTimeField(verbose_name="CreationTime",default=datetime.utcnow, blank=True) priority = models.IntegerField(default=100, null=True) purge_policy = models.CharField(max_length=5, default="no", blank=True, null=True) stage_request_id = models.IntegerField(null=True) # LOFAR properties project = models.CharField(max_length=100, blank=True, null=True, default="unknown") sas_id = models.CharField(verbose_name="SAS_ID",max_length=15, blank=True, null=True) inputs = models.JSONField(null=True, blank=True) outputs = models.JSONField(null=True, blank=True) metrics = models.JSONField(null=True, blank=True) remarks = models.JSONField(null=True, blank=True) meta_scheduling = models.JSONField(null=True, blank=True) size_to_process = models.PositiveBigIntegerField(default=0, null=True, blank=True) size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True) total_processing_time = models.IntegerField(default=0, null=True, blank=True) # relationships workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.SET_NULL, null=True, blank=True) predecessor = models.ForeignKey('self', related_name='successors', on_delete=models.SET_NULL, null=True, blank=True) def __str__(self): return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id) # this translates a view-name (from urls.py) back to a url, to avoid hardcoded url's in the html templates # bad : <td><a href="/atdb/taaks/{{ task.id }}/" target="_blank">{{ task.taskID }} </a> </td> # good: <td><a href="{{ task.get_absolute_url }}" target="_blank">{{ task.taskID }} </a> </td> def get_absolute_url(self): return reverse('task-detail-view-api', kwargs={'pk': self.pk}) @property def predecessor_status(self): try: return self.predecessor.status except: return "no_predecessor" @property def has_quality(self): try: quality = self.outputs['quality'] return True except: return False # todo: check if there is a 'quality' structure in the 'task.outputs' at another level? try: quality = self.outputs[0]['quality'] return True except: return False @property def has_plots(self): try: quality = self.outputs['quality'] plots = quality['plots'] if len(plots) > 0: return True else: return False except: return False @property def has_summary(self): try: summary = self.outputs['quality']['summary'] return True except: return False @property def quality_json(self): try: return self.outputs['quality'] except: return None # todo: check if there is a 'quality' structure in the 'task.outputs' at another level? try: return self.outputs[0]['quality'] except: return None @property def get_quality_remarks(self): try: return self.remarks['quality'] except: return None @property def get_quality_remarks_taskid(self): try: return self.remarks['quality_taskid'] except: return None @property def get_quality_remarks_sasid(self): try: return self.remarks['quality_sasid'] except: return None @property def quality_as_list(self): try: q = convert_quality_to_list_for_template(self) return q except: return None @property def quality_as_shortlist(self): try: q = convert_quality_to_shortlist_for_template(self) return q except: return None @property def summary_as_list(self): try: q = convert_summary_to_list_for_template(self) return q except: return None @property def sas_id_archived(self): try: # --- temporary hack, test data --- return int(self.sas_id)+123445 # --------------------------------- return self.archive['sas_id_archived'] except: return None def lta_object_id(self): try: return self.archive['lta_object_id'] except: return None def url_in_lta(self): try: lta_url = Configuration.objects.get(key='lta:url').value url_in_lta = lta_url + "Lofar?project=ALL&mode=query_result_page" # --- temporary hack, test data --- url_in_lta += "&product=PulsarPipeline&pipeline_object_id=ED0BBBFC49D81C3DE053164A17ACBF9C" # --------------------------------- return url_in_lta except: return None class LogEntry(models.Model): cpu_cycles = models.IntegerField(null=True,blank=True) wall_clock_time = models.IntegerField(null=True,blank=True) url_to_log_file = models.CharField(max_length=100, blank=True, null=True) service = models.CharField(max_length=30, blank=True, null=True) step_name = models.CharField(max_length=30, blank=True, null=True) timestamp = models.DateTimeField(blank=True, null=True) status = models.CharField(max_length=50,default="defined", blank=True, null=True) description = models.CharField(max_length=100, blank=True, null=True) size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True) # relationships task = models.ForeignKey(Task, related_name='log_entries', on_delete=models.CASCADE, null=False) def __str__(self): return str(self.id)+ ' - '+ str(self.task)+' - '+self.status +' ('+self.step_name+')' def save(self, *args, **kwargs): #override save to check how long this task has been in this state. try: # if this logentry already has a wall_clock_time, then don't change it. if not self.wall_clock_time: # gather all entries for this task, to discover the previous one. entries_for_this_task = LogEntry.objects.filter(task=self.task).order_by("-timestamp") # remove entries from the list, including 'this', so that the latest remaining is the previous entry. for entry in entries_for_this_task: entries_for_this_task = entries_for_this_task.exclude(id=entry.id) if entry == self: break try: latest_entry_before_self = entries_for_this_task.latest('timestamp') previous_timestamp = latest_entry_before_self.timestamp except: # if no previous entry exists, then use the timestamp from the task. previous_timestamp = self.task.creationTime dt = (self.timestamp - previous_timestamp).seconds message = "logentry for task "+str(self.task.id)+", to "+self.status + " took " + str(dt) + " seconds" logger.info(message) self.wall_clock_time = dt except Exception as e: print(e) finally: # finally save the Monitor info itself also super(LogEntry, self).save(*args, **kwargs) class Status(models.Model): name = models.CharField(max_length=50, default="unknown") timestamp = models.DateTimeField(default=datetime.utcnow, blank=True) # relationships task = models.ForeignKey(Task, related_name='status_history', on_delete=models.CASCADE, null=False) # the representation of the value in the REST API def __str__(self): formatedDate = self.timestamp.strftime(datetime_format_string) return str(self.name)+' ('+str(formatedDate)+')' class Configuration(models.Model): filter = models.CharField(max_length=30, blank=True, null=True) key = models.CharField(max_length=50) # large value needed to hold the dcache token value = models.CharField(max_length=5120) # the representation of the value in the REST API def __str__(self): return str(self.key) class Job(models.Model): type = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True) task_id = models.IntegerField(null=True, blank=True) job_id = models.IntegerField(null=True, blank=True) metadata = models.JSONField(null=True, blank=True) # the representation of the value in the REST API def __str__(self): return 'task_id:'+str(self.task_id)+', job_id:'+str(self.job_id) class PostProcessingRule(models.Model): aggregation_key = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True) trigger_status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True) # relationships workflow_to_process = models.ForeignKey(Workflow, related_name='to_process', on_delete=models.SET_NULL, null=True, blank=True) workflow_to_apply = models.ForeignKey(Workflow, related_name='to_apply',on_delete=models.SET_NULL, null=True, blank=True) # the representation of the value in the REST API def __str__(self): return str(self.aggregation_key)+' - '+str(self.trigger_status) class LatestMonitor(models.Model): name = models.CharField(max_length=50, default="unknown") type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True) timestamp = models.DateTimeField(default=datetime.utcnow, blank=True) hostname = models.CharField(max_length=50, default="unknown") process_id = models.IntegerField(null=True, blank=True) description = models.CharField(max_length=255, blank=True, null=True) status = models.CharField(max_length=50, default="ok", null=True) metadata = models.JSONField(null=True, blank=True) @property def enabled(self): try: enabled = self.metadata['enabled'] return enabled except: # only when metadata['enabled']=False' this will be register as false. # to make sure that services are enabled by default return "True" # the representation of the value in the REST API def __str__(self): return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status def purge_old_records(): current_time = timezone.now() # change this try: time_threshold = current_time - timedelta(hours=settings.MAX_MONITORING_HISTORY_HOURS) records_to_delete = Monitor.objects.filter(timestamp__lt=time_threshold).delete() except Exception as e: # if MAX_MONITORING_HISTORY_HOURS is not set, then do nothing and continue pass class Monitor(models.Model): name = models.CharField(max_length=50, default="unknown") type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True) timestamp = models.DateTimeField(default=datetime.utcnow, blank=True) hostname = models.CharField(max_length=50, default="unknown") process_id = models.IntegerField(null=True, blank=True) description = models.CharField(max_length=255, blank=True, null=True) status = models.CharField(max_length=50, default="ok", null=True) metadata = models.JSONField(null=True, blank=True) def save(self, *args, **kwargs): # check if this combination of service name + hostname already exists # in the LatestMonitor, and update if it is newer. try: latestMonitor = LatestMonitor.objects.get(name=self.name,hostname=self.hostname) # carry over the metadata, if possible latest_metadata = latestMonitor.metadata latestMonitor.delete() except: pass # this combination of name and hostname didn't yet exist, create it. metadata = self.metadata try: if latest_metadata: metadata = latest_metadata except: pass latestMonitor = LatestMonitor( name=self.name, type=self.type, timestamp=self.timestamp, hostname = self.hostname, process_id = self.process_id, description = self.description, status = self.status, metadata = metadata ) latestMonitor.save() # finally save the Monitor info itself also super(Monitor, self).save(*args, **kwargs) # and purge the monitoring table to its max purge_old_records() # the representation of the value in the REST API def __str__(self): return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status