-
Jörn Künsemöller authoredJörn Künsemöller authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
specification.py 51.78 KiB
"""
This file contains the viewsets (based on the elsewhere defined data models and serializers)
"""
from django.shortcuts import get_object_or_404, get_list_or_404, render
from django.http import JsonResponse
from django.contrib.auth.models import User
from django_filters import rest_framework as filters
import django_property_filter as property_filters
from rest_framework.viewsets import ReadOnlyModelViewSet
from rest_framework import status
from rest_framework.response import Response
from rest_framework.decorators import permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.decorators import action
from drf_yasg.utils import swagger_auto_schema
from drf_yasg.openapi import Parameter
from lofar.sas.tmss.tmss.tmssapp.viewsets.lofar_viewset import LOFARViewSet, LOFARNestedViewSet, AbstractTemplateViewSet, LOFARCopyViewSet
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp import serializers
from django.http import JsonResponse
from datetime import datetime
from lofar.common.json_utils import get_default_json_object_for_schema
from lofar.common.datetimeutils import formatDatetime
from lofar.sas.tmss.tmss.tmssapp.tasks import *
from lofar.sas.tmss.tmss.tmssapp.subtasks import *
from django.urls import resolve, get_script_prefix,Resolver404
import json
import logging
from django.core.exceptions import ObjectDoesNotExist
logger = logging.getLogger(__name__)
# This is required for keeping a user reference as ForeignKey in other models
# (I think so that the HyperlinkedModelSerializer can generate a URI)
class UserViewSet(ReadOnlyModelViewSet):
queryset = User.objects.all()
serializer_class = serializers.UserSerializer
class TagsViewSet(LOFARViewSet):
queryset = models.Tags.objects.all()
serializer_class = serializers.TagsSerializer
class CommonSchemaTemplateViewSet(AbstractTemplateViewSet):
queryset = models.CommonSchemaTemplate.objects.all()
serializer_class = serializers.CommonSchemaTemplateSerializer
class GeneratorTemplateViewSet(AbstractTemplateViewSet):
queryset = models.GeneratorTemplate.objects.all()
serializer_class = serializers.GeneratorTemplateSerializer
class DefaultGeneratorTemplateViewSet(LOFARViewSet):
queryset = models.DefaultGeneratorTemplate.objects.all()
serializer_class = serializers.DefaultGeneratorTemplateSerializer
class SchedulingUnitObservingStrategyTemplateViewSet(LOFARViewSet):
queryset = models.SchedulingUnitObservingStrategyTemplate.objects.all()
serializer_class = serializers.SchedulingUnitObservingStrategyTemplateSerializer
@swagger_auto_schema(responses={status.HTTP_201_CREATED: 'The newly created scheduling unit',
status.HTTP_403_FORBIDDEN: 'forbidden'},
operation_description="Create a new SchedulingUnit based on this SchedulingUnitObservingStrategyTemplate, with the given <name> and <description> and make it a child of the given <scheduling_set_id>",
manual_parameters=[Parameter(name='scheduling_set_id', required=True, type='integer', in_='query',
description="the id of the scheduling_set which will be the parent of the newly created scheduling_unit"),
Parameter(name='name', required=False, type='string', in_='query',
description="The name for the newly created scheduling_unit"),
Parameter(name='description', required=False, type='string', in_='query',
description="The description for the newly created scheduling_unit")])
@action(methods=['get'], detail=True)
def create_scheduling_unit(self, request, pk=None):
strategy_template = get_object_or_404(models.SchedulingUnitObservingStrategyTemplate, pk=pk)
spec = add_defaults_to_json_object_for_schema(strategy_template.template,
strategy_template.scheduling_unit_template.schema)
scheduling_set = get_object_or_404(models.SchedulingSet, pk=request.query_params['scheduling_set_id'])
scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(name=request.query_params.get('name', "scheduling unit"),
description=request.query_params.get('description', ""),
requirements_doc=spec,
scheduling_set=scheduling_set,
requirements_template=strategy_template.scheduling_unit_template,
observation_strategy_template=strategy_template)
scheduling_unit_observation_strategy_template_path = request._request.path
base_path = scheduling_unit_observation_strategy_template_path[:scheduling_unit_observation_strategy_template_path.find('/scheduling_unit_observing_strategy_template')]
scheduling_unit_draft_path = '%s/scheduling_unit_draft/%s/' % (base_path, scheduling_unit_draft.id,)
# return a response with the new serialized SchedulingUnitDraft, and a Location to the new instance in the header
return Response(serializers.SchedulingUnitDraftSerializer(scheduling_unit_draft, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': scheduling_unit_draft_path})
class SchedulingUnitObservingStrategyTemplateNestedViewSet(LOFARNestedViewSet):
queryset = models.SchedulingUnitObservingStrategyTemplate.objects.all()
serializer_class = serializers.SchedulingUnitObservingStrategyTemplateSerializer
def get_queryset(self):
if 'scheduling_set_id' in self.kwargs:
scheduling_unit_drafts = get_list_or_404(models.SchedulingUnitDraft, scheduling_set__id=self.kwargs['scheduling_set_id'])
return [draft.observation_strategy_template for draft in scheduling_unit_drafts]
else:
return models.SchedulingUnitObservingStrategyTemplate.objects.all()
class SchedulingUnitTemplateFilter(filters.FilterSet):
class Meta:
model = models.SchedulingUnitTemplate
fields = {
'name': ['exact'],
'version': ['lt', 'gt', 'exact']
}
class SchedulingUnitTemplateViewSet(AbstractTemplateViewSet):
queryset = models.SchedulingUnitTemplate.objects.all()
serializer_class = serializers.SchedulingUnitTemplateSerializer
filter_class = SchedulingUnitTemplateFilter
class DefaultSchedulingUnitTemplateViewSet(LOFARViewSet):
queryset = models.DefaultSchedulingUnitTemplate.objects.all()
serializer_class = serializers.DefaultSchedulingUnitTemplateSerializer
class SchedulingConstraintsTemplateFilter(filters.FilterSet):
class Meta:
model = models.SchedulingConstraintsTemplate
fields = {
'name': ['exact'],
'version': ['lt', 'gt', 'exact']
}
class SchedulingConstraintsTemplateViewSet(AbstractTemplateViewSet):
queryset = models.SchedulingConstraintsTemplate.objects.all()
serializer_class = serializers.SchedulingConstraintsTemplateSerializer
filter_class = SchedulingConstraintsTemplateFilter
class DefaultSchedulingConstraintsTemplateViewSet(LOFARViewSet):
queryset = models.DefaultSchedulingConstraintsTemplate.objects.all()
serializer_class = serializers.DefaultSchedulingConstraintsTemplateSerializer
class TaskTemplateFilter(filters.FilterSet):
class Meta:
model = models.TaskTemplate
fields = {
'name': ['exact'],
'version': ['lt', 'gt', 'exact']
}
class TaskTemplateViewSet(AbstractTemplateViewSet):
queryset = models.TaskTemplate.objects.all()
serializer_class = serializers.TaskTemplateSerializer
filter_class = TaskTemplateFilter
class DefaultTaskTemplateViewSet(LOFARViewSet):
queryset = models.DefaultTaskTemplate.objects.all()
serializer_class = serializers.DefaultTaskTemplateSerializer
class TaskRelationSelectionTemplateViewSet(AbstractTemplateViewSet):
queryset = models.TaskRelationSelectionTemplate.objects.all()
serializer_class = serializers.TaskRelationSelectionTemplateSerializer
class DefaultTaskRelationSelectionTemplateViewSet(LOFARViewSet):
queryset = models.DefaultTaskRelationSelectionTemplate.objects.all()
serializer_class = serializers.DefaultTaskRelationSelectionTemplateSerializer
class DefaultReservationTemplateViewSet(LOFARViewSet):
queryset = models.DefaultReservationTemplate.objects.all()
serializer_class = serializers.DefaultReservationTemplateSerializer
class ReservationTemplateViewSet(AbstractTemplateViewSet):
queryset = models.ReservationTemplate.objects.all()
serializer_class = serializers.ReservationTemplateSerializer
class ReservationViewSet(LOFARViewSet):
queryset = models.Reservation.objects.all()
serializer_class = serializers.ReservationSerializer
class RoleViewSet(LOFARViewSet):
queryset = models.Role.objects.all()
serializer_class = serializers.RoleSerializer
class SchedulingRelationPlacement(LOFARViewSet):
queryset = models.SchedulingRelationPlacement.objects.all()
serializer_class = serializers.SchedulingRelationPlacementSerializer
class DatatypeViewSet(LOFARViewSet):
queryset = models.Datatype.objects.all()
serializer_class = serializers.DatatypeSerializer
class DataformatViewSet(LOFARViewSet):
queryset = models.Dataformat.objects.all()
serializer_class = serializers.DataformatSerializer
class CopyReasonViewSet(LOFARViewSet):
queryset = models.CopyReason.objects.all()
serializer_class = serializers.CopyReasonSerializer
class TaskConnectorTypeViewSet(LOFARViewSet):
queryset = models.TaskConnectorType.objects.all()
serializer_class = serializers.TaskConnectorTypeSerializer
class CycleViewSet(LOFARViewSet):
queryset = models.Cycle.objects.all()
serializer_class = serializers.CycleSerializer
ordering = ['start']
class CycleQuotaViewSet(LOFARViewSet):
queryset = models.CycleQuota.objects.all()
serializer_class = serializers.CycleQuotaSerializer
def get_queryset(self):
queryset = models.CycleQuota.objects.all()
# query by project
project = self.request.query_params.get('project', None)
if project is not None:
return queryset.filter(project=project)
return queryset
class ProjectViewSet(LOFARViewSet):
queryset = models.Project.objects.all()
serializer_class = serializers.ProjectSerializer
ordering = ['name']
def get_queryset(self):
queryset = models.Project.objects.all()
# query by cycle
cycle = self.request.query_params.get('cycle', None)
if cycle is not None:
return queryset.filter(cycles__name=cycle)
return queryset
class ProjectNestedViewSet(LOFARNestedViewSet):
queryset = models.Project.objects.all()
serializer_class = serializers.ProjectSerializer
def get_queryset(self):
if 'cycle_id' in self.kwargs:
cycle = get_object_or_404(models.Cycle, pk=self.kwargs['cycle_id'])
return cycle.projects.all()
else:
return models.Project.objects.all()
class ProjectQuotaViewSet(LOFARViewSet):
queryset = models.ProjectQuota.objects.all()
serializer_class = serializers.ProjectQuotaSerializer
def get_queryset(self):
queryset = models.ProjectQuota.objects.all()
# query by project
project = self.request.query_params.get('project', None)
if project is not None:
return queryset.filter(project=project)
return queryset
class ProjectQuotaArchiveLocationViewSet(LOFARViewSet):
queryset = models.ProjectQuotaArchiveLocation.objects.all()
serializer_class = serializers.ProjectQuotaArchiveLocationSerializer
def get_queryset(self):
queryset = models.ProjectQuotaArchiveLocation.objects.all()
# query by project
project = self.request.query_params.get('project', None)
if project is not None:
return queryset.filter(project=project)
return queryset
class ResourceTypeViewSet(LOFARViewSet):
queryset = models.ResourceType.objects.all()
serializer_class = serializers.ResourceTypeSerializer
class SchedulingSetViewSet(LOFARViewSet):
queryset = models.SchedulingSet.objects.all()
serializer_class = serializers.SchedulingSetSerializer
class FlagViewSet(LOFARViewSet):
queryset = models.Flag.objects.all()
serializer_class = serializers.FlagSerializer
class SettingViewSet(LOFARViewSet):
queryset = models.Setting.objects.all()
serializer_class = serializers.SettingSerializer
class QuantityViewSet(LOFARViewSet):
queryset = models.Quantity.objects.all()
serializer_class = serializers.QuantitySerializer
class PeriodCategoryViewSet(LOFARViewSet):
queryset = models.PeriodCategory.objects.all()
serializer_class = serializers.PeriodCategorySerializer
class ProjectCategoryViewSet(LOFARViewSet):
queryset = models.ProjectCategory.objects.all()
serializer_class = serializers.ProjectCategorySerializer
class SchedulingUnitDraftPropertyFilter(property_filters.PropertyFilterSet):
project = property_filters.PropertyCharFilter(field_name='project')
class Meta:
model = models.SchedulingUnitDraft
fields = ['project']
class SchedulingUnitDraftViewSet(LOFARViewSet):
queryset = models.SchedulingUnitDraft.objects.all()
serializer_class = serializers.SchedulingUnitDraftSerializer
filter_class = SchedulingUnitDraftPropertyFilter
# prefetch all reverse related references from other models on their related_name to avoid a ton of duplicate queries
queryset = queryset.prefetch_related('copied_from') \
.prefetch_related('scheduling_unit_blueprints')\
.prefetch_related('task_drafts')
# preselect all references to other models to avoid even more duplicate queries
queryset = queryset.select_related('copies') \
.select_related('copy_reason') \
.select_related('scheduling_set')
# use select_related for forward related references
queryset = queryset.select_related('copy_reason', 'scheduling_set', 'requirements_template', 'observation_strategy_template', 'scheduling_constraints_template')
@swagger_auto_schema(responses={201: 'The Created SchedulingUnitBlueprint, see Location in Response header',
403: 'forbidden'},
operation_description="Carve SchedulingUnitDraft in stone, and make an (uneditable) blueprint out of it.")
@action(methods=['get'], detail=True, url_name="create_task_blueprint", name="Create SchedulingUnitBlueprint")
def create_scheduling_unit_blueprint(self, request, pk=None):
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=pk)
scheduling_unit_blueprint = create_scheduling_unit_blueprint_from_scheduling_unit_draft(scheduling_unit_draft)
# url path magic to construct the new scheduling_unit_blueprint_path url
scheduling_unit_draft_path = request._request.path
base_path = scheduling_unit_draft_path[:scheduling_unit_draft_path.find('/scheduling_unit_draft')]
scheduling_unit_blueprint_path = '%s/scheduling_unit_blueprint/%s/' % (base_path, scheduling_unit_blueprint.id,)
# return a response with the new serialized SchedulingUnitBlueprintSerializer, and a Location to the new instance in the header
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': scheduling_unit_blueprint_path})
@swagger_auto_schema(responses={201: 'The Created SchedulingUnitBlueprint, see Location in Response header',
403: 'forbidden'},
operation_description="Carve this SchedulingUnitDraft and its TaskDraft(s) in stone, and make blueprint(s) out of it and create their subtask(s), and schedule the ones that are not dependend on predecessors")
@action(methods=['get'], detail=True, url_name="create_blueprints_and_schedule", name="Create Blueprints-Tree and Schedule")
def create_blueprints_and_schedule(self, request, pk=None):
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=pk)
scheduling_unit_blueprint = create_task_blueprints_and_subtasks_and_schedule_subtasks_from_scheduling_unit_draft(scheduling_unit_draft)
# url path magic to construct the new scheduling_unit_blueprint_path url
scheduling_unit_draft_path = request._request.path
base_path = scheduling_unit_draft_path[:scheduling_unit_draft_path.find('/scheduling_unit_draft')]
scheduling_unit_blueprint_path = '%s/scheduling_unit_blueprint/%s/' % (base_path, scheduling_unit_blueprint.id,)
# return a response with the new serialized SchedulingUnitBlueprintSerializer, and a Location to the new instance in the header
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': scheduling_unit_blueprint_path})
@swagger_auto_schema(responses={201: 'The Created SchedulingUnitBlueprint, see Location in Response header',
403: 'forbidden'},
operation_description="Carve this SchedulingUnitDraft and its TaskDraft(s) in stone, and make blueprint(s) out of it and create their subtask(s)")
@action(methods=['get'], detail=True, url_name="create_blueprints_and_subtasks", name="Create Blueprints-Tree")
def create_blueprints_and_subtasks(self, request, pk=None):
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=pk)
scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft)
# url path magic to construct the new scheduling_unit_blueprint_path url
scheduling_unit_draft_path = request._request.path
base_path = scheduling_unit_draft_path[:scheduling_unit_draft_path.find('/scheduling_unit_draft')]
scheduling_unit_blueprint_path = '%s/scheduling_unit_blueprint/%s/' % (base_path, scheduling_unit_blueprint.id,)
# return a response with the new serialized SchedulingUnitBlueprintSerializer, and a Location to the new instance in the header
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': scheduling_unit_blueprint_path})
@swagger_auto_schema(responses={201: 'The updated scheduling_unit_draft with references to its created task_drafts',
403: 'forbidden'},
operation_description="Create Task Drafts from SchedulingUnitDraft.")
@action(methods=['get'], detail=True, url_name="create_task_drafts", name="Create Task Drafts from Requirement doc")
def create_task_drafts(self, request, pk=None):
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=pk)
create_task_drafts_from_scheduling_unit_draft(scheduling_unit_draft)
# just return as a response the serialized scheduling_unit_draft (with references to the created task_draft(s)
return Response(serializers.SchedulingUnitDraftSerializer(scheduling_unit_draft,
context={'request':request}).data,
status=status.HTTP_201_CREATED)
class SchedulingUnitDraftExtendedViewSet(SchedulingUnitDraftViewSet):
serializer_class = serializers.SchedulingUnitDraftExtendedSerializer
class SchedulingUnitDraftNestedViewSet(LOFARNestedViewSet):
queryset = models.SchedulingUnitDraft.objects.all()
serializer_class = serializers.SchedulingUnitDraftSerializer
def get_queryset(self):
if 'scheduling_set_id' in self.kwargs:
scheduling_set = get_object_or_404(models.SchedulingSet, pk=self.kwargs['scheduling_set_id'])
return scheduling_set.scheduling_unit_drafts.all()
else:
return models.SchedulingUnitDraft.objects.all()
class TaskDraftCopyViewSet(LOFARCopyViewSet):
queryset = models.TaskDraft.objects.all()
serializer_class = serializers.TaskDraftSerializer
@swagger_auto_schema(responses={201: 'The new Task Draft',
403: 'forbidden'},
operation_description="Copy a Task Draft to a new Task Draft")
def create(self, request, *args, **kwargs):
if 'task_draft_id' in kwargs:
task_draft = get_object_or_404(models.TaskDraft, pk=kwargs["task_draft_id"])
body_unicode = request.body.decode('utf-8')
body_data = json.loads(body_unicode)
copy_reason = body_data.get('copy_reason', None)
try:
copy_reason_obj = models.CopyReason.objects.get(value=copy_reason)
except ObjectDoesNotExist:
logger.info("CopyReason matching query does not exist.")
#if a non valid copy_reason is specified, set copy_reason to None
copy_reason = None
task_draft_copy = copy_task_draft(task_draft,copy_reason)
# url path magic to construct the new task_draft_path url
task_draft_path = request._request.path
base_path = task_draft_path[:task_draft_path.find('/task_draft')]
task_draft_copy_path = '%s/task_draft/%s/' % (base_path, task_draft_copy.id,)
# return a response with the new serialized SchedulingUnitBlueprintSerializer, and a Location to the new instance in the header
return Response(serializers.TaskDraftSerializer(task_draft_copy, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': task_draft_copy_path})
else:
content = {'Error': 'scheduling_unit_draft_id is missing'}
return Response(content, status=status.HTTP_404_NOT_FOUND)
class SchedulingUnitDraftCopyViewSet(LOFARCopyViewSet):
queryset = models.SchedulingUnitDraft.objects.all()
serializer_class = serializers.SchedulingUnitDraftCopySerializer
@swagger_auto_schema(responses={201: 'The new scheduling_unit_draft',
403: 'forbidden'},
operation_description="Copy a Scheduling Unit Draft to a new Scheduling Unit Draft")
def create(self, request, *args, **kwargs):
if 'scheduling_unit_draft_id' in kwargs:
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=kwargs['scheduling_unit_draft_id'])
scheduling_set = scheduling_unit_draft.scheduling_set
body_unicode = request.body.decode('utf-8')
body_data = json.loads(body_unicode)
copy_reason = body_data.get('copy_reason', None)
try:
copy_reason_obj = models.CopyReason.objects.get(value=copy_reason)
except ObjectDoesNotExist:
logger.info("CopyReason matching query does not exist.")
#if a non valid copy_reason is specified, set copy_reason to None
copy_reason = None
scheduling_set_id = body_data.get('scheduling_set_id', None)
logger.info(scheduling_set_id)
if scheduling_set_id is not None:
try:
scheduling_set = models.SchedulingSet.objects.get(id=scheduling_set_id)
except ObjectDoesNotExist:
logger.info("scheduling Set does not exist.")
scheduling_unit_draft_copy = copy_scheduling_unit_draft(scheduling_unit_draft,scheduling_set,copy_reason)
# url path magic to construct the new scheduling_unit_draft_path url
scheduling_unit_draft_path = request._request.path
base_path = scheduling_unit_draft_path[:scheduling_unit_draft_path.find('/scheduling_unit_draft')]
scheduling_unit_draft_copy_path = '%s/scheduling_unit_draft/%s/' % (base_path, scheduling_unit_draft_copy.id,)
# return a response with the new serialized SchedulingUnitBlueprintSerializer, and a Location to the new instance in the header
return Response(serializers.SchedulingUnitDraftSerializer(scheduling_unit_draft_copy, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': scheduling_unit_draft_copy_path})
else:
content = {'Error': 'scheduling_unit_draft_id is missing'}
return Response(content, status=status.HTTP_404_NOT_FOUND)
class SchedulingUnitDraftCopyFromSchedulingSetViewSet(LOFARCopyViewSet):
queryset = models.SchedulingUnitDraft.objects.all()
serializer_class = serializers.SchedulingUnitDraftCopyFromSchedulingSetSerializer
def get_queryset(self):
if 'scheduling_set_id' in self.kwargs:
scheduling_set = get_object_or_404(models.SchedulingSet, pk=self.kwargs['scheduling_set_id'])
return scheduling_set.scheduling_unit_drafts.all()
else:
return models.SchedulingUnitDraft.objects.all()
@swagger_auto_schema(responses={201: "The TaskDrafts copied from the TaskDrafts in this Scheduling Unit Set",
403: 'forbidden'},
operation_description="Create a copy of all the TaskDrafts in this Scheduling Unit Set.")
def create(self, request, *args, **kwargs):
if 'scheduling_set_id' in kwargs:
scheduling_set = get_object_or_404(models.SchedulingSet, pk=kwargs['scheduling_set_id'])
scheduling_unit_drafts = scheduling_set.scheduling_unit_drafts.all()
body_unicode = request.body.decode('utf-8')
body_data = json.loads(body_unicode)
copy_reason = body_data.get('copy_reason', None)
try:
copy_reason_obj = models.CopyReason.objects.get(value=copy_reason)
except ObjectDoesNotExist:
logger.info("CopyReason matching query does not exist.")
#if a non valid copy_reason is specified, set copy_reason to None
copy_reason = None
scheduling_unit_draft_copy_path=[]
for scheduling_unit_draft in scheduling_unit_drafts:
scheduling_unit_draft_copy = copy_scheduling_unit_draft(scheduling_unit_draft,scheduling_set,copy_reason)
# url path magic to construct the new scheduling_unit_draft url
copy_scheduling_unit_draft_path = request._request.path
base_path = copy_scheduling_unit_draft_path[:copy_scheduling_unit_draft_path.find('/copy_scheduling_unit_drafts')]
scheduling_unit_draft_copy_path += ['%s/copy_scheduling_unit_drafts/%s/' % (base_path, scheduling_unit_draft_copy.id,)]
# just return as a response the serialized scheduling_set (with references to the created copy_scheduling_unit_draft(s)
return Response(serializers.SchedulingSetSerializer(scheduling_set, context={'request':request}).data,status=status.HTTP_201_CREATED)
else:
content = {'Error': 'scheduling_set_id is missing'}
return Response(content, status=status.HTTP_404_NOT_FOUND)
class SchedulingUnitBlueprintCopyToSchedulingUnitDraftViewSet(LOFARCopyViewSet):
queryset = models.SchedulingUnitBlueprint.objects.all()
serializer_class = serializers.SchedulingUnitBlueprintCopyToSchedulingUnitDraftSerializer
@swagger_auto_schema(responses={201: "The copy of the SchedulingUnitDraft",
403: 'forbidden'},
operation_description="Create a SchedulingUnitDraft from the SchedulingUnitBlueprint")
def create(self, request, *args, **kwargs):
if 'scheduling_unit_blueprint_id' in kwargs:
scheduling_unit_blueprint = get_object_or_404(models.SchedulingUnitBlueprint, pk=kwargs['scheduling_unit_blueprint_id'])
body_unicode = request.body.decode('utf-8')
body_data = json.loads(body_unicode)
copy_reason = body_data.get('copy_reason', None)
try:
copy_reason_obj = models.CopyReason.objects.get(value=copy_reason)
except ObjectDoesNotExist:
logger.info("CopyReason matching query does not exist.")
#if a non valid copy_reason is specified, set copy_reason to None
copy_reason = None
scheduling_unit_draft = create_scheduling_unit_draft_from_scheduling_unit_blueprint(scheduling_unit_blueprint,copy_reason)
# return a response with the new serialized scheduling_unit_blueprint (with references to the created task_blueprint(s) and (scheduled) subtasks)
return Response(serializers.SchedulingUnitDraftSerializer(scheduling_unit_draft, context={'request':request}).data,
status=status.HTTP_201_CREATED)
else:
content = {'Error': 'scheduling_unit_draft_id is missing'}
return Response(content, status=status.HTTP_404_NOT_FOUND)
class TaskBlueprintCopyToTaskDraftViewSet(LOFARCopyViewSet):
queryset = models.SchedulingUnitBlueprint.objects.all()
serializer_class = serializers.SchedulingUnitBlueprintCopyToSchedulingUnitDraftSerializer
@swagger_auto_schema(responses={201: "The TaskDraft created from this TaskBlueprint",
403: 'forbidden'},
operation_description="Copy this TaskBlueprint to a new TaskDraft.")
def create(self, request, *args, **kwargs):
if 'task_blueprint_id' in kwargs:
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=kwargs['task_blueprint_id'])
task_draft = copy_task_blueprint_to_task_draft(task_blueprint)
# return a response with the new serialized scheduling_unit_blueprint (with references to the created task_blueprint(s) and (scheduled) subtasks)
return Response(serializers.TaskDraftSerializer(task_draft, context={'request':request}).data,
status=status.HTTP_201_CREATED)
else:
content = {'Error': 'task_blueprint_id is missing'}
return Response(content, status=status.HTTP_404_NOT_FOUND)
class SchedulingUnitBlueprintPropertyFilter(property_filters.PropertyFilterSet):
start_time = property_filters.PropertyIsoDateTimeFromToRangeFilter(field_name='start_time')
stop_time = property_filters.PropertyIsoDateTimeFromToRangeFilter(field_name='stop_time')
project = property_filters.PropertyCharFilter(field_name='project')
class Meta:
model = models.SchedulingUnitBlueprint
fields = ['start_time', 'stop_time', 'project']
class SchedulingUnitBlueprintViewSet(LOFARViewSet):
queryset = models.SchedulingUnitBlueprint.objects.all()
serializer_class = serializers.SchedulingUnitBlueprintSerializer
filter_class = SchedulingUnitBlueprintPropertyFilter
# prefetch all reverse related references from other models on their related_name to avoid a ton of duplicate queries
queryset = queryset.prefetch_related('task_blueprints')
# use select_related for forward related references
queryset = queryset.select_related('requirements_template', 'draft')
@swagger_auto_schema(responses={201: "This SchedulingUnitBlueprint, with references to its created TaskBlueprints and (scheduled) Subtasks.",
403: 'forbidden'},
operation_description="Create TaskBlueprint(s) for this scheduling unit, create subtasks, and schedule the ones that are not dependend on predecessors.")
@action(methods=['get'], detail=True, url_name="create_taskblueprints_subtasks_and_schedule", name="Create TaskBlueprint(s), their Subtask(s) and schedule them.")
def create_taskblueprints_subtasks_and_schedule(self, request, pk=None):
scheduling_unit_blueprint = get_object_or_404(models.SchedulingUnitBlueprint, pk=pk)
scheduling_unit_blueprint = create_task_blueprints_and_subtasks_and_schedule_subtasks_from_scheduling_unit_blueprint(scheduling_unit_blueprint)
# return a response with the new serialized scheduling_unit_blueprint (with references to the created task_blueprint(s) and (scheduled) subtasks)
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
@swagger_auto_schema(responses={201: "This SchedulingUnitBlueprint, with references to its created TaskBlueprints and Subtasks.",
403: 'forbidden'},
operation_description="Create TaskBlueprint(s) for this scheduling unit and create subtasks.")
@action(methods=['get'], detail=True, url_name="create_taskblueprints_subtasks", name="Create TaskBlueprint(s) and their Subtask(s)")
def create_taskblueprints_subtasks(self, request, pk=None):
scheduling_unit_blueprint = get_object_or_404(models.SchedulingUnitBlueprint, pk=pk)
scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_blueprint(scheduling_unit_blueprint)
# return a response with the new serialized scheduling_unit_blueprint (with references to the created task_blueprint(s) and (scheduled) subtasks)
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
@swagger_auto_schema(responses={201: "This SchedulingUnitBlueprint, with references to its created TaskBlueprints.",
403: 'forbidden'},
operation_description="Create the TaskBlueprint(s).")
@action(methods=['get'], detail=True, url_name="create_taskblueprints", name="Create TaskBlueprint(s)")
def create_taskblueprints(self, request, pk=None):
scheduling_unit_blueprint = get_object_or_404(models.SchedulingUnitBlueprint, pk=pk)
scheduling_unit_blueprint = create_task_blueprints_from_scheduling_unit_blueprint(scheduling_unit_blueprint)
# return a response with the new serialized scheduling_unit_blueprint (with references to the created task_blueprint(s) and (scheduled) subtasks)
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
@swagger_auto_schema(responses={200: 'The available logging urls for all Subtasks of this SchedulingUnitBlueprint.',
403: 'forbidden'},
operation_description="Get the subtask logging urls of this schedulingunit blueprint.")
@action(methods=['get'], detail=True, url_name='get_all_subtasks_log_urls')
def get_all_subtasks_log_urls(self, request, pk=None):
subtasks = models.Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint_id=pk)
result = []
for subtask in subtasks:
if subtask.log_url != "":
result.append({"subtaskid": subtask.id, "type": subtask.specifications_template.type.value, "log_url": subtask.log_url})
# TypeError: In order to allow non-dict objects to be serialized set the safe parameter to False.
# result is list of dict so thats why
return JsonResponse(result, safe=False)
class SchedulingUnitBlueprintExtendedViewSet(SchedulingUnitBlueprintViewSet):
serializer_class = serializers.SchedulingUnitBlueprintExtendedSerializer
class SchedulingUnitBlueprintNestedViewSet(LOFARNestedViewSet):
queryset = models.SchedulingUnitBlueprint.objects.all()
serializer_class = serializers.SchedulingUnitBlueprintSerializer
def get_queryset(self):
if 'scheduling_unit_draft_id' in self.kwargs:
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=self.kwargs['scheduling_unit_draft_id'])
return scheduling_unit_draft.scheduling_unit_blueprints.all()
else:
return models.SchedulingUnitBlueprint.objects.all()
class TaskDraftViewSet(LOFARViewSet):
queryset = models.TaskDraft.objects.all()
serializer_class = serializers.TaskDraftSerializer
#permission_classes = (TMSSPermissions,) # todo: move to LOFARViewSet eventually? # Note: this should work the same, but something funny is going on: [IsProjectMember | TMSSDjangoModelPermissions]
#filter_backends = (IsProjectMemberFilterBackend,) # todo: move to LOFARViewSet eventually?
# prefetch all reverse related references from other models on their related_name to avoid a ton of duplicate queries
queryset = queryset.prefetch_related('first_scheduling_relation') \
.prefetch_related('second_scheduling_relation')\
.prefetch_related('produced_by')\
.prefetch_related('consumed_by')\
.prefetch_related('task_blueprints')\
.prefetch_related('copied_from')
# prefetch nested references in reverse models to avoid duplicate lookup queries
queryset = queryset.prefetch_related('first_scheduling_relation__placement') \
.prefetch_related('second_scheduling_relation__placement')
# select all references to other models to avoid even more duplicate queries
queryset = queryset.select_related('copies') \
.select_related('copy_reason')
# do not permit listing if the queryset contains objects that the user has not permission for.
# Note: this is not required if we apply correct filtering, but it's a nice check that we do filter correctly.
# Do not check on other actions, as the queryset might contain items that will be filtered later.
# todo: see if there is something like a get_filtered_queryset that always only includes what will be returned
# to the user, so we can always check object permissions on everything.
def get_queryset(self):
qs = super().get_queryset()
if self.action == 'list':
qs = self.filter_queryset(qs)
for obj in qs:
self.check_object_permissions(self.request, obj)
return qs
@swagger_auto_schema(responses={201: 'The created task blueprint, see Location in Response header',
403: 'forbidden'},
operation_description="Carve this draft task specification in stone, and make an (uneditable) blueprint out of it.")
@action(methods=['get'], detail=True, url_name="create_task_blueprint", name="Create TaskBlueprint") # todo: I think these actions should be 'post'-only, since they alter the DB ?!
def create_task_blueprint(self, request, pk=None):
task_draft = get_object_or_404(models.TaskDraft, pk=pk)
# todo: it seems there is no way to pass IsProjectMember as a permission class to actions without explicitly
# registering them in the router for some reason. but explicitly doing a check_object_permission works as well.
# We may want to extend get_object_or_404 to also perform a permission check before returning an object...?
self.check_object_permissions(self.request, task_draft) # or request.user.has_perm('create_task_blueprint')
task_blueprint = create_task_blueprint_from_task_draft(task_draft)
# url path magic to construct the new task_blueprint_path url
task_draft_path = request._request.path
base_path = task_draft_path[:task_draft_path.find('/task_draft')]
task_blueprint_path = '%s/task_blueprint/%s/' % (base_path, task_blueprint.id,)
# return a response with the new serialized TaskBlueprint, and a Location to the new instance in the header
return Response(serializers.TaskBlueprintSerializer(task_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': task_blueprint_path})
@swagger_auto_schema(responses={201: "This TaskBlueprint, with its created (and some scheduled) subtasks",
403: 'forbidden'},
operation_description="Create subtasks, and schedule the ones that are not dependend on predecessors.")
@action(methods=['get'], detail=True, url_name="create_task_blueprint_subtasks_and_schedule", name="Create TaskBlueprint, its Subtask(s) and Schedule")
def create_task_blueprint_subtasks_and_schedule(self, request, pk=None):
task_draft = get_object_or_404(models.TaskDraft, pk=pk)
task_blueprint = create_task_blueprint_and_subtasks_and_schedule_subtasks_from_task_draft(task_draft)
# url path magic to construct the new task_blueprint_path url
task_draft_path = request._request.path
base_path = task_draft_path[:task_draft_path.find('/task_draft')]
task_blueprint_path = '%s/task_blueprint/%s/' % (base_path, task_blueprint.id,)
# return a response with the new serialized TaskBlueprint, and a Location to the new instance in the header
return Response(serializers.TaskBlueprintSerializer(task_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': task_blueprint_path})
@swagger_auto_schema(responses={201: "This TaskBlueprint, with its created subtask(s)",
403: 'forbidden'},
operation_description="Create subtasks.")
@action(methods=['get'], detail=True, url_name="create_task_blueprint_subtasks", name="Create TaskBlueprint and its Subtask(s)")
def create_task_blueprint_and_subtasks(self, request, pk=None):
task_draft = get_object_or_404(models.TaskDraft, pk=pk)
task_blueprint = create_task_blueprint_and_subtasks_from_task_draft(task_draft)
# url path magic to construct the new task_blueprint_path url
task_draft_path = request._request.path
base_path = task_draft_path[:task_draft_path.find('/task_draft')]
task_blueprint_path = '%s/task_blueprint/%s/' % (base_path, task_blueprint.id,)
# return a response with the new serialized TaskBlueprint, and a Location to the new instance in the header
return Response(serializers.TaskBlueprintSerializer(task_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': task_blueprint_path})
@swagger_auto_schema(responses={200: 'The predecessor task draft of this task draft',
403: 'forbidden'},
operation_description="Get the predecessor task draft of this task draft.")
@action(methods=['get'], detail=True, url_name="predecessors")
def predecessors(self, request, pk=None):
task_draft = get_object_or_404(models.TaskDraft, pk=pk)
predecessors = self.filter_queryset(task_draft.predecessors)
serializer = self.get_serializer(predecessors, many=True)
return Response(serializer.data)
@swagger_auto_schema(responses={200: 'The successor subtasks of this subtask',
403: 'forbidden'},
operation_description="Get the successor subtasks of this subtask.")
@action(methods=['get'], detail=True, url_name="successors")
def successors(self, request, pk=None):
task_draft = get_object_or_404(models.TaskDraft, pk=pk)
successors = self.filter_queryset(task_draft.successors)
serializer = self.get_serializer(successors, many=True)
return Response(serializer.data)
class TaskDraftNestedViewSet(LOFARNestedViewSet):
queryset = models.TaskDraft.objects.all()
serializer_class = serializers.TaskDraftSerializer
def get_queryset(self):
if 'scheduling_unit_draft_id' in self.kwargs:
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=self.kwargs['scheduling_unit_draft_id'])
return scheduling_unit_draft.task_drafts.all()
else:
return models.TaskDraft.objects.all()
class TaskBlueprintViewSet(LOFARViewSet):
queryset = models.TaskBlueprint.objects.all()
serializer_class = serializers.TaskBlueprintSerializer
# prefetch all reverse related references from other models on their related_name to avoid a ton of duplicate queries
queryset = queryset.prefetch_related('first_scheduling_relation')\
.prefetch_related('second_scheduling_relation')\
.prefetch_related('produced_by')\
.prefetch_related('consumed_by')\
.prefetch_related('subtasks')
# prefetch nested references in reverse models to avoid duplicate lookup queries
queryset = queryset.prefetch_related('first_scheduling_relation__placement') \
.prefetch_related('second_scheduling_relation__placement') \
.prefetch_related('subtasks__specifications_template')
# use select_related for forward related references
queryset = queryset.select_related('draft', 'specifications_template', 'specifications_template__type', 'scheduling_unit_blueprint')
@swagger_auto_schema(responses={201: "This TaskBlueprint, with it is created subtasks",
403: 'forbidden'},
operation_description="Create subtasks.")
@action(methods=['get'], detail=True, url_name="create_subtasks", name="Create Subtasks")
def create_subtasks(self, request, pk=None):
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=pk)
subtasks = create_subtasks_from_task_blueprint(task_blueprint)
task_blueprint.refresh_from_db()
# return a response with the new serialized task_blueprint (with references to the created subtasks)
return Response(serializers.TaskBlueprintSerializer(task_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
@swagger_auto_schema(responses={201: "This TaskBlueprint, with it's created (and some scheduled) subtasks",
403: 'forbidden'},
operation_description="Create subtasks, and schedule the ones that are not dependend on predecessors.")
@action(methods=['get'], detail=True, url_name="create_subtasks_and_schedule", name="Create Subtasks and Schedule")
def create_subtasks_and_schedule(self, request, pk=None):
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=pk)
subtasks = create_and_schedule_subtasks_from_task_blueprint(task_blueprint)
task_blueprint.refresh_from_db()
# return a response with the new serialized task_blueprint (with references to the created (and scheduled) subtasks)
return Response(serializers.TaskBlueprintSerializer(task_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
@swagger_auto_schema(responses={201: "This TaskBlueprint, with the scheduled subtasks",
403: 'forbidden'},
operation_description="Schedule the Subtasks that are not dependend on predecessors.")
@action(methods=['get'], detail=True, url_name="schedule_independent_subtasks", name="Schedule independend Subtasks")
def schedule_independent_subtasks(self, request, pk=None):
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=pk)
schedule_independent_subtasks_in_task_blueprint(task_blueprint)
task_blueprint.refresh_from_db()
# return a response with the new serialized task_blueprint (with references to the created (and scheduled) subtasks)
return Response(serializers.TaskBlueprintSerializer(task_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
@swagger_auto_schema(responses={200: 'The predecessor task draft of this task draft',
403: 'forbidden'},
operation_description="Get the predecessor task draft of this task draft.")
@action(methods=['get'], detail=True, url_name="predecessors")
def predecessors(self, request, pk=None):
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=pk)
predecessors = self.filter_queryset(task_blueprint.predecessors)
serializer = self.get_serializer(predecessors, many=True)
return Response(serializer.data)
@swagger_auto_schema(responses={200: 'The successor subtasks of this subtask',
403: 'forbidden'},
operation_description="Get the successor subtasks of this subtask.")
@action(methods=['get'], detail=True, url_name="successors")
def successors(self, request, pk=None):
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=pk)
successors = self.filter_queryset(task_blueprint.successors)
serializer = self.get_serializer(successors, many=True)
return Response(serializer.data)
class TaskBlueprintNestedViewSet(LOFARNestedViewSet):
queryset = models.TaskBlueprint.objects.all()
serializer_class = serializers.TaskBlueprintSerializer
def get_queryset(self):
if 'task_draft_id' in self.kwargs:
task_draft = get_object_or_404(models.TaskDraft, pk=self.kwargs['task_draft_id'])
return task_draft.task_blueprints.all()
else:
return models.TaskBlueprint.objects.all()
class TaskRelationDraftViewSet(LOFARViewSet):
queryset = models.TaskRelationDraft.objects.all()
serializer_class = serializers.TaskRelationDraftSerializer
class TaskRelationDraftNestedViewSet(LOFARNestedViewSet):
queryset = models.TaskRelationDraft.objects.all()
serializer_class = serializers.TaskRelationDraftSerializer
def get_queryset(self):
if 'task_draft_id' in self.kwargs:
task_draft = get_object_or_404(models.TaskDraft, pk=self.kwargs['task_draft_id'])
return task_draft.produced_by.all() | task_draft.consumed_by.all()
else:
return models.TaskRelationDraft.objects.all()
class TaskRelationBlueprintViewSet(LOFARViewSet):
queryset = models.TaskRelationBlueprint.objects.all()
serializer_class = serializers.TaskRelationBlueprintSerializer
class TaskSchedulingRelationBlueprintViewSet(LOFARViewSet):
queryset = models.TaskSchedulingRelationBlueprint.objects.all()
serializer_class = serializers.TaskSchedulingRelationBlueprintSerializer
class TaskSchedulingRelationDraftViewSet(LOFARViewSet):
queryset = models.TaskSchedulingRelationDraft.objects.all()
serializer_class = serializers.TaskSchedulingRelationDraftSerializer
class TaskRelationBlueprintNestedViewSet(LOFARNestedViewSet):
queryset = models.TaskRelationBlueprint.objects.all()
serializer_class = serializers.TaskRelationBlueprintSerializer
def get_queryset(self):
if 'task_blueprint_id' in self.kwargs:
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=self.kwargs['task_blueprint_id'])
return task_blueprint.produced_by.all() | task_blueprint.consumed_by.all()
elif 'task_relation_draft_id' in self.kwargs:
task_relation_draft = get_object_or_404(models.TaskRelationDraft, pk=self.kwargs['task_relation_draft_id'])
return task_relation_draft.related_task_relation_blueprint.all()
else:
return models.TaskRelationBlueprint.objects.all()
class TaskTypeViewSet(LOFARViewSet):
queryset = models.TaskType.objects.all()
serializer_class = serializers.TaskTypeSerializer