-
Nico Vermaas authoredNico Vermaas authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
algorithms.py 10.68 KiB
"""
File name: algorithms.py
Author: Nico Vermaas - Astron
Date created: 2019-04-04
Description: Business logic for ATDB. These functions are called from the views (views.py).
"""
from django.db.models import Q,Sum
import logging
from .common import timeit
from ..models import Task, LogEntry, Workflow
from django.conf import settings
DATE_FORMAT = "%Y-%m-%d"
TIME_FORMAT = "%Y-%m-%d %H:%M:%SZ"
DJANGO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
logger = logging.getLogger(__name__)
@timeit
def get_size(status_list):
"""
aggregate the sizes of all task with a status in the list
:param status_list: list of statuses to consider for the aggregation
:return: summed sizes
"""
logger.info("get_size("+str(status_list)+")")
field = 'size_to_process'
query = field + '__sum'
tasks = Task.objects.filter(status__in=status_list)
sum_value = tasks.aggregate(Sum(field))[query]
if sum_value == None:
sum_value = 0.0
return sum_value
def convert_logentries_to_html(log_entries):
results = ""
try:
results += "<th>service</th><th>status</th><th>timestamp</th><th>cpu_cycles</th><th>wall_clock_time</th><th>logfile</th>"
results += "<tbody>"
for log in log_entries:
line = "<tr><td><b>" + log.step_name + '</b></td>'
line +='<td class="' + log.status + '" >' + log.status + "</td>"
line += "<td>" + str(log.timestamp.strftime("%m-%d-%Y, %H:%M:%S")) + "</td>"
line += "<td>" + str(log.cpu_cycles) + "</td>"
line += "<td>" + str(log.wall_clock_time) + "</td>"
if log.url_to_log_file!=None:
link = "<a href=" + '"' + str(log.url_to_log_file) + '" target="_blank">' + "logfile" + "</a>"
else:
link = "-"
line += "<td>" + link + "</td>"
results += line
results += "</tbody>"
except:
results = "<tr><td>no data</td></tr>"
return results
def convert_list_of_dicts_to_html(my_list):
results = ""
try:
for my_dict in my_list:
# iterate through the dict of key/values
for key,value in my_dict.items():
try:
if "://" in value:
link = "<a href=" + '"' + value + '">' + key +"</a>"
value = link
except:
pass
line = "<tr><td><b>" + str(key) + "</b></td><td>" + str(value) + "</td></tr>"
results = results + line
except:
results = "<tr><td>no data</td></tr>"
return results
def convert_config_to_html(querylist):
results = ""
try:
for record in querylist:
# iterate through the dict of key/values
key = record.key
value = record.value
try:
if "://" in value:
link = "<a href=" + '"' + value + '">' + key +"</a>"
value = link
except:
pass
line = "<tr><td><b>" + str(key) + "</b></td><td>" + str(value) + "</td></tr>"
results = results + line
except:
results = "<tr><td>no data</td></tr>"
return results
# aggregate information from the tasks table per workflow per status
def aggregate_resources_tasks():
workflow_results = []
# get all active tasks
active_tasks = Task.objects.filter(status__in=settings.ACTIVE_STATUSSES)
# active_tasks_count = active_tasks.count()
# retrieve all unique workflows
active_workflows = active_tasks.values('workflow').distinct()
# iterate through the filters and accumulate logentries
for w in active_workflows:
workflow_result = {}
# extract the workflow object (cheap)
workflow = Workflow.objects.get(id = w['workflow'])
# get the numbers for this workflow
# all tasks for this workflow for the 'grand total'
tasks_per_workflow = Task.objects.filter(workflow=workflow)
nr_of_tasks_per_workflow = tasks_per_workflow.count()
sum_size_to_process = tasks_per_workflow.aggregate(Sum('size_to_process'))
workflow_result['size_to_process'] = sum_size_to_process['size_to_process__sum']
sum_size_processed = tasks_per_workflow.aggregate(Sum('size_processed'))
workflow_result['size_processed'] = sum_size_processed['size_processed__sum']
# all the active tasks
active_tasks_per_workflow = tasks_per_workflow.filter(status__in=settings.ACTIVE_STATUSSES)
nr_of_active_tasks_per_workflow = active_tasks_per_workflow.count()
# split per status, to see the progress
nr_per_status = {}
for status in settings.ALL_STATUSSES:
record = {}
nr_for_this_status = Task.objects.filter(workflow=workflow, status=status).count()
record[status] = nr_for_this_status
nr_per_status[status] = nr_for_this_status
nr_per_status['active'] = nr_of_active_tasks_per_workflow
nr_per_status['total'] = nr_of_tasks_per_workflow
# store the results in a dict
workflow_result['id'] = workflow.id
workflow_result['name'] = workflow.workflow_uri
workflow_result['nr_of_tasks'] = nr_of_tasks_per_workflow
workflow_result['nr_of_active_tasks'] = nr_of_active_tasks_per_workflow
workflow_result['nr_of_tasks_per_status'] = nr_per_status
workflow_results.append(workflow_result)
return workflow_results
# aggregate information from the logentries table per workflow per status
def aggregate_resources_logs():
records = []
# get all active tasks
active_tasks = Task.objects.filter(status__in=settings.ACTIVE_STATUSSES)
active_tasks_count = active_tasks.count()
# retrieve all unique workflows
active_workflows = active_tasks.values('workflow').distinct()
# iterate through the filters and accumulate logentries
for w in active_workflows:
workflow_result = {}
# extract the workflow object (cheap)
workflow = Workflow.objects.get(id = w['workflow'])
# aggregate logentries per step for all active statusses
for status in settings.ACTIVE_STATUSSES:
record = {}
record['name'] = str(workflow.id) +' - '+ workflow.workflow_uri
record['status'] = status
# aggregate logentries per step for all active statusses (expensive)
logs = LogEntry.objects.filter(status=status)\
.filter(task__status__in=settings.ACTIVE_STATUSSES)\
.filter(task__workflow=workflow)
sum_cpu_cycles = logs.aggregate(Sum('cpu_cycles'))
record['cpu_cycles'] = sum_cpu_cycles['cpu_cycles__sum']
wall_clock_time = logs.aggregate(Sum('wall_clock_time'))
record['wall_clock_time'] = wall_clock_time['wall_clock_time__sum']
records.append(record)
return records
def construct_link(request, status, workflow_id, count):
link = str(count)
try:
query = "?status=" + status + "&workflow__id=" + str(workflow_id)
if settings.DEV == True:
url = request.build_absolute_uri('/atdb/tasks') + query
else:
# Unclear why 'build_absolute_uri' doesn't return 'https' in production.
# Probably because the https is handled fully outside the container by Traefik
# and ATDB is not aware of that.
url = "https://" + request.get_host() + '/atdb/tasks' + query
link = '<a href="' + url + '" target="_blank">' + str(count) + "</a>"
except:
pass
return link
def human_readable(size_in_bytes):
for count in ['Bytes', 'KB', 'MB', 'GB', 'TB']:
if size_in_bytes > -1024.0 and size_in_bytes < 1024.0:
return "%3.1f %s" % (size_in_bytes, count)
size_in_bytes /= 1024.0
return "%3.1f %s" % (size_in_bytes, 'PB')
def construct_tasks_per_workflow_html(request, workflow_results):
# --- Progress of tasks per active workflow ---
workflow_results = aggregate_resources_tasks()
results_tasks = "<p>Progress of tasks per (active) workflow</p>"
header = "<th>Workflow</th>"
for status in settings.ALL_STATUSSES:
header += "<th>" + status + "</th>"
results_tasks += header + '<th class="active">active</th><th>total</th><th>to process</th><th>processed</th>'
for workflow_result in workflow_results:
d = workflow_result['nr_of_tasks_per_status']
#values = "<td><b>" + str(workflow_result['id'])+" - "+workflow_result['name'] + "</b></td>"
values = "<td><b>" + str(workflow_result['id']) + "</b></td>"
for key in d:
percentage = round(int(d[key]) / int(workflow_result['nr_of_tasks']) * 100)
# distinguish active statusses
style = ""
if key in settings.ACTIVE_STATUSSES or key=='active':
style = "active"
# bonus: add a query link
link = construct_link(request, key, workflow_result['id'], d[key])
values += "<td class=" + style + ">" + str(percentage) + "% (" + link + ")</td>"
#values += "<td class="+style+">" + str(percentage) + "% ("+str(d[key])+")</td>"
#values += "<td>" + str(d[key]) + "</td>"
# add sizes
values += "<td>" + str(human_readable(workflow_result['size_to_process'])) + "</td>"
try:
percentage = round(int(workflow_result['size_processed']) / int(workflow_result['size_to_process']) * 100)
except:
percentage = 0
values += "<td>" + str(human_readable(workflow_result['size_processed'])) + " ("+ str(percentage) + "%) </td>"
results_tasks += "<tr>" + values + "</tr>"
results_tasks = "<tbody>" + results_tasks + "</tbody>"
return results_tasks
def construct_logs_per_workflow_html(log_records):
results_logs = ""
for record in log_records:
# distinguish active statusses
style = ""
if record['status'] in settings.ACTIVE_STATUSSES:
style = "active"
line = "<tr><td><b>" + record['name'] + "</b></td>" \
'<td class="' + style + '" >' + record['status'] + \
"</td><td>" + str(record['cpu_cycles']) + \
"</td><td>" + str(record['wall_clock_time']) + "</td><tr>"
results_logs += line
return results_logs
def construct_dashboard_html(request):
# --- Progress of tasks per active workflow ---
workflow_results = aggregate_resources_tasks()
results_tasks = construct_tasks_per_workflow_html(request, workflow_results)
# --- logentries ---
log_records = aggregate_resources_logs()
results_logs = construct_logs_per_workflow_html(log_records)
return results_tasks,results_logs