""" File name: algorithms.py Author: Nico Vermaas - Astron Date created: 2019-04-04 Description: Business logic for ATDB. These functions are called from the views (views.py). """ from django.db.models import Q,Sum import logging from .common import timeit from ..models import Task, LogEntry, Workflow from django.conf import settings DATE_FORMAT = "%Y-%m-%d" TIME_FORMAT = "%Y-%m-%d %H:%M:%SZ" DJANGO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" logger = logging.getLogger(__name__) @timeit def get_size(status_list): """ aggregate the sizes of all task with a status in the list :param status_list: list of statuses to consider for the aggregation :return: summed sizes """ logger.info("get_size("+str(status_list)+")") field = 'size_to_process' query = field + '__sum' tasks = Task.objects.filter(status__in=status_list) sum_value = tasks.aggregate(Sum(field))[query] if sum_value == None: sum_value = 0.0 return sum_value def convert_logentries_to_html(log_entries): results = "" try: results += "<th>service</th><th>status</th><th>timestamp</th><th>cpu_cycles</th><th>wall_clock_time</th><th>logfile</th>" results += "<tbody>" for log in log_entries: line = "<tr><td><b>" + log.step_name + '</b></td>' line +='<td class="' + log.status + '" >' + log.status + "</td>" line += "<td>" + str(log.timestamp.strftime("%m-%d-%Y, %H:%M:%S")) + "</td>" line += "<td>" + str(log.cpu_cycles) + "</td>" line += "<td>" + str(log.wall_clock_time) + "</td>" if log.url_to_log_file!=None: link = "<a href=" + '"' + str(log.url_to_log_file) + '" target="_blank">' + "logfile" + "</a>" else: link = "-" line += "<td>" + link + "</td>" results += line results += "</tbody>" except: results = "<tr><td>no data</td></tr>" return results def convert_list_of_dicts_to_html(my_list): results = "" try: for my_dict in my_list: # iterate through the dict of key/values for key,value in my_dict.items(): try: if "://" in value: link = "<a href=" + '"' + value + '">' + key +"</a>" value = link except: pass line = "<tr><td><b>" + str(key) + "</b></td><td>" + str(value) + "</td></tr>" results = results + line except: results = "<tr><td>no data</td></tr>" return results def convert_config_to_html(querylist): results = "" try: for record in querylist: # iterate through the dict of key/values key = record.key value = record.value try: if "://" in value: link = "<a href=" + '"' + value + '">' + key +"</a>" value = link except: pass line = "<tr><td><b>" + str(key) + "</b></td><td>" + str(value) + "</td></tr>" results = results + line except: results = "<tr><td>no data</td></tr>" return results # aggregate information from the tasks table per workflow per status def aggregate_resources_tasks(): workflow_results = [] # get all active tasks active_tasks = Task.objects.filter(status__in=settings.ACTIVE_STATUSSES) # active_tasks_count = active_tasks.count() # retrieve all unique workflows active_workflows = active_tasks.values('workflow').distinct() # iterate through the filters and accumulate logentries for w in active_workflows: workflow_result = {} # extract the workflow object (cheap) workflow = Workflow.objects.get(id = w['workflow']) # get the numbers for this workflow # all tasks for this workflow for the 'grand total' tasks_per_workflow = Task.objects.filter(workflow=workflow) nr_of_tasks_per_workflow = tasks_per_workflow.count() sum_size_to_process = tasks_per_workflow.aggregate(Sum('size_to_process')) workflow_result['size_to_process'] = sum_size_to_process['size_to_process__sum'] sum_size_processed = tasks_per_workflow.aggregate(Sum('size_processed')) workflow_result['size_processed'] = sum_size_processed['size_processed__sum'] # all the active tasks active_tasks_per_workflow = tasks_per_workflow.filter(status__in=settings.ACTIVE_STATUSSES) nr_of_active_tasks_per_workflow = active_tasks_per_workflow.count() # split per status, to see the progress nr_per_status = {} for status in settings.ALL_STATUSSES: record = {} nr_for_this_status = Task.objects.filter(workflow=workflow, status=status).count() record[status] = nr_for_this_status nr_per_status[status] = nr_for_this_status nr_per_status['active'] = nr_of_active_tasks_per_workflow nr_per_status['total'] = nr_of_tasks_per_workflow # store the results in a dict workflow_result['id'] = workflow.id workflow_result['name'] = workflow.workflow_uri workflow_result['nr_of_tasks'] = nr_of_tasks_per_workflow workflow_result['nr_of_active_tasks'] = nr_of_active_tasks_per_workflow workflow_result['nr_of_tasks_per_status'] = nr_per_status workflow_results.append(workflow_result) return workflow_results # aggregate information from the logentries table per workflow per status def aggregate_resources_logs(): records = [] # get all active tasks active_tasks = Task.objects.filter(status__in=settings.ACTIVE_STATUSSES) active_tasks_count = active_tasks.count() # retrieve all unique workflows active_workflows = active_tasks.values('workflow').distinct() # iterate through the filters and accumulate logentries for w in active_workflows: workflow_result = {} # extract the workflow object (cheap) workflow = Workflow.objects.get(id = w['workflow']) # aggregate logentries per step for all active statusses for status in settings.ACTIVE_STATUSSES: record = {} record['name'] = str(workflow.id) +' - '+ workflow.workflow_uri record['status'] = status # aggregate logentries per step for all active statusses (expensive) logs = LogEntry.objects.filter(status=status)\ .filter(task__status__in=settings.ACTIVE_STATUSSES)\ .filter(task__workflow=workflow) sum_cpu_cycles = logs.aggregate(Sum('cpu_cycles')) record['cpu_cycles'] = sum_cpu_cycles['cpu_cycles__sum'] wall_clock_time = logs.aggregate(Sum('wall_clock_time')) record['wall_clock_time'] = wall_clock_time['wall_clock_time__sum'] records.append(record) return records def construct_link(request, status, workflow_id, count): link = str(count) try: query = "?status=" + status + "&workflow__id=" + str(workflow_id) if settings.DEV == True: url = request.build_absolute_uri('/atdb/tasks') + query else: # Unclear why 'build_absolute_uri' doesn't return 'https' in production. # Probably because the https is handled fully outside the container by Traefik # and ATDB is not aware of that. url = "https://" + request.get_host() + '/atdb/tasks' + query link = '<a href="' + url + '" target="_blank">' + str(count) + "</a>" except: pass return link def human_readable(size_in_bytes): for count in ['Bytes', 'KB', 'MB', 'GB', 'TB']: if size_in_bytes > -1024.0 and size_in_bytes < 1024.0: return "%3.1f %s" % (size_in_bytes, count) size_in_bytes /= 1024.0 return "%3.1f %s" % (size_in_bytes, 'PB') def construct_tasks_per_workflow_html(request, workflow_results): # --- Progress of tasks per active workflow --- workflow_results = aggregate_resources_tasks() results_tasks = "<p>Progress of tasks per (active) workflow</p>" header = "<th>Workflow</th>" for status in settings.ALL_STATUSSES: header += "<th>" + status + "</th>" results_tasks += header + '<th class="active">active</th><th>total</th><th>to process</th><th>processed</th>' for workflow_result in workflow_results: d = workflow_result['nr_of_tasks_per_status'] #values = "<td><b>" + str(workflow_result['id'])+" - "+workflow_result['name'] + "</b></td>" values = "<td><b>" + str(workflow_result['id']) + "</b></td>" for key in d: percentage = round(int(d[key]) / int(workflow_result['nr_of_tasks']) * 100) # distinguish active statusses style = "" if key in settings.ACTIVE_STATUSSES or key=='active': style = "active" # bonus: add a query link link = construct_link(request, key, workflow_result['id'], d[key]) values += "<td class=" + style + ">" + str(percentage) + "% (" + link + ")</td>" #values += "<td class="+style+">" + str(percentage) + "% ("+str(d[key])+")</td>" #values += "<td>" + str(d[key]) + "</td>" # add sizes values += "<td>" + str(human_readable(workflow_result['size_to_process'])) + "</td>" try: percentage = round(int(workflow_result['size_processed']) / int(workflow_result['size_to_process']) * 100) except: percentage = 0 values += "<td>" + str(human_readable(workflow_result['size_processed'])) + " ("+ str(percentage) + "%) </td>" results_tasks += "<tr>" + values + "</tr>" results_tasks = "<tbody>" + results_tasks + "</tbody>" return results_tasks def construct_logs_per_workflow_html(log_records): results_logs = "" for record in log_records: # distinguish active statusses style = "" if record['status'] in settings.ACTIVE_STATUSSES: style = "active" line = "<tr><td><b>" + record['name'] + "</b></td>" \ '<td class="' + style + '" >' + record['status'] + \ "</td><td>" + str(record['cpu_cycles']) + \ "</td><td>" + str(record['wall_clock_time']) + "</td><tr>" results_logs += line return results_logs def construct_dashboard_html(request): # --- Progress of tasks per active workflow --- workflow_results = aggregate_resources_tasks() results_tasks = construct_tasks_per_workflow_html(request, workflow_results) # --- logentries --- log_records = aggregate_resources_logs() results_logs = construct_logs_per_workflow_html(log_records) return results_tasks,results_logs