Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
algorithms.py 10.68 KiB
"""
    File name: algorithms.py
    Author: Nico Vermaas - Astron
    Date created: 2019-04-04
    Description:  Business logic for ATDB. These functions are called from the views (views.py).
"""

from django.db.models import Q,Sum
import logging
from .common import timeit
from ..models import Task, LogEntry, Workflow
from django.conf import settings

DATE_FORMAT = "%Y-%m-%d"
TIME_FORMAT = "%Y-%m-%d %H:%M:%SZ"
DJANGO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

logger = logging.getLogger(__name__)

@timeit
def get_size(status_list):
    """
    aggregate the sizes of all task with a status in the list
    :param status_list: list of statuses to consider for the aggregation
    :return: summed sizes
    """

    logger.info("get_size("+str(status_list)+")")

    field = 'size_to_process'
    query = field + '__sum'
    tasks = Task.objects.filter(status__in=status_list)
    sum_value = tasks.aggregate(Sum(field))[query]

    if sum_value == None:
        sum_value = 0.0
    return sum_value


def convert_logentries_to_html(log_entries):
    results = ""

    try:
        results += "<th>service</th><th>status</th><th>timestamp</th><th>cpu_cycles</th><th>wall_clock_time</th><th>logfile</th>"
        results += "<tbody>"
        for log in log_entries:
            line = "<tr><td><b>" + log.step_name + '</b></td>'
            line +='<td class="' + log.status + '" >' + log.status + "</td>"
            line += "<td>" + str(log.timestamp.strftime("%m-%d-%Y, %H:%M:%S")) + "</td>"
            line += "<td>" + str(log.cpu_cycles) + "</td>"
            line += "<td>" + str(log.wall_clock_time) + "</td>"
            if log.url_to_log_file!=None:
                link = "<a href=" + '"' + str(log.url_to_log_file) + '" target="_blank">' + "logfile" + "</a>"
            else:
                link = "-"
            line += "<td>" + link + "</td>"
            results += line

        results += "</tbody>"
    except:
        results = "<tr><td>no data</td></tr>"

    return results


def convert_list_of_dicts_to_html(my_list):
    results = ""
    try:
        for my_dict in my_list:
            # iterate through the dict of key/values
            for key,value in my_dict.items():
                try:
                    if "://" in value:
                        link = "<a href=" + '"' + value + '">' + key +"</a>"
                        value = link
                except:
                    pass
                line = "<tr><td><b>" + str(key) + "</b></td><td>" + str(value) + "</td></tr>"
                results = results + line
    except:
        results = "<tr><td>no data</td></tr>"

    return results


def convert_config_to_html(querylist):
    results = ""
    try:
        for record in querylist:
            # iterate through the dict of key/values
            key = record.key
            value = record.value
            try:
                if "://" in value:
                    link = "<a href=" + '"' + value + '">' + key +"</a>"
                    value = link
            except:
                pass
            line = "<tr><td><b>" + str(key) + "</b></td><td>" + str(value) + "</td></tr>"
            results = results + line
    except:
        results = "<tr><td>no data</td></tr>"

    return results


# aggregate information from the tasks table per workflow per status
def aggregate_resources_tasks():

    workflow_results = []

    # get all active tasks
    active_tasks = Task.objects.filter(status__in=settings.ACTIVE_STATUSSES)
    # active_tasks_count = active_tasks.count()

    # retrieve all unique workflows
    active_workflows = active_tasks.values('workflow').distinct()

    # iterate through the filters and accumulate logentries
    for w in active_workflows:
        workflow_result = {}

        # extract the workflow object (cheap)
        workflow = Workflow.objects.get(id = w['workflow'])

        # get the numbers for this workflow

        # all tasks for this workflow for the 'grand total'
        tasks_per_workflow = Task.objects.filter(workflow=workflow)
        nr_of_tasks_per_workflow = tasks_per_workflow.count()

        sum_size_to_process = tasks_per_workflow.aggregate(Sum('size_to_process'))
        workflow_result['size_to_process'] = sum_size_to_process['size_to_process__sum']

        sum_size_processed = tasks_per_workflow.aggregate(Sum('size_processed'))
        workflow_result['size_processed'] = sum_size_processed['size_processed__sum']

        # all the active tasks
        active_tasks_per_workflow = tasks_per_workflow.filter(status__in=settings.ACTIVE_STATUSSES)
        nr_of_active_tasks_per_workflow = active_tasks_per_workflow.count()

        # split per status, to see the progress
        nr_per_status = {}

        for status in settings.ALL_STATUSSES:
            record = {}
            nr_for_this_status = Task.objects.filter(workflow=workflow, status=status).count()
            record[status] = nr_for_this_status
            nr_per_status[status] = nr_for_this_status

        nr_per_status['active'] = nr_of_active_tasks_per_workflow
        nr_per_status['total'] = nr_of_tasks_per_workflow

        # store the results in a dict
        workflow_result['id'] = workflow.id
        workflow_result['name'] = workflow.workflow_uri
        workflow_result['nr_of_tasks'] = nr_of_tasks_per_workflow
        workflow_result['nr_of_active_tasks'] = nr_of_active_tasks_per_workflow
        workflow_result['nr_of_tasks_per_status'] = nr_per_status

        workflow_results.append(workflow_result)

    return workflow_results


# aggregate information from the logentries table per workflow per status
def aggregate_resources_logs():

    records = []

    # get all active tasks
    active_tasks = Task.objects.filter(status__in=settings.ACTIVE_STATUSSES)
    active_tasks_count = active_tasks.count()

    # retrieve all unique workflows
    active_workflows = active_tasks.values('workflow').distinct()

    # iterate through the filters and accumulate logentries
    for w in active_workflows:
        workflow_result = {}

        # extract the workflow object (cheap)
        workflow = Workflow.objects.get(id = w['workflow'])

        # aggregate logentries per step for all active statusses
        for status in settings.ACTIVE_STATUSSES:
            record = {}
            record['name'] = str(workflow.id) +' - '+ workflow.workflow_uri
            record['status'] = status

            # aggregate logentries per step for all active statusses (expensive)
            logs = LogEntry.objects.filter(status=status)\
                .filter(task__status__in=settings.ACTIVE_STATUSSES)\
                .filter(task__workflow=workflow)

            sum_cpu_cycles = logs.aggregate(Sum('cpu_cycles'))
            record['cpu_cycles'] = sum_cpu_cycles['cpu_cycles__sum']

            wall_clock_time = logs.aggregate(Sum('wall_clock_time'))
            record['wall_clock_time'] = wall_clock_time['wall_clock_time__sum']

            records.append(record)

    return records


def construct_link(request, status, workflow_id, count):
    link = str(count)
    try:
        query = "?status=" + status + "&workflow__id=" + str(workflow_id)
        if settings.DEV == True:
            url = request.build_absolute_uri('/atdb/tasks') + query
        else:
            # Unclear why 'build_absolute_uri' doesn't return 'https' in production.
            # Probably because the https is handled fully outside the container by Traefik
            # and ATDB is not aware of that.

            url = "https://" + request.get_host() + '/atdb/tasks' + query
        link = '<a href="' + url + '" target="_blank">' + str(count) + "</a>"
    except:
        pass
    return link


def human_readable(size_in_bytes):
    for count in ['Bytes', 'KB', 'MB', 'GB', 'TB']:
        if size_in_bytes > -1024.0 and size_in_bytes < 1024.0:
            return "%3.1f %s" % (size_in_bytes, count)
        size_in_bytes /= 1024.0
    return "%3.1f %s" % (size_in_bytes, 'PB')


def construct_tasks_per_workflow_html(request, workflow_results):

    # --- Progress of tasks per active workflow ---
    workflow_results = aggregate_resources_tasks()

    results_tasks = "<p>Progress of tasks per (active) workflow</p>"
    header = "<th>Workflow</th>"

    for status in settings.ALL_STATUSSES:
        header += "<th>" + status + "</th>"
    results_tasks += header + '<th class="active">active</th><th>total</th><th>to process</th><th>processed</th>'

    for workflow_result in workflow_results:

        d = workflow_result['nr_of_tasks_per_status']
        #values = "<td><b>" + str(workflow_result['id'])+" - "+workflow_result['name'] + "</b></td>"
        values = "<td><b>" + str(workflow_result['id']) + "</b></td>"

        for key in d:
             percentage = round(int(d[key]) / int(workflow_result['nr_of_tasks']) * 100)

             # distinguish active statusses
             style = ""
             if key in settings.ACTIVE_STATUSSES or key=='active':
                style = "active"

             # bonus: add a query link
             link = construct_link(request, key, workflow_result['id'], d[key])

             values += "<td class=" + style + ">" + str(percentage) + "% (" + link + ")</td>"
             #values += "<td class="+style+">" + str(percentage) + "% ("+str(d[key])+")</td>"
             #values += "<td>" + str(d[key]) + "</td>"

        # add sizes
        values += "<td>" + str(human_readable(workflow_result['size_to_process'])) + "</td>"
        try:
            percentage = round(int(workflow_result['size_processed']) / int(workflow_result['size_to_process']) * 100)
        except:
            percentage = 0
        values += "<td>" + str(human_readable(workflow_result['size_processed'])) + " ("+ str(percentage) + "%) </td>"

        results_tasks += "<tr>" + values + "</tr>"

    results_tasks = "<tbody>" + results_tasks + "</tbody>"
    return results_tasks


def construct_logs_per_workflow_html(log_records):
    results_logs = ""

    for record in log_records:
        # distinguish active statusses
        style = ""
        if record['status'] in settings.ACTIVE_STATUSSES:
            style = "active"

        line = "<tr><td><b>" + record['name'] + "</b></td>" \
                                                '<td class="' + style + '" >' + record['status'] + \
               "</td><td>" + str(record['cpu_cycles']) + \
               "</td><td>" + str(record['wall_clock_time']) + "</td><tr>"
        results_logs += line

    return results_logs


def construct_dashboard_html(request):

    # --- Progress of tasks per active workflow ---
    workflow_results = aggregate_resources_tasks()
    results_tasks = construct_tasks_per_workflow_html(request, workflow_results)

    # --- logentries ---
    log_records = aggregate_resources_logs()
    results_logs = construct_logs_per_workflow_html(log_records)

    return results_tasks,results_logs