Newer
Older
from django.core.exceptions import ObjectDoesNotExist
from django.http import HttpResponseRedirect
from django.shortcuts import redirect, render
from django.views.generic import CreateView, DeleteView, DetailView, UpdateView
from django.views.generic.list import ListView
from rest_framework import generics, status, viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.schemas.openapi import AutoSchema
from .models import (
ATDBProcessingSite,
DataFilterType,
DataLocation,
DataProduct,
DataProductFilter,
WorkSpecification,
)
from .serializers import (
ATDBProcessingSiteSerializer,
DataLocationSerializer,
DataProductFlatSerializer,
DataProductSerializer,
WorkSpecificationSerializer,
)
from .tasks import insert_task_into_atdb
def compute_size_of_inputs(inputs: dict) -> Tuple[int, int, int]:
total_size = 0
number_of_files = 0
if isinstance(inputs, dict) and "size" in inputs:
total_size = inputs["size"]
number_of_files = 1
elif (
isinstance(inputs, dict)
or isinstance(inputs, list)
or isinstance(inputs, tuple)
):
values = inputs
if isinstance(inputs, dict):
values = inputs.values()
for value in values:
total_size += item_total
number_of_files += item_count
average_file_size = total_size / number_of_files if number_of_files else 0
return total_size, number_of_files, average_file_size
def format_size(num, suffix="B"):
if num == 0:
return "-"
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}Yi{suffix}"
filter_class = None
def __init__(self, *args, **kwargs):
self._load_filters()
super().__init__(*args, **kwargs)
def _load_filters(self):
if self.Meta.filter_class is None:
for item in self.Meta.filter_class.objects.all():
field_obj = self.Meta.model._meta.get_field(item.field)
filter_class, *_ = self.filter_for_lookup(field_obj, item.lookup_type)
self.base_filters[item.name] = filter_class(item.field)
# --- Filters ---
class DataProductFilterSet(DynamicFilterSet):
class Meta:
model = DataProduct
filter_class = DataProductFilter
# ---------- GUI Views -----------
atdb_hosts = ATDBProcessingSite.objects.values("name", "url")
return render(request, "lofardata/api.html", {"atdb_hosts": atdb_hosts})
def preprocess_filters_specification_view(specification):
dataproduct_filters = DataProductFilter.objects.all()
for dataproduct_filter in dataproduct_filters:
if (
specification is not None
and specification.filters
and dataproduct_filter.field in specification.filters
):
dataproduct_filter.default = specification.filters[dataproduct_filter.field]
else:
dataproduct_filter.default = ""
if dataproduct_filter.filter_type == DataFilterType.DROPDOWN:
dataproduct_filter.choices = DataProduct.objects.distinct(
dataproduct_filter.field
).values_list(dataproduct_filter.field)
return dataproduct_filters
class Specifications(ListView):
serializer_class = WorkSpecificationSerializer
template_name = "lofardata/index.html"
model = WorkSpecification
ordering = ["-created_on"]
def get_queryset(self):
queryset = WorkSpecification.objects.all()
current_user: User = self.request.user
if current_user.is_staff or current_user.is_superuser:
return queryset.order_by("-created_on")
return queryset.filter(created_by=current_user.id).order_by("-created_on")
class WorkSpecificationCreateUpdateView(UpdateView):
template_name = "lofardata/workspecification/create_update.html"
model = WorkSpecification
form_class = WorkSpecificationForm
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def get_object(self, queryset=None):
if self.kwargs.__len__() == 0 or self.kwargs["pk"] is None:
specification = WorkSpecification()
else:
specification = WorkSpecification.objects.get(pk=self.kwargs["pk"])
return specification
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
try:
specification = WorkSpecification.objects.get(pk=context["object"].pk)
except ObjectDoesNotExist:
specification = None
context["filters"] = preprocess_filters_specification_view(specification)
return context
def create_successor(self, specification):
successor = WorkSpecification()
successor.predecessor_specification = specification
successor.processing_site = specification.processing_site
successor.save()
return self.get_success_url(pk=successor.pk)
def form_valid(self, form):
action_ = form.data["action"]
specification = form.instance
if action_ == "Submit":
specification.async_task_result = None
specification.is_ready = False
if action_ == "Send":
insert_task_into_atdb.delay(specification.pk)
if action_ == "Successor":
specification.save()
successor = WorkSpecification()
successor.predecessor_specification = specification
successor.processing_site = specification.processing_site
successor.selected_workflow = specification.selected_workflow
return HttpResponseRedirect(self.create_successor(specification))
def get_success_url(self, **kwargs):
if kwargs.__len__() == 0 or kwargs["pk"] is None:
return reverse_lazy("index")
return reverse_lazy("specification-update", kwargs={"pk": kwargs["pk"]})
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
class WorkSpecificationDetailView(DetailView):
template_name = "lofardata/workspecification/detail.html"
model = WorkSpecification
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
specification = WorkSpecification.objects.get(pk=context["object"].pk)
context["filters"] = preprocess_filters_specification_view(specification)
total_input_size, number_of_files, average_file_size = compute_size_of_inputs(
specification.inputs
)
context["number_of_files"] = number_of_files
context["total_input_size"] = format_size(total_input_size)
context["size_per_task"] = format_size(
average_file_size * specification.batch_size
if specification.batch_size > 0
else total_input_size
)
return context
class WorkSpecificationDeleteView(DeleteView):
template_name = "lofardata/workspecification/delete.html"
model = WorkSpecification
success_url = reverse_lazy("index")
class WorkSpecificationInputsView(DetailView):
template_name = "lofardata/workspecification/inputs.html"
model = WorkSpecification
class WorkSpecificationATDBTasksView(DetailView):
template_name = "lofardata/workspecification/tasks.html"
model = WorkSpecification
# ---------- REST API views ----------
class DataProductView(generics.ListCreateAPIView):
model = DataProduct
serializer_class = DataProductSerializer
queryset = DataProduct.objects.all().order_by("obs_id")
# using the Django Filter Backend - https://django-filter.readthedocs.io/en/latest/index.html
filter_backends = (filters.DjangoFilterBackend,)
class ATDBProcessingSiteView(viewsets.ReadOnlyModelViewSet):
model = ATDBProcessingSite
serializer_class = ATDBProcessingSiteSerializer
queryset = ATDBProcessingSite.objects.all().order_by("pk")

Roy de Goei
committed
class DataProductDetailsView(generics.RetrieveUpdateDestroyAPIView):
model = DataProduct
serializer_class = DataProductSerializer
queryset = DataProduct.objects.all()
class DataLocationView(generics.ListCreateAPIView):
model = DataLocation
serializer_class = DataLocationSerializer
queryset = DataLocation.objects.all().order_by("name")
class InsertWorkSpecificationSchema(AutoSchema):
def get_operation_id_base(self, path, method, action):
class InsertMultiDataproductView(generics.CreateAPIView):
"""
Add single DataProduct
"""
queryset = DataProduct.objects.all()
serializer_class = DataProductFlatSerializer
"""if an array is passed, set serializer to many"""
if isinstance(kwargs.get("data", {}), list):
kwargs["many"] = True
class WorkSpecificationViewset(viewsets.ModelViewSet):
queryset = WorkSpecification.objects.all()
serializer_class = WorkSpecificationSerializer
def get_queryset(self):
current_user: User = self.request.user
if not current_user.is_staff or not current_user.is_superuser:
return self.queryset.filter(created_by=current_user.id)
@action(detail=True, methods=["POST"])
def submit(self, request, pk=None) -> Response:
# TODO: check that there are some matches in the request?
insert_task_into_atdb.delay(pk)
time.sleep(1) # allow for some time to pass
return redirect("specification-detail", pk=pk)