from django.db import models from django.urls import reverse from django.utils import timezone from django.utils.timezone import datetime, timedelta from django.conf import settings import json import logging from .services import calculated_qualities as qualities from .services.common import State logger = logging.getLogger(__name__) # constants datetime_format_string = '%Y-%m-%dT%H:%M:%SZ' class Workflow(models.Model): description = models.CharField(max_length=500, blank=True, null=True) tag = models.CharField(max_length=30, blank=True, null=True) workflow_uri = models.CharField(unique=True, max_length=30, blank=True, null=True) repository = models.CharField(max_length=100, blank=True, null=True) commit_id = models.CharField(max_length=100, blank=True, null=True) path = models.CharField(max_length=100, blank=True, null=True) oi_size_fraction = models.FloatField(blank=True, null=True) meta_scheduling = models.JSONField(null=True, blank=True) default_parameters = models.JSONField(null=True, blank=True) prefetch = models.BooleanField(null=True, default=True) def __str__(self): return str(self.id) + ' - ' + str(self.workflow_uri) # convert the quality information from the JSONfield into a easy parsable list for the template def convert_quality_to_list_for_template(task): list = [] try: list.append(str(task.quality_json['uv-coverage'])) except: list.append("-") try: list.append(str(task.quality_json['sensitivity'])) except: list.append("-") try: list.append(str(task.quality_json['observing-conditions'])) except: list.append("-") return list def convert_quality_to_shortlist_for_template(task): list = [] try: list.append(str(task.quality_json['uv-coverage'])) list.append(str(task.quality_json['sensitivity'])) list.append(str(task.quality_json['observing-conditions'])) except Exception as err: pass return list def convert_summary_to_list_for_template(task): list = [] try: summary = task.quality_json['summary'] except Exception as err: pass return list def associate_task_with_activity(task): if not task.activity: try: activity = Activity.objects.get(sas_id=task.sas_id) except: # no activity exists yet, create it logger.info(f'create new activity for sas_id {task.sas_id}') activity = Activity(sas_id=task.sas_id, project=task.project, filter=task.filter) activity.save() task.activity = activity return task.activity class Activity(models.Model): """ Aggregation per SAS_ID level """ sas_id = models.CharField(db_index=True, verbose_name="SAS_ID", max_length=15, blank=True, null=True) workflow_id = models.IntegerField(null=True) project = models.CharField(max_length=100, blank=True, null=True, default="unknown") filter = models.CharField(max_length=30, blank=True, null=True) status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True) calculated_quality = models.CharField(max_length=10, blank=True, null=True) # this is the JSON blob that is filled in by the ldv_archiver during the ingest process archive = models.JSONField(null=True, blank=True) # set by update_activity, used by Validation Page is_verified = models.BooleanField(default=False) # TODO: flag set by the 'validate' step in ATDB, used by combine service is_validated = models.BooleanField(default=False) # TODO: flag set (and used) by the combine service, so that it doesn't do double work is_combined = models.BooleanField(default=False) finished_fraction = models.FloatField(blank=True, null=True) ingested_fraction = models.FloatField(blank=True, null=True) total_size = models.FloatField(blank=True, null=True) remaining = models.FloatField(blank=True, null=True) ingestq_status = models.JSONField(null=True, blank=True) @property def has_archived(self): """ check if any task belonging to this sas_id already has an output SAS_ID at the LTA """ try: if self.archive['sas_id_archived']: return self.archive['sas_id_archived'] except: return None def __str__(self): return str(self.sas_id) def check_if_summary(task): """ check if this task is a summary task for backward compatiblity reasons this is done very ugly, by looking if certain filenames contain the substring 'summary' """ # look in the outputs.tar_archive try: tars = task.outputs['tar_archive'] for tar in tars: if 'summary' in tar['basename']: # a summary tarball was found, this task is a summary task return True except: # no 'tar_archive' was found return False return False class Task(models.Model): # Task control properties task_type = models.CharField(max_length=20, default="regular") is_summary = models.BooleanField(default=False) filter = models.CharField(max_length=30, blank=True, null=True) environment = models.CharField(max_length=255, blank=True, null=True) new_status = models.CharField(max_length=50, default="defining", null=True) status = models.CharField(db_index=True, default="unknown", max_length=50,blank=True, null=True) quality = models.CharField(max_length=10,blank=True, null=True) calculated_qualities = models.JSONField(null=True, blank=True) resume = models.BooleanField(verbose_name="Resume", default=True) creationTime = models.DateTimeField(verbose_name="CreationTime",default=timezone.now, blank=True) priority = models.IntegerField(default=100, null=True) purge_policy = models.CharField(max_length=5, default="no", blank=True, null=True) cleanup_policy = models.CharField(max_length=30, blank=True, null=True) stage_request_id = models.IntegerField(null=True) # LOFAR properties project = models.CharField(max_length=100, blank=True, null=True, default="unknown") sas_id = models.CharField(verbose_name="SAS_ID",max_length=15, blank=True, null=True) inputs = models.JSONField(null=True, blank=True) outputs = models.JSONField(null=True, blank=True) metrics = models.JSONField(null=True, blank=True) remarks = models.JSONField(null=True, blank=True) meta_scheduling = models.JSONField(null=True, blank=True) archive = models.JSONField(null=True, blank=True) size_to_process = models.PositiveBigIntegerField(default=0, null=True, blank=True) size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True) total_processing_time = models.IntegerField(default=0, null=True, blank=True) # relationships workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.CASCADE, null=True, blank=True) predecessor = models.ForeignKey('self', related_name='successors', on_delete=models.SET_NULL, null=True, blank=True) joined_output_task = models.ForeignKey('self', related_name='joined_input_tasks', on_delete=models.SET_NULL, null=True, blank=True) # pipeline or observation activity = models.ForeignKey(Activity, related_name='tasks', on_delete=models.SET_NULL, null=True, blank=True) def __str__(self): return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id) def save(self, *args, **kwargs): # nv:1mar2023, temporary hack, set tasks 'on hold' as soon they get to 'scrubbed' # (users forget to do that manually, causing unwanted ingests) if (self.status != State.SCRUBBED.value) & (self.new_status == State.SCRUBBED.value): self.resume = False # nv:19jun2023, calculate the qualities for this task if (self.status != State.STORED.value) & (self.new_status == State.STORED.value): # read the quality_thresholds from the Configuration table try: quality_thresholds = json.loads(Configuration.objects.get(key='quality_thresholds').value) except: quality_thresholds = { "moderate": 20, "poor": 50, "overall_poor": 50, "overall_good": 90, } tasks_for_this_sasid = Task.objects.filter(sas_id=self.sas_id) self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds) # nv:20feb2024, check if this task is a summary task if (self.status != State.STORED.value) & (self.new_status == State.STORED.value): self.is_summary = check_if_summary(self) # nv:20feb2024, same as above, but for backward compatibilty reasons. # For tasks that are already beyond PROCESSED, but not yet ingested. if (self.status != State.VALIDATED.value) & (self.new_status == State.VALIDATED.value): self.is_summary = check_if_summary(self) # make sure that every task has an activity (also for backward compatibility) associate_task_with_activity(self) # remark: # a post_save signal is triggered by this save() # to update the associated 'activity' with relevant aggregated information super(Task, self).save(*args, **kwargs) # this translates a view-name (from urls.py) back to a url, to avoid hardcoded url's in the html templates # bad : <td><a href="/atdb/tasks/{{ task.id }}/" target="_blank">{{ task.taskID }} </a> </td> # good: <td><a href="{{ task.get_absolute_url }}" target="_blank">{{ task.taskID }} </a> </td> def get_absolute_url(self): return reverse('task-detail-view-api', kwargs={'pk': self.pk}) def get_jobs_statusses(self): statusses = {} # check the statusses of all the jobs of this id jobs = Job.objects.filter(task_id=self.id) for job in jobs: try: key = job.metadata['status'].lower() # if key doesn't exist, add a new one try: statusses[key] = statusses[key] + 1 except: statusses.update({ key : 0 }) statusses[key] = statusses[key] + 1 except: pass return statusses @property def predecessor_status(self): try: return self.predecessor.status except: return "no_predecessor" @property def has_quality(self): try: quality = self.outputs['quality'] return True except: return False @property def has_plots(self): try: quality = self.outputs['quality'] plots = quality['plots'] if len(plots) > 0: return True else: return False except: return False @property def has_summary(self): try: summary = self.outputs['quality']['summary'] return True except: return False @property def quality_json(self): try: return self.outputs['quality'] except: return None @property def get_quality_remarks(self): try: return self.remarks['quality'] except: return None @property def get_quality_remarks_taskid(self): try: return self.remarks['quality_taskid'] except: return None @property def get_quality_remarks_sasid(self): try: return self.remarks['quality_sasid'] except: return None @property def quality_as_list(self): try: q = convert_quality_to_list_for_template(self) return q except: return None @property def quality_as_shortlist(self): try: q = convert_quality_to_shortlist_for_template(self) return q except: return None @property def summary_as_list(self): try: q = convert_summary_to_list_for_template(self) return q except: return None @property def sas_id_archived(self): """ check if this task already has an output SAS_ID at the LTA """ try: return self.archive['sas_id_archived'] except: return None @property def path_to_lta(self): """ return the 'path_to_lta' of this task (or None if that fails) """ try: return self.archive['path_to_lta'] except: return None @property def task_type_join(self): try: # find out if this is a join/joined type of task, or just a regular task if self.joined_output_task: # this tasks has a designated output task, so it is an input task (join) return 'join' joined_input_tasks = self.joined_input_tasks.all() if joined_input_tasks.count()>0: # this task has input tasks, so it is an output task (joined) return 'joined' return 'regular' except: # 'the show must go on', don't crash if anything goes wrong, just show it as 'regular' return 'regular' @property def joined_status(self): # if a task has joined_input_tasks, then check their status try: joined_input_tasks = self.joined_input_tasks.all() status = None for task in joined_input_tasks: if status and task.status != status: return None status = task.status # all statusses are the same, return it return status except Exception as e: print(e) return "unknown" @property def stageit_url(self): stage_request_id = self.stage_request_id if stage_request_id: stager_api = Configuration.objects.get(key='stager:api').value url = f"{stager_api}/requests/{self.stage_request_id}/" return url else: return None # NV: this shows the latest status change, but because it is a derived property it cannot be sorted. # This functionality was not requested, and to avoid additional requests about 'sort' functionalty # it is currently commented out. Could be of use later though, so I leave it in for now. # @property # def latest_change(self): # qs = Status.objects.filter(task__id=self.id).order_by('-timestamp') # if len(qs) > 0: # status = qs[0] # timestamp = status.timestamp # # return timestamp class LogEntry(models.Model): cpu_cycles = models.IntegerField(null=True,blank=True) wall_clock_time = models.IntegerField(null=True,blank=True) url_to_log_file = models.CharField(max_length=100, blank=True, null=True) service = models.CharField(max_length=30, blank=True, null=True) step_name = models.CharField(max_length=30, blank=True, null=True) timestamp = models.DateTimeField(blank=True, null=True) status = models.CharField(max_length=50,default="defined", blank=True, null=True) description = models.CharField(max_length=255, blank=True, null=True) size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True) # relationships task = models.ForeignKey(Task, related_name='log_entries', on_delete=models.CASCADE, null=False) def __str__(self): return str(self.id)+ ' - '+ str(self.task)+' - '+self.status +' ('+self.step_name+')' def save(self, *args, **kwargs): #override save to check how long this task has been in this state. try: # if this logentry already has a wall_clock_time, then don't change it. if not self.wall_clock_time: # gather all entries for this task, to discover the previous one. entries_for_this_task = LogEntry.objects.filter(task=self.task).order_by("-timestamp") # remove entries from the list, including 'this', so that the latest remaining is the previous entry. for entry in entries_for_this_task: entries_for_this_task = entries_for_this_task.exclude(id=entry.id) if entry == self: break try: latest_entry_before_self = entries_for_this_task.latest('timestamp') previous_timestamp = latest_entry_before_self.timestamp except: # if no previous entry exists, then use the timestamp from the task. previous_timestamp = self.task.creationTime dt = (self.timestamp - previous_timestamp).seconds #message = "logentry for task "+str(self.task.id)+", to "+self.status + " took " + str(dt) + " seconds" #logger.info(message) self.wall_clock_time = dt except Exception as e: print(e) finally: # finally save the Monitor info itself also super(LogEntry, self).save(*args, **kwargs) class Status(models.Model): name = models.CharField(max_length=50, default="unknown") timestamp = models.DateTimeField(default=timezone.now, blank=True) # relationships task = models.ForeignKey(Task, related_name='status_history', on_delete=models.CASCADE, null=False) # the representation of the value in the REST API def __str__(self): formatedDate = self.timestamp.strftime(datetime_format_string) return str(self.name)+' ('+str(formatedDate)+')' class Configuration(models.Model): filter = models.CharField(max_length=30, blank=True, null=True) key = models.CharField(max_length=50) # large value needed to hold the dcache token value = models.CharField(max_length=5120) # the representation of the value in the REST API def __str__(self): return str(self.key) class Job(models.Model): type = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True) task_id = models.IntegerField(null=True, blank=True) job_id = models.IntegerField(null=True, blank=True) timestamp = models.DateTimeField(default=timezone.now, blank=True) metadata = models.JSONField(null=True, blank=True) # the representation of the value in the REST API def __str__(self): return 'task_id:'+str(self.task_id)+', job_id:'+str(self.job_id) @property def webdav_url(self): try: path = self.metadata['stdout_path'] # form the webdav url s = path.rsplit('/', 1) l = s[0].split('/run') webdav_url = "https://public.spider.surfsara.nl/project/ldv/run" + l[1] return webdav_url except: return "N/A" class PostProcessingRule(models.Model): aggregation_key = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True) trigger_status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True) # relationships workflow_to_process = models.ForeignKey(Workflow, related_name='to_process', on_delete=models.SET_NULL, null=True, blank=True) workflow_to_apply = models.ForeignKey(Workflow, related_name='to_apply',on_delete=models.SET_NULL, null=True, blank=True) # the representation of the value in the REST API def __str__(self): return str(self.aggregation_key)+' - '+str(self.trigger_status) class LatestMonitor(models.Model): name = models.CharField(max_length=50, default="unknown") type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True) timestamp = models.DateTimeField(default=datetime.utcnow, blank=True) hostname = models.CharField(max_length=50, default="unknown") process_id = models.IntegerField(null=True, blank=True) description = models.CharField(max_length=255, blank=True, null=True) status = models.CharField(max_length=50, default="ok", null=True) metadata = models.JSONField(null=True, blank=True) @property def enabled(self): try: enabled = self.metadata['enabled'] return enabled except: # only when metadata['enabled']=False' this will be register as false. # to make sure that services are enabled by default return "True" # the representation of the value in the REST API def __str__(self): return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status def purge_old_records(): current_time = timezone.now() # change this try: time_threshold = current_time - timedelta(hours=settings.MAX_MONITORING_HISTORY_HOURS) records_to_delete = Monitor.objects.filter(timestamp__lt=time_threshold).delete() except Exception as e: # if MAX_MONITORING_HISTORY_HOURS is not set, then do nothing and continue pass class Monitor(models.Model): name = models.CharField(max_length=50, default="unknown") type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True) timestamp = models.DateTimeField(default=datetime.utcnow, blank=True) hostname = models.CharField(max_length=50, default="unknown") process_id = models.IntegerField(null=True, blank=True) description = models.CharField(max_length=255, blank=True, null=True) status = models.CharField(max_length=50, default="ok", null=True) metadata = models.JSONField(null=True, blank=True) def save(self, *args, **kwargs): # check if this combination of service name + hostname already exists # in the LatestMonitor, and update if it is newer. try: latestMonitor = LatestMonitor.objects.get(name=self.name,hostname=self.hostname) # carry over the metadata, if possible latest_metadata = latestMonitor.metadata latestMonitor.delete() except: pass # this combination of name and hostname didn't yet exist, create it. metadata = self.metadata try: if latest_metadata: metadata = latest_metadata except: pass latestMonitor = LatestMonitor( name=self.name, type=self.type, timestamp=self.timestamp, hostname = self.hostname, process_id = self.process_id, description = self.description, status = self.status, metadata = metadata ) latestMonitor.save() # finally save the Monitor info itself also super(Monitor, self).save(*args, **kwargs) # and purge the monitoring table to its max purge_old_records() # the representation of the value in the REST API def __str__(self): return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status