Skip to content
Snippets Groups Projects
Commit 9661180e authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-320: added methods to get/compute the progress of a subtask,...

TMSS-320: added methods to get/compute the progress of a subtask, process_archive_information for a dataproduct, and to find the transformed_output_dataproduct for a given input dataproduct
parent f8d1bf11
No related branches found
No related tags found
2 merge requests!308Resolve TMSS-495,!306Resolve TMSS-320
...@@ -103,8 +103,10 @@ class TMSSsession(object): ...@@ -103,8 +103,10 @@ class TMSSsession(object):
def set_subtask_status(self, subtask_id: int, status: str) -> {}: def set_subtask_status(self, subtask_id: int, status: str) -> {}:
'''set the status for the given subtask, and return the subtask with its new state, or raise on error''' '''set the status for the given subtask, and return the subtask with its new state, or raise on error'''
json_doc = {'state': "%s/subtask_state/%s/" % (self.base_url, status)} json_doc = {'state': "%s/subtask_state/%s/" % (self.base_url, status)}
if status == 'finishing': if status == 'finishing' or status == 'cancelling':
json_doc['stop_time'] = datetime.utcnow().isoformat() json_doc['stop_time'] = datetime.utcnow().isoformat()
if status == 'cancelling':
json_doc['do_cancel'] = json_doc['stop_time']
response = self.session.patch(url='%s/subtask/%s/' % (self.base_url, subtask_id), response = self.session.patch(url='%s/subtask/%s/' % (self.base_url, subtask_id),
json=json_doc, json=json_doc,
...@@ -244,14 +246,38 @@ class TMSSsession(object): ...@@ -244,14 +246,38 @@ class TMSSsession(object):
template = self.get_subtask_template(name=name, version=version) template = self.get_subtask_template(name=name, version=version)
return self.get_url_as_json_object(template['url']+"/default") return self.get_url_as_json_object(template['url']+"/default")
def get_subtask_output_dataproducts(self, subtask_id: int) -> []: def get_subtask_output_dataproducts(self, subtask_id: int) -> []:
'''get the output dataproducts of the subtask with the given subtask_id''' '''get the output dataproducts of the subtask with the given subtask_id'''
return self.get_path_as_json_object('subtask/%s/output_dataproducts' % subtask_id) return self.get_path_as_json_object('subtask/%s/output_dataproducts' % subtask_id)
def get_subtask_input_dataproducts(self, subtask_id: int) -> []: def get_subtask_input_dataproducts(self, subtask_id: int) -> []:
'''get the input dataproducts of the subtask with the given subtask_id''' '''get the input dataproducts of the subtask with the given subtask_id'''
return self.get_path_as_json_object('subtask/%s/input_dataproducts' % subtask_id) return self.get_path_as_json_object('subtask/%s/input_dataproducts' % subtask_id)
def get_dataproduct_SIP(self, dataproduct_id: int) -> str:
'''get the SIP for the dataproduct with the given dataproduct_id as an XML string'''
return self.get_path_as_json_object('dataproduct/%s/sip' % dataproduct_id)
def get_subtask_transformed_output_dataproduct(self, subtask_id: int, input_dataproduct_id: int) -> {}:
'''get the transformed output dataproduct of the subtask with the given subtask_id and input_dataproduct_id'''
return self.get_path_as_json_object('subtask/%s/transformed_output_dataproduct?input_dataproduct_id=%s' % (subtask_id, input_dataproduct_id))
def post_dataproduct_archive_information(self, dataproduct_id: int, storage_ticket: str,
srm_url: str, file_size: int,
md5_checksum: str = None, adler32_checksum: str = None) -> {}:
json_data={ 'storage_ticket': storage_ticket,
'srm_url': srm_url,
'file_size': file_size }
if md5_checksum:
json_data['md5_checksum'] = md5_checksum
if adler32_checksum:
json_data['adler32_checksum'] = adler32_checksum
response = self.session.post(url=self.get_full_url_for_path('dataproduct/%s/process_archive_information' % (dataproduct_id,)), json=json_data)
logger.info("post_dataproduct_archive_information: json_doc: %s response: %s", json_data, response.text)
if response.status_code == 201:
logger.info("created new template: %s", json.loads(response.text)['url'])
def specify_observation_task(self, task_id: int) -> requests.Response: def specify_observation_task(self, task_id: int) -> requests.Response:
"""specify observation for the given draft task by just doing a REST API call """ """specify observation for the given draft task by just doing a REST API call """
result = self.session.get(url=self.get_full_url_for_path('/task/%s/specify_observation' % (task_id,))) result = self.session.get(url=self.get_full_url_for_path('/task/%s/specify_observation' % (task_id,)))
...@@ -269,6 +295,11 @@ class TMSSsession(object): ...@@ -269,6 +295,11 @@ class TMSSsession(object):
returns the scheduled subtask upon success, or raises.""" returns the scheduled subtask upon success, or raises."""
return self.get_path_as_json_object('subtask/%s/schedule' % subtask_id) return self.get_path_as_json_object('subtask/%s/schedule' % subtask_id)
def get_subtask_progress(self, subtask_id: int) -> {}:
"""get the progress [0.0, 1.0] of a running subtask.
returns a dict with the 'id' and 'progress', or raises."""
return self.get_path_as_json_object('subtask/%s/get_progress' % subtask_id)
def get_setting(self, setting_name: str) -> {}: def get_setting(self, setting_name: str) -> {}:
"""get the value of a TMSS setting. """get the value of a TMSS setting.
returns the setting value upon success, or raises.""" returns the setting value upon success, or raises."""
......
...@@ -225,6 +225,41 @@ class Subtask(BasicCommon): ...@@ -225,6 +225,41 @@ class Subtask(BasicCommon):
''' '''
return Dataproduct.objects.filter(producer__subtask_id=self.id) return Dataproduct.objects.filter(producer__subtask_id=self.id)
def get_transformed_output_dataproduct(self, input_dataproduct_id: int) -> 'Dataproduct':
'''return the transformed output dataproduct for the given input_dataproduct_id.'''
return self.output_dataproducts.get(producers__input_id=input_dataproduct_id)
@property
def progress(self) -> float:
'''Get the progress of this subtask ranging from 0.0 before it is started, up to 1.0 when finished.'''
if self.state.value in [SubtaskState.Choices.DEFINING.value, SubtaskState.Choices.DEFINING.value,
SubtaskState.Choices.SCHEDULING.value, SubtaskState.Choices.SCHEDULED.value,
SubtaskState.Choices.QUEUEING.value, SubtaskState.Choices.QUEUED.value,
SubtaskState.Choices.STARTING.value]:
return 0.0
if self.state.value == SubtaskState.Choices.FINISHED.value:
return 1.0
if self.state.value in [SubtaskState.Choices.STARTED.value, SubtaskState.Choices.FINISHING.value]:
# subtask is running, compute progress if possible.
if self.specifications_template.type.value == SubtaskType.Choices.INGEST.value:
# progress for an ingest subtask is the ratio of archived output dataproducts over the total
num_archived_dataproducts = self.output_dataproducts.filter(archive_info__isnull=False, hash__isnull=False).distinct('id').count()
num_dataproducts = self.output_dataproducts.count()
return float(num_archived_dataproducts) / float(num_dataproducts)
if self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
# progress for an observation is just how far we are into the duration
num_seconds_running = max(0, (datetime.utcnow() - self.start_time).total_seconds())
return min(1.0, float(num_seconds_running) / float(self.duration.total_seconds()))
# TODO: add more progress computations for more subtask types if possible
raise NotImplementedError("Could not get progress for subtask id=%s, type=%s state=%s" % (self.id,
self.specifications_template.type.value,
self.state))
def save(self, force_insert=False, force_update=False, using=None, update_fields=None): def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
creating = self._state.adding # True on create, False on update creating = self._state.adding # True on create, False on update
......
...@@ -14,6 +14,7 @@ from rest_framework.filters import OrderingFilter ...@@ -14,6 +14,7 @@ from rest_framework.filters import OrderingFilter
from drf_yasg import openapi from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema from drf_yasg.utils import swagger_auto_schema
from drf_yasg.inspectors import SwaggerAutoSchema from drf_yasg.inspectors import SwaggerAutoSchema
from drf_yasg.openapi import Parameter
from rest_framework.decorators import action from rest_framework.decorators import action
from django.http import HttpResponse, JsonResponse, HttpResponseRedirect, HttpResponseNotFound from django.http import HttpResponse, JsonResponse, HttpResponseRedirect, HttpResponseNotFound
...@@ -29,11 +30,12 @@ from lofar.common.json_utils import get_default_json_object_for_schema ...@@ -29,11 +30,12 @@ from lofar.common.json_utils import get_default_json_object_for_schema
from lofar.common.datetimeutils import formatDatetime from lofar.common.datetimeutils import formatDatetime
from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset
from drf_yasg.renderers import _SpecRenderer from drf_yasg.renderers import _SpecRenderer
import json
from lofar.sas.tmss.tmss.tmssapp.renderers import PlainTextRenderer from lofar.sas.tmss.tmss.tmssapp.renderers import PlainTextRenderer
from rest_framework.views import APIView from rest_framework.views import APIView
from rest_framework.decorators import api_view, renderer_classes from rest_framework.decorators import api_view, renderer_classes
from django.core.exceptions import ObjectDoesNotExist
class TextPlainAutoSchema(SwaggerAutoSchema): class TextPlainAutoSchema(SwaggerAutoSchema):
...@@ -267,6 +269,26 @@ class SubtaskViewSet(LOFARViewSet): ...@@ -267,6 +269,26 @@ class SubtaskViewSet(LOFARViewSet):
serializer = serializers.DataproductSerializer(dataproducts, many=True, context={'request': request}) serializer = serializers.DataproductSerializer(dataproducts, many=True, context={'request': request})
return RestResponse(serializer.data) return RestResponse(serializer.data)
@swagger_auto_schema(responses={200: 'The transformed output dataproduct of this subtask for the given input_dataproduct_id.',
403: 'forbidden'},
operation_description='Getthe transformed output dataproducts for the given input_dataproduct_id.',
manual_parameters=[Parameter(name='input_dataproduct_id', required=True, type='integer', in_='query',
description="the id of the input dataproduct for which you want to get the transformed output dataproduct")])
@action(methods=['get'], detail=True, url_name='transformed_output_dataproduct')
def transformed_output_dataproduct(self, request, pk=None):
'''return the transformed output dataproduct for the given input_dataproduct_id.'''
subtask = get_object_or_404(models.Subtask, pk=pk)
input_dataproduct_id = request.query_params['input_dataproduct_id']
try:
output_dataproduct = subtask.get_transformed_output_dataproduct(input_dataproduct_id)
except models.Dataproduct.DoesNotExist:
return HttpResponseNotFound('Cannot find transformed output dataproduct for subtask id=%s and input_dataproduct_id=%s.' % (pk, input_dataproduct_id))
serializer = serializers.DataproductSerializer(output_dataproduct, many=False, context={'request': request})
return RestResponse(serializer.data)
@swagger_auto_schema(responses={200: 'The finished version of this subtask.', @swagger_auto_schema(responses={200: 'The finished version of this subtask.',
403: 'forbidden', 403: 'forbidden',
500: 'The feedback of this subtask could not be processed'}, 500: 'The feedback of this subtask could not be processed'},
...@@ -279,6 +301,15 @@ class SubtaskViewSet(LOFARViewSet): ...@@ -279,6 +301,15 @@ class SubtaskViewSet(LOFARViewSet):
serializer = self.get_serializer(finished_subtask) serializer = self.get_serializer(finished_subtask)
return RestResponse(serializer.data) return RestResponse(serializer.data)
@swagger_auto_schema(responses={200: 'Get progress of this subtask ranging from 0.0 when just started up to 1.0 when finished.',
403: 'forbidden',
500: 'Could not compute the progress'},
operation_description="Get progress of this subtask ranging from 0.0 when just started up to 1.0 when finished.")
@action(methods=['get'], detail=True, url_name='get_progress')
def get_progress(self, request, pk=None):
subtask = get_object_or_404(models.Subtask, pk=pk)
return JsonResponse({'id': subtask.id, 'progress': subtask.progress})
class SubtaskNestedViewSet(LOFARNestedViewSet): class SubtaskNestedViewSet(LOFARNestedViewSet):
queryset = models.Subtask.objects.all() queryset = models.Subtask.objects.all()
...@@ -329,6 +360,35 @@ class DataproductViewSet(LOFARViewSet): ...@@ -329,6 +360,35 @@ class DataproductViewSet(LOFARViewSet):
from lofar.sas.tmss.tmss.tmssapp.adapters.sip import generate_sip_for_dataproduct from lofar.sas.tmss.tmss.tmssapp.adapters.sip import generate_sip_for_dataproduct
return HttpResponse(visualizer.visualize_sip(generate_sip_for_dataproduct(dataproduct)), content_type='image/svg+xml') return HttpResponse(visualizer.visualize_sip(generate_sip_for_dataproduct(dataproduct)), content_type='image/svg+xml')
@swagger_auto_schema(responses={200: 'The finished version of this subtask.',
403: 'forbidden',
500: 'The feedback of this subtask could not be processed'},
operation_description="Generate feedback_doc of subtask output dataproducts from the subtask raw_feedback and set subtask state to finished.")
@action(methods=['post'], detail=True, url_name='process_archive_information')
def process_archive_information(self, request, pk=None):
dataproduct = get_object_or_404(models.Dataproduct, pk=pk)
json_doc = json.loads(request.body.decode('utf-8'))
dataproduct.size = int(json_doc['file_size'])
dataproduct.directory, dataproduct.filename = json_doc['srm_url'].rsplit('/', maxsplit=1)
if 'storage_ticket' in json_doc:
models.DataproductArchiveInfo.objects.create(dataproduct=dataproduct, storage_ticket=json_doc['storage_ticket'])
if 'md5_checksum' in json_doc:
models.DataproductHash.objects.create(dataproduct=dataproduct,
algorithm=models.Algorithm.objects.get(value=models.Algorithm.Choices.MD5.value),
hash=json_doc['md5_checksum'])
if 'adler32_checksum' in json_doc:
models.DataproductHash.objects.create(dataproduct=dataproduct,
algorithm=models.Algorithm.objects.get(value=models.Algorithm.Choices.ADLER32.value),
hash=json_doc['adler32_checksum'])
dataproduct.save()
serializer = self.get_serializer(dataproduct)
return RestResponse(serializer.data)
class AntennaSetViewSet(LOFARViewSet): class AntennaSetViewSet(LOFARViewSet):
queryset = models.AntennaSet.objects.all() queryset = models.AntennaSet.objects.all()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment