-
Nico Vermaas authoredNico Vermaas authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
algorithms.py 43.69 KiB
"""
File name: algorithms.py
Author: Nico Vermaas - Astron
Description: Business logic for ATDB. These functions are called from the views (views.py).
"""
import json
import requests
import base64
from datetime import datetime
from django.db.models import Q, Sum
import logging
from .common import timeit, get_summary_flavour, SummaryFlavour
from django.urls import reverse
from ..models import Task, LogEntry, Workflow, Configuration
from django.conf import settings
DATE_FORMAT = "%Y-%m-%d"
TIME_FORMAT = "%Y-%m-%d %H:%M:%SZ"
DJANGO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
logger = logging.getLogger(__name__)
@timeit
def get_size(status_list, type):
"""
aggregate the sizes of all task with a status in the list
:param status_list: list of statuses to consider for the aggregation
:return: summed sizes
"""
logger.info("get_size(" + str(status_list) + ")")
if type == 'processed':
field = 'size_processed'
else:
field = 'size_to_process'
query = field + '__sum'
tasks = Task.objects.filter(status__in=status_list).filter(task_type='regular')
sum_value = tasks.aggregate(Sum(field))[query]
if sum_value == None:
sum_value = 0.0
return sum_value
@timeit
def get_min_start_and_max_end_time(sas_id):
"""
Retrieve the minimum start time en maximum end time of a set of taskids (sas_id)
The start time is the moment when the task start 'processing'
The end time is the moment when the task was 'processed'
"""
min_start_time = None
max_end_time = None
logger.info("get_min_start_and_max_end_time(" + str(sas_id) + ")")
tasks = Task.objects.filter(sas_id=sas_id)
for task in tasks:
try:
# If more entries are found for 'processing' task, get the latest
latest_start_time = LogEntry.objects.filter(task=task.pk).filter(step_name='running').filter(status='processing').latest('timestamp')
start_time = latest_start_time.timestamp
# If more entries are found for 'processed' task, get the latest
lastest_end_time = LogEntry.objects.filter(task=task.pk).filter(step_name='running').filter(status='processed').latest('timestamp')
end_time = lastest_end_time.timestamp
if min_start_time is None:
min_start_time = start_time
elif start_time < min_start_time:
min_start_time = start_time
if max_end_time is None:
max_end_time = end_time
elif end_time > max_end_time:
max_end_time = end_time
except:
pass # having no timestamp, just skip
return min_start_time, max_end_time
def convert_logentries_to_html(log_entries):
results = ""
try:
results += '<th>service</th><th>step</th><th>status</th><th width="200px">timestamp</th><th>cpu_cycles</th><th>wall_clock_time</th><th>log</th>'
results += "<tbody>"
for log in log_entries:
line = "<tr><td><b>" + str(log.service) + '</b></td>'
line += "<td><b>" + str(log.step_name) + '</b></td>'
line += '<td class="' + log.status + '" >' + log.status + "</td>"
try:
line += "<td>" + str(log.timestamp.strftime("%m-%d-%Y, %H:%M:%S")) + "</td>"
except:
line += "<td>no timetamp</td>"
line += "<td>" + str(log.cpu_cycles) + "</td>"
line += "<td>" + str(log.wall_clock_time) + "</td>"
if log.url_to_log_file != None:
link = "<a href=" + '"' + str(log.url_to_log_file) + '" target="_blank">' + "logfile" + "</a>"
else:
link = str(log.description)
line += "<td>" + link + "</td>"
results += line
results += "</tbody>"
except Exception as err:
results = "<tr><td>" + str(err) + "</td></tr>"
# results = "<tr><td>no data</td></tr>"
return results
def convert_quality_to_html(task):
results = ""
try:
results = ""
results += "<tr><td><b>SAS_ID</b></td><td>" + str(task.sas_id) + "</td></tr>"
results += "<tr><td><b>Project</b></td><td>" + str(task.project) + "</td></tr>"
results += "<tr><td><b>ATDB Filter</b></td><td>" + str(task.filter) + "</td></tr>"
results += "<tr><td><b>Quality</b></td><td>" + str(task.quality) + "</td></tr>"
try:
results += "<tr><td><b>QA uv-coverage</b></td><td>" + str(task.quality_json['uv-coverage']) + "</td></tr>"
results += "<tr><td><b>QA sensitivity</b></td><td>" + str(task.quality_json['sensitivity']) + "</td></tr>"
results += "<tr><td><b>QA observing-conditions</b></td><td>" + str(task.quality_json['observing-conditions']) + "</td></tr>"
except:
# not all tasks have this QA information, if missing, continue (the show must go on)
pass
try:
results += "<tr><td><b>high_flagging</b></td><td>" + str(task.quality_json['details']['high_flagging']) + "</td></tr>"
results += "<tr><td><b>elevation_score</b></td><td>" + str(task.quality_json['details']['elevation_score']) + "</td></tr>"
results += "<tr><td><b>sun_interference</b></td><td>" + str(task.quality_json['details']['sun_interference']) + "</td></tr>"
results += "<tr><td><b>moon_interference</b></td><td>" + str(task.quality_json['details']['moon_interference']) + "</td></tr>"
results += "<tr><td><b>jupiter_interference</b></td><td>" + str(task.quality_json['details']['jupiter_interference']) + "</td></tr>"
results += "<tr><td><b>full_array_incomplete</b></td><td>" + str(task.quality_json['details']['full_array_incomplete']) + "</td></tr>"
results += "<tr><td><b>dutch_array_incomplete</b></td><td>" + str(task.quality_json['details']['dutch_array_incomplete']) + "</td></tr>"
results += "<tr><td><b>full_array_incomplete_is</b></td><td>" + str(task.quality_json['details']['full_array_incomplete_is']) + "</td></tr>"
results += "<tr><td><b>dutch_array_flag_data_loss</b></td><td>" + str(task.quality_json['details']['dutch_array_flag_data_loss']) + "</td></tr>"
results += "<tr><td><b>dutch_array_high_data_loss</b></td><td>" + str(task.quality_json['details']['dutch_array_high_data_loss']) + "</td></tr>"
results += "<tr><td><b>fill_array_missing_is_pair</b></td><td>" + str(task.quality_json['details']['fill_array_missing_is_pair']) + "</td></tr>"
results += "<tr><td><b>full_array_missing_important_pair</b></td><td>" + str(task.quality_json['details']['full_array_missing_important_pair']) + "</td></tr>"
results += "<tr><td><b>dutch_array_missing_important_pair</b></td><td>" + str(task.quality_json['details']['dutch_array_missing_important_pair']) + "</td></tr>"
results += "<tr><td><b>dutch_array_high_data_loss_on_important_pair</b></td><td>" + str(task.quality_json['details']['dutch_array_high_data_loss_on_important_pair']) + "</td></tr>"
except:
pass
#results += "<tr><td><b>QA diagnostic plots</b></td><td>" + "-" + "</td></tr>"
#results += "<tr><td><b>Workflow summary parset</b></td><td>" + "-" + "</td></tr>"
#results += "<tr><td><b>Summary logs</b></td><td>" + "-" + "</td></tr>"
#results += "<tr><td><b>QA summary.hf5</b></td><td>" + "-" + "</td></tr>"
except Exception as err:
results = "<tr><td>" + str(err) + "</td></tr>"
# results = "<tr><td>no data</td></tr>"
return results
def convert_list_of_dicts_to_html(my_blob):
results = ""
my_list = []
# if the parameter is not a list, then make it a list first
if not isinstance(my_blob, list):
my_list.append(my_blob)
else:
my_list = my_blob
try:
for my_dict in my_list:
# iterate through the dict of key/values
for key, value in my_dict.items():
try:
if "://" in value:
link = "<a href=" + '"' + value + '">' + key + "</a>"
value = link
except:
pass
line = "<tr><td><b>" + str(key) + "</b></td><td>" + str(value) + "</td></tr>"
results = results + line
except:
results = "<tr><td>no data</td></tr>"
return results
import xml.etree.ElementTree as ElementTree
from typing import Union, List, Dict
def _generate_html_from_json_tree(json_blob: Union[List, Dict], element: ElementTree.Element):
if isinstance(json_blob, list) or isinstance(json_blob, tuple):
if element.tag != 'tbody':
sub_table = ElementTree.SubElement(element, 'table')
else:
sub_table = element
for item in json_blob:
row = ElementTree.SubElement(sub_table, 'tr')
element = ElementTree.SubElement(row, 'td')
_generate_html_from_json_tree(item, element)
elif isinstance(json_blob, dict):
if element.tag != 'tbody':
sub_table = ElementTree.SubElement(element, 'table')
else:
sub_table = element
for key, value in json_blob.items():
row = ElementTree.SubElement(sub_table, 'tr')
key_element = ElementTree.SubElement(row, 'td')
bold_key = ElementTree.SubElement(key_element, 'b')
bold_key.text = key
value_element = ElementTree.SubElement(row, 'td')
_generate_html_from_json_tree(value, value_element)
else:
value = ElementTree.SubElement(element, 'td', attrib={"style": "max-width:25rem"})
value.text = str(json_blob)
def convert_json_to_nested_table(json_blob):
root_element = ElementTree.Element('tbody')
_generate_html_from_json_tree(json_blob, root_element)
return ElementTree.tostring(root_element, method='xml').decode()
def convert_config_to_html(querylist):
results = ""
try:
for record in querylist:
# iterate through the dict of key/values
key = record.key
value = record.value
filter = record.filter
try:
if "://" in value:
link = "<a href=" + '"' + value + '">' + key + "</a>"
value = link
except:
pass
line = "<tr><td><b>" + str(filter) + "</b></td> <td><b>" + str(key) + "</b></td><td>" + str(
value) + "</td></tr>"
results = results + line
except:
results = "<tr><td>no data</td></tr>"
return results
def construct_link_to_monitor_history(request, title, name, hostname):
query = "name=" + name + "&hostname=" + hostname
try:
if settings.DEV == True:
url = request.build_absolute_uri(reverse('monitoring')) + '?' + query
else:
# Unclear why 'build_absolute_uri' doesn't return 'https' in production.
# Probably because the https is handled fully outside the container by Traefik
# and ATDB is not aware of that.
url = "https://" + request.get_host() + reverse('monitoring') + '?' + query
link = '<a href="' + url + '" target="_blank">' + title + "</a>"
except:
pass
return link
def convert_monitor_to_html(request, monitor_data):
results = ""
try:
for record in monitor_data:
# iterate through the dict of key/values
d1 = datetime.utcnow().replace(tzinfo=None)
d2 = record.timestamp.replace(tzinfo=None)
delta = d1 - d2
# http://localhost:8000/atdb/monitor/?name=stager&hostname=localhost
link_to_service_history = construct_link_to_monitor_history(request, record.name, record.name, record.hostname)
line = '<tr>'
if "error" in record.status:
line = '<tr class="' + record.status + '" >'
#line += "<td><b>" + str(record.name) + "</b></td>"
line += "<td><b>" + link_to_service_history + "</b></td>"
line += "<td>" + str(record.hostname) + "</td>"
# only provide the hold/resume buttons for superusers, otherwise just show the state
if request.user.is_superuser:
if record.enabled=="True":
service_enabled = str(record.enabled) + ' <a href="service_hold_resume/' + record.name + '/' + record.hostname + '/False"' + 'class="btn btn-warning btn-sm" role="button"><i class="fas fa-pause"></i> Hold</a>'
else:
service_enabled = str(record.enabled) + ' <a href="service_hold_resume/' + record.name + '/' + record.hostname + '/True"' + 'class="btn btn-success btn-sm" role="button"><i class="fas fa-play"></i> Resume</a>'
else:
service_enabled = str(record.enabled)
# if the heartbeat is 30 minutes late, show '(late)' in red
if delta.seconds > settings.SERVICES_LATE_WARNING_SECONDS:
line += "<td>" + service_enabled + "</td>"
line += "<td><i>unknown</i></td>"
line += '<td class="error">' + str(record.timestamp.strftime(TIME_FORMAT)) + " - (late)</td>"
else:
line += "<td>" + service_enabled + "</td>"
line += '<td class="' + record.status + '" >' + str(record.status) + "</td>"
line += '<td>' + str(record.timestamp.strftime(TIME_FORMAT)) + "</td>"
line += "<td>" + str(record.process_id) + "</td>"
line += "<td>" + str(record.description) + "</td>"
line += "</tr>"
results = results + line
except Exception as e:
results = "<tr><td>no data</td></tr>"
return results
# aggregate information from the tasks table per workflow per status
def aggregate_resources_tasks(request, selection, filtered_tasks):
workflow_results = []
my_workflows = []
# get all active tasks
if 'active' in selection:
active_tasks = filtered_tasks.filter(status__in=settings.ACTIVE_STATUSSES).filter(task_type='regular')
# retrieve all unique workflows from the active tasks
active_workflows = active_tasks.values('workflow').distinct()
# construct the list of workflows (cheap)
for w in active_workflows:
try:
workflow = Workflow.objects.get(id=w['workflow'])
my_workflows.append(workflow)
except:
pass
else:
my_workflows = Workflow.objects.all()
# iterate through the workflows
for workflow in my_workflows:
workflow_result = {}
# get the numbers for this workflow
# all tasks for this workflow for the 'grand total'
tasks_per_workflow = filtered_tasks.filter(workflow=workflow).filter(task_type='regular')
nr_of_tasks_per_workflow = tasks_per_workflow.count()
sum_size_to_process = tasks_per_workflow.aggregate(Sum('size_to_process'))
workflow_result['size_to_process'] = sum_size_to_process['size_to_process__sum']
sum_size_processed = tasks_per_workflow.aggregate(Sum('size_processed'))
workflow_result['size_processed'] = sum_size_processed['size_processed__sum']
sum_total_processing_time = tasks_per_workflow.aggregate(Sum('total_processing_time'))
workflow_result['total_processing_time'] = sum_total_processing_time['total_processing_time__sum']
# all the active tasks
active_tasks_per_workflow = tasks_per_workflow.filter(status__in=settings.ACTIVE_STATUSSES)
nr_of_active_tasks_per_workflow = active_tasks_per_workflow.count()
# split per status, to see the progress
nr_per_status = {}
nr_per_status['failed'] = tasks_per_workflow.filter(status__icontains='failed').count()
nr_per_status['active'] = nr_of_active_tasks_per_workflow
nr_per_status['total'] = nr_of_tasks_per_workflow
for status in settings.ALL_STATUSSES:
nr_for_this_status = tasks_per_workflow.filter(status=status).count()
nr_per_status[status] = nr_for_this_status
# store the results in a dict
workflow_result['id'] = workflow.id
workflow_result['name'] = workflow.workflow_uri
workflow_result['nr_of_tasks'] = nr_of_tasks_per_workflow
workflow_result['nr_of_active_tasks'] = nr_of_active_tasks_per_workflow
workflow_result['nr_of_tasks_per_status'] = nr_per_status
workflow_results.append(workflow_result)
return workflow_results
# aggregate information from the logentries table per workflow per status
def aggregate_resources_logs(selection):
workflow_results = []
my_workflows = []
# get all active tasks
if 'active' in selection:
active_tasks = Task.objects.filter(status__in=settings.ACTIVE_STATUSSES).filter(task_type='regular')
# retrieve all unique workflows from the active tasks
active_workflows = active_tasks.values('workflow').distinct()
# construct the list of workflows (cheap)
for w in active_workflows:
try:
workflow = Workflow.objects.get(id=w['workflow'])
my_workflows.append(workflow)
except:
pass
else:
my_workflows = Workflow.objects.all()
for workflow in my_workflows:
workflow_result = {}
# aggregate logentries per step for all active statusses
record_per_status = {}
for status in settings.ALL_STATUSSES:
record = {}
# aggregate logentries per step for all active statusses (expensive)
logs = LogEntry.objects.filter(status=status) \
.filter(task__status__in=settings.ACTIVE_STATUSSES) \
.filter(task__workflow=workflow)
sum_cpu_cycles = logs.aggregate(Sum('cpu_cycles'))
record['cpu_cycles'] = sum_cpu_cycles['cpu_cycles__sum']
wall_clock_time = logs.aggregate(Sum('wall_clock_time'))
record['wall_clock_time'] = wall_clock_time['wall_clock_time__sum']
record_per_status[status] = record
workflow_result['id'] = workflow.id
workflow_result['name'] = workflow.workflow_uri
workflow_result['records_per_status'] = record_per_status
workflow_results.append(workflow_result)
return workflow_results
def construct_link_to_tasks_api(request, status, workflow_id, count):
link = str(count)
try:
if status in settings.ALL_STATUSSES:
query = "?status=" + status + "&workflow__id=" + str(workflow_id)
else:
if 'failed' in status:
query = "?status__icontains=failed&workflow__id=" + str(workflow_id)
else:
query = "?workflow__id=" + str(workflow_id)
if settings.DEV == True:
url = request.build_absolute_uri(reverse('tasks')) + query
else:
# Unclear why 'build_absolute_uri' doesn't return 'https' in production.
# Probably because the https is handled fully outside the container by Traefik
# and ATDB is not aware of that.
url = "https://" + request.get_host() + reverse('tasks') + query
link = '<a href="' + url + '" target="_blank">' + str(count) + "</a>"
except:
pass
return link
def construct_link_to_workflow_api(request, workflow_result):
title = str(workflow_result['id']) + ' - ' + str(workflow_result['name'])
link = str(title)
try:
if settings.DEV == True:
url = request.build_absolute_uri(reverse('workflows')) + '/' + str(workflow_result['id'])
else:
# Unclear why 'build_absolute_uri' doesn't return 'https' in production.
# Probably because the https is handled fully outside the container by Traefik
# and ATDB is not aware of that.
url = "https://" + request.get_host() + reverse('workflows') + '/' + str(workflow_result['id'])
link = '<a href="' + url + '" target="_blank">' + title + "</a>"
except:
pass
return link
def human_readable(size_in_bytes):
try:
for count in ['Bytes', 'KB', 'MB', 'GB', 'TB']:
if size_in_bytes > -1024.0 and size_in_bytes < 1024.0:
return "%3.1f %s" % (size_in_bytes, count)
size_in_bytes /= 1024.0
return "%3.1f %s" % (size_in_bytes, 'PB')
except:
return "0"
def highlight_value(values, value_to_highlight):
# find 'class' left of the value
pos_value = values.find(str(value_to_highlight))
# split up the values, left and right of the search area
part1 = values[:pos_value - 15]
part2 = values[pos_value:]
substring = values[pos_value - 15:pos_value]
if 'inactive' in substring:
new_substring = substring.replace('inactive', 'max')
else:
new_substring = substring.replace('active', 'max')
values = part1 + new_substring + part2
return values
def construct_tasks_per_workflow_html(request, workflow_results):
# --- Progress of tasks per active workflow ---
results_tasks = "<p>Progress of tasks per workflow</p>"
# construct the header
header = '<th class="aggregate_failed">failed</th><th class="aggregate">active</th><th class="aggregate">total</th>'
for status in settings.ALL_STATUSSES:
header += "<th>" + status + "</th>"
results_tasks += header
for workflow_result in workflow_results:
link = construct_link_to_workflow_api(request, workflow_result)
values = "<tr class='info'><td colspan='6'><b>" + link + "</b></td>"
# add sizes
values += "<td><b>size to process:</b> " + str(human_readable(workflow_result['size_to_process'])) + "</td>"
try:
percentage = round(int(workflow_result['size_processed']) / int(workflow_result['size_to_process']) * 100)
except:
percentage = 0
values += "<td><b>size processed:</b> " + str(
human_readable(workflow_result['size_processed'])) + " (<b>" + str(percentage) + "%</b>) </td>"
values += "<td><b>processing time:</b> " + str(workflow_result['total_processing_time']) + "</td>"
values += "<td colspan='8'></td></tr><tr>"
d = workflow_result['nr_of_tasks_per_status']
max = 0
for key in d:
try:
percentage = round(int(d[key]) / int(workflow_result['nr_of_tasks']) * 100)
if (percentage > max) and (key in settings.ALL_STATUSSES):
max = percentage
except:
percentage = 0
# distinguish active statusses
style = "inactive"
if key in settings.ACTIVE_STATUSSES or key == 'active':
style = "active"
if key in settings.AGGREGATES:
style = "aggregate"
# bonus: add a query link
link = construct_link_to_tasks_api(request, key, workflow_result['id'], d[key])
values += "<td class=" + style + ">" + str(percentage) + "% (" + link + ")</td>"
# add sizes
# values += "<td>" + str(human_readable(workflow_result['size_to_process'])) + "</td>"
# try:
# percentage = round(int(workflow_result['size_processed']) / int(workflow_result['size_to_process']) * 100)
# except:
# percentage = 0
# values += "<td>" + str(human_readable(workflow_result['size_processed'])) + " ("+ str(percentage) + "%) </td>"
# values += "<td>" + str(workflow_result['total_processing_time']) + "</td>"
if max > 0:
values = highlight_value(values, max)
results_tasks += "</tr><tr>" + values + "</tr>"
results_tasks = "<tbody>" + results_tasks + "</tbody>"
return results_tasks
def construct_logs_per_workflow_html_version1(log_records):
results_logs = "<p>Resources used per step per active workflow</p>"
# construct the header
header = "<th>Workflow</th><th>Status</th><th>CPU cycles</th><th>wall clock time</th>"
results_logs += header
for record in log_records:
# distinguish active statusses
style = ""
if record['status'] in settings.ACTIVE_STATUSSES:
style = "active"
line = "<tr><td><b>" + record['name'] + "</b></td>" \
'<td class="' + style + '" >' + record['status'] + \
"</td><td>" + str(record['cpu_cycles']) + \
"</td><td>" + str(record['wall_clock_time']) + "</td><tr>"
results_logs += line
results_logs = "<tbody>" + results_logs + "</tbody>"
return results_logs
def construct_logs_per_workflow_html(request, workflow_results):
results = "<p>Resources used per step per workflow: <b>cpu_cycles/wall_clock_time (seconds)</b></p>"
# construct the header
header = "<th>Workflow</th>"
for status in settings.ALL_STATUSSES:
header += "<th>" + status + "</th>"
results += header
for workflow_result in workflow_results:
records_per_status = workflow_result['records_per_status']
link = construct_link_to_workflow_api(request, workflow_result)
values = "<td><b>" + link + "</b></td>"
for status in records_per_status:
record = records_per_status[status]
# distinguish active statusses
style = ""
if status in settings.ACTIVE_STATUSSES or status == 'active':
style = "active"
# show the values (done with a weird ternary operator)
if record['cpu_cycles']:
cpu_cycles = str(record['cpu_cycles'])
else:
cpu_cycles = '0'
if record['wall_clock_time']:
wall_clock_time = str(record['wall_clock_time'])
else:
wall_clock_time = '0'
value = cpu_cycles + '/' + wall_clock_time
values += "<td class=" + style + ">" + value + "</td>"
results += "<tr>" + values + "</tr>"
results = "<tbody>" + results + "</tbody>"
return results
def construct_dashboard_html(request, selection):
# gather and construct the dashboard based on the requested selection
filtered_tasks = Task.objects.all()
try:
if 'applyfilter' in selection:
filtered_tasks_as_list = request.session['filtered_tasks_as_list']
filtered_tasks = Task.objects.filter(id__in=filtered_tasks_as_list)
except:
pass
# --- Progress of tasks per active workflow ---
workflow_results = aggregate_resources_tasks(request, selection, filtered_tasks)
results_tasks = construct_tasks_per_workflow_html(request, workflow_results)
# --- logentries ---
results_logs = "<p>Resources not shown. Click the 'Resources Invisible' toggle to show resources.</p>"
if 'resources' in selection:
log_records = aggregate_resources_logs(selection)
results_logs = construct_logs_per_workflow_html(request, log_records)
return results_tasks, results_logs
def unique_values_for_aggregation_key(queryset, aggregation_key):
return list(map(lambda x: x[aggregation_key], queryset.values(aggregation_key).distinct()))
def add_plots(task, results, expand_image="False"):
# keep a temporary list of filenames to check uniqueness
plot_files = []
# translate the path to a url for the original plots location
try:
srm_to_url = Configuration.objects.get(key='dcache:srm_to_url').value
except:
srm_to_url = "srm://srm.grid.sara.nl/pnfs/grid.sara.nl/data/lofar/ops/disk/ldv/::https://webdav.grid.surfsara.nl/"
# translate the path to a url for when the plots are moved by the archiver
try:
srm_to_url_archive_disk = Configuration.objects.get(key='dcache:srm_to_url_archive_disk').value
except:
srm_to_url_archive_disk = "srm://srm.grid.sara.nl/pnfs/grid.sara.nl/data/lofar/ops/disk/projects/::https://webdav.grid.surfsara.nl/projects/"
# retrieve the current tokens for both the original and archived locations
token_original = str(Configuration.objects.get(key='dcache:token').value)
token_archive_disk = str(Configuration.objects.get(key='dcache:token_archive_disk').value)
plots = task.quality_json["plots"]
count = 0
for plot in plots:
# check where the plot lives and provide the translation
try:
# is the plot already moved to its final location on disk in the lta?
base_surl = plot['surl_lta']
translation = srm_to_url_archive_disk.split("::")
token = token_archive_disk
except:
# assume the orignal location.
base_surl = plot['surl']
translation = srm_to_url.split("::")
token = token_original
basename = plot['basename']
checksum = plot['checksum']
# plot_file = basename + str(plot['size'])
plot_file = checksum
# only add unique files
if not plot_file in plot_files:
count = count + 1
surl = base_surl + "?action=show&authz=" + str(token)
url = surl.replace(translation[0], translation[1])
if basename.endswith('png') and expand_image=="True":
# retrieve the url and add the binary data to the html
response = requests.get(url)
if response.status_code == 200:
content_as_string = base64.b64encode(response.content).decode("utf-8")
img_html = '<img width="800" src = "data:image/png;base64,' + content_as_string + '">'
collapseable = """
<p>
<button class="btn btn-primary" type="button" data-toggle="collapse" data-target="#replace_with_id" aria-expanded="false" aria-controls="replace_with_id">
<i class="fas fa-image"></i> replace_with_basename
</button>
</p>
<div class="collapse" id="replace_with_id">
<div class="card card-body">
<a href="replace_with_url" target='_blank'>replace_with_image</a>
<hr>
<i>(click on image for orignal)</i>
</div>
</div>
"""
collapseable = collapseable.replace('replace_with_basename', basename)
collapseable = collapseable.replace('replace_with_image', img_html)
collapseable = collapseable.replace('replace_with_url', url)
collapseable = collapseable.replace('replace_with_id', "plot" + str(task.id)+'_'+str(count))
results += '<tr><td>' + collapseable + '</td></tr>'
else:
results += '<tr><td><a href="' + url + '" target="_blank">' + basename + '</a></td></tr>'
plot_files.append(plot_file)
return results
def construct_inspectionplots(task, expand_image="False", source='task_id'):
# find the plots in the quality json structure
if source == 'task_id':
results = "<h4>Inspection Plots and Summary Logs</h4>"
results += "<p>Clicking a link will redirect to SURF SARA in a new browser window. </p>"
results = add_plots(task, results, expand_image)
elif source == 'sas_id':
sas_id = task.sas_id
results = "<h4>(Unique) Inspection Plots and Summary Logs for SAS_ID" + str(sas_id) + "</h4>"
results += "<p>Clicking a link will redirect to SURF SARA in a new browser window. </p>"
tasks = Task.objects.filter(sas_id=sas_id)
for task in tasks:
# skip 'suspended' and 'discarded' tasks
if task.status in ['suspended','discarded']:
continue
try:
results += '<tr style="background-color:#7EB1C4"><td colspan="3"><b>Task ' + str(task.id) + '</b></td></tr>'
results = add_plots(task, results, expand_image)
except Exception as error:
logger.error(error)
logger.info('task ' + str(task) + ' has no plots, skipped.')
return results
def construct_default_summary(task):
totals = ""
results = ""
total_size_input = 0
total_size_output = 0
quality_values = {'poor': 0, 'moderate': 0, 'good': 0}
sas_id = task.sas_id
title = "<h4>Summary File for SAS_ID " + task.sas_id + "</h4> "
tasks = Task.objects.filter(sas_id=sas_id)
for task in tasks:
# skip 'suspended' and 'discarded' tasks
if task.status in ['suspended', 'discarded']:
continue
results += '<tr style="background-color:#7EB1C4"><td colspan="3"><b>Task ' + str(task.id) + '</b></td></tr>'
# find the summary in the quality json structure
try:
summary = task.quality_json["summary"]
for key in summary:
record = summary[key]
total_size_input += record['input_size']
total_size_output+= record['output_size']
line = ''
line += '<tr style="background-color:#7EB100"><td colspan="3"><b>' + key + '</b></td></tr>'
line += '<th></th><th>Name</th><th>Size</th>'
line += '<tr><td><b>Input</b></td>'
line += '<td>' + record['input_name'] + '</td>'
line += '<td>' + str(record['input_size']) + ' (' + record['input_size_str'] + ')</td>'
line += '</tr>'
line += '<tr><td><b>Output</b></td>'
line += '<td>' + record['output_name'] + '</td>'
line += '<td>' + str(record['output_size']) + ' (' + record['output_size_str'] + ')</td>'
line += '</tr>'
line += '<tr><td><b>Ratio</b></td>'
line += '<td colspan="2">' + str(round(record['size_ratio'],3)) + '</td>'
line += '</tr>'
if 'rfi_percent' in record:
# add RFI percentage (if present)
rfi = record['rfi_percent']
line += '<tr><td><b>RFI percentage</b></td>'
line += '<td colspan="2">' + str(rfi) + '</td>'
line += '</tr>'
try:
# add calculated quality (if present)
calculated_qualities = task.calculated_qualities
if calculated_qualities:
task_quality = calculated_qualities['per_task']
line += '<tr><td><b>Calculated Quality</b></td>'
line += '<td class="' + task_quality + '">' + str(task_quality) + '</td>'
line += '</tr>'
except:
pass
try:
added = record['added']
if added:
line += '<th>Added</th>'
for filename in added:
line += '<tr><td colspan="3">' + filename + '<td></tr>'
except:
pass
try:
deleted = record['deleted']
if deleted:
line += '<th>Deleted</th>'
for filename in deleted:
line += '<tr><td colspan="3">' +filename + '<td></tr>'
except:
pass
try:
key = task.calculated_qualities['per_task']
quality_values[key] = quality_values[key] + 1
except:
# ignore the tasks that have no calculated quality.
pass
results += line
except:
pass
totals += '<th>Totals</th><th></th><th width="35%"></th>'
try:
totals += '<tr><td colspan="2"><b>Input size</b></td><td>' + str(total_size_input) + '</td></tr>'
totals += '<tr><td colspan="2"><b>Output size</b><td>' + str(total_size_output) + '</td></tr>'
totals += '<tr><td colspan="2"><b>Ratio</b></td><td>' + str(round(total_size_output / total_size_input, 3)) + '</td></tr>'
try:
# add calculated quality per sasid (if present)
if calculated_qualities:
sasid_quality = calculated_qualities['per_sasid']
totals += '<tr><td colspan="2"><b>Calculated Quality</b></td>'
totals += '<td class="' + sasid_quality + '">' + str(sasid_quality) + '</td></tr>'
totals += '<tr><td colspan="2"><b>Quality Statistics</b></td><td>' + str(quality_values) + '</td></tr>'
try:
quality_thresholds = json.loads(Configuration.objects.get(key='quality_thresholds').value)
totals += '<tr>'
totals += '<td><b>RFI thresholds</b></td>'
totals += '<td>Per Task</td><td>M, rfi>'+ str(quality_thresholds['poor']) + '% = P, rfi<=' + str(quality_thresholds['moderate']) + '% = G</td>'
totals += '</tr>'
totals += '<tr>'
totals += '<td></td>'
totals += '<td>Per SAS_ID</td><td>M, >'+ str(quality_thresholds['overall_poor']) + '% P = P, >' + str(quality_thresholds['overall_good']) + '% G = G</td>'
totals += '</tr>'
except:
pass
except:
pass
except:
pass
results = title + totals + results
return results
def construct_imaging_summary(task):
totals = ""
results = ""
total_size_to_process = 0
total_size_processed = 0
total_total_processing_time = 0
quality_values = {'poor': 0, 'moderate': 0, 'good': 0}
sas_id = task.sas_id
title = "<h4>Summary File for SAS_ID " + task.sas_id + "</h4> "
tasks = Task.objects.filter(sas_id=sas_id)
for task in tasks:
# skip 'suspended' and 'discarded' tasks
if task.status in ['suspended', 'discarded']:
continue
results += '<tr style="background-color:#7EB1C4"><td colspan="3"><b>Task ' + str(task.id) + '</b></td></tr>'
total_size_to_process += task.size_to_process
total_size_processed += task.size_processed
total_total_processing_time += task.total_processing_time
# find the summary in the quality json structure
try:
summary = task.quality_json["summary"]
results += '<tr><td><b>size_to_process</b></td><td>' + str(task.size_to_process) + '</td></tr>'
results += '<tr><td><b>size_processed</b></td><td>' + str(task.size_processed) + '</td></tr>'
results += '<tr><td><b>applied_fixes</b></td><td>' + str(summary['applied_fixes']) + '</td></tr>'
results += '<tr><td><b>rfi_perc_total</b></td><td>' + str(summary['rfi_perc_total']) + '</td></tr>'
results += '<tr><td><b>elevation_score</b></td><td>' + str(summary['elevation_score']) + '</td></tr>'
results += '<tr><td><b>sun_interference</b></td><td>' + str(summary['sun_interference']) + '</td></tr>'
results += '<tr><td><b>unfixable_issues</b></td><td>' + str(summary['unfixable_issues']) + '</td></tr>'
results += '<tr><td><b>moon_interference</b></td><td>' + str(summary['moon_interference']) + '</td></tr>'
results += '<tr><td><b>jupiter_interference</b></td><td>' + str(summary['jupiter_interference']) + '</td></tr>'
results += '<tr><td><b>degree_incompleteness_array</b></td><td>' + str(summary['degree_incompleteness_array']) + '</td></tr>'
results += '<tr><td><b>array_missing_important_pairs_is</b></td><td>' + str(summary['array_missing_important_pairs_is']) + '</td></tr>'
results += '<tr><td><b>array_missing_important_pairs_dutch</b></td><td>' + str(summary['array_missing_important_pairs_dutch']) + '</td></tr>'
results += '<tr><td><b>aggregated_array_data_losses_percentage</b></td><td>' + str(summary['aggregated_array_data_losses_percentage']) + '</td></tr>'
results += '<tr><td><b>array_high_data_loss_on_is_important_pair</b></td><td>' + str(summary['array_high_data_loss_on_is_important_pair']) + '</td></tr>'
results += '<tr><td><b>array_high_data_loss_on_dutch_important_pair</b></td><td>' + str(summary['array_high_data_loss_on_dutch_important_pair']) + '</td></tr>'
results += '<tr style="background-color:#7EB100"><td colspan="3"><b>Details</b></td></tr>'
details = summary["details"]
results += '<tr><td><b>Antenna configuration</b></td><td>' + str(details['antenna_configuration']) + '</td></tr>'
results += '<tr><td><b>Antennas not available</b></td><td>' + str(details['antennas_not_available']) + '</td></tr>'
if 'rfi_percentage' in details:
# add RFI percentage (if present)
rfi = details['rfi_percentage']
results += '<tr><td><b>RFI percentage</b></td>'
results += '<td colspan="2">' + str(rfi) + '</td>'
results += '</tr>'
try:
# add calculated quality (if present)
calculated_qualities = task.calculated_qualities
if calculated_qualities:
task_quality = calculated_qualities['per_task']
results += '<tr><td><b>Calculated Quality</b></td>'
results += '<td class="' + task_quality + '">' + str(task_quality) + '</td>'
results += '</tr>'
except:
pass
try:
key = task.calculated_qualities['per_task']
quality_values[key] = quality_values[key] + 1
except:
# ignore the tasks that have no calculated quality.
pass
results += '<tr style="background-color:#7EB100"><td colspan="3"><b>Target</b></td></tr>'
results += f'<tr><td>{details["target"]}</td></tr>'
results += '<tr style="background-color:#7EB100"><td colspan="3"><b>Pointing</b></td></tr>'
results += f'<tr><td colspan="3">{details["pointing"]}</td></tr>'
stations = details["DStDev"]
results += '<tr style="background-color:#7EB100"><td colspan="3"><b>Stations</b></td></tr>'
results += f'<tr><td colspan="3">{stations}</td></tr>'
antennas = details["antennas"]
results += '<tr style="background-color:#7EB100"><td colspan="3"><b>Antennas</b></td></tr>'
results += f'<tr><td colspan="3">{antennas}</td></tr>'
except:
pass
totals += '<th>Totals</th><th></th><th width="35%"></th>'
try:
totals += '<tr><td><b>Size to process</b></td colspan="2"><td>' + str(total_size_to_process) + '</td></tr>'
totals += '<tr><td><b>Size processed</b><td colspan="2">' + str(total_size_processed) + '</td></tr>'
try:
# add calculated quality per sasid (if present)
if calculated_qualities:
sasid_quality = calculated_qualities['per_sasid']
totals += '<tr><td><b>Calculated Quality</b></td>'
totals += '<td colspan="2" class="' + sasid_quality + '">' + str(sasid_quality) + '</td></tr>'
try:
quality_thresholds = json.loads(Configuration.objects.get(key='quality_thresholds').value)
totals += '<tr>'
totals += '<td><b>RFI thresholds</b></td>'
totals += '<td colspan="2">M, rfi>'+ str(quality_thresholds['poor']) + '% = P, rfi<=' + str(quality_thresholds['moderate']) + '% = G</td>'
totals += '</tr>'
except:
pass
except:
pass
except:
pass
results = title + totals + results
return results
def construct_summary(task):
summary_flavour = get_summary_flavour(task)
# construct the appropriate summary html
if summary_flavour == SummaryFlavour.DEFAULT.value:
html = construct_default_summary(task)
elif summary_flavour == SummaryFlavour.IMAGING.value:
html = construct_imaging_summary(task)
return html