import logging import json from datetime import datetime, timedelta try: import matplotlib.pyplot as plt except: # enable debugging, but without matplotlib pass import psycopg2 from . import config from django.contrib.auth.decorators import login_required from django.views.generic import ListView from django.contrib import messages from rest_framework import generics from rest_framework.response import Response from django.http import JsonResponse from django_filters import rest_framework as filters from django_filters.views import FilterView from django_tables2.views import SingleTableMixin from django.shortcuts import render, redirect, reverse from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger from django.contrib.admin.views.decorators import staff_member_required from rest_framework.request import Request #from silk.profiling.profiler import silk_profile from django.conf import settings from .models import Activity, Task, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor from .models import check_if_summary from .services.common import State from .services.signals import disconnect_signals, connect_signals from .tables import TaskTable from .forms import QualityAnnotationForm, DiscardAnnotationForm from django.db.models import Q from .serializers import \ TaskWriteSerializer, \ TaskReadSerializer, \ TaskReadSerializerFast, \ ActivitySerializer, \ WorkflowSerializer, \ LogEntrySerializer, \ ConfigurationSerializer, \ JobSerializer, \ PostProcessingRuleSerializer, \ MonitorSerializer, LatestMonitorSerializer from .services import algorithms, activities_handler logger = logging.getLogger(__name__) def redirect_with_params(view_name, params): return redirect(reverse(view_name) + params) # ---------- filters (in the REST API) --------- class TaskFilter(filters.FilterSet): class Meta: model = Task fields = { 'task_type': ['exact', 'icontains', 'in'], 'is_summary': ['exact'], 'creationTime': ['icontains'], 'filter': ['exact', 'icontains'], 'workflow__id': ['exact', 'icontains'], 'project': ['exact', 'icontains'], 'sas_id': ['exact', 'icontains', 'in'], 'status': ['exact', 'icontains', 'in', 'startswith'], 'quality': ['exact', 'icontains', 'in', 'startswith'], 'purge_policy': ['exact'], 'cleanup_policy': ['exact','icontains','in'], 'priority': ['exact', 'lte', 'gte'], 'resume': ['exact'], # http://localhost:8000/atdb/tasks/?predecessor__isnull=True 'predecessor': ['isnull'], 'predecessor__status': ['exact', 'icontains', 'in', 'startswith'], 'activity' : ['isnull'], 'activity__id': ['exact'], 'activity__ingested_fraction' : ['exact','lt', 'lte', 'gt', 'gte','isnull'], } class TaskFilterQueryPage(filters.FilterSet): #resume = django_filters.BooleanFilter(field_name='resume',lookup_expr='exact', label='Resuming') class Meta: model = Task fields = { 'id': ['exact', 'gte', 'lte'], 'workflow__id': ['exact'], 'filter': ['exact', 'icontains'], 'priority': ['exact', 'gte', 'lte'], 'status': ['icontains', 'in'], 'quality': ['icontains', 'in'], 'project': ['exact', 'icontains', 'in'], 'sas_id': ['exact', 'icontains', 'in'], 'purge_policy': ['icontains'], #'sas_id_archived': ['exact', 'icontains', 'in'], } class ActivityFilter(filters.FilterSet): class Meta: model = Activity fields = { 'sas_id': ['exact', 'icontains', 'in'], 'filter': ['exact', 'icontains'], #'workflow__id': ['exact', 'icontains'], 'workflow_id': ['exact', 'icontains'], 'project': ['exact', 'icontains'], 'sas_id': ['exact', 'icontains', 'in'], 'status': ['exact', 'icontains', 'in', 'startswith'], #'ingestq_status': ['icontains'], 'ingested_fraction' : ['exact','lt', 'lte', 'gt', 'gte'], 'finished_fraction': ['exact', 'lt', 'lte', 'gt', 'gte'], 'total_size': ['exact', 'lt', 'lte', 'gt', 'gte'], 'remaining': ['exact', 'lt', 'lte', 'gt', 'gte'], } class WorkflowFilter(filters.FilterSet): class Meta: model = Workflow fields = { 'description': ['icontains'], 'repository': ['exact', 'icontains'], 'commit_id': ['exact', 'icontains'], 'path': ['exact', 'icontains'], } class LogEntryFilter(filters.FilterSet): class Meta: model = LogEntry fields = { 'task__id': ['exact'], 'step_name': ['exact', 'icontains', 'in', 'startswith'], 'status': ['exact', 'in'], } class ConfigurationFilter(filters.FilterSet): class Meta: model = Configuration fields = { 'filter': ['exact', 'icontains'], 'key': ['exact', 'icontains'], 'value': ['exact', 'icontains'], } class JobFilter(filters.FilterSet): class Meta: model = Job fields = { 'type': ['exact', 'icontains', 'in'], 'task_id': ['exact', 'in'], 'job_id': ['exact', 'in'], } class PostProcessingFilter(filters.FilterSet): class Meta: model = PostProcessingRule fields = { 'aggregation_key': ['exact', 'icontains', 'in'], 'trigger_status': ['exact', 'icontains', 'in'], 'workflow_to_process__id': ['exact'], 'workflow_to_apply__id': ['exact'], } class MonitorFilter(filters.FilterSet): class Meta: model = Monitor fields = { 'name': ['exact', 'icontains', 'in'], 'hostname': ['exact', 'icontains', 'in'], 'process_id': ['exact'], 'timestamp': ['icontains'], 'status': ['exact', 'icontains', 'in'], } class LatestMonitorFilter(filters.FilterSet): class Meta: model = LatestMonitor fields = { 'name': ['exact', 'icontains', 'in'], 'hostname': ['exact', 'icontains', 'in'], } # ---------- Tables2 Views (experimental) ----------- class QueryView(SingleTableMixin, FilterView): table_class = TaskTable model = Task # queryset = Task.objects.filter(task_type='regular') queryset = Task.objects.all() template_name = "taskdatabase/query/index.html" filterset_class = TaskFilterQueryPage def get_table_data(self): # https://stackoverflow.com/questions/7763115/django-passing-data-between-views #nv:16jan2024, this would be scary, but perhaps needed #how large is the list that can go on the session? #query_list_of_ids = list(self.object_list.values_list('id')) query_list_of_ids = list(self.object_list.values_list('id'))[:settings.QUERY_LIMIT_MULTI_CHANGE] filtered_tasks_as_list = [] for id in query_list_of_ids: filtered_tasks_as_list.append(id[0]) # store on the session self.request.session['filtered_tasks_as_list'] = filtered_tasks_as_list return self.object_list # ---------- GUI Views ----------- class IndexView(ListView): """ This is the main view of ATDB. It shows a pagination list of tasks, sorted by creationTime. """ template_name = 'taskdatabase/index.html' # by default this returns the list in an object called object_list, so use 'object_list' in the html page. # but if 'context_object_name' is defined, then this returned list is named and can be accessed that way in html. context_object_name = 'my_tasks' #@silk_profile(name='IndexView') def get_queryset(self): tasks = get_filtered_tasks(self.request) paginator = Paginator(tasks, config.TASKS_PER_PAGE) # Show 50 tasks per page page = self.request.GET.get('page') try: # check if there was a page on the session, if so, use it. if page == None: page = self.request.session['page'] self.request.session['page'] = None except: pass try: tasks = paginator.page(page) except PageNotAnInteger: # If page is not an integer, deliver first page. tasks = paginator.page(1) except EmptyPage: # If page is out of range (e.g. 9999), deliver last page of results. tasks = paginator.page(paginator.num_pages) return tasks class PostProcessingTasksView(ListView): """ This is the main view of ATDB. It shows a pagination list of tasks, sorted by creationTime. """ template_name = 'taskdatabase/postprocessing.html' # by default this returns the list in an object called object_list, so use 'object_list' in the html page. # but if 'context_object_name' is defined, then this returned list is named and can be accessed that way in html. context_object_name = 'my_tasks' def get_queryset(self): tasks = get_filtered_tasks(self.request) # only return the 'postprocessing' tasks, and not the 'regular' tasks in the GUI tasks = tasks.filter(task_type='postprocessing') paginator = Paginator(tasks, config.TASKS_PER_PAGE) # Show 50 tasks per page page = self.request.GET.get('page') try: # check if there was a page on the session, if so, use it. if page == None: page = self.request.session['page'] self.request.session['page'] = None except: pass try: tasks = paginator.page(page) except PageNotAnInteger: # If page is not an integer, deliver first page. tasks = paginator.page(1) except EmptyPage: # If page is out of range (e.g. 9999), deliver last page of results. tasks = paginator.page(paginator.num_pages) return tasks class ShowQualityPage(ListView): """ This shows the tasks that have a quality statistics in its outputs[0]['quality']. Not that the global filter is also applied """ template_name = 'taskdatabase/quality/page.html' # by default this returns the list in an object called object_list, so use 'object_list' in the html page. # but if 'context_object_name' is defined, then this returned list is named and can be accessed that way in html. context_object_name = 'my_tasks' #@silk_profile(name='ShowQualityPage') def get_queryset(self): tasks = get_filtered_tasks(self.request) # exclude the tasks without quality information tasks = tasks.exclude(outputs__quality__isnull=True) # tasks = tasks.exclude(outputs__0__quality__isnull=True) paginator = Paginator(tasks, config.TASKS_PER_PAGE) # Show 50 tasks per page page = self.request.GET.get('page') try: # check if there was a page on the session, if so, use it. if page == None: page = self.request.session['page'] self.request.session['page'] = None except: pass try: tasks = paginator.page(page) except PageNotAnInteger: # If page is not an integer, deliver first page. tasks = paginator.page(1) except EmptyPage: # If page is out of range (e.g. 9999), deliver last page of results. tasks = paginator.page(paginator.num_pages) return tasks class ShowValidationPage(ListView): """ This shows the tasks that are ready for validation Note that the global filter is also applied """ template_name = 'taskdatabase/validation/page.html' # by default this returns the list in an object called object_list, so use 'object_list' in the html page. # but if 'context_object_name' is defined, then this returned list is named and can be accessed that way in html. context_object_name = 'my_tasks' #@silk_profile(name='ShowValidationPage') def get_queryset(self): tasks = get_filtered_tasks(self.request, None, "sas_id").filter(status__icontains=State.STORED.value) # exclude the failed tasks tasks = tasks.exclude(status__icontains=State.FAILED.value) paginator = Paginator(tasks, config.TASKS_PER_PAGE_SMALL) # Show 50 tasks per page page = self.request.GET.get('page') try: # check if there was a page on the session, if so, use it. if page == None: page = self.request.session['page'] self.request.session['page'] = None except: pass try: tasks = paginator.page(page) except PageNotAnInteger: # If page is not an integer, deliver first page. tasks = paginator.page(1) except EmptyPage: # If page is out of range (e.g. 9999), deliver last page of results. tasks = paginator.page(paginator.num_pages) return tasks class ShowFailuresPage(ListView): """ This shows the tasks that are ready failed in one of the steps Note that the global filter is also applied """ template_name = 'taskdatabase/failures/page.html' # by default this returns the list in an object called object_list, so use 'object_list' in the html page. # but if 'context_object_name' is defined, then this returned list is named and can be accessed that way in html. context_object_name = 'my_tasks' #@silk_profile(name='ShowFailuresPage') def get_queryset(self): tasks = get_filtered_tasks(self.request).filter(status__icontains=State.FAILED.value) paginator = Paginator(tasks, config.TASKS_PER_PAGE_SMALL) # Show 50 tasks per page page = self.request.GET.get('page') try: # check if there was a page on the session, if so, use it. if page == None: page = self.request.session['page'] self.request.session['page'] = None except: pass try: tasks = paginator.page(page) except PageNotAnInteger: # If page is not an integer, deliver first page. tasks = paginator.page(1) except EmptyPage: # If page is out of range (e.g. 9999), deliver last page of results. tasks = paginator.page(paginator.num_pages) return tasks class ShowDiscardedPage(ListView): """ This shows the tasks that are discarded Note that the global filter is also applied """ template_name = 'taskdatabase/discarded/page.html' # by default this returns the list in an object called object_list, so use 'object_list' in the html page. # but if 'context_object_name' is defined, then this returned list is named and can be accessed that way in html. context_object_name = 'my_tasks' #@silk_profile(name='ShowDiscardedPage') def get_queryset(self): discarded_tasks = Task.objects.filter(status__icontains='discarded') tasks = get_filtered_tasks(self.request, discarded_tasks) paginator = Paginator(tasks, config.TASKS_PER_PAGE) # Show 50 tasks per page page = self.request.GET.get('page') try: # check if there was a page on the session, if so, use it. if page == None: page = self.request.session['page'] self.request.session['page'] = None except: pass try: tasks = paginator.page(page) except PageNotAnInteger: # If page is not an integer, deliver first page. tasks = paginator.page(1) except EmptyPage: # If page is out of range (e.g. 9999), deliver last page of results. tasks = paginator.page(paginator.num_pages) return tasks class ShowIngestQPage(ListView): """ This shows aggregated tasks per sas_id that are queued for ingest or archiving Note that the global filter is also applied """ template_name = 'taskdatabase/ingest/page.html' context_object_name = 'my_tasks' # @silk_profile(name='ShowIngestPage') def get_queryset(self): ingest_tasks = Task.objects.only('workflow','project','filter').filter( Q(status__icontains=State.SCRUBBED.value) | Q(status__icontains=State.ARCHIVING.value) | Q(status__icontains=State.ARCHIVED.value) | Q(status__icontains=State.FINISHING.value)) tasks = get_filtered_tasks(self.request, ingest_tasks, "sas_id") # exclude the failed tasks tasks = tasks.exclude(status__icontains=State.FAILED.value) paginator = Paginator(tasks, config.TASKS_PER_PAGE_SMALL) # Show 10 tasks per page page = self.request.GET.get('page') try: # check if there was a page on the session, if so, use it. if page == None: page = self.request.session['page'] self.request.session['page'] = None except: pass try: tasks = paginator.page(page) except PageNotAnInteger: # If page is not an integer, deliver first page. tasks = paginator.page(1) except EmptyPage: # If page is out of range (e.g. 9999), deliver last page of results. tasks = paginator.page(paginator.num_pages) return tasks class ShowFinishedPage(ListView): """ This shows the tasks that are finished Note that the global filter is also applied """ template_name = 'taskdatabase/archived/page.html' # by default this returns the list in an object called object_list, so use 'object_list' in the html page. # but if 'context_object_name' is defined, then this returned list is named and can be accessed that way in html. context_object_name = 'my_tasks' #@silk_profile(name='ShowFinishedPage') def get_queryset(self): ###archived_tasks = Task.objects.filter(status=State.FINISHED.value) ###tasks = get_filtered_tasks(self.request, archived_tasks) tasks = get_filtered_tasks(self.request).filter(status=State.FINISHED.value) paginator = Paginator(tasks, config.TASKS_PER_PAGE) # Show 50 tasks per page page = self.request.GET.get('page') try: # check if there was a page on the session, if so, use it. if page == None: page = self.request.session['page'] self.request.session['page'] = None except: pass try: tasks = paginator.page(page) except PageNotAnInteger: # If page is not an integer, deliver first page. tasks = paginator.page(1) except EmptyPage: # If page is out of range (e.g. 9999), deliver last page of results. tasks = paginator.page(paginator.num_pages) return tasks # this provides a broad range of filters for the search_box in the GUI #@silk_profile(name='get_filtered_tasks') def get_filtered_tasks(request, pre_filtered_tasks=None, distinct=None): filtered_tasks_as_list = None # there was an attempt to pre_filter tasks, but it has yielded no results. # further filtering is then useless, return an empty queryset if pre_filtered_tasks is not None: if pre_filtered_tasks.count() == 0: return pre_filtered_tasks try: my_sort = request.session['sort'] except: my_sort = '-creationTime' # if there is already a 'filtered_tasks_as_list' on the session, then show that. # this is a way to propagate an earlier filter from the FILTER page to several pages # nv:16jan24, this has a potential issue, because it limits to 5000 results # users said that that is no problem, the advantage of this functionalty outweighs this potential issue. try: filtered_tasks_as_list = request.session['filtered_tasks_as_list'] except: pass if filtered_tasks_as_list: # there is an earlier list of filtered tasks on the session, use that as starting point if pre_filtered_tasks: # there is a list of pre-filtered tasks given as extra argument to this function (most pages do that) filtered_tasks = pre_filtered_tasks.filter(id__in=filtered_tasks_as_list).defer('inputs','outputs') # check if there is an ingest filter active try: ingest_filter = request.session['ingest_filter'] if ingest_filter != 'all': if type(ingest_filter) is list: filtered_tasks = filtered_tasks.filter(status__in=ingest_filter) else: filtered_tasks = filtered_tasks.filter(status__icontains=ingest_filter) except: pass else: # there is no list of filtered tasks given as extra argument to this function filtered_tasks = Task.objects.filter(id__in=filtered_tasks_as_list).defer('inputs','outputs') else: # there is no list of previously filtered tasks on the session, use all tasks if pre_filtered_tasks: filtered_tasks = pre_filtered_tasks else: #filtered_tasks = Task.objects.all() filtered_tasks = Task.objects.defer('inputs','outputs') # check if there is an ingest filter active try: ingest_filter = request.session['ingest_filter'] if ingest_filter != 'all': if type(ingest_filter) is list: filtered_tasks = filtered_tasks.filter(status__in=ingest_filter) else: filtered_tasks = filtered_tasks.filter(status__icontains=ingest_filter) except: pass # check if there is a status filter active try: status_filter = request.session['task_filter'] if status_filter != 'all': if type(status_filter) is list: filtered_tasks = filtered_tasks.filter(status__in=status_filter) else: filtered_tasks = filtered_tasks.filter(status__icontains=status_filter) except: pass # check if the on_hold filter is active try: onhold = request.session['task_onhold_filter'] if onhold != None: filtered_tasks = filtered_tasks.filter(resume=not onhold) except: pass search = request.GET.get('search_box', None) if (search is not None): filtered_tasks = filtered_tasks.filter( Q(id__contains=search) | Q(sas_id__contains=search) | Q(creationTime__icontains=search) | Q(filter__icontains=search) | Q(status__icontains=search) | Q(status__in=search) | Q(project__icontains=search)) if (Task.objects.defer('inputs','outputs','metrics','remarks','meta_scheduling').count() == filtered_tasks.count()): request.session['filtered'] = False else: request.session['filtered'] = True if distinct: # this does not seem to work, the distinct tasks are not sorted. my_distinct_tasks = filtered_tasks.order_by(distinct,my_sort).distinct(distinct) return filtered_tasks.filter(id__in=my_distinct_tasks).order_by(my_sort) else: return filtered_tasks.order_by(my_sort) def TaskDetails(request, id=0, page=0): try: task = Task.objects.get(id=id) # store the requested task_id on the session request.session['task_id'] = task.id except: # when an invalid id is given, like '/atdb/task_details/0/', # then look if there is a task stored on the session try: task_on_session = request.session['task_id'] task = Task.objects.get(id=task_on_session) except: messages.add_message(request, messages.WARNING, 'no task selected.') return redirect('index') # store the current page on the session request.session['page'] = page log_entries = LogEntry.objects.filter(task=task).order_by('-timestamp') logentries_html = algorithms.convert_logentries_to_html(log_entries) return render(request, "taskdatabase/tasks/task_details.html", {'task': task, 'logentries': logentries_html}) def ShowTaskQuality(request, id=0, page=0): try: task = Task.objects.get(id=id) # store the requested task_id on the session request.session['task_id'] = task.id except: # when an invalid id is given, like '/atdb/task_details/0/', # then look if there is a task stored on the session try: task_on_session = request.session['task_id'] task = Task.objects.get(id=task_on_session) except: messages.add_message(request, messages.WARNING, 'no task selected.') return redirect('index') # store the current page on the session request.session['page'] = page quality_html = algorithms.convert_quality_to_html(task) #summary_html = algorithms.construct_summary(task) plots_html = algorithms.construct_inspectionplots(task) return render(request, "taskdatabase/tasks/task_quality.html", {'task': task, 'quality': quality_html, 'plots' : plots_html}) def AnnotateQualityTaskId(request, id=0, page=0): # a POST means that the form is filled in and should be stored in the database if request.method == "POST": form = QualityAnnotationForm(request.POST) if form.is_valid(): task = Task.objects.get(id=id) try: task.remarks['quality_taskid'] = request.POST.get("annotation", "") except: task.remarks = {} task.remarks['quality_taskid'] = request.POST.get("annotation", "") task.save() return redirect_with_params('quality', '?page=' + request.POST.get("return_to_page", 1)) else: # a GET means that the form should be presented to be filled in task = Task.objects.get(id=id) try: quality_remarks = task.remarks['quality_taskid'] except: quality_remarks = "" form = QualityAnnotationForm(initial={'annotation': quality_remarks, 'return_to_page': page}) return render(request, "taskdatabase/quality/annotate_quality_taskid.html", {'task': task, 'page': page, 'form': form}) def AnnotateQualitySasId(request, id=0, page=0): # a POST means that the form is filled in and should be stored in the database if request.method == "POST": form = QualityAnnotationForm(request.POST) if form.is_valid(): task = Task.objects.get(id=id) tasks = Task.objects.filter(sas_id=task.sas_id) for task in tasks: try: remark_per_sasid = request.POST.get("annotation", "") task.remarks['quality_sasid'] = remark_per_sasid except: task.remarks = {} task.remarks['quality_sasid'] = request.POST.get("annotation", "") task.save() return redirect_with_params('validation', '?page=' + request.POST.get("return_to_page", 1)) else: # a GET means that the form should be presented to be filled in task = Task.objects.get(id=id) try: quality_remarks = task.remarks['quality_sasid'] except: quality_remarks = "" form = QualityAnnotationForm(initial={'annotation': quality_remarks, 'return_to_page': page}) return render(request, "taskdatabase/validation/annotate_quality_sasid.html", {'task': task, 'page': page, 'form': form}) def ClearAnnotationsSasID(request, id=0): task = Task.objects.get(id=id) tasks = Task.objects.filter(sas_id=task.sas_id) for task in tasks: try: task.remarks['quality_sasid'] = None except: task.remarks = {} task.remarks['quality_sasid'] = None task.save() return redirect('validation') def ShowInspectionPlots(request, id=0, page=0): # a GET means that the form should be presented to be filled in task = Task.objects.get(id=id) # convert the path to a url plots_html = algorithms.construct_inspectionplots(task, source='task_id', expand_image=True) return render(request, "taskdatabase/validation/inspection_plots.html", {'task': task, 'my_plots': plots_html}) def ShowInspectionPlotsSasId(request, id=0, expand_image="False"): # a GET means that the form should be presented to be filled in task = Task.objects.get(id=id) # convert the path to a url plots_html = algorithms.construct_inspectionplots(task, expand_image, source='sas_id') return render(request, "taskdatabase/validation/inspection_plots.html", {'task': task, 'my_plots': plots_html}) def ShowSummarySasId(request, id=0, page=0): # a GET means that the form should be presented to be filled in task = Task.objects.get(id=id) # convert the path to a url summary_html = algorithms.construct_summary(task) return render(request, "taskdatabase/validation/summary.html", {'task': task, 'my_summary': summary_html}) def ShowInputs(request, id): task = Task.objects.get(id=id) # convert the json to a presentable piece of html for the output template results = algorithms.convert_json_to_nested_table(task.inputs) return render(request, "taskdatabase/details/inputs.html", {'results': results}) def ShowOutputs(request, id): task = Task.objects.get(id=id) # convert the json to a presentable piece of html for the output template results = algorithms.convert_json_to_nested_table(task.outputs) return render(request, "taskdatabase/details/outputs.html", {'results': results}) def ShowMetrics(request, id): task = Task.objects.get(id=id) # convert the json to a presentable piece of html for the output template results = algorithms.convert_list_of_dicts_to_html(task.metrics) return render(request, "taskdatabase/details/metrics.html", {'results': results}) def ShowConfig(request): configuration = Configuration.objects.all() results = algorithms.convert_config_to_html(configuration) return render(request, "taskdatabase/config.html", {'results': results}) def ShowDashboard(request, selection): # gather the results results_tasks, results_logs = algorithms.construct_dashboard_html(request, selection) return render(request, "taskdatabase/dashboard/dashboard.html", {'results_tasks': results_tasks, 'results_logs': results_logs, 'selection': selection}) def WorkflowDetails(request, id): workflow = Workflow.objects.get(id=id) return render(request, "taskdatabase/details/workflow_details.html", {'workflow': workflow}) def ShowMonitoring(request): # get the latest value of each unique combination of service name and hostname. # distinct_services_per_host = Monitor.objects.all().order_by('name', 'hostname', '-timestamp').distinct('name', 'hostname') distinct_services_per_host = LatestMonitor.objects.all().order_by('name', 'hostname', '-timestamp').distinct('name', 'hostname') monitor_results = algorithms.convert_monitor_to_html(request, distinct_services_per_host) return render(request, "taskdatabase/monitoring_page.html", {'monitor_results': monitor_results}) class DiagramView(ListView): model = Task template_name = "taskdatabase/diagram.html" # ---------- REST API views ----------- # example: /atdb/tasks/ # show all tasks (regular and postprocessing) class TaskListViewAPI(generics.ListCreateAPIView): """ A pagination list of tasks, unsorted. """ model = Task queryset = Task.objects.all().order_by('-priority', 'id') # using the Django Filter Backend - https://django-filter.readthedocs.io/en/latest/index.html filter_backends = (filters.DjangoFilterBackend,) filter_class = TaskFilter def get_serializer_class(self): if self.request.method in ['GET']: return TaskReadSerializer else: return TaskWriteSerializer class PostProcessingTaskListViewAPI(generics.ListCreateAPIView): """ A pagination list of tasks, unsorted. """ model = Task queryset = Task.objects.filter(task_type='postprocessing').order_by('-priority', 'id') # serializer_class = TaskSerializer # using the Django Filter Backend - https://django-filter.readthedocs.io/en/latest/index.html filter_backends = (filters.DjangoFilterBackend,) filter_class = TaskFilter def get_serializer_class(self): if self.request.method in ['GET']: return TaskReadSerializer else: return TaskWriteSerializer # all tasks class AllTaskListViewAPI(generics.ListCreateAPIView): """ A pagination list of tasks, unsorted. """ model = Task queryset = Task.objects.all().order_by('-priority', 'id') # serializer_class = TaskSerializer # using the Django Filter Backend - https://django-filter.readthedocs.io/en/latest/index.html filter_backends = (filters.DjangoFilterBackend,) filter_class = TaskFilter def get_serializer_class(self): if self.request.method in ['GET']: return TaskReadSerializer else: return TaskWriteSerializer class TaskListViewAPIFast(generics.ListAPIView): """ A pagination list of tasks, unsorted. """ model = Task queryset = Task.objects.all().order_by('-priority', 'id') serializer_class = TaskReadSerializerFast # using the Django Filter Backend - https://django-filter.readthedocs.io/en/latest/index.html filter_backends = (filters.DjangoFilterBackend,) filter_class = TaskFilter # example: /atdb/tasks/5/ # calling this view serializes a task in the REST API class TaskDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView): """ Detailed view of a task. """ model = Task queryset = Task.objects.all() # serializer_class = TaskSerializer def get_serializer_class(self): if self.request.method in ['GET']: return TaskReadSerializer else: return TaskWriteSerializer class TaskDetailsViewAPIFast(generics.RetrieveUpdateDestroyAPIView): """ Detailed view of a task. """ model = Task queryset = Task.objects.all() serializer_class = TaskReadSerializerFast # example: /atdb/activities/ # show all tasks (regular and postprocessing) class ActivityListViewAPI(generics.ListCreateAPIView): """ A pagination list of tasks, unsorted. """ model = Activity queryset = Activity.objects.all() # using the Django Filter Backend - https://django-filter.readthedocs.io/en/latest/index.html filter_backends = (filters.DjangoFilterBackend,) filter_class = ActivityFilter serializer_class = ActivitySerializer class ActivityDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView): model = Activity queryset = Activity.objects.all() serializer_class = ActivitySerializer # example: /atdb/workflows/ class WorkflowListViewAPI(generics.ListCreateAPIView): model = Workflow queryset = Workflow.objects.all() serializer_class = WorkflowSerializer filter_backends = (filters.DjangoFilterBackend,) filter_class = WorkflowFilter # example: /atdb/workflows/5/ class WorkflowDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView): model = Workflow queryset = Workflow.objects.all() serializer_class = WorkflowSerializer # example: /atdb/logentries/ class LogEntryListViewAPI(generics.ListCreateAPIView): model = LogEntry queryset = LogEntry.objects.all() serializer_class = LogEntrySerializer filter_backends = (filters.DjangoFilterBackend,) filter_class = LogEntryFilter # overriding the POST, because the status that comes in with the LogEntry # also needs to propagate to the task.new_status def perform_create(self, serializer): log_entry = serializer.save() task = log_entry.task task.new_status = log_entry.status task.save() # example: /atdb/workflows/5/ # calling this view serializes a task in the REST API class LogEntryDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView): """ Detailed view of a LogEntry. """ model = LogEntry queryset = LogEntry.objects.all() serializer_class = LogEntrySerializer # overriding the POST, because the status that comes in with the LogEntry # also needs to propagate to the task.new_status def perform_create(self, serializer): log_entry = serializer.save() task = log_entry.task task.new_status = log_entry.status task.save() # example: /atdb/configuration/ class ConfigurationListViewAPI(generics.ListCreateAPIView): model = Configuration queryset = Configuration.objects.all() serializer_class = ConfigurationSerializer filter_backends = (filters.DjangoFilterBackend,) filter_class = ConfigurationFilter # example: /atdb/configuration/5/ class ConfigurationDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView): model = Configuration queryset = Configuration.objects.all() serializer_class = ConfigurationSerializer # example: /atdb/job/ class JobListViewAPI(generics.ListCreateAPIView): model = Job queryset = Job.objects.all().order_by('id') serializer_class = JobSerializer filter_backends = (filters.DjangoFilterBackend,) filter_class = JobFilter # example: /atdb/job/5/ class JobDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView): model = Job queryset = Job.objects.all() serializer_class = JobSerializer # example: /atdb/postprocessing/ class PostProcessingRuleListViewAPI(generics.ListCreateAPIView): model = PostProcessingRule queryset = PostProcessingRule.objects.all() serializer_class = PostProcessingRuleSerializer filter_backends = (filters.DjangoFilterBackend,) filter_class = PostProcessingFilter # example: /atdb/postprocessing/5/ class PostProcessingRuleDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView): model = PostProcessingRule queryset = PostProcessingRule.objects.all() serializer_class = PostProcessingRuleSerializer # example: /atdb/monitor/ class MonitorListViewAPI(generics.ListCreateAPIView): model = Monitor queryset = Monitor.objects.all().order_by('-timestamp') serializer_class = MonitorSerializer filter_backends = (filters.DjangoFilterBackend,) filter_class = MonitorFilter # example: /atdb/latest-monitor/ class LatestMonitorListViewAPI(generics.ListCreateAPIView): model = LatestMonitor queryset = LatestMonitor.objects.all() serializer_class = LatestMonitorSerializer filter_backends = (filters.DjangoFilterBackend,) filter_class = LatestMonitorFilter class LatestMonitorDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView): model = LatestMonitor queryset = LatestMonitor.objects.all() serializer_class = LatestMonitorSerializer @login_required def ClearInactiveServices(request): LatestMonitor.objects.all().delete() return redirect("monitoring") # example: /atdb/job/5/ class MonitorDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView): model = Monitor queryset = Monitor.objects.all() serializer_class = MonitorSerializer # --- controller resources, triggered by a button in the GUI or directoy with a URL --- # set task status to 'new_status' - called from the GUI @login_required def Hold(request, pk, hold_it, page=0): model = Task task = Task.objects.get(pk=pk) task.resume = (hold_it == 'resume') task.save() if page == 0: # redirect to details screen return redirect('query') else: # redirect to tasks list return redirect_with_params('index', '?page=' + page) def HoldQuery(request, pk, hold_it, query_params): model = Task task = Task.objects.get(pk=pk) task.resume = (hold_it == 'resume') task.save() current_query_params = convert_query_params_to_url(query_params) return redirect_with_params('query', '?' + current_query_params) def PurgeQuery(request, pk, purge_policy, query_params): model = Task task = Task.objects.get(pk=pk) task.purge_policy = purge_policy task.save() current_query_params = convert_query_params_to_url(query_params) return redirect_with_params('query', '?' + current_query_params) @login_required def ServiceHoldResume(request, name, hostname, enabled): model = LatestMonitor service = LatestMonitor.objects.get(name=name, hostname=hostname) metadata = service.metadata if not metadata: metadata = {} metadata['enabled'] = enabled service.metadata = metadata service.save() return redirect('monitoring') @login_required def TaskSetStatus(request, pk, new_status, page=0): model = Task task = Task.objects.get(pk=pk) task.new_status = new_status task.save() if page == 0: # redirect to details screen return redirect('task-details') else: # redirect to tasks list return redirect_with_params('index', '?page=' + page) @login_required def TaskValidateSasId(request, pk, quality, new_status, page=0): """ find all tasks with the same SAS_ID of the given task (pk), and set its quality to all of them This is used by the 'P/M/G/Validate' buttons on the Validation Page There is one special 'quality', if its value is 'calculated' then use the calculated quality of the task. Unless there is no calculated quality, then don't change the quality and just set the status to 'validated' """ task = Task.objects.get(pk=pk) # find all tasks with the same SAS_ID, and set this quality to all of them sas_id = task.sas_id tasks = Task.objects.filter(sas_id=sas_id) for task in tasks: if task.status == State.STORED.value or task.status == State.VALIDATED.value: if quality == 'calculated': try: quality = task.calculated_qualities['per_sasid'] except: # no calculated quality present, just the existing quality (so no change) quality = task.quality task.quality = quality task.new_status = new_status task.save() if page == 0: # redirect to details screen return redirect('validation') else: # redirect to tasks list return redirect_with_params('validation', '?page=' + page) @login_required def TaskValidateTask(request, pk, quality, new_status, page=0): """ Find the task (pk), and set its quality to the calculated quality (if present, otherwise just set the status to 'validated') This is used by the 'Validate' button on the Quality Page """ task = Task.objects.get(pk=pk) if quality == 'calculated': try: quality = task.calculated_qualities['per_task'] except: # no calculated quality present, just the existing quality (so no change) quality = task.quality task.quality = quality task.new_status = new_status task.save() if page == 0: # redirect to details screen return redirect('quality') else: # redirect to tasks list return redirect_with_params('quality', '?page=' + page) @login_required def TaskRetry(request, pk, new_status, page=0): task = Task.objects.get(pk=pk) task.new_status = new_status task.save() if page == 0: # redirect to details screen return redirect('task-details') else: # redirect to tasks list return redirect_with_params('failures', '?page=' + page) @login_required def TaskDiscard(request, pk, new_status, page=0): task = Task.objects.get(pk=pk) if request.method == "POST": form = DiscardAnnotationForm(request.POST) if form.is_valid(): try: task.remarks['discard_reason'] = request.POST.get("annotation", "") except: task.remarks = {} task.remarks['discard_reason'] = request.POST.get("annotation", "") # currently, the cleanup_policy is set to the old status. # That will tell the cleanup service what to do with it. task.cleanup_policy = task.status # set the status to discard and save it task.new_status = new_status task.save() # return to the failures page if int(page) == 0: return redirect('task-details') else: return redirect_with_params('failures', '?page=' + page) else: # a GET means that the form should be presented to be filled in try: discard_reason = task.remarks['discard_reason'] except: discard_reason = "" my_form = DiscardAnnotationForm(initial={'annotation': discard_reason, 'return_to_page': page}) # if not a POST, then render the confirmaton page, which will return to this function with a POST. return render(request, "taskdatabase/failures/confirm_discard.html", {'task': task, 'my_form': my_form, 'page': page}) def TaskDiscardSasId(request, pk, new_status, page=0): task = Task.objects.get(pk=pk) sas_id = task.sas_id tasks = Task.objects.filter(sas_id=sas_id) if request.method == "POST": form = DiscardAnnotationForm(request.POST) for task in tasks: task.cleanup_policy = task.status if form.is_valid(): try: task.remarks['discard_reason'] = request.POST.get("annotation", "") except: task.remarks = {} task.remarks['discard_reason'] = request.POST.get("annotation", "") task.new_status = new_status task.save() # return to the validation page return redirect_with_params('validation', '?page=' + page) else: # a GET means that the form should be presented to be filled in try: discard_reason = task.remarks.get("discard_reason", "") except: discard_reason = "" count=tasks.count() my_form = DiscardAnnotationForm(initial={'annotation': discard_reason, 'return_to_page': page}) # if not a POST, then render the confirmaton page, which will return to this function with a POST. return render(request, "taskdatabase/validation/confirm_discard.html", {'task': task, 'my_form': my_form, 'page': page, 'sas_id': sas_id, 'count': count}) # set a filter value in the session, used later by the 'get_searched_tasks' mechanism def TaskSetFilter(request, filter, redirect_to_page): request.session['task_filter'] = filter # switch off the other filters if filter == 'all': request.session['task_onhold_filter'] = None if redirect_to_page == 'quality': return redirect('quality') if redirect_to_page == 'ingest': return redirect('ingest') if redirect_to_page == 'failures': return redirect('failures') return redirect_with_params('index', '?page=1') def TaskSetIngestFilter(request, filter): request.session['ingest_filter'] = filter return redirect_with_params('ingest', '?page=1') # set the defined list of ACTIVE_STATUSSES on the session, used later by the 'get_searched_tasks' mechanism def TaskSetActiveFilter(request, redirect_to_page): request.session['task_filter'] = settings.ACTIVE_STATUSSES request.session['task_onhold_filter'] = None if redirect_to_page == 'quality': return redirect('quality') return redirect_with_params('index', '?page=1') def TaskSetOnHoldFilter(request, onhold, redirect_to_page): request.session['task_onhold_filter'] = onhold if redirect_to_page == 'quality': return redirect('quality') return redirect_with_params('index', '?page=1') def TaskClearFilter(request, redirect_to_page): request.session['task_filter'] = 'all' request.session['task_onhold_filter'] = None request.session['filtered_tasks_as_list'] = [] request.session['search_box'] = '' request.session['filtered'] = False request.session['ingest_filter'] = 'all' try: return redirect(redirect_to_page) except: return redirect_with_params('index', '?page=1') @login_required def ChangePriority(request, pk, priority_change, page=0): task = Task.objects.get(pk=pk) priority = task.priority + int(priority_change) if priority < 0: priority = 0 task.priority = priority task.save() if page == 0: # redirect to details screen return redirect('task-details') else: # redirect to tasks list return redirect_with_params('index', '?page=' + page) @login_required def ChangePrioritySasID(request, pk, priority_change, page=0): task = Task.objects.get(pk=pk) tasks = Task.objects.filter(sas_id=task.sas_id) for task in tasks: if task.status not in ['discarded','suspended']: priority = task.priority + int(priority_change) if priority < 0: priority = 0 task.priority = priority task.save() return redirect_with_params('ingest', '?page=' + page) def SortTasks(request, sort, redirect_to_page): # store the sort field on the session request.session['sort'] = sort if redirect_to_page == 'atdb': return redirect('index') else: return redirect(redirect_to_page) def convert_query_params_to_url(query_params): # to keep the state of the current query, # loop through the current query_params and send them back into the next request # because the query_params come in as a QueryDict converted to a string # it needs some converting to a json string that can be loaded into a dict s = query_params.replace('<QueryDict: ', '')[:-1] s = s.replace('[', '') s = s.replace(']', '') s = s.replace('\'', '"') # read the constructed json as a dict d = json.loads(s) # construct the dict to a proper url params = "" for key in d: params = params + "&" + key + "=" + d[key] return params @login_required def TaskSetStatusTables2(request, pk, new_status, query_params): model = Task task = Task.objects.get(pk=pk) task.new_status = new_status task.save() current_query_params = convert_query_params_to_url(query_params) # current_query_params = "id=&id__gte=&id__lte=&workflow__id=&filter=%09test&filter__icontains=&priority=&priority__gte=&priority__lte=&status__icontains=&status__in=&project=&project__icontains=&sas_id=&sas_id__icontains=&resume=unknown" return redirect_with_params('query', '?' + current_query_params) @login_required def TaskMultiStatus(request, new_status, query_params): # get the list of id's from the session filtered_tasks_as_list = request.session['filtered_tasks_as_list'] count = len(filtered_tasks_as_list) if request.method == "POST": form = DiscardAnnotationForm(request.POST) for id in filtered_tasks_as_list: task = Task.objects.get(id=id) # setting status to discard or discarded uses a different form if 'discard' in new_status: if form.is_valid(): try: task.remarks['discard_reason'] = request.POST.get("annotation", "") except: task.remarks = {} task.remarks['discard_reason'] = request.POST.get("annotation", "") # currently, the cleanup_policy is set to the old status. # That will tell the cleanup service what to do with it. task.cleanup_policy = task.status task.new_status = new_status task.save() current_query_params = request.session['current_query_params'] return redirect_with_params('query', '?' + current_query_params) else: # add the current query parameters to the session so that they survive # the request/response to the confirmation page (which has other query parameters) current_query_params = convert_query_params_to_url(query_params) request.session['current_query_params'] = current_query_params if 'discard' in new_status: my_form = DiscardAnnotationForm(initial={'annotation': '', 'return_to_page': 0}) return render(request, "taskdatabase/query/confirm_discard.html", {'my_form': my_form, 'new_value': new_status,'count': count}) else: return render(request, "taskdatabase/query/confirm_multi_change.html", {'new_value': new_status, 'count': count}) @login_required def TaskMultiHold(request, onhold, query_params): # get the list of id's from the session filtered_tasks_as_list = request.session['filtered_tasks_as_list'] count = len(filtered_tasks_as_list) if request.method == "POST": for id in filtered_tasks_as_list: task = Task.objects.get(id=id) task.resume = (onhold == 'resume') task.save() current_query_params = request.session['current_query_params'] return redirect_with_params('query', '?' + current_query_params) # add the current query parameters to the session so that they survive # the request/response to the confirmation page (which has other query parameters) current_query_params = convert_query_params_to_url(query_params) request.session['current_query_params'] = current_query_params return render(request, "taskdatabase/query/confirm_multi_change.html", {'new_value': onhold, 'count': count}) @login_required def TaskMultiPurge(request, purge_policy, query_params): # get the list of id's from the session filtered_tasks_as_list = request.session['filtered_tasks_as_list'] count = len(filtered_tasks_as_list) if request.method == "POST": for id in filtered_tasks_as_list: task = Task.objects.get(id=id) task.purge_policy = purge_policy task.save() current_query_params = request.session['current_query_params'] return redirect_with_params('query', '?' + current_query_params) # add the current query parameters to the session so that they survive # the request/response to the confirmation page (which has other query parameters) current_query_params = convert_query_params_to_url(query_params) request.session['current_query_params'] = current_query_params return render(request, "taskdatabase/query/confirm_multi_change.html", {'new_value': purge_policy, 'count': count}) # /atdb/get_size?status__in=defined,staged class GetSizeView(generics.ListAPIView): queryset = Task.objects.all() # override list and generate a custom response def list(self, request, *args, **kwargs): query_params = dict(self.request.query_params) try: status_in = query_params['status__in'] status_list = status_in[0].split(',') if status_list == ['']: status_list = settings.STATUSSES_WITH_DATA except: # if no 'status__in=' is given, then use the default list status_list = settings.STATUSSES_WITH_DATA try: type = query_params['type'][0] # should be 'processed' or 'to_process' except: # if no 'type=' is given, then use the default list type = 'to_process' size = algorithms.get_size(status_list, type) # return a response return Response({ 'total_size': size, }) # /atdb/get_min_start_and_max_end_time?sas_id=650065 class GetMinMaxTimeView(generics.ListAPIView): queryset = Task.objects.all() # override list and generate a custom response def list(self, request, *args, **kwargs): # read the arguments from the query try: sas_id = self.request.query_params['sas_id'] start_time, end_time = algorithms.get_min_start_and_max_end_time(sas_id) return Response({ 'start_time': start_time, 'end_time': end_time, }) except Exception as error: return Response({ 'error': str(error) }) # /atdb/get_unique_values_for_key/{key} class GetUniqueValuesForKey(generics.ListAPIView): queryset = Task.objects.all() model = Task filter_backends = (filters.DjangoFilterBackend,) filter_class = TaskFilter # override list and generate a custom response def list(self, request: Request, *args, **kwargs): try: aggregation_key = kwargs['aggregation_key'] queryset = self.get_queryset() queryset = self.filter_queryset(queryset) return Response({'aggregation_key': aggregation_key, 'results': algorithms.unique_values_for_aggregation_key( queryset, aggregation_key) }) except Exception as error: return Response({ 'error': str(error) }) @staff_member_required def AssociateActivities(request): # disconnect the signals to avoid unneccesary updates disconnect_signals() #tasks = Task.objects.filter(activity__isnull=True)[:10000] tasks = Task.objects.filter(activity__isnull=True) total = tasks.count() i = 0 for task in tasks: i+=1 if not task.activity: if task.status not in ['discarded', 'suspended']: # saving triggers a call to associate_task_with_activity(task) task.save() logger.info(f'{i} of {total}') connect_signals() return redirect('index') def UpdateActivitySasId(request, sas_id): # this function is called externally to avoid worker timeouts # http://localhost:8000/atdb/tasks/repair/update-activity/600907 try: activity = Activity.objects.get(sas_id=sas_id) tasks = Task.objects.filter(sas_id=activity.sas_id) total = tasks.count() i = 0 for task in tasks: i += 1 if task.status in ['stored', 'validated','scrubbed','archiving', 'archived','finished'] or 'failed' in task.status: activities_handler.update_activity(task) logger.info(f'{i} of {total} for sas_id = {sas_id}') logger.info(f'UpdateActivitySasId {sas_id}') except Exception as error: logger.error(error) return JsonResponse({ 'error': str(error) }) return JsonResponse({ 'total': total }) def UpdateSummaryFlag(request, task_id): # this function is called externally to avoid worker timeouts # http://localhost:8000/atdb/tasks/repair/update-summary-flag/12345 try: task = Task.objects.get(id=task_id) if not task.is_summary: is_summary = check_if_summary(task) if is_summary: # disconnect the signals to avoid unneccesary updates disconnect_signals() task.save() connect_signals() except Exception as error: logger.error(error) return JsonResponse({ 'error': str(error) }) return JsonResponse({ 'is_summary': task.is_summary }) def CreateStatusGraph(request): query_per_hour = """ SELECT DATE_TRUNC('hour', timestamp) AS hour, COUNT(*) AS num_records FROM taskdatabase_status WHERE name = %s AND timestamp >= %s -- Filter for records within the last x days GROUP BY DATE_TRUNC('hour', timestamp) ORDER BY DATE_TRUNC('hour', timestamp); """ query_per_day = """ SELECT DATE_TRUNC('day', timestamp) AS day, COUNT(*) AS num_records FROM taskdatabase_status WHERE name = %s AND timestamp >= %s -- Filter for records within the last x days GROUP BY DATE_TRUNC('day', timestamp) ORDER BY DATE_TRUNC('day', timestamp); """ if request.method == 'POST': status = request.POST.get('status', 'finished') days_to_check = int(request.POST.get('days', 60)) bin_size = request.POST.get('bin_size', 'day') # Calculate the date x days ago start_date = datetime.now() - timedelta(days=days_to_check) database = settings.DATABASES['default'] # Connect to your PostgreSQL database conn = psycopg2.connect( dbname=database['NAME'], user=database['USER'], password=database['PASSWORD'], host=database['HOST'], port=database['PORT'], ) # Execute the SQL query cur = conn.cursor() if bin_size == 'day': cur.execute(query_per_day, (status,start_date)) else: cur.execute(query_per_hour, (status, start_date)) # Fetch the results results = cur.fetchall() # Close cursor and connection cur.close() conn.close() # Extract hours and number of records from results records = [result[0] for result in results] num_records = [result[1] for result in results] # Plot the graph plt.figure(figsize=(10, 6)) plt.plot(records, num_records, marker='o') plt.xlabel(bin_size.capitalize()) plt.ylabel('Number of Records') plt.title(f'{status} per {bin_size} for the last {days_to_check} days') plt.xticks(rotation=45) plt.grid(True) plt.tight_layout() plt.savefig('taskdatabase/static/status_graph.png') # Render the template with the graph return render(request, 'taskdatabase/graphs/status_graph.html') return render(request, 'taskdatabase/graphs/status_graph_input_form.html', {'image_path': 'status_graph.png'})