Select Git revision
MANIFEST.in
-
Klaas Kliffen authoredKlaas Kliffen authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
specification.py 71.19 KiB
"""
This file contains the viewsets (based on the elsewhere defined data models and serializers)
"""
from django.shortcuts import get_object_or_404, get_list_or_404, render
from django.http import JsonResponse
from django.contrib.auth import get_user_model
User = get_user_model()
from django_filters import rest_framework as filters
import django_property_filter as property_filters
from rest_framework.viewsets import ReadOnlyModelViewSet
from rest_framework import status
from rest_framework.response import Response
from rest_framework.decorators import permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.decorators import action
from rest_framework.response import Response as RestResponse
from drf_yasg.utils import swagger_auto_schema
from drf_yasg.openapi import Parameter
from lofar.sas.tmss.tmss.tmssapp.viewsets.lofar_viewset import LOFARViewSet, LOFARNestedViewSet, AbstractTemplateViewSet, LOFARCopyViewSet, LOFARFilterBackend
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp import serializers
from lofar.sas.tmss.tmss.tmssapp.adapters.reports import create_cycle_report, create_project_report
from lofar.sas.tmss.tmss.tmssapp.adapters.keycloak import get_users_by_role_in_project
from django.http import JsonResponse
from datetime import datetime
from lofar.common.json_utils import get_default_json_object_for_schema
from lofar.common.datetimeutils import formatDatetime
from lofar.sas.tmss.tmss.tmssapp.tasks import *
from lofar.sas.tmss.tmss.tmssapp.subtasks import *
from lofar.sas.tmss.tmss.tmssapp.viewsets.permissions import TMSSDjangoModelPermissions, get_project_roles_for_user
from django.urls import resolve, get_script_prefix,Resolver404
from rest_framework.filters import OrderingFilter
import json
import logging
import dateutil
from django.core.exceptions import ObjectDoesNotExist
logger = logging.getLogger(__name__)
# This is required for keeping a user reference as ForeignKey in other models
# (I think so that the HyperlinkedModelSerializer can generate a URI)
class UserViewSet(ReadOnlyModelViewSet):
queryset = User.objects.all()
serializer_class = serializers.UserSerializer
class TagsViewSet(LOFARViewSet):
queryset = models.Tags.objects.all()
serializer_class = serializers.TagsSerializer
class CommonSchemaTemplateViewSet(AbstractTemplateViewSet):
queryset = models.CommonSchemaTemplate.objects.all()
serializer_class = serializers.CommonSchemaTemplateSerializer
class GeneratorTemplateViewSet(AbstractTemplateViewSet):
queryset = models.GeneratorTemplate.objects.all()
serializer_class = serializers.GeneratorTemplateSerializer
class DefaultGeneratorTemplateViewSet(LOFARViewSet):
queryset = models.DefaultGeneratorTemplate.objects.all()
serializer_class = serializers.DefaultGeneratorTemplateSerializer
class SchedulingUnitObservingStrategyTemplateViewSet(LOFARViewSet):
queryset = models.SchedulingUnitObservingStrategyTemplate.objects.all()
serializer_class = serializers.SchedulingUnitObservingStrategyTemplateSerializer
@swagger_auto_schema(responses={status.HTTP_201_CREATED: 'The newly created scheduling unit',
status.HTTP_403_FORBIDDEN: 'forbidden'},
operation_description="Create a new SchedulingUnit based on this SchedulingUnitObservingStrategyTemplate, with the given <name> and <description> and make it a child of the given <scheduling_set_id>",
manual_parameters=[Parameter(name='scheduling_set_id', required=True, type='integer', in_='query',
description="the id of the scheduling_set which will be the parent of the newly created scheduling_unit"),
Parameter(name='name', required=False, type='string', in_='query',
description="The name for the newly created scheduling_unit"),
Parameter(name='description', required=False, type='string', in_='query',
description="The description for the newly created scheduling_unit"),
Parameter(name='requirements_doc_overrides', required=False, type='dict', in_='body',
description="a JSON dict containing the override values for the parameters in the template")
])
@action(methods=['post'], detail=True)
def create_scheduling_unit(self, request, pk=None):
strategy_template = get_object_or_404(models.SchedulingUnitObservingStrategyTemplate, pk=pk)
scheduling_set = get_object_or_404(models.SchedulingSet, pk=request.query_params['scheduling_set_id'])
name = request.query_params.get('name', "scheduling unit")
description = request.query_params.get('description', "")
requirements_doc_overrides = request.data if isinstance(request.data, dict) else None
scheduling_unit_draft = create_scheduling_unit_draft_from_observing_strategy_template(strategy_template, scheduling_set, name, description, requirements_doc_overrides)
# return a response with the new serialized SchedulingUnitDraft, and a Location to the new instance in the header
scheduling_unit_observation_strategy_template_path = request._request.path
base_path = scheduling_unit_observation_strategy_template_path[:scheduling_unit_observation_strategy_template_path.find('/scheduling_unit_observing_strategy_template')]
scheduling_unit_draft_path = '%s/scheduling_unit_draft/%s/' % (base_path, scheduling_unit_draft.id,)
return Response(serializers.SchedulingUnitDraftSerializer(scheduling_unit_draft, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': scheduling_unit_draft_path})
@swagger_auto_schema(responses={200: 'The template document as JSON object with all defaults filled in.',
403: 'forbidden'},
operation_description="Get the template document as JSON object with all defaults filled in. This template document is the foundation from which a scheduling unit draft is created. The intended use is to carefully inspect this JSON document for all specification details for this strategy template.")
@action(methods=['get'], detail=True)
def template_doc_complete_with_defaults(self, request, pk=None):
strategy_template = get_object_or_404(models.SchedulingUnitObservingStrategyTemplate, pk=pk)
return JsonResponse(strategy_template.template_doc_complete_with_defaults, json_dumps_params={'indent': 2})
@swagger_auto_schema(responses={200: 'The template document as JSON object with just the items that are in the parameters list.',
403: 'forbidden'},
operation_description="Get the slimmed down version of the template document as JSON object with all defaults filled in, but then only for the allowed-to-be-edited parameters. This template document is the foundation from which a scheduling unit draft is created. The intended use is to carefully inspect this JSON document for all specification details for this strategy template.")
@action(methods=['get'], detail=True)
def template_doc_with_just_the_parameters(self, request, pk=None):
strategy_template = get_object_or_404(models.SchedulingUnitObservingStrategyTemplate, pk=pk)
return JsonResponse(strategy_template.template_doc_with_just_the_parameters, json_dumps_params={'indent': 2})
@swagger_auto_schema(responses={200: 'Get a trigger JSON document ready to be used for trigger submission',
403: 'forbidden'},
operation_description="Get a trigger JSON document ready to be used for trigger submission.")
@action(methods=['get'], detail=True)
def trigger_doc(self, request, pk=None):
'''Convenience endpoint to easily fetch a JSON doc for the requested strategy_template which can be used for trigger submission'''
strategy_template = get_object_or_404(models.SchedulingUnitObservingStrategyTemplate, pk=pk)
trigger_template = get_object_or_404(models.CommonSchemaTemplate, name='triggers')
triger_doc = trigger_template.get_default_json_document_for_schema()
triger_doc['name'] = 'my_trigger_name'
triger_doc['description'] = 'my_description_for_this_trigger'
triger_doc['scheduling_unit_observing_strategy_template']['name'] = strategy_template.name
triger_doc['scheduling_unit_observing_strategy_template']['version'] = int(strategy_template.version)
triger_doc['scheduling_unit_observing_strategy_template']['overrides'] = strategy_template.template_doc_with_just_the_parameters
return JsonResponse(triger_doc, json_dumps_params={'indent': 2})
class SchedulingUnitObservingStrategyTemplateNestedViewSet(LOFARNestedViewSet):
queryset = models.SchedulingUnitObservingStrategyTemplate.objects.all()
serializer_class = serializers.SchedulingUnitObservingStrategyTemplateSerializer
def get_queryset(self):
if 'scheduling_set_id' in self.kwargs:
scheduling_unit_drafts = get_list_or_404(models.SchedulingUnitDraft, scheduling_set__id=self.kwargs['scheduling_set_id'])
return [draft.observation_strategy_template for draft in scheduling_unit_drafts]
else:
return models.SchedulingUnitObservingStrategyTemplate.objects.all()
class SchedulingUnitTemplateFilter(filters.FilterSet):
class Meta:
model = models.SchedulingUnitTemplate
fields = {
'name': ['exact'],
'version': ['lt', 'gt', 'exact']
}
class SchedulingUnitTemplateViewSet(AbstractTemplateViewSet):
queryset = models.SchedulingUnitTemplate.objects.all()
serializer_class = serializers.SchedulingUnitTemplateSerializer
filter_class = SchedulingUnitTemplateFilter
class DefaultSchedulingUnitTemplateViewSet(LOFARViewSet):
queryset = models.DefaultSchedulingUnitTemplate.objects.all()
serializer_class = serializers.DefaultSchedulingUnitTemplateSerializer
class SchedulingConstraintsTemplateFilter(filters.FilterSet):
class Meta:
model = models.SchedulingConstraintsTemplate
fields = {
'name': ['exact'],
'version': ['lt', 'gt', 'exact']
}
class SchedulingConstraintsTemplateViewSet(AbstractTemplateViewSet):
queryset = models.SchedulingConstraintsTemplate.objects.all()
serializer_class = serializers.SchedulingConstraintsTemplateSerializer
filter_class = SchedulingConstraintsTemplateFilter
class DefaultSchedulingConstraintsTemplateViewSet(LOFARViewSet):
queryset = models.DefaultSchedulingConstraintsTemplate.objects.all()
serializer_class = serializers.DefaultSchedulingConstraintsTemplateSerializer
class TaskTemplateFilter(filters.FilterSet):
class Meta:
model = models.TaskTemplate
fields = {
'name': ['exact'],
'version': ['lt', 'gt', 'exact']
}
class TaskTemplateViewSet(AbstractTemplateViewSet):
queryset = models.TaskTemplate.objects.all()
serializer_class = serializers.TaskTemplateSerializer
filter_class = TaskTemplateFilter
class DefaultTaskTemplateViewSet(LOFARViewSet):
queryset = models.DefaultTaskTemplate.objects.all()
serializer_class = serializers.DefaultTaskTemplateSerializer
class TaskRelationSelectionTemplateViewSet(AbstractTemplateViewSet):
queryset = models.TaskRelationSelectionTemplate.objects.all()
serializer_class = serializers.TaskRelationSelectionTemplateSerializer
class DefaultTaskRelationSelectionTemplateViewSet(LOFARViewSet):
queryset = models.DefaultTaskRelationSelectionTemplate.objects.all()
serializer_class = serializers.DefaultTaskRelationSelectionTemplateSerializer
class ReservationStrategyTemplateViewSet(LOFARViewSet):
queryset = models.ReservationStrategyTemplate.objects.all()
serializer_class = serializers.ReservationStrategyTemplateSerializer
@swagger_auto_schema(responses={status.HTTP_201_CREATED: 'The newly created reservation',
status.HTTP_403_FORBIDDEN: 'forbidden'},
operation_description="Create a new Reservation based on this ReservationStrategyTemplate, "
"with the given <name>, <description>, <start_time> and <stop_time>",
manual_parameters=[Parameter(name='start_time', required=True, type='string', in_='query',
description="The start time as a timestamp string in isoformat"),
Parameter(name='stop_time', required=True, type='string', in_='query',
description="The stop time as a timestamp string in isoformat"),
Parameter(name='name', required=False, type='string', in_='query',
description="The name for the newly created reservation"),
Parameter(name='description', required=False, type='string', in_='query',
description="The description for the newly created reservation"),
Parameter(name='project_id', required=False, type='integer', in_='query',
description="the id of the project which will be the parent of the newly created reservation"),
])
@action(methods=['post'], detail=True)
def create_reservation(self, request, pk=None):
strategy_template = get_object_or_404(models.ReservationStrategyTemplate, pk=pk)
reservation_template_spec = add_defaults_to_json_object_for_schema(strategy_template.template,
strategy_template.reservation_template.schema)
start_time = request.query_params.get('start_time', None)
stop_time = request.query_params.get('stop_time', None)
if start_time:
start_time = dateutil.parser.parse(start_time) # string to datetime
else:
start_time = datetime.now()
if stop_time:
stop_time = dateutil.parser.parse(stop_time) # string to datetime
else:
stop_time = None
project_id = request.query_params.get('project_id', None)
if project_id:
project = get_object_or_404(models.Project, pk=request.query_params['project_id'])
else:
project = None
reservation = Reservation.objects.create(name=request.query_params.get('name', "reservation"),
description=request.query_params.get('description', ""),
project=project,
specifications_template=strategy_template.reservation_template,
specifications_doc=reservation_template_spec,
start_time=start_time,
stop_time=stop_time)
reservation_strategy_template_path = request._request.path
base_path = reservation_strategy_template_path[:reservation_strategy_template_path.find('/reservation_strategy_template')]
reservation_path = '%s/reservation/%s/' % (base_path, reservation.id,)
# return a response with the new serialized Reservation, and a Location to the new instance in the header
return Response(serializers.ReservationSerializer(reservation, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': reservation_path})
class DefaultReservationTemplateViewSet(LOFARViewSet):
queryset = models.DefaultReservationTemplate.objects.all()
serializer_class = serializers.DefaultReservationTemplateSerializer
class ReservationTemplateViewSet(AbstractTemplateViewSet):
queryset = models.ReservationTemplate.objects.all()
serializer_class = serializers.ReservationTemplateSerializer
class ReservationViewSet(LOFARViewSet):
queryset = models.Reservation.objects.all()
serializer_class = serializers.ReservationSerializer
class RoleViewSet(LOFARViewSet):
queryset = models.Role.objects.all()
serializer_class = serializers.RoleSerializer
class IOTypeViewSet(LOFARViewSet):
queryset = models.IOType.objects.all()
serializer_class = serializers.IOTypeSerializer
class SchedulingRelationPlacementViewSet(LOFARViewSet):
queryset = models.SchedulingRelationPlacement.objects.all()
serializer_class = serializers.SchedulingRelationPlacementSerializer
class DatatypeViewSet(LOFARViewSet):
queryset = models.Datatype.objects.all()
serializer_class = serializers.DatatypeSerializer
class DataformatViewSet(LOFARViewSet):
queryset = models.Dataformat.objects.all()
serializer_class = serializers.DataformatSerializer
class CopyReasonViewSet(LOFARViewSet):
queryset = models.CopyReason.objects.all()
serializer_class = serializers.CopyReasonSerializer
class TaskConnectorTypeViewSet(LOFARViewSet):
queryset = models.TaskConnectorType.objects.all()
serializer_class = serializers.TaskConnectorTypeSerializer
class CycleViewSet(LOFARViewSet):
permission_classes = (TMSSDjangoModelPermissions,) # override default project permission
filter_backends = (LOFARFilterBackend, OrderingFilter) # override default project permission
queryset = models.Cycle.objects.all()
serializer_class = serializers.CycleSerializer
ordering = ['start']
@swagger_auto_schema(responses={200: 'The Report information',
403: 'forbidden'},
operation_description="Get Report information for the cycle.")
@action(methods=['get'], detail=True, url_name="report", name="Get Report")
def report(self, request, pk=None):
cycle = get_object_or_404(models.Cycle, pk=pk)
result = create_cycle_report(request, cycle)
return Response(result, status=status.HTTP_200_OK)
class CycleQuotaViewSet(LOFARViewSet):
queryset = models.CycleQuota.objects.all()
serializer_class = serializers.CycleQuotaSerializer
def get_queryset(self):
queryset = models.CycleQuota.objects.all()
# query by project
project = self.request.query_params.get('project', None)
if project is not None:
return queryset.filter(project=project)
return queryset
class ProjectViewSet(LOFARViewSet):
queryset = models.Project.objects.all()
serializer_class = serializers.ProjectSerializer
ordering = ['name']
def get_queryset(self):
queryset = models.Project.objects.all()
# query by cycle
cycle = self.request.query_params.get('cycle', None)
if cycle is not None:
return queryset.filter(cycles__name=cycle)
return queryset
@swagger_auto_schema(responses={200: 'The Report information',
403: 'forbidden'},
operation_description="Get Report information for the project.")
@action(methods=['get'], detail=True, url_name="report", name="Get Report")
def report(self, request, pk=None):
project = get_object_or_404(models.Project, pk=pk)
result = create_project_report(request, project)
return Response(result, status=status.HTTP_200_OK)
@swagger_auto_schema(responses={200: 'List of users that have the "friend of project" role for this project',
403: 'forbidden'},
operation_description="Get the list of users that have the 'friend of project' role for this project")
@action(methods=['get'], detail=True, url_name="friend", name="Friend(s) of this project")
def friend(self, request, pk=None):
project = get_object_or_404(models.Project, pk=pk)
result = get_users_by_role_in_project(models.ProjectRole.Choices.FRIEND_OF_PROJECT.value, project.name)
return Response(result, status=status.HTTP_200_OK)
@swagger_auto_schema(responses={200: 'List of roles that the requesting user has in this project',
403: 'forbidden'},
operation_description="Get the list of roles that the requesting user has in this project")
@action(methods=['get'], detail=True, url_name="my_roles", name="My roles in this project")
def my_roles(self, request, pk=None):
project = get_object_or_404(models.Project, pk=pk)
project_roles = get_project_roles_for_user(request.user)
result = []
for project_role in project_roles:
if project.name == project_role['project']:
result.append(project_role['role'])
return Response(result, status=status.HTTP_200_OK)
class ProjectNestedViewSet(LOFARNestedViewSet):
queryset = models.Project.objects.all()
serializer_class = serializers.ProjectSerializer
def get_queryset(self):
if 'cycle_id' in self.kwargs:
cycle = get_object_or_404(models.Cycle, pk=self.kwargs['cycle_id'])
return cycle.projects.all()
else:
return models.Project.objects.all()
class ProjectQuotaViewSet(LOFARViewSet):
queryset = models.ProjectQuota.objects.all()
serializer_class = serializers.ProjectQuotaSerializer
def get_queryset(self):
queryset = models.ProjectQuota.objects.all()
# query by project
project = self.request.query_params.get('project', None)
if project is not None:
return queryset.filter(project=project)
return queryset
class ProjectQuotaArchiveLocationViewSet(LOFARViewSet):
queryset = models.ProjectQuotaArchiveLocation.objects.all()
serializer_class = serializers.ProjectQuotaArchiveLocationSerializer
def get_queryset(self):
queryset = models.ProjectQuotaArchiveLocation.objects.all()
# query by project
project = self.request.query_params.get('project', None)
if project is not None:
return queryset.filter(project=project)
return queryset
class ResourceTypeViewSet(LOFARViewSet):
queryset = models.ResourceType.objects.all()
serializer_class = serializers.ResourceTypeSerializer
class SchedulingSetViewSet(LOFARViewSet):
queryset = models.SchedulingSet.objects.all()
serializer_class = serializers.SchedulingSetSerializer
class SystemSettingFlagViewSet(LOFARViewSet):
queryset = models.SystemSettingFlag.objects.all()
serializer_class = serializers.SystemSettingFlagSerializer
class SettingViewSet(LOFARViewSet):
queryset = models.Setting.objects.all()
serializer_class = serializers.SettingSerializer
class QuantityViewSet(LOFARViewSet):
queryset = models.Quantity.objects.all()
serializer_class = serializers.QuantitySerializer
class PeriodCategoryViewSet(LOFARViewSet):
queryset = models.PeriodCategory.objects.all()
serializer_class = serializers.PeriodCategorySerializer
class ProjectCategoryViewSet(LOFARViewSet):
queryset = models.ProjectCategory.objects.all()
serializer_class = serializers.ProjectCategorySerializer
class SchedulingUnitDraftPropertyFilter(property_filters.PropertyFilterSet):
project = property_filters.PropertyCharFilter(field_name='project')
class Meta:
model = models.SchedulingUnitDraft
fields = '__all__'
filter_overrides = {
models.JSONField: {
'filter_class': property_filters.CharFilter,
},
models.ArrayField: {
'filter_class': property_filters.CharFilter,
'extra': lambda f: {'lookup_expr': 'icontains'}
},
}
class SchedulingUnitDraftViewSet(LOFARViewSet):
queryset = models.SchedulingUnitDraft.objects.all()
serializer_class = serializers.SchedulingUnitDraftSerializer
filter_class = SchedulingUnitDraftPropertyFilter # note that this breaks other filter backends from LOFARViewSet
# prefetch all reverse related references from other models on their related_name to avoid a ton of duplicate queries
queryset = queryset.prefetch_related('copied_from') \
.prefetch_related('scheduling_unit_blueprints')\
.prefetch_related('task_drafts')
# preselect all references to other models to avoid even more duplicate queries
queryset = queryset.select_related('copies') \
.select_related('copy_reason') \
.select_related('scheduling_set')
# use select_related for forward related references
queryset = queryset.select_related('copy_reason', 'scheduling_set', 'requirements_template', 'observation_strategy_template', 'scheduling_constraints_template')
@swagger_auto_schema(responses={201: 'The Created SchedulingUnitBlueprint, see Location in Response header',
403: 'forbidden'},
operation_description="Carve SchedulingUnitDraft in stone, and make an (uneditable) blueprint out of it.")
@action(methods=['post'], detail=True, url_name="create_scheduling_unit_blueprint", name="Create SchedulingUnitBlueprint")
def create_scheduling_unit_blueprint(self, request, pk=None):
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=pk)
scheduling_unit_blueprint = create_scheduling_unit_blueprint_from_scheduling_unit_draft(scheduling_unit_draft)
# url path magic to construct the new scheduling_unit_blueprint_path url
scheduling_unit_draft_path = request._request.path
base_path = scheduling_unit_draft_path[:scheduling_unit_draft_path.find('/scheduling_unit_draft')]
scheduling_unit_blueprint_path = '%s/scheduling_unit_blueprint/%s/' % (base_path, scheduling_unit_blueprint.id,)
# return a response with the new serialized SchedulingUnitBlueprintSerializer, and a Location to the new instance in the header
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': scheduling_unit_blueprint_path})
@swagger_auto_schema(responses={201: 'The Created SchedulingUnitBlueprint, see Location in Response header',
403: 'forbidden'},
operation_description="Carve this SchedulingUnitDraft and its TaskDraft(s) in stone, and make blueprint(s) out of it and create their subtask(s), and schedule the ones that are not dependend on predecessors")
@action(methods=['post'], detail=True, url_name="create_blueprints_and_schedule", name="Create Blueprints-Tree and Schedule")
def create_blueprints_and_schedule(self, request, pk=None):
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=pk)
scheduling_unit_blueprint = create_task_blueprints_and_subtasks_and_schedule_subtasks_from_scheduling_unit_draft(scheduling_unit_draft)
# url path magic to construct the new scheduling_unit_blueprint_path url
scheduling_unit_draft_path = request._request.path
base_path = scheduling_unit_draft_path[:scheduling_unit_draft_path.find('/scheduling_unit_draft')]
scheduling_unit_blueprint_path = '%s/scheduling_unit_blueprint/%s/' % (base_path, scheduling_unit_blueprint.id,)
# return a response with the new serialized SchedulingUnitBlueprintSerializer, and a Location to the new instance in the header
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': scheduling_unit_blueprint_path})
@swagger_auto_schema(responses={201: 'The Created SchedulingUnitBlueprint, see Location in Response header',
403: 'forbidden'},
operation_description="Carve this SchedulingUnitDraft and its TaskDraft(s) in stone, and make blueprint(s) out of it and create their subtask(s)")
@action(methods=['post'], detail=True, url_name="create_blueprints_and_subtasks", name="Create Blueprints-Tree")
def create_blueprints_and_subtasks(self, request, pk=None):
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=pk)
scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft)
# url path magic to construct the new scheduling_unit_blueprint_path url
scheduling_unit_draft_path = request._request.path
base_path = scheduling_unit_draft_path[:scheduling_unit_draft_path.find('/scheduling_unit_draft')]
scheduling_unit_blueprint_path = '%s/scheduling_unit_blueprint/%s/' % (base_path, scheduling_unit_blueprint.id,)
# return a response with the new serialized SchedulingUnitBlueprintSerializer, and a Location to the new instance in the header
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': scheduling_unit_blueprint_path})
@swagger_auto_schema(responses={201: 'The updated scheduling_unit_draft with references to its created task_drafts',
403: 'forbidden'},
operation_description="Create Task Drafts from SchedulingUnitDraft.")
@action(methods=['post'], detail=True, url_name="create_task_drafts", name="Create Task Drafts from Requirement doc")
def create_task_drafts(self, request, pk=None):
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=pk)
create_task_drafts_from_scheduling_unit_draft(scheduling_unit_draft)
# just return as a response the serialized scheduling_unit_draft (with references to the created task_draft(s)
return Response(serializers.SchedulingUnitDraftSerializer(scheduling_unit_draft,
context={'request':request}).data,
status=status.HTTP_201_CREATED)
class SchedulingUnitDraftExtendedViewSet(SchedulingUnitDraftViewSet):
serializer_class = serializers.SchedulingUnitDraftExtendedSerializer
class SchedulingUnitDraftNestedViewSet(LOFARNestedViewSet):
queryset = models.SchedulingUnitDraft.objects.all()
serializer_class = serializers.SchedulingUnitDraftSerializer
def get_queryset(self):
if 'scheduling_set_id' in self.kwargs:
scheduling_set = get_object_or_404(models.SchedulingSet, pk=self.kwargs['scheduling_set_id'])
return scheduling_set.scheduling_unit_drafts.all()
else:
return models.SchedulingUnitDraft.objects.all()
class TaskDraftCopyViewSet(LOFARCopyViewSet):
queryset = models.TaskDraft.objects.all()
serializer_class = serializers.TaskDraftSerializer
@swagger_auto_schema(responses={201: 'The new Task Draft',
400: 'bad request',
403: 'forbidden',
404: 'not found'},
operation_description="Copy this Task Draft to a new Task Draft")
def create(self, request, *args, **kwargs):
if 'id' in kwargs:
task_draft = get_object_or_404(models.TaskDraft, pk=kwargs["id"])
body_unicode = request.body.decode('utf-8')
body_data = json.loads(body_unicode)
copy_reason = body_data.get('copy_reason', None)
if copy_reason == None:
content = {'Error': 'copy_reason is missing'}
return Response(content, status=status.HTTP_400_BAD_REQUEST)
try:
copy_reason_obj = models.CopyReason.objects.get(value=copy_reason)
except ObjectDoesNotExist:
logger.info("CopyReason matching query does not exist.")
#if a non valid copy_reason is specified, raise a 400 error
choices = [c.value for c in models.CopyReason.Choices]
content = {'Error': 'The provided copy_reason is not defined in the system. Possible choices are: %s' % choices}
return Response(content, status=status.HTTP_400_BAD_REQUEST)
# TODO: Update copy_task_draft() accordingly if needed.
task_draft_copy = copy_task_draft(task_draft,copy_reason_obj)
# url path magic to construct the new task_draft_path url
task_draft_path = request._request.path
base_path = task_draft_path[:task_draft_path.find('/task_draft')]
task_draft_copy_path = '%s/task_draft/%s/' % (base_path, task_draft_copy.id,)
# return a response with the new serialized SchedulingUnitBlueprintSerializer, and a Location to the new instance in the header
return Response(serializers.TaskDraftSerializer(task_draft_copy, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': task_draft_copy_path})
else:
content = {'Error': 'SchedulingUnitDraft id is missing'}
return Response(content, status=status.HTTP_404_NOT_FOUND)
class SchedulingUnitDraftCopyViewSet(LOFARCopyViewSet):
queryset = models.SchedulingUnitDraft.objects.all()
serializer_class = serializers.SchedulingUnitDraftCopySerializer
@swagger_auto_schema(responses={201: 'The new SchedulingUnitDraft',
400: 'bad request',
403: 'forbidden',
404: 'not found'},
operation_description="Copy a SchedulingUnitDraft to a new SchedulingUnitDraft")
def create(self, request, *args, **kwargs):
if 'id' in kwargs:
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=kwargs['id'])
scheduling_set = scheduling_unit_draft.scheduling_set
body_unicode = request.body.decode('utf-8')
body_data = json.loads(body_unicode)
copy_reason = body_data.get('copy_reason', None)
if copy_reason == None:
content = {'Error': 'copy_reason is missing'}
return Response(content, status=status.HTTP_400_BAD_REQUEST)
try:
copy_reason_obj = models.CopyReason.objects.get(value=copy_reason)
except ObjectDoesNotExist:
logger.info("CopyReason matching query does not exist.")
# if a non valid copy_reason is specified, raise a 400 error
choices = [c.value for c in models.CopyReason.Choices]
content = {'Error': 'The provided copy_reason is not defined in the system. Possible choices are: %s' % choices}
return Response(content, status=status.HTTP_400_BAD_REQUEST)
scheduling_set_id = body_data.get('scheduling_set_id', None)
logger.info(scheduling_set_id)
if scheduling_set_id is not None:
try:
scheduling_set = models.SchedulingSet.objects.get(id=scheduling_set_id)
except ObjectDoesNotExist:
logger.info("scheduling Set does not exist.")
# TODO: Change parameter from copy_reason to copy_reason_obj and update copy_scheduling_unit_draft() accordingly.
scheduling_unit_draft_copy = copy_scheduling_unit_draft(scheduling_unit_draft,scheduling_set,copy_reason)
# url path magic to construct the new scheduling_unit_draft_path url
scheduling_unit_draft_path = request._request.path
base_path = scheduling_unit_draft_path[:scheduling_unit_draft_path.find('/scheduling_unit_draft')]
scheduling_unit_draft_copy_path = '%s/scheduling_unit_draft/%s/' % (base_path, scheduling_unit_draft_copy.id,)
# return a response with the new serialized SchedulingUnitBlueprintSerializer, and a Location to the new instance in the header
return Response(serializers.SchedulingUnitDraftSerializer(scheduling_unit_draft_copy, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': scheduling_unit_draft_copy_path})
else:
content = {'Error': 'SchedulingUnitDraft id is missing'}
return Response(content, status=status.HTTP_404_NOT_FOUND)
class SchedulingUnitDraftCopyFromSchedulingSetViewSet(LOFARCopyViewSet):
queryset = models.SchedulingUnitDraft.objects.all()
serializer_class = serializers.SchedulingUnitDraftCopyFromSchedulingSetSerializer
def get_queryset(self):
if 'scheduling_set_id' in self.kwargs:
scheduling_set = get_object_or_404(models.SchedulingSet, pk=self.kwargs['scheduling_set_id'])
return scheduling_set.scheduling_unit_drafts.all()
else:
return models.SchedulingUnitDraft.objects.all()
@swagger_auto_schema(responses={201: "The SchedulingUnitSet which will also contain the created new drafts",
400: 'bad request',
403: 'forbidden',
404: 'not found'},
operation_description="Copy the SchedulingUnitDrafts in this SchedulingUnitSet to new SchedulingUnitDrafts")
def create(self, request, *args, **kwargs):
if 'id' in kwargs:
scheduling_set = get_object_or_404(models.SchedulingSet, pk=kwargs['id'])
scheduling_unit_drafts = scheduling_set.scheduling_unit_drafts.all()
body_unicode = request.body.decode('utf-8')
body_data = json.loads(body_unicode)
copy_reason = body_data.get('copy_reason', None)
if copy_reason == None:
content = {'Error': 'copy_reason is missing'}
return Response(content, status=status.HTTP_400_BAD_REQUEST)
try:
copy_reason_obj = models.CopyReason.objects.get(value=copy_reason)
except ObjectDoesNotExist:
logger.info("CopyReason matching query does not exist.")
# if a non valid copy_reason is specified, raise a 400 error
choices = [c.value for c in models.CopyReason.Choices]
content = {'Error': 'The provided copy_reason is not defined in the system. Possible choices are: %s' % choices}
return Response(content, status=status.HTTP_400_BAD_REQUEST)
scheduling_unit_draft_copy_path=[]
for scheduling_unit_draft in scheduling_unit_drafts:
# TODO: Change parameter from copy_reason to copy_reason_obj and update copy_scheduling_unit_draft() accordingly.
scheduling_unit_draft_copy = copy_scheduling_unit_draft(scheduling_unit_draft,scheduling_set,copy_reason)
# url path magic to construct the new scheduling_unit_draft url
copy_scheduling_unit_draft_path = request._request.path
base_path = copy_scheduling_unit_draft_path[:copy_scheduling_unit_draft_path.find('/copy_scheduling_unit_drafts')]
scheduling_unit_draft_copy_path += ['%s/copy_scheduling_unit_drafts/%s/' % (base_path, scheduling_unit_draft_copy.id,)]
# just return as a response the serialized scheduling_set (with references to the created copy_scheduling_unit_draft(s)
return Response(serializers.SchedulingSetSerializer(scheduling_set, context={'request':request}).data,status=status.HTTP_201_CREATED)
else:
content = {'Error': 'SchedulingSet id is missing'}
return Response(content, status=status.HTTP_404_NOT_FOUND)
class SchedulingUnitBlueprintCopyToSchedulingUnitDraftViewSet(LOFARCopyViewSet):
queryset = models.SchedulingUnitBlueprint.objects.all()
serializer_class = serializers.SchedulingUnitBlueprintCopyToSchedulingUnitDraftSerializer
@swagger_auto_schema(responses={201: "The new SchedulingUnitDraft copied from this SchedulingUnitBlueprint",
400: 'bad request',
403: 'forbidden',
404: 'not found'},
operation_description="Copy the SchedulingUnitBlueprint to a new SchedulingUnitDraft")
def create(self, request, *args, **kwargs):
if 'id' in kwargs:
scheduling_unit_blueprint = get_object_or_404(models.SchedulingUnitBlueprint, pk=kwargs['id'])
body_unicode = request.body.decode('utf-8')
body_data = json.loads(body_unicode)
copy_reason = body_data.get('copy_reason', None)
if copy_reason == None:
content = {'Error': 'copy_reason is missing'}
return Response(content, status=status.HTTP_400_BAD_REQUEST)
try:
copy_reason_obj = models.CopyReason.objects.get(value=copy_reason)
except ObjectDoesNotExist:
logger.info("CopyReason matching query does not exist.")
# if a non valid copy_reason is specified, raise a 400 error
choices = [c.value for c in models.CopyReason.Choices]
content = {'Error': 'The provided copy_reason is not defined in the system. Possible choices are: %s' % choices}
return Response(content, status=status.HTTP_400_BAD_REQUEST)
# TODO: Update create_scheduling_unit_draft_from_scheduling_unit_blueprint() accordingly if needed.
scheduling_unit_draft = create_scheduling_unit_draft_from_scheduling_unit_blueprint(scheduling_unit_blueprint,copy_reason_obj)
# return a response with the new serialized scheduling_unit_blueprint (with references to the created task_blueprint(s) and (scheduled) subtasks)
return Response(serializers.SchedulingUnitDraftSerializer(scheduling_unit_draft, context={'request':request}).data,
status=status.HTTP_201_CREATED)
else:
content = {'Error': 'scheduling_unit_draft_id is missing'}
return Response(content, status=status.HTTP_404_NOT_FOUND)
class TaskBlueprintCopyToTaskDraftViewSet(LOFARCopyViewSet):
queryset = models.SchedulingUnitBlueprint.objects.all()
serializer_class = serializers.SchedulingUnitBlueprintCopyToSchedulingUnitDraftSerializer
@swagger_auto_schema(responses={201: "The new TaskDraft created from this TaskBlueprint",
400: 'bad request',
403: 'forbidden',
404: 'not found'},
operation_description="Copy this TaskBlueprint into a new TaskDraft.")
def create(self, request, *args, **kwargs):
if 'id' in kwargs:
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=kwargs['id'])
body_unicode = request.body.decode('utf-8')
body_data = json.loads(body_unicode)
copy_reason = body_data.get('copy_reason', None)
if copy_reason == None:
content = {'Error': 'copy_reason is missing'}
return Response(content, status=status.HTTP_400_BAD_REQUEST)
try:
copy_reason_obj = models.CopyReason.objects.get(value=copy_reason)
except ObjectDoesNotExist:
logger.info("CopyReason matching query does not exist.")
# if a non valid copy_reason is specified, raise a 400 error
choices = [c.value for c in models.CopyReason.Choices]
content = {'Error': 'The provided copy_reason is not defined in the system. Possible choices are: %s' % choices}
return Response(content, status=status.HTTP_400_BAD_REQUEST)
# TODO: Add copy_reason_obj and update the copy_task_blueprint_to_task_draft() accordingly.
task_draft = copy_task_blueprint_to_task_draft(task_blueprint)
# return a response with the new serialized scheduling_unit_blueprint (with references to the created task_blueprint(s) and (scheduled) subtasks)
return Response(serializers.TaskDraftSerializer(task_draft, context={'request':request}).data,
status=status.HTTP_201_CREATED)
else:
content = {'Error': 'TaskBlueprint id is missing'}
return Response(content, status=status.HTTP_404_NOT_FOUND)
class SchedulingUnitBlueprintPropertyFilter(property_filters.PropertyFilterSet):
start_time = property_filters.PropertyIsoDateTimeFromToRangeFilter(field_name='start_time')
stop_time = property_filters.PropertyIsoDateTimeFromToRangeFilter(field_name='stop_time')
project = property_filters.PropertyCharFilter(field_name='project')
status = property_filters.PropertyCharFilter(field_name='status')
class Meta:
model = models.SchedulingUnitBlueprint
fields = '__all__'
filter_overrides = {
models.JSONField: {
'filter_class': property_filters.CharFilter,
},
models.ArrayField: {
'filter_class': property_filters.CharFilter,
'extra': lambda f: {'lookup_expr': 'icontains'}
},
}
class SchedulingUnitBlueprintViewSet(LOFARViewSet):
queryset = models.SchedulingUnitBlueprint.objects.all()
serializer_class = serializers.SchedulingUnitBlueprintSerializer
filter_class = SchedulingUnitBlueprintPropertyFilter # note that this breaks other filter backends from LOFARViewSet
# prefetch all reverse related references from other models on their related_name to avoid a ton of duplicate queries
queryset = queryset.prefetch_related('task_blueprints')
# use select_related for forward related references
queryset = queryset.select_related('requirements_template', 'draft')
@swagger_auto_schema(responses={201: "This SchedulingUnitBlueprint, with references to its created TaskBlueprints and (scheduled) Subtasks.",
403: 'forbidden'},
operation_description="Create TaskBlueprint(s) for this scheduling unit, create subtasks, and schedule the ones that are not dependend on predecessors.")
@action(methods=['post'], detail=True, url_name="create_taskblueprints_subtasks_and_schedule", name="Create TaskBlueprint(s), their Subtask(s) and schedule them.")
def create_taskblueprints_subtasks_and_schedule(self, request, pk=None):
scheduling_unit_blueprint = get_object_or_404(models.SchedulingUnitBlueprint, pk=pk)
scheduling_unit_blueprint = create_task_blueprints_and_subtasks_and_schedule_subtasks_from_scheduling_unit_blueprint(scheduling_unit_blueprint)
# return a response with the new serialized scheduling_unit_blueprint (with references to the created task_blueprint(s) and (scheduled) subtasks)
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
@swagger_auto_schema(responses={201: "This SchedulingUnitBlueprint, with references to its created TaskBlueprints and Subtasks.",
403: 'forbidden'},
operation_description="Create TaskBlueprint(s) for this scheduling unit and create subtasks.")
@action(methods=['post'], detail=True, url_name="create_taskblueprints_subtasks", name="Create TaskBlueprint(s) and their Subtask(s)")
def create_taskblueprints_subtasks(self, request, pk=None):
scheduling_unit_blueprint = get_object_or_404(models.SchedulingUnitBlueprint, pk=pk)
scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_blueprint(scheduling_unit_blueprint)
# return a response with the new serialized scheduling_unit_blueprint (with references to the created task_blueprint(s) and (scheduled) subtasks)
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
@swagger_auto_schema(responses={201: "This SchedulingUnitBlueprint, with references to its created TaskBlueprints.",
403: 'forbidden'},
operation_description="Create the TaskBlueprint(s).")
@action(methods=['post'], detail=True, url_name="create_taskblueprints", name="Create TaskBlueprint(s)")
def create_taskblueprints(self, request, pk=None):
scheduling_unit_blueprint = get_object_or_404(models.SchedulingUnitBlueprint, pk=pk)
scheduling_unit_blueprint = create_task_blueprints_from_scheduling_unit_blueprint(scheduling_unit_blueprint)
# return a response with the new serialized scheduling_unit_blueprint (with references to the created task_blueprint(s) and (scheduled) subtasks)
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
@swagger_auto_schema(responses={200: 'The available logging urls for all Subtasks of this SchedulingUnitBlueprint.',
403: 'forbidden'},
operation_description="Get the subtask logging urls of this schedulingunit blueprint.")
@action(methods=['get'], detail=True, url_name='get_all_subtasks_log_urls')
def get_all_subtasks_log_urls(self, request, pk=None):
subtasks = models.Subtask.objects.filter(task_blueprints__scheduling_unit_blueprint_id=pk)
result = []
for subtask in subtasks:
if subtask.log_url != "":
result.append({"subtaskid": subtask.id, "type": subtask.specifications_template.type.value, "log_url": subtask.log_url})
# TypeError: In order to allow non-dict objects to be serialized set the safe parameter to False.
# result is list of dict so thats why
return JsonResponse(result, safe=False)
@swagger_auto_schema(responses={200: 'The cancelled version of this scheduling_unit',
403: 'forbidden',
500: 'The scheduling_unit could not be cancelled'},
operation_description="Try to cancel this scheduling_unit.")
@action(methods=['post'], detail=True, url_name="cancel")
def cancel(self, request, pk=None):
scheduling_unit_blueprint = get_object_or_404(models.SchedulingUnitBlueprint, pk=pk)
from lofar.sas.tmss.tmss.tmssapp.tasks import cancel_scheduling_unit_blueprint
scheduling_unit_blueprint = cancel_scheduling_unit_blueprint(scheduling_unit_blueprint)
serializer = self.get_serializer(scheduling_unit_blueprint)
return RestResponse(serializer.data)
@swagger_auto_schema(responses={200: 'The obsolete version of this scheduling_unit',
403: 'forbidden',
500: 'The scheduling_unit could not be marked as obsolete. Was it in a state from which it is allowed to mark it as obsolete?'},
operation_description="Try to mark this scheduling_unit as obsolete (when the status is cancelled/error)")
@action(methods=['post'], detail=True, url_name="mark_as_obsolete")
def mark_as_obsolete(self, request, pk=None):
scheduling_unit_blueprint = get_object_or_404(models.SchedulingUnitBlueprint, pk=pk)
from lofar.sas.tmss.tmss.tmssapp.tasks import mark_scheduling_unit_blueprint_as_obsolete
scheduling_unit_blueprint = mark_scheduling_unit_blueprint_as_obsolete(scheduling_unit_blueprint)
serializer = self.get_serializer(scheduling_unit_blueprint)
return RestResponse(serializer.data)
@swagger_auto_schema(responses={200: "All Subtasks in this SchedulingUnitBlueprint",
403: 'forbidden'},
operation_description="Get all subtasks for this scheduling_unit")
@action(methods=['get'], detail=True, url_name="subtasks", name="all subtasks in this scheduling_unit")
def subtasks(self, request, pk=None):
subtasks = models.Subtask.objects.all().filter(task_blueprints__scheduling_unit_blueprint_id=pk). \
select_related('state', 'specifications_template', 'specifications_template__type', 'cluster', 'created_or_updated_by_user').all()
# return a response with the new serialized scheduling_unit_blueprint (with references to the created task_blueprint(s) and (scheduled) subtasks)
return Response(serializers.SubtaskSerializer(subtasks, many=True, context={'request':request}).data,
status=status.HTTP_200_OK)
@swagger_auto_schema(responses={201: "This SchedulingUnitBlueprint, with references to the created Cleanup TaskBlueprints.",
403: 'forbidden'},
operation_description="Create a cleanup task for this scheduling unit.")
@action(methods=['post'], detail=True, url_name="create_cleanuptask", name="Create a cleanup task for this scheduling unit")
def create_cleanuptask_for_scheduling_unit_blueprint(self, request, pk=None):
scheduling_unit_blueprint = get_object_or_404(models.SchedulingUnitBlueprint, pk=pk)
scheduling_unit_blueprint = create_cleanuptask_for_scheduling_unit_blueprint(scheduling_unit_blueprint)
# return a response with the new serialized scheduling_unit_blueprint (with references to the created task_blueprint and subtask)
return Response(serializers.SchedulingUnitBlueprintSerializer(scheduling_unit_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
class SchedulingUnitBlueprintExtendedViewSet(SchedulingUnitBlueprintViewSet):
serializer_class = serializers.SchedulingUnitBlueprintExtendedSerializer
class SchedulingUnitBlueprintNestedViewSet(LOFARNestedViewSet):
queryset = models.SchedulingUnitBlueprint.objects.all()
serializer_class = serializers.SchedulingUnitBlueprintSerializer
def get_queryset(self):
if 'scheduling_unit_draft_id' in self.kwargs:
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=self.kwargs['scheduling_unit_draft_id'])
return scheduling_unit_draft.scheduling_unit_blueprints.all()
else:
return models.SchedulingUnitBlueprint.objects.all()
class TaskDraftViewSet(LOFARViewSet):
queryset = models.TaskDraft.objects.all()
serializer_class = serializers.TaskDraftSerializer
# prefetch all reverse related references from other models on their related_name to avoid a ton of duplicate queries
queryset = queryset.prefetch_related('first_scheduling_relation') \
.prefetch_related('second_scheduling_relation')\
.prefetch_related('produced_by')\
.prefetch_related('consumed_by')\
.prefetch_related('task_blueprints')\
.prefetch_related('copied_from')
# prefetch nested references in reverse models to avoid duplicate lookup queries
queryset = queryset.prefetch_related('first_scheduling_relation__placement') \
.prefetch_related('second_scheduling_relation__placement') \
.prefetch_related('specifications_template__type') \
.prefetch_related('specifications_template__connector_types')
# select all references to other models to avoid even more duplicate queries
queryset = queryset.select_related('copies') \
.select_related('copy_reason')
# # do not permit listing if the queryset contains objects that the user has not permission for.
# # Note: this is not required if we apply correct filtering, but it's a nice check that we do filter correctly.
# # Do not check on other actions, as the queryset might contain items that will be filtered later.
# # todo: see if there is something like a get_filtered_queryset that always only includes what will be returned
# # to the user, so we can always check object permissions on everything.
# # todo: this causes a recursion error if has_permission is called from has_object_permission in TMSSPermissions
# # Probably a non-issue since we do the filtering anyway.
# def get_queryset(self):
# qs = super().get_queryset()
# if self.action == 'list':
# qs = self.filter_queryset(qs)
# for obj in qs:
# self.check_object_permissions(self.request, obj)
# return qs
@swagger_auto_schema(responses={201: 'The created task blueprint, see Location in Response header',
403: 'forbidden'},
operation_description="Carve this draft task specification in stone, and make an (uneditable) blueprint out of it.")
@action(methods=['post'], detail=True, url_name="create_task_blueprint", name="Create TaskBlueprint")
def create_task_blueprint(self, request, pk=None):
task_draft = get_object_or_404(models.TaskDraft, pk=pk)
task_blueprint = create_task_blueprint_from_task_draft(task_draft)
# url path magic to construct the new task_blueprint_path url
task_draft_path = request._request.path
base_path = task_draft_path[:task_draft_path.find('/task_draft')]
task_blueprint_path = '%s/task_blueprint/%s/' % (base_path, task_blueprint.id,)
# return a response with the new serialized TaskBlueprint, and a Location to the new instance in the header
return Response(serializers.TaskBlueprintSerializer(task_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': task_blueprint_path})
@swagger_auto_schema(responses={201: "This TaskBlueprint, with its created (and some scheduled) subtasks",
403: 'forbidden'},
operation_description="Create subtasks, and schedule the ones that are not dependend on predecessors.")
@action(methods=['post'], detail=True, url_name="create_task_blueprint_subtasks_and_schedule", name="Create TaskBlueprint, its Subtask(s) and Schedule")
def create_task_blueprint_subtasks_and_schedule(self, request, pk=None):
task_draft = get_object_or_404(models.TaskDraft, pk=pk)
task_blueprint = create_task_blueprint_and_subtasks_and_schedule_subtasks_from_task_draft(task_draft)
# url path magic to construct the new task_blueprint_path url
task_draft_path = request._request.path
base_path = task_draft_path[:task_draft_path.find('/task_draft')]
task_blueprint_path = '%s/task_blueprint/%s/' % (base_path, task_blueprint.id,)
# return a response with the new serialized TaskBlueprint, and a Location to the new instance in the header
return Response(serializers.TaskBlueprintSerializer(task_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': task_blueprint_path})
@swagger_auto_schema(responses={201: "This TaskBlueprint, with its created subtask(s)",
403: 'forbidden'},
operation_description="Create subtasks.")
@action(methods=['post'], detail=True, url_name="create_task_blueprint_subtasks", name="Create TaskBlueprint and its Subtask(s)")
def create_task_blueprint_and_subtasks(self, request, pk=None):
task_draft = get_object_or_404(models.TaskDraft, pk=pk)
task_blueprint = create_task_blueprint_and_subtasks_from_task_draft(task_draft)
# url path magic to construct the new task_blueprint_path url
task_draft_path = request._request.path
base_path = task_draft_path[:task_draft_path.find('/task_draft')]
task_blueprint_path = '%s/task_blueprint/%s/' % (base_path, task_blueprint.id,)
# return a response with the new serialized TaskBlueprint, and a Location to the new instance in the header
return Response(serializers.TaskBlueprintSerializer(task_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED,
headers={'Location': task_blueprint_path})
@swagger_auto_schema(responses={200: 'The predecessor task draft of this task draft',
403: 'forbidden'},
operation_description="Get the predecessor task draft of this task draft.")
@action(methods=['get'], detail=True, url_name="predecessors")
def predecessors(self, request, pk=None):
task_draft = get_object_or_404(models.TaskDraft, pk=pk)
predecessors = self.filter_queryset(task_draft.predecessors)
serializer = self.get_serializer(predecessors, many=True)
return Response(serializer.data)
@swagger_auto_schema(responses={200: 'The successor subtasks of this subtask',
403: 'forbidden'},
operation_description="Get the successor subtasks of this subtask.")
@action(methods=['get'], detail=True, url_name="successors")
def successors(self, request, pk=None):
task_draft = get_object_or_404(models.TaskDraft, pk=pk)
successors = self.filter_queryset(task_draft.successors)
serializer = self.get_serializer(successors, many=True)
return Response(serializer.data)
class TaskDraftNestedViewSet(LOFARNestedViewSet):
queryset = models.TaskDraft.objects.all()
serializer_class = serializers.TaskDraftSerializer
def get_queryset(self):
if 'scheduling_unit_draft_id' in self.kwargs:
scheduling_unit_draft = get_object_or_404(models.SchedulingUnitDraft, pk=self.kwargs['scheduling_unit_draft_id'])
return scheduling_unit_draft.task_drafts.all()
else:
return models.TaskDraft.objects.all()
class TaskBlueprintViewSet(LOFARViewSet):
queryset = models.TaskBlueprint.objects.all()
serializer_class = serializers.TaskBlueprintSerializer
# prefetch all reverse related references from other models on their related_name to avoid a ton of duplicate queries
queryset = queryset.prefetch_related('first_scheduling_relation')\
.prefetch_related('second_scheduling_relation')\
.prefetch_related('produced_by')\
.prefetch_related('consumed_by')\
.prefetch_related('subtasks')
# prefetch nested references in reverse models to avoid duplicate lookup queries
queryset = queryset.prefetch_related('first_scheduling_relation__placement') \
.prefetch_related('second_scheduling_relation__placement') \
.prefetch_related('subtasks__specifications_template') \
.prefetch_related('specifications_template__connector_types')
# use select_related for forward related references
queryset = queryset.select_related('draft', 'specifications_template', 'specifications_template__type', 'scheduling_unit_blueprint')
@swagger_auto_schema(responses={201: "This TaskBlueprint, with it is created subtasks",
403: 'forbidden'},
operation_description="Create subtasks.")
@action(methods=['post'], detail=True, url_name="create_subtasks", name="Create Subtasks")
def create_subtasks(self, request, pk=None):
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=pk)
subtasks = create_subtasks_from_task_blueprint(task_blueprint)
task_blueprint.refresh_from_db()
# return a response with the new serialized task_blueprint (with references to the created subtasks)
return Response(serializers.TaskBlueprintSerializer(task_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
@swagger_auto_schema(responses={201: "This TaskBlueprint, with it's created (and some scheduled) subtasks",
403: 'forbidden'},
operation_description="Create subtasks, and schedule the ones that are not dependend on predecessors.")
@action(methods=['post'], detail=True, url_name="create_subtasks_and_schedule", name="Create Subtasks and Schedule")
def create_subtasks_and_schedule(self, request, pk=None):
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=pk)
subtasks = create_and_schedule_subtasks_from_task_blueprint(task_blueprint)
task_blueprint.refresh_from_db()
# return a response with the new serialized task_blueprint (with references to the created (and scheduled) subtasks)
return Response(serializers.TaskBlueprintSerializer(task_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
@swagger_auto_schema(responses={201: "This TaskBlueprint, with the scheduled subtasks",
403: 'forbidden'},
operation_description="Schedule the Subtasks that are not dependend on predecessors.")
@action(methods=['post'], detail=True, url_name="schedule_independent_subtasks", name="Schedule independend Subtasks")
def schedule_independent_subtasks(self, request, pk=None):
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=pk)
schedule_independent_subtasks_in_task_blueprint(task_blueprint)
task_blueprint.refresh_from_db()
# return a response with the new serialized task_blueprint (with references to the created (and scheduled) subtasks)
return Response(serializers.TaskBlueprintSerializer(task_blueprint, context={'request':request}).data,
status=status.HTTP_201_CREATED)
@swagger_auto_schema(responses={200: 'The predecessor task draft of this task draft',
403: 'forbidden'},
operation_description="Get the predecessor task draft of this task draft.")
@action(methods=['get'], detail=True, url_name="predecessors")
def predecessors(self, request, pk=None):
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=pk)
predecessors = self.filter_queryset(task_blueprint.predecessors)
serializer = self.get_serializer(predecessors, many=True)
return Response(serializer.data)
@swagger_auto_schema(responses={200: 'The successor subtasks of this subtask',
403: 'forbidden'},
operation_description="Get the successor subtasks of this subtask.")
@action(methods=['get'], detail=True, url_name="successors")
def successors(self, request, pk=None):
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=pk)
successors = self.filter_queryset(task_blueprint.successors)
serializer = self.get_serializer(successors, many=True)
return Response(serializer.data)
@swagger_auto_schema(responses={200: 'The cancelled version of this task',
403: 'forbidden',
500: 'The task could not be cancelled'},
operation_description="Try to cancel this task.")
@action(methods=['post'], detail=True, url_name="cancel")
def cancel(self, request, pk=None):
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=pk)
from lofar.sas.tmss.tmss.tmssapp.tasks import cancel_task_blueprint
task_blueprint = cancel_task_blueprint(task_blueprint)
serializer = self.get_serializer(task_blueprint)
return RestResponse(serializer.data)
@swagger_auto_schema(responses={200: 'The obsolete version of this task_blueprint',
403: 'forbidden',
500: 'The task_blueprint could not be marked as obsolete. Was it in a state from which it is allowed to mark it as obsolete?'},
operation_description="Try to mark this task_blueprint as obsolete (when the status is cancelled/error)")
@action(methods=['post'], detail=True, url_name="mark_as_obsolete")
def mark_as_obsolete(self, request, pk=None):
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=pk)
from lofar.sas.tmss.tmss.tmssapp.tasks import mark_task_blueprint_as_obsolete
task_blueprint = mark_task_blueprint_as_obsolete(task_blueprint)
serializer = self.get_serializer(task_blueprint)
return RestResponse(serializer.data)
class TaskBlueprintNestedViewSet(LOFARNestedViewSet):
queryset = models.TaskBlueprint.objects.all()
serializer_class = serializers.TaskBlueprintSerializer
def get_queryset(self):
if 'task_draft_id' in self.kwargs:
task_draft = get_object_or_404(models.TaskDraft, pk=self.kwargs['task_draft_id'])
return task_draft.task_blueprints.all()
else:
return models.TaskBlueprint.objects.all()
class TaskRelationDraftViewSet(LOFARViewSet):
queryset = models.TaskRelationDraft.objects.all()
serializer_class = serializers.TaskRelationDraftSerializer
class TaskRelationDraftNestedViewSet(LOFARNestedViewSet):
queryset = models.TaskRelationDraft.objects.all()
serializer_class = serializers.TaskRelationDraftSerializer
def get_queryset(self):
if 'task_draft_id' in self.kwargs:
task_draft = get_object_or_404(models.TaskDraft, pk=self.kwargs['task_draft_id'])
return task_draft.produced_by.all() | task_draft.consumed_by.all()
else:
return models.TaskRelationDraft.objects.all()
class TaskRelationBlueprintViewSet(LOFARViewSet):
queryset = models.TaskRelationBlueprint.objects.all()
serializer_class = serializers.TaskRelationBlueprintSerializer
class TaskSchedulingRelationBlueprintViewSet(LOFARViewSet):
queryset = models.TaskSchedulingRelationBlueprint.objects.all()
serializer_class = serializers.TaskSchedulingRelationBlueprintSerializer
class TaskSchedulingRelationDraftViewSet(LOFARViewSet):
queryset = models.TaskSchedulingRelationDraft.objects.all()
serializer_class = serializers.TaskSchedulingRelationDraftSerializer
class TaskRelationBlueprintNestedViewSet(LOFARNestedViewSet):
queryset = models.TaskRelationBlueprint.objects.all()
serializer_class = serializers.TaskRelationBlueprintSerializer
def get_queryset(self):
if 'task_blueprint_id' in self.kwargs:
task_blueprint = get_object_or_404(models.TaskBlueprint, pk=self.kwargs['task_blueprint_id'])
return task_blueprint.produced_by.all() | task_blueprint.consumed_by.all()
elif 'task_relation_draft_id' in self.kwargs:
task_relation_draft = get_object_or_404(models.TaskRelationDraft, pk=self.kwargs['task_relation_draft_id'])
return task_relation_draft.related_task_relation_blueprint.all()
else:
return models.TaskRelationBlueprint.objects.all()
class TaskTypeViewSet(LOFARViewSet):
queryset = models.TaskType.objects.all()
serializer_class = serializers.TaskTypeSerializer
class PriorityQueueTypeViewSet(LOFARViewSet):
queryset = models.PriorityQueueType.objects.all()
serializer_class = serializers.PriorityQueueTypeSerializer