from django.db import models from django.urls import reverse from django.utils import timezone from django.utils.timezone import datetime, timedelta from django.conf import settings import json import logging from .services import calculated_qualities as qualities from .services.common import State, verified_statusses logger = logging.getLogger(__name__) # constants datetime_format_string = '%Y-%m-%dT%H:%M:%SZ' class Workflow(models.Model): description = models.CharField(max_length=500, blank=True, null=True) tag = models.CharField(max_length=30, blank=True, null=True) workflow_uri = models.CharField(unique=True, max_length=30, blank=True, null=True) repository = models.CharField(max_length=100, blank=True, null=True) commit_id = models.CharField(max_length=100, blank=True, null=True) path = models.CharField(max_length=100, blank=True, null=True) oi_size_fraction = models.FloatField(blank=True, null=True) meta_scheduling = models.JSONField(null=True, blank=True) default_parameters = models.JSONField(null=True, blank=True) prefetch = models.BooleanField(null=True, default=True) def __str__(self): return str(self.id) + ' - ' + str(self.workflow_uri) # convert the quality information from the JSONfield into a easy parsable list for the template def convert_quality_to_list_for_template(task): list = [] try: list.append(str(task.quality_json['uv-coverage'])) except: list.append("-") try: list.append(str(task.quality_json['sensitivity'])) except: list.append("-") try: list.append(str(task.quality_json['observing-conditions'])) except: list.append("-") return list def convert_quality_to_shortlist_for_template(task): list = [] try: list.append(str(task.quality_json['uv-coverage'])) list.append(str(task.quality_json['sensitivity'])) list.append(str(task.quality_json['observing-conditions'])) except Exception as err: pass return list def convert_summary_to_list_for_template(task): list = [] try: summary = task.quality_json['summary'] except Exception as err: pass return list def associate_task_with_activity(task): if not task.activity: try: activity = Activity.objects.get(sas_id=task.sas_id) except: # no activity exists yet, create it logger.info(f'create new activity for sas_id {task.sas_id}') activity = Activity(sas_id=task.sas_id, project=task.project, workflow_id = task.workflow.id, filter=task.filter) activity.save() task.activity = activity return task.activity class Activity(models.Model): """ Aggregation per SAS_ID level """ sas_id = models.CharField(db_index=True, verbose_name="SAS_ID", max_length=15, blank=True, null=True) workflow_id = models.IntegerField(null=True) project = models.CharField(max_length=100, blank=True, null=True, default="unknown") filter = models.CharField(max_length=30, blank=True, null=True) status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True) calculated_quality = models.CharField(max_length=10, blank=True, null=True) # this is the JSON blob that is filled in by the ldv_archiver during the ingest process archive = models.JSONField(null=True, blank=True) is_verified = models.BooleanField(default=False) finished_fraction = models.FloatField(blank=True, null=True) ingested_fraction = models.FloatField(blank=True, null=True) total_size = models.FloatField(blank=True, null=True) remaining = models.FloatField(blank=True, null=True) ingestq_status = models.JSONField(null=True, blank=True) @property def has_archived(self): """ check if any task belonging to this sas_id already has an output SAS_ID at the LTA """ try: if self.archive['sas_id_archived']: return self.archive['sas_id_archived'] except: return None def __str__(self): return str(self.sas_id) class Task(models.Model): # Task control properties task_type = models.CharField(max_length=20, default="regular") filter = models.CharField(max_length=30, blank=True, null=True) environment = models.CharField(max_length=255, blank=True, null=True) new_status = models.CharField(max_length=50, default="defining", null=True) status = models.CharField(db_index=True, default="unknown", max_length=50,blank=True, null=True) quality = models.CharField(max_length=10,blank=True, null=True) calculated_qualities = models.JSONField(null=True, blank=True) resume = models.BooleanField(verbose_name="Resume", default=True) creationTime = models.DateTimeField(verbose_name="CreationTime",default=timezone.now, blank=True) priority = models.IntegerField(default=100, null=True) purge_policy = models.CharField(max_length=5, default="no", blank=True, null=True) cleanup_policy = models.CharField(max_length=30, blank=True, null=True) stage_request_id = models.IntegerField(null=True) # LOFAR properties project = models.CharField(max_length=100, blank=True, null=True, default="unknown") sas_id = models.CharField(verbose_name="SAS_ID",max_length=15, blank=True, null=True) inputs = models.JSONField(null=True, blank=True) outputs = models.JSONField(null=True, blank=True) metrics = models.JSONField(null=True, blank=True) remarks = models.JSONField(null=True, blank=True) meta_scheduling = models.JSONField(null=True, blank=True) archive = models.JSONField(null=True, blank=True) size_to_process = models.PositiveBigIntegerField(default=0, null=True, blank=True) size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True) total_processing_time = models.IntegerField(default=0, null=True, blank=True) # relationships workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.CASCADE, null=True, blank=True) predecessor = models.ForeignKey('self', related_name='successors', on_delete=models.SET_NULL, null=True, blank=True) joined_output_task = models.ForeignKey('self', related_name='joined_input_tasks', on_delete=models.SET_NULL, null=True, blank=True) # pipeline or observation activity = models.ForeignKey(Activity, related_name='tasks', on_delete=models.SET_NULL, null=True, blank=True) def __str__(self): return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id) def save(self, *args, **kwargs): # nv:1mar2023, temporary hack, set tasks 'on hold' as soon they get to 'scrubbed' # (users forget to do that manually, causing unwanted ingests) if (self.status != State.SCRUBBED.value) & (self.new_status == State.SCRUBBED.value): self.resume = False # nv:19jun2023, calculate the qualities for this task if (self.status != State.STORED.value) & (self.new_status == State.STORED.value): # read the quality_thresholds from the Configuration table try: quality_thresholds = json.loads(Configuration.objects.get(key='quality_thresholds').value) except: quality_thresholds = { "moderate": 20, "poor": 50, "overall_poor": 50, "overall_good": 90, } tasks_for_this_sasid = Task.objects.filter(sas_id=self.sas_id) self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds) # make sure that every task has an activity (backward compatibility) # TODO: uncomment to enable SDC-1188 functionality for deploy STEP 2 # associate_task_with_activity(self) # remark: # a post_save signal is triggered by this save() # to update the associated 'activity' with relevant aggregated information super(Task, self).save(*args, **kwargs) # this translates a view-name (from urls.py) back to a url, to avoid hardcoded url's in the html templates # bad : <td><a href="/atdb/tasks/{{ task.id }}/" target="_blank">{{ task.taskID }} </a> </td> # good: <td><a href="{{ task.get_absolute_url }}" target="_blank">{{ task.taskID }} </a> </td> def get_absolute_url(self): return reverse('task-detail-view-api', kwargs={'pk': self.pk}) def get_jobs_statusses(self): statusses = {} # check the statusses of all the jobs of this id jobs = Job.objects.filter(task_id=self.id) for job in jobs: try: key = job.metadata['status'].lower() # if key doesn't exist, add a new one try: statusses[key] = statusses[key] + 1 except: statusses.update({ key : 0 }) statusses[key] = statusses[key] + 1 except: pass return statusses @property def predecessor_status(self): try: return self.predecessor.status except: return "no_predecessor" @property def has_quality(self): try: quality = self.outputs['quality'] return True except: return False @property def has_plots(self): try: quality = self.outputs['quality'] plots = quality['plots'] if len(plots) > 0: return True else: return False except: return False @property def has_summary(self): try: summary = self.outputs['quality']['summary'] return True except: return False @property def quality_json(self): try: return self.outputs['quality'] except: return None @property def get_quality_remarks(self): try: return self.remarks['quality'] except: return None @property def get_quality_remarks_taskid(self): try: return self.remarks['quality_taskid'] except: return None @property def get_quality_remarks_sasid(self): try: return self.remarks['quality_sasid'] except: return None @property def quality_as_list(self): try: q = convert_quality_to_list_for_template(self) return q except: return None @property def quality_as_shortlist(self): try: q = convert_quality_to_shortlist_for_template(self) return q except: return None @property def summary_as_list(self): try: q = convert_summary_to_list_for_template(self) return q except: return None @property def sas_id_archived(self): """ check if this task already has an output SAS_ID at the LTA """ try: return self.archive['sas_id_archived'] except: return None # keep the old mechanism in comments to test/evaluate # TODO: remove when it is no longer used by the GUI # --- <CUT> --- @property def sas_id_has_archived(self): """ check if any task belonging to this sas_id already has an output SAS_ID at the LTA """ try: for task in Task.objects.filter(sas_id=self.sas_id): try: if task.archive['sas_id_archived']: return task.archive['sas_id_archived'] except: pass except: return None # --- </CUT> --- @property def path_to_lta(self): """ return the 'path_to_lta' of this task (or None if that fails) """ try: return self.archive['path_to_lta'] except: return None # keep the old mechanism in comments to test/evaluate, remove later when it works # TODO: remove when it is no longer used by the GUI # ---- <CUT> ---- @property def sasid_path_to_lta(self): """ check if any task belonging to this sas_id already has a 'path_to_lta' setting """ try: for task in Task.objects.filter(sas_id=self.sas_id): try: if task.archive['path_to_lta']: return task.archive['path_to_lta'] except: # if 'path_to_lta' is not found, or 'archive' is empty, continue to the next task pass except: return None @property def sasid_is_verified(self): for task in Task.objects.filter(sas_id=self.sas_id): if task.status not in verified_statusses: return False return True @property def sasid_finished_fraction(self): size_archived = 0 size_remaining = 0 total_size = 0 tasks = Task.objects.filter(sas_id=self.sas_id) for task in tasks: if task.status == State.FINISHED.value: size_archived = size_archived + task.size_to_process else: size_remaining = size_remaining + task.size_to_process total_size = total_size + task.size_to_process finished = {} try: finished['fraction'] = round((size_archived / (size_remaining + size_archived)) * 100) except: finished['fraction'] = -1 finished['total_size'] = total_size finished['remaining'] = size_remaining return finished @property def sasid_ingested_fraction(self): """ This 'property' of a task returns the fraction of queued/ingested tasks per SAS_ID and a list of statusses of other tasks belonging to the same SAS_ID. It is implemented as 'property', because then it can be used in html pages like this: <td>{{ task.sasid_ingested_fraction.status }}</td> <td>{{ task.sasid_ingested_fraction.completion }}%</td> A selection of statusses are considered 'queued', and another selection is considered 'ingested'. The division of those 2 are returned as a 'completed %'. A limited list of statusses for the other tasks that belong to this SAS_ID is also returned. """ result = {} statusses = {'scrubbed': 0, 'archiving': 0, 'archived': 0, 'finishing': 0, 'finished': 0, 'suspended': 0,'discarded': 0, 'archived_failed': 0, 'finished_failed': 0} tasks = Task.objects.filter(sas_id=self.sas_id) for task in tasks: try: statusses[task.status] += 1 except: pass incomplete = int(statusses['scrubbed']) + int(statusses['archiving']) + int(statusses['finishing']) +\ int(statusses['suspended']) + int(statusses['archived_failed']) + int(statusses['finished_failed']) complete = int(statusses['archived']) + int(statusses['finished']) completion = round(complete / (incomplete + complete) * 100) non_zero_statusses = {key: value for key, value in statusses.items() if value != 0} result['status'] = non_zero_statusses result['completion'] = completion return result # ---- </CUT> ---- @property def task_type_join(self): try: # find out if this is a join/joined type of task, or just a regular task if self.joined_output_task: # this tasks has a designated output task, so it is an input task (join) return 'join' joined_input_tasks = self.joined_input_tasks.all() if joined_input_tasks.count()>0: # this task has input tasks, so it is an output task (joined) return 'joined' return 'regular' except: # 'the show must go on', don't crash if anything goes wrong, just show it as 'regular' return 'regular' @property def joined_status(self): # if a task has joined_input_tasks, then check their status try: joined_input_tasks = self.joined_input_tasks.all() status = None for task in joined_input_tasks: if status and task.status != status: return None status = task.status # all statusses are the same, return it return status except Exception as e: print(e) return "unknown" @property def stageit_url(self): stage_request_id = self.stage_request_id if stage_request_id: stager_api = Configuration.objects.get(key='stager:api').value url = f"{stager_api}/requests/{self.stage_request_id}/" return url else: return None # NV: this shows the latest status change, but because it is a derived property it cannot be sorted. # This functionality was not requested, and to avoid additional requests about 'sort' functionalty # it is currently commented out. Could be of use later though, so I leave it in for now. # @property # def latest_change(self): # qs = Status.objects.filter(task__id=self.id).order_by('-timestamp') # if len(qs) > 0: # status = qs[0] # timestamp = status.timestamp # # return timestamp class LogEntry(models.Model): cpu_cycles = models.IntegerField(null=True,blank=True) wall_clock_time = models.IntegerField(null=True,blank=True) url_to_log_file = models.CharField(max_length=100, blank=True, null=True) service = models.CharField(max_length=30, blank=True, null=True) step_name = models.CharField(max_length=30, blank=True, null=True) timestamp = models.DateTimeField(blank=True, null=True) status = models.CharField(max_length=50,default="defined", blank=True, null=True) description = models.CharField(max_length=255, blank=True, null=True) size_processed = models.PositiveBigIntegerField(default=0, null=True, blank=True) # relationships task = models.ForeignKey(Task, related_name='log_entries', on_delete=models.CASCADE, null=False) def __str__(self): return str(self.id)+ ' - '+ str(self.task)+' - '+self.status +' ('+self.step_name+')' def save(self, *args, **kwargs): #override save to check how long this task has been in this state. try: # if this logentry already has a wall_clock_time, then don't change it. if not self.wall_clock_time: # gather all entries for this task, to discover the previous one. entries_for_this_task = LogEntry.objects.filter(task=self.task).order_by("-timestamp") # remove entries from the list, including 'this', so that the latest remaining is the previous entry. for entry in entries_for_this_task: entries_for_this_task = entries_for_this_task.exclude(id=entry.id) if entry == self: break try: latest_entry_before_self = entries_for_this_task.latest('timestamp') previous_timestamp = latest_entry_before_self.timestamp except: # if no previous entry exists, then use the timestamp from the task. previous_timestamp = self.task.creationTime dt = (self.timestamp - previous_timestamp).seconds #message = "logentry for task "+str(self.task.id)+", to "+self.status + " took " + str(dt) + " seconds" #logger.info(message) self.wall_clock_time = dt except Exception as e: print(e) finally: # finally save the Monitor info itself also super(LogEntry, self).save(*args, **kwargs) class Status(models.Model): name = models.CharField(max_length=50, default="unknown") timestamp = models.DateTimeField(default=timezone.now, blank=True) # relationships task = models.ForeignKey(Task, related_name='status_history', on_delete=models.CASCADE, null=False) # the representation of the value in the REST API def __str__(self): formatedDate = self.timestamp.strftime(datetime_format_string) return str(self.name)+' ('+str(formatedDate)+')' class Configuration(models.Model): filter = models.CharField(max_length=30, blank=True, null=True) key = models.CharField(max_length=50) # large value needed to hold the dcache token value = models.CharField(max_length=5120) # the representation of the value in the REST API def __str__(self): return str(self.key) class Job(models.Model): type = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True) task_id = models.IntegerField(null=True, blank=True) job_id = models.IntegerField(null=True, blank=True) timestamp = models.DateTimeField(default=timezone.now, blank=True) metadata = models.JSONField(null=True, blank=True) # the representation of the value in the REST API def __str__(self): return 'task_id:'+str(self.task_id)+', job_id:'+str(self.job_id) @property def webdav_url(self): try: path = self.metadata['stdout_path'] # form the webdav url s = path.rsplit('/', 1) l = s[0].split('/run') webdav_url = "https://public.spider.surfsara.nl/project/ldv/run" + l[1] return webdav_url except: return "N/A" class PostProcessingRule(models.Model): aggregation_key = models.CharField(db_index=True, max_length=20, default=None,null=True, blank=True) trigger_status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True) # relationships workflow_to_process = models.ForeignKey(Workflow, related_name='to_process', on_delete=models.SET_NULL, null=True, blank=True) workflow_to_apply = models.ForeignKey(Workflow, related_name='to_apply',on_delete=models.SET_NULL, null=True, blank=True) # the representation of the value in the REST API def __str__(self): return str(self.aggregation_key)+' - '+str(self.trigger_status) class LatestMonitor(models.Model): name = models.CharField(max_length=50, default="unknown") type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True) timestamp = models.DateTimeField(default=datetime.utcnow, blank=True) hostname = models.CharField(max_length=50, default="unknown") process_id = models.IntegerField(null=True, blank=True) description = models.CharField(max_length=255, blank=True, null=True) status = models.CharField(max_length=50, default="ok", null=True) metadata = models.JSONField(null=True, blank=True) @property def enabled(self): try: enabled = self.metadata['enabled'] return enabled except: # only when metadata['enabled']=False' this will be register as false. # to make sure that services are enabled by default return "True" # the representation of the value in the REST API def __str__(self): return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status def purge_old_records(): current_time = timezone.now() # change this try: time_threshold = current_time - timedelta(hours=settings.MAX_MONITORING_HISTORY_HOURS) records_to_delete = Monitor.objects.filter(timestamp__lt=time_threshold).delete() except Exception as e: # if MAX_MONITORING_HISTORY_HOURS is not set, then do nothing and continue pass class Monitor(models.Model): name = models.CharField(max_length=50, default="unknown") type = models.CharField(max_length=20, default="ldv-service", null=True, blank=True) timestamp = models.DateTimeField(default=datetime.utcnow, blank=True) hostname = models.CharField(max_length=50, default="unknown") process_id = models.IntegerField(null=True, blank=True) description = models.CharField(max_length=255, blank=True, null=True) status = models.CharField(max_length=50, default="ok", null=True) metadata = models.JSONField(null=True, blank=True) def save(self, *args, **kwargs): # check if this combination of service name + hostname already exists # in the LatestMonitor, and update if it is newer. try: latestMonitor = LatestMonitor.objects.get(name=self.name,hostname=self.hostname) # carry over the metadata, if possible latest_metadata = latestMonitor.metadata latestMonitor.delete() except: pass # this combination of name and hostname didn't yet exist, create it. metadata = self.metadata try: if latest_metadata: metadata = latest_metadata except: pass latestMonitor = LatestMonitor( name=self.name, type=self.type, timestamp=self.timestamp, hostname = self.hostname, process_id = self.process_id, description = self.description, status = self.status, metadata = metadata ) latestMonitor.save() # finally save the Monitor info itself also super(Monitor, self).save(*args, **kwargs) # and purge the monitoring table to its max purge_old_records() # the representation of the value in the REST API def __str__(self): return str(self.name) + ' - ('+ self.hostname+') - '+str(self.timestamp) + ' - ' + self.status