diff --git a/SAS/TMSS/client/lib/tmss_http_rest_client.py b/SAS/TMSS/client/lib/tmss_http_rest_client.py index a23c4ba587d35ef7fea7f4ac2aadbe038bf9c2f6..61dcd49d1408a5d025f84ada66507022c62aceb6 100644 --- a/SAS/TMSS/client/lib/tmss_http_rest_client.py +++ b/SAS/TMSS/client/lib/tmss_http_rest_client.py @@ -103,8 +103,10 @@ class TMSSsession(object): def set_subtask_status(self, subtask_id: int, status: str) -> {}: '''set the status for the given subtask, and return the subtask with its new state, or raise on error''' json_doc = {'state': "%s/subtask_state/%s/" % (self.base_url, status)} - if status == 'finishing': + if status == 'finishing' or status == 'cancelling': json_doc['stop_time'] = datetime.utcnow().isoformat() + if status == 'cancelling': + json_doc['do_cancel'] = json_doc['stop_time'] response = self.session.patch(url='%s/subtask/%s/' % (self.base_url, subtask_id), json=json_doc, @@ -244,14 +246,38 @@ class TMSSsession(object): template = self.get_subtask_template(name=name, version=version) return self.get_url_as_json_object(template['url']+"/default") - def get_subtask_output_dataproducts(self, subtask_id: int) -> []: + def get_subtask_output_dataproducts(self, subtask_id: int) -> []: '''get the output dataproducts of the subtask with the given subtask_id''' return self.get_path_as_json_object('subtask/%s/output_dataproducts' % subtask_id) - def get_subtask_input_dataproducts(self, subtask_id: int) -> []: + def get_subtask_input_dataproducts(self, subtask_id: int) -> []: '''get the input dataproducts of the subtask with the given subtask_id''' return self.get_path_as_json_object('subtask/%s/input_dataproducts' % subtask_id) + def get_dataproduct_SIP(self, dataproduct_id: int) -> str: + '''get the SIP for the dataproduct with the given dataproduct_id as an XML string''' + return self.get_path_as_json_object('dataproduct/%s/sip' % dataproduct_id) + + def get_subtask_transformed_output_dataproduct(self, subtask_id: int, input_dataproduct_id: int) -> {}: + '''get the transformed output dataproduct of the subtask with the given subtask_id and input_dataproduct_id''' + return self.get_path_as_json_object('subtask/%s/transformed_output_dataproduct?input_dataproduct_id=%s' % (subtask_id, input_dataproduct_id)) + + def post_dataproduct_archive_information(self, dataproduct_id: int, storage_ticket: str, + srm_url: str, file_size: int, + md5_checksum: str = None, adler32_checksum: str = None) -> {}: + json_data={ 'storage_ticket': storage_ticket, + 'srm_url': srm_url, + 'file_size': file_size } + if md5_checksum: + json_data['md5_checksum'] = md5_checksum + if adler32_checksum: + json_data['adler32_checksum'] = adler32_checksum + + response = self.session.post(url=self.get_full_url_for_path('dataproduct/%s/process_archive_information' % (dataproduct_id,)), json=json_data) + logger.info("post_dataproduct_archive_information: json_doc: %s response: %s", json_data, response.text) + if response.status_code == 201: + logger.info("created new template: %s", json.loads(response.text)['url']) + def specify_observation_task(self, task_id: int) -> requests.Response: """specify observation for the given draft task by just doing a REST API call """ result = self.session.get(url=self.get_full_url_for_path('/task/%s/specify_observation' % (task_id,))) @@ -269,6 +295,11 @@ class TMSSsession(object): returns the scheduled subtask upon success, or raises.""" return self.get_path_as_json_object('subtask/%s/schedule' % subtask_id) + def get_subtask_progress(self, subtask_id: int) -> {}: + """get the progress [0.0, 1.0] of a running subtask. + returns a dict with the 'id' and 'progress', or raises.""" + return self.get_path_as_json_object('subtask/%s/get_progress' % subtask_id) + def get_setting(self, setting_name: str) -> {}: """get the value of a TMSS setting. returns the setting value upon success, or raises.""" diff --git a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py index 4930b0a7887f41dbdde9ddca6b8e4bbb6217d630..38f26a66ab3160d2d16a438bd125e639bd0c5b0a 100644 --- a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py @@ -225,6 +225,41 @@ class Subtask(BasicCommon): ''' return Dataproduct.objects.filter(producer__subtask_id=self.id) + def get_transformed_output_dataproduct(self, input_dataproduct_id: int) -> 'Dataproduct': + '''return the transformed output dataproduct for the given input_dataproduct_id.''' + return self.output_dataproducts.get(producers__input_id=input_dataproduct_id) + + @property + def progress(self) -> float: + '''Get the progress of this subtask ranging from 0.0 before it is started, up to 1.0 when finished.''' + if self.state.value in [SubtaskState.Choices.DEFINING.value, SubtaskState.Choices.DEFINING.value, + SubtaskState.Choices.SCHEDULING.value, SubtaskState.Choices.SCHEDULED.value, + SubtaskState.Choices.QUEUEING.value, SubtaskState.Choices.QUEUED.value, + SubtaskState.Choices.STARTING.value]: + return 0.0 + + if self.state.value == SubtaskState.Choices.FINISHED.value: + return 1.0 + + if self.state.value in [SubtaskState.Choices.STARTED.value, SubtaskState.Choices.FINISHING.value]: + # subtask is running, compute progress if possible. + if self.specifications_template.type.value == SubtaskType.Choices.INGEST.value: + # progress for an ingest subtask is the ratio of archived output dataproducts over the total + num_archived_dataproducts = self.output_dataproducts.filter(archive_info__isnull=False, hash__isnull=False).distinct('id').count() + num_dataproducts = self.output_dataproducts.count() + return float(num_archived_dataproducts) / float(num_dataproducts) + + if self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: + # progress for an observation is just how far we are into the duration + num_seconds_running = max(0, (datetime.utcnow() - self.start_time).total_seconds()) + return min(1.0, float(num_seconds_running) / float(self.duration.total_seconds())) + + # TODO: add more progress computations for more subtask types if possible + + raise NotImplementedError("Could not get progress for subtask id=%s, type=%s state=%s" % (self.id, + self.specifications_template.type.value, + self.state)) + def save(self, force_insert=False, force_update=False, using=None, update_fields=None): creating = self._state.adding # True on create, False on update diff --git a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py index bcd3eaf22671451c5d005e36c178c56f66b1c0f3..ef961ea3995769576ddb385080363e82a9fb5aec 100644 --- a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py @@ -14,6 +14,7 @@ from rest_framework.filters import OrderingFilter from drf_yasg import openapi from drf_yasg.utils import swagger_auto_schema from drf_yasg.inspectors import SwaggerAutoSchema +from drf_yasg.openapi import Parameter from rest_framework.decorators import action from django.http import HttpResponse, JsonResponse, HttpResponseRedirect, HttpResponseNotFound @@ -29,11 +30,12 @@ from lofar.common.json_utils import get_default_json_object_for_schema from lofar.common.datetimeutils import formatDatetime from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset from drf_yasg.renderers import _SpecRenderer - +import json from lofar.sas.tmss.tmss.tmssapp.renderers import PlainTextRenderer from rest_framework.views import APIView from rest_framework.decorators import api_view, renderer_classes +from django.core.exceptions import ObjectDoesNotExist class TextPlainAutoSchema(SwaggerAutoSchema): @@ -267,6 +269,26 @@ class SubtaskViewSet(LOFARViewSet): serializer = serializers.DataproductSerializer(dataproducts, many=True, context={'request': request}) return RestResponse(serializer.data) + + @swagger_auto_schema(responses={200: 'The transformed output dataproduct of this subtask for the given input_dataproduct_id.', + 403: 'forbidden'}, + operation_description='Getthe transformed output dataproducts for the given input_dataproduct_id.', + manual_parameters=[Parameter(name='input_dataproduct_id', required=True, type='integer', in_='query', + description="the id of the input dataproduct for which you want to get the transformed output dataproduct")]) + @action(methods=['get'], detail=True, url_name='transformed_output_dataproduct') + def transformed_output_dataproduct(self, request, pk=None): + '''return the transformed output dataproduct for the given input_dataproduct_id.''' + subtask = get_object_or_404(models.Subtask, pk=pk) + input_dataproduct_id = request.query_params['input_dataproduct_id'] + try: + output_dataproduct = subtask.get_transformed_output_dataproduct(input_dataproduct_id) + except models.Dataproduct.DoesNotExist: + return HttpResponseNotFound('Cannot find transformed output dataproduct for subtask id=%s and input_dataproduct_id=%s.' % (pk, input_dataproduct_id)) + + serializer = serializers.DataproductSerializer(output_dataproduct, many=False, context={'request': request}) + return RestResponse(serializer.data) + + @swagger_auto_schema(responses={200: 'The finished version of this subtask.', 403: 'forbidden', 500: 'The feedback of this subtask could not be processed'}, @@ -279,6 +301,15 @@ class SubtaskViewSet(LOFARViewSet): serializer = self.get_serializer(finished_subtask) return RestResponse(serializer.data) + @swagger_auto_schema(responses={200: 'Get progress of this subtask ranging from 0.0 when just started up to 1.0 when finished.', + 403: 'forbidden', + 500: 'Could not compute the progress'}, + operation_description="Get progress of this subtask ranging from 0.0 when just started up to 1.0 when finished.") + @action(methods=['get'], detail=True, url_name='get_progress') + def get_progress(self, request, pk=None): + subtask = get_object_or_404(models.Subtask, pk=pk) + return JsonResponse({'id': subtask.id, 'progress': subtask.progress}) + class SubtaskNestedViewSet(LOFARNestedViewSet): queryset = models.Subtask.objects.all() @@ -329,6 +360,35 @@ class DataproductViewSet(LOFARViewSet): from lofar.sas.tmss.tmss.tmssapp.adapters.sip import generate_sip_for_dataproduct return HttpResponse(visualizer.visualize_sip(generate_sip_for_dataproduct(dataproduct)), content_type='image/svg+xml') + @swagger_auto_schema(responses={200: 'The finished version of this subtask.', + 403: 'forbidden', + 500: 'The feedback of this subtask could not be processed'}, + operation_description="Generate feedback_doc of subtask output dataproducts from the subtask raw_feedback and set subtask state to finished.") + @action(methods=['post'], detail=True, url_name='process_archive_information') + def process_archive_information(self, request, pk=None): + dataproduct = get_object_or_404(models.Dataproduct, pk=pk) + json_doc = json.loads(request.body.decode('utf-8')) + + dataproduct.size = int(json_doc['file_size']) + dataproduct.directory, dataproduct.filename = json_doc['srm_url'].rsplit('/', maxsplit=1) + + if 'storage_ticket' in json_doc: + models.DataproductArchiveInfo.objects.create(dataproduct=dataproduct, storage_ticket=json_doc['storage_ticket']) + + if 'md5_checksum' in json_doc: + models.DataproductHash.objects.create(dataproduct=dataproduct, + algorithm=models.Algorithm.objects.get(value=models.Algorithm.Choices.MD5.value), + hash=json_doc['md5_checksum']) + + if 'adler32_checksum' in json_doc: + models.DataproductHash.objects.create(dataproduct=dataproduct, + algorithm=models.Algorithm.objects.get(value=models.Algorithm.Choices.ADLER32.value), + hash=json_doc['adler32_checksum']) + + dataproduct.save() + serializer = self.get_serializer(dataproduct) + return RestResponse(serializer.data) + class AntennaSetViewSet(LOFARViewSet): queryset = models.AntennaSet.objects.all()