Skip to content
Snippets Groups Projects
Select Git revision
  • fb9d9b204c3a798044ae5170d2a20afa3b2d972e
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

feedback_handling.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    tmss_http_rest_client.py 25.96 KiB
    import logging
    import time
    
    logger = logging.getLogger(__name__)
    
    import requests
    from http.client import responses
    import os
    import json
    from datetime import datetime, timedelta
    from lofar.common.datetimeutils import formatDatetime
    from lofar.common.dbcredentials import DBCredentials
    
    
    # usage example:
    #
    # with TMSSsession('paulus', 'pauluspass', 'localhost', 8000) as tmsssession:
    #    response = tmsssession.session.get(url='http://localhost/api/task_draft/')
    #    print(response)
    
    
    #TODO: add unittests!
    class TMSSsession(object):
    
        OPENID = "openid"
        BASICAUTH = "basicauth"
    
        def __init__(self, username, password, host, port: int=8000, authentication_method=OPENID):
            self.session = requests.session()
            self.username = username
            self.password = password
            self.host_url = "http://%s:%d" % (host, port)
            self.api_url = "%s/api" % (self.host_url,)
            self.authentication_method = authentication_method
    
        @staticmethod
        def create_from_dbcreds_for_ldap(dbcreds_name: str=None) -> 'TMSSsession':
            '''Factory method to create a TMSSSession object which uses the credentials in the ~/.lofar/dbcredentials/<dbcreds_name>.ini file
               (mis)use the DBCredentials to get a url with user/pass for tmss
               the contents below are used to contruct a url like this: http://localhost:8000/api
               [database:TMSS]
                host=localhost
                user=<username>
                password=<password>
                type=http
                port=8000
             '''
            if dbcreds_name is None:
                dbcreds_name = os.environ.get("TMSS_CLIENT_DBCREDENTIALS", "TMSSClient")
    
            dbcreds = DBCredentials().get(dbcreds_name)
            return TMSSsession.create_from_dbcreds(dbcreds)
    
        @staticmethod
        def create_from_dbcreds(dbcreds: DBCredentials) -> 'TMSSsession':
            '''Factory method to create a TMSSSession object which uses the credentials in the dbcreds object.
            See also: create_from_dbcreds_for_ldap
             '''
            return TMSSsession(username=dbcreds.user, password=dbcreds.password,
                               host=dbcreds.host,
                               port=dbcreds.port,
                               authentication_method=TMSSsession.BASICAUTH)
    
        @staticmethod
        def check_connection(dbcreds_name: str=None):
            '''Open a connection to TMSS using the credentials for the given dbcreds_name
            raises if no connection possible'''
            with TMSSsession.create_from_dbcreds_for_ldap(dbcreds_name) as client:
                try:
                    client.get_path_as_string("")
                    logger.info("Http REST login to TMSS working on %s for user '%s'", client.api_url, client.username)
                except:
                    raise ConnectionError("Cannot connect to TMSS REST API %s for user '%s'" % (client.api_url, client.username))
    
        @staticmethod
        def check_connection_and_exit_on_error(dbcreds_name: str=None):
            '''Open a connection to TMSS using the credentials for the given dbcreds_name
            exit(1) upon ConnectionError'''
            try:
                TMSSsession.check_connection(dbcreds_name)
            except Exception as e:
                logger.error(e)
                exit(1)
    
        def __enter__(self):
            self.open()
    
            # return the request session for use within the context
            return self
    
        def __exit__(self, type, value, traceback):
            self.close()
    
        def open(self):
            '''open the request session and login'''
            self.session.__enter__()
            self.session.verify = False
            self.session.headers['Accept'] = 'application/json'
    
            if self.authentication_method == self.OPENID:
                # get authentication page of OIDC through TMSS redirect
                response = self.session.get(self.api_url.replace('/api', '/oidc/authenticate/'), allow_redirects=True)
                csrftoken = self.session.cookies['csrftoken']
    
                # post user credentials to login page, also pass csrf token
                data = {'username': self.username, 'password': self.password, 'csrfmiddlewaretoken': csrftoken}
                response = self.session.post(url=response.url, data=data, allow_redirects=True)
    
                # raise when sth went wrong
                if "The username and/or password you specified are not correct" in response.content.decode('utf8'):
                    raise ValueError("The username and/or password you specified are not correct")
                if response.status_code != 200:
                    raise ConnectionError(response.content.decode('utf8'))
    
            if self.authentication_method == self.BASICAUTH:
                self.session.auth = (self.username, self.password)
    
        def close(self):
            '''close the request session and logout'''
            try:
                # logout user
                self.session.get(self.api_url + '/logout/', allow_redirects=True)
                self.session.close()
            except:
                pass
    
        def set_subtask_status(self, subtask_id: int, status: str) -> {}:
            '''set the status for the given subtask, and return the subtask with its new state, or raise on error'''
            json_doc = {'state': "%s/subtask_state/%s/" % (self.api_url, status)}
            if status == 'finishing' or status == 'cancelling':
                json_doc['stop_time'] = datetime.utcnow().isoformat()
            logger.info("updating subtask id=%s status to '%s'", subtask_id, status)
            response = self.session.patch(url='%s/subtask/%s/' % (self.api_url, subtask_id),
                                          json=json_doc)
    
            if response.status_code >= 200 and response.status_code < 300:
                return json.loads(response.content.decode('utf-8'))
    
            content = response.content.decode('utf-8')
            raise Exception("Could not set status with url %s - %s %s - %s" % (response.request.url, response.status_code, responses.get(response.status_code), content))
    
        def get_subtask_parset(self, subtask_id) -> str:
            '''get the lofar parameterset (as text) for the given subtask'''
            result = self.session.get(url=self.get_full_url_for_path('/subtask/%s/parset' % (subtask_id,)),
                                      headers={'Accept': 'text/plain'})
            if result.status_code >= 200 and result.status_code < 300:
                return result.content.decode('utf-8')
            raise Exception("Could not get parameterset for subtask %s.\nResponse: %s" % (subtask_id, result))
    
        def get_subtask_predecessors(self, subtask_id: int, state: str=None) -> list:
            '''get the subtask's predecessors as list of dict for the given subtask'''
    
            clauses = {}
            if state is not None:
                clauses["state__value"] = state
    
            path = 'subtask/%s/predecessors' % (subtask_id,)
            return self.get_path_as_json_object(path, clauses)
    
        def get_subtask_successors(self, subtask_id: int, state: str=None) -> list:
            '''get the subtask's successors as list of dict for the given subtask'''
            clauses = {}
            if state is not None:
                clauses["state__value"] = state
    
            path = 'subtask/%s/successors' % (subtask_id,)
            return self.get_path_as_json_object(path, clauses)
    
        def get_subtask(self, subtask_id: int) -> dict:
            '''get the subtask as dict for the given subtask'''
            path = 'subtask/%s' % (subtask_id,)
            return self.get_path_as_json_object(path)
    
        def get_subtasks(self, state: str=None,
                         cluster: str=None,
                         start_time_less_then: datetime=None, start_time_greater_then: datetime=None,
                         stop_time_less_then: datetime = None, stop_time_greater_then: datetime = None) -> list:
            '''get subtasks (as list of dicts) filtered by the given parameters'''
            clauses = {}
            if state is not None:
                clauses["state__value"] = state
            if cluster is not None:
                clauses["cluster__name"] = cluster
            if start_time_less_then is not None:
                clauses["start_time__lt="] = formatDatetime(start_time_less_then)
            if start_time_greater_then is not None:
                clauses["start_time__gt"] = formatDatetime(start_time_greater_then)
            if stop_time_less_then is not None:
                clauses["stop_time__lt"] = formatDatetime(stop_time_less_then)
            if stop_time_greater_then is not None:
                clauses["stop_time__gt"] = formatDatetime(stop_time_greater_then)
    
            return self.get_path_as_json_object("subtask", clauses)
    
        def get_full_url_for_path(self, path: str) -> str:
            '''get the full URL for the given path'''
            return '%s/%s' % (self.api_url, path.strip('/'))
    
        def get_path_as_json_object(self, path: str, params={}) -> object:
            '''get resource at the given path, interpret it as json, and return it as as native object (usually a dict or a list of dicts)'''
            return self.get_url_as_json_object(self.get_full_url_for_path(path=path), params=params)
    
        def get_path_as_string(self, path: str, params={}) -> str:
            '''get resource at the given path, and return it as as plain text'''
            return self.get_url_as_string(self.get_full_url_for_path(path=path), params=params)
    
        def get_url_as_string(self, full_url: str, params={}) -> str:
            '''get resource at the given full url (including http://<base_url>, interpret it as json, and return it as as plain text'''
            response = self.session.get(url=full_url, params=params, timeout=100000)
            logger.info("%s %s %s in %.1fms%s on %s", response.request.method.upper(), response.status_code, responses.get(response.status_code),
                                                      response.elapsed.total_seconds()*1000, ' SLOW!' if response.elapsed > timedelta(seconds=1) else '',
                                                      response.request.url)
    
            if response.status_code >= 200 and response.status_code < 300:
                result = response.content.decode('utf-8')
                return result
    
            # ugly error message parsing
            content = response.text
            try:
                error_msg = content.split('\n')[1] # magic! error message is at 2nd line of response...
            except:
                error_msg= content
    
            raise Exception("Could not get %s - %s %s - %s" % (full_url, response.status_code, responses.get(response.status_code), error_msg))
    
        def get_url_as_json_object(self, full_url: str, params={}) -> object:
            '''get resource at the given full url (including http://<base_url>, interpret it as json, and return it as as native object (usually a dict or a list of dicts)'''
            result = self.get_url_as_string(full_url, params)
            result = json.loads(result)
            if isinstance(result, dict):
                result_object = result.get('results', result) # return the 'results' list if any, or else just the object itself
    
                if result.get('next'):
                    # recurse, get the 'next' url, and return a concatenation of the results
                    return result_object + self.get_url_as_json_object(result['next'])
                return result_object
            return result
    
        def post_to_url_and_get_result_as_as_string(self, full_url: str, json_data:dict=None) -> str:
            '''post to the given full_url including http://<base_url>, and return the response as plain text
            Try to post, automatically retry 3 times with 10sec interval upon failure.
            '''
            ATTEMPT_COUNT = 3
            RETRY_INTERVAL = 10
            for attempt_nr in range(ATTEMPT_COUNT):
                response = self.session.post(url=full_url, timeout=100000, json=json_data)
                logger.info("%s %s %s in %.1fms%s on %s", response.request.method.upper(), response.status_code, responses.get(response.status_code),
                                                          response.elapsed.total_seconds()*1000, ' SLOW!' if response.elapsed > timedelta(seconds=1) else '',
                                                          response.request.url)
    
                if response.status_code >= 200 and response.status_code < 300:
                    result = response.content.decode('utf-8')
                    return result
    
                if attempt_nr < ATTEMPT_COUNT:
                    time.sleep(RETRY_INTERVAL)
    
            # ugly error message parsing
            content = response.text
            try:
                error_msg = content.split('\n')[1] # magic! error message is at 2nd line of response...
            except:
                error_msg= content
    
            raise Exception("Could not post to %s - %s %s - %s" % (full_url, response.status_code, responses.get(response.status_code), error_msg))
    
        def post_to_url_and_get_result_as_json_object(self, full_url: str, json_data:dict=None) -> object:
            '''post to the given full_url (including http://<base_url>), and return the response as native object (usually a dict or a list of dicts)'''
            result = self.post_to_url_and_get_result_as_as_string(full_url, json_data=json_data)
            return json.loads(result)
    
        def post_to_path_and_get_result_as_json_object(self, path: str, json_data:dict=None) -> object:
            '''post to the given path, and return the response as native object (usually a dict or a list of dicts)'''
            return self.post_to_url_and_get_result_as_json_object(self.get_full_url_for_path(path=path), json_data=json_data)
    
        def _get_template(self, template_type_name: str, name: str, version: int=None) -> dict:
            '''get the template of the given type as dict for the given name (and version)'''
            clauses = {}
            if name is not None:
                clauses["name"] = name
            if version is not None:
                clauses["version"] = version
            result = self.get_path_as_json_object(template_type_name, clauses)
            if isinstance(result, list):
                if len(result) > 1:
                    raise ValueError("Found more then one %s for clauses: %s" % (template_type_name, clauses))
                elif len(result) == 1:
                    return result[0]
                return None
            return result
    
        def get_schedulingunit_template(self, name: str, version: int=None) -> dict:
            '''get the schedulingunit_template as dict for the given name (and version)'''
            return self._get_template('scheduling_unit_template', name, version)
    
        def get_task_template(self, name: str, version: int=None) -> dict:
            '''get the task_template as dict for the given name (and version)'''
            return self._get_template('task_template', name, version)
    
        def get_subtask_template(self, name: str, version: int=None) -> dict:
            '''get the subtask_template as dict for the given name (and version)'''
            return self._get_template('subtask_template', name, version)
    
        def get_schedulingunit_template_default_specification(self, name: str, version: int=None) -> dict:
            template = self.get_schedulingunit_template(name=name, version=version)
            return self.get_url_as_json_object(template['url']+"/default")
    
        def get_task_template_default_specification(self, name: str, version: int=None) -> dict:
            template = self.get_task_template(name=name, version=version)
            return self.get_url_as_json_object(template['url']+"/default")
    
        def get_subtask_template_default_specification(self, name: str, version: int=None) -> dict:
            template = self.get_subtask_template(name=name, version=version)
            return self.get_url_as_json_object(template['url']+"/default")
    
        def get_subtask_output_dataproducts(self, subtask_id: int) -> []:
            '''get the output dataproducts of the subtask with the given subtask_id'''
            return self.get_path_as_json_object('subtask/%s/output_dataproducts' % subtask_id)
    
        def get_subtask_input_dataproducts(self, subtask_id: int) -> []:
            '''get the input dataproducts of the subtask with the given subtask_id'''
            return self.get_path_as_json_object('subtask/%s/input_dataproducts' % subtask_id)
    
        def get_dataproduct_SIP(self, dataproduct_id: int) -> str:
            '''get the SIP for the dataproduct with the given dataproduct_id as an XML string'''
            return self.get_path_as_string('dataproduct/%s/sip' % dataproduct_id)
    
        def get_subtask_transformed_output_dataproduct(self, subtask_id: int, input_dataproduct_id: int) -> {}:
            '''get the transformed output dataproduct of the subtask with the given subtask_id and input_dataproduct_id'''
            return self.get_path_as_json_object('subtask/%s/transformed_output_dataproduct?input_dataproduct_id=%s' % (subtask_id, input_dataproduct_id))
    
        def post_dataproduct_archive_information(self, dataproduct_id: int, storage_ticket: str,
                                                 srm_url: str, file_size: int,
                                                 md5_checksum: str = None, adler32_checksum: str = None) -> {}:
            json_data={ 'storage_ticket': storage_ticket,
                        'srm_url': srm_url,
                        'file_size': file_size }
            if md5_checksum:
                json_data['md5_checksum'] = md5_checksum
            if adler32_checksum:
                json_data['adler32_checksum'] = adler32_checksum
    
            response = self.session.post(url=self.get_full_url_for_path('dataproduct/%s/post_archive_information' % (dataproduct_id,)), json=json_data)
            logger.info("post_dataproduct_archive_information: json_doc: %s response: %s", json_data, response.text)
            if response.status_code == 201:
                logger.info("created new template: %s", json.loads(response.text)['url'])
    
        def specify_observation_task(self, task_id: int) -> requests.Response:
            """specify observation for the given draft task by just doing a REST API call """
            result = self.session.get(url=self.get_full_url_for_path('/task/%s/specify_observation' % (task_id,)))
            if result.status_code >= 200 and result.status_code < 300:
                return result.content.decode('utf-8')
            raise Exception("Could not specify observation for task %s.\nResponse: %s" % (task_id, result))
    
        def schedule_subtask(self, subtask_id: int, start_time: datetime=None) -> {}:
            """schedule the subtask for the given subtask_id at the given start_time. If start_time==None then already (pre)set start_time is used.
            returns the scheduled subtask upon success, or raises."""
            if start_time is not None:
                self.session.patch(self.get_full_url_for_path('subtask/%s' % subtask_id), {'start_time': datetime.utcnow()})
            return self.post_to_path_and_get_result_as_json_object('subtask/%s/schedule' % subtask_id)
    
        def unschedule_subtask(self, subtask_id: int) -> {}:
            """unschedule the subtask for the given subtask_id.
            returns the unscheduled subtask upon success, or raises."""
            return self.post_to_path_and_get_result_as_json_object('subtask/%s/unschedule' % subtask_id)
    
        def cancel_subtask(self, subtask_id: int) -> {}:
            """cancel the subtask for the given subtask_id, either preventing it to start, or to kill it while running.
            returns the cancelled subtask upon success, or raises."""
            return self.post_to_path_and_get_result_as_json_object('subtask/%s/cancel' % subtask_id)
    
        def cancel_task_blueprint(self, task_blueprint_id: int) -> {}:
            """cancel the task_blueprint for the given task_blueprint_id, either preventing it to start, or to kill it while running.
            returns the cancelled task_blueprint upon success, or raises."""
            return self.post_to_path_and_get_result_as_json_object('task_blueprint/%s/cancel' % task_blueprint_id)
    
        def cancel_scheduling_unit_blueprint(self, scheduling_unit_blueprint_id: int) -> {}:
            """cancel the scheduling_unit_blueprint for the given scheduling_unit_blueprint_id, either preventing it to start, or to kill it while running.
            returns the cancelled scheduling_unit_blueprint upon success, or raises."""
            return self.post_to_path_and_get_result_as_json_object('scheduling_unit_blueprint/%s/cancel' % scheduling_unit_blueprint_id)
    
        def create_blueprints_and_subtasks_from_scheduling_unit_draft(self, scheduling_unit_draft_id: int) -> {}:
            """create a scheduling_unit_blueprint, its specified taskblueprints and subtasks for the given scheduling_unit_draft_id.
            returns the scheduled subtask upon success, or raises."""
            return self.post_to_path_and_get_result_as_json_object('scheduling_unit_draft/%s/create_blueprints_and_subtasks' % scheduling_unit_draft_id)
    
        def create_scheduling_unit_draft_from_strategy_template(self, scheduling_unit_observing_strategy_template_id: int, parent_scheduling_set_id: int) -> {}:
            """create a scheduling_unit_blueprint, its specified taskblueprints and subtasks for the given scheduling_unit_draft_id.
            returns the created scheduling_unit_draft upon success, or raises."""
            return self.post_to_path_and_get_result_as_json_object('scheduling_unit_observing_strategy_template/%s/create_scheduling_unit?scheduling_set_id=%s' % (scheduling_unit_observing_strategy_template_id, parent_scheduling_set_id))
    
        def get_schedulingunit_draft(self, scheduling_unit_draft_id: str, extended: bool=True) -> dict:
            '''get the schedulingunit_draft as dict for the given scheduling_unit_draft_id. When extended==True then you get the full scheduling_unit,task,subtask tree.'''
            return self.get_path_as_json_object('scheduling_unit_draft%s/%s' % ('_extended' if extended else '', scheduling_unit_draft_id))
    
        def get_schedulingunit_blueprint(self, scheduling_unit_blueprint_id: str, extended: bool=True) -> dict:
            '''get the schedulingunit_blueprint as dict for the given scheduling_unit_blueprint_id. When extended==True then you get the full scheduling_unit,task,subtask tree.'''
            return self.get_path_as_json_object('scheduling_unit_blueprint%s/%s' % ('_extended' if extended else '', scheduling_unit_blueprint_id))
    
        def get_subtask_progress(self, subtask_id: int) -> {}:
            """get the progress [0.0, 1.0] of a running subtask.
            returns a dict with the 'id' and 'progress', or raises."""
            return self.get_path_as_json_object('subtask/%s/get_progress' % subtask_id)
    
        def get_subtasks_in_same_scheduling_unit(self, subtask: dict) -> []:
            """get all subtasks in the same scheduling_unit for the given subtask.
            returns a list of subtask-dicts upon success, or raises."""
            task_blueprint = self.get_url_as_json_object(subtask['task_blueprint'])
            scheduling_unit_blueprint = self.get_url_as_json_object(task_blueprint['scheduling_unit_blueprint'])
            subtasks = self.get_url_as_json_object(full_url=scheduling_unit_blueprint['url'].rstrip('/') + '/subtasks')
            return subtasks
    
        def get_setting(self, setting_name: str) -> {}:
            """get the value of a TMSS setting.
            returns the setting value upon success, or raises."""
            response = self.session.get(url=self.get_full_url_for_path('/setting/%s/' % (setting_name,)),
                                        params={'format': 'json'})
    
            if response.status_code >= 200 and response.status_code < 300:
                return json.loads(response.content.decode('utf-8'))['value']
    
            content = response.content.decode('utf-8')
            raise Exception("Could not get setting with name %s.\nResponse: %s" % (setting_name, content))
    
        def set_setting(self, setting_name: str, setting_value: bool) -> {}:
            """set a value for a TMSS setting.
            returns the setting value upon success, or raises."""
            response = self.session.patch(url=self.get_full_url_for_path('/setting/%s/' % (setting_name,)),
                                          json={'value': setting_value})
    
            if response.status_code >= 200 and response.status_code < 300:
                return json.loads(response.content.decode('utf-8'))['value']
    
            content = response.content.decode('utf-8')
            raise Exception("Could not set status with url %s - %s %s - %s" % (response.request.url, response.status_code, responses.get(response.status_code), content))
    
        def post_template(self, template_path:str, name: str, description: str, version: int, schema: str=None, template: str=None, **kwargs):
            '''POST a template at <BASE_URL>/<template_path> with the given name, description and version'''
            json_data = {'name': name,
                         'description': description,
                         'version': version}
            if schema is not None:
                json_data['schema'] = json.loads(schema) if isinstance(schema, str) else schema
            if template is not None:
                json_data['template'] = json.loads(template) if isinstance(template, str) else template
            json_data.update(**kwargs)
    
            response = self.session.post(url=self.get_full_url_for_path(template_path), json=json_data)
            if response.status_code == 201:
                logger.info("created new template with name=%s: %s", name, json.loads(response.text)['url'])
            else:
                raise Exception("Could not POST template with name=%s: %s" % (name,response.text))
    
        def process_feedback_and_set_to_finished_if_complete(self, subtask_id: int, feedback: str) -> {}:
            '''Process the feedback_doc (which can be for one or more or all dataproducts), store/append it in the subtask's raw_feedback, and process it into json feedback per dataproduct. Sets the subtask to finished if all dataproducts are processed, which may require multiple postings of partial feedback docs.
            Return the updated subtask, or raise an error'''
            response = self.session.post(url=self.get_full_url_for_path('/subtask/%s/process_feedback_and_set_to_finished_if_complete' % (subtask_id,)),
                                         data=feedback)
    
            if response.status_code >= 200 and response.status_code < 300:
                return json.loads(response.content.decode('utf-8'))
    
            content = response.content.decode('utf-8')
            raise Exception("Could not process feedback with url %s - %s %s - %s" % (
            response.request.url, response.status_code, responses.get(response.status_code), content))
    
        def reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete(self, subtask_id) -> {}:
            '''Reprocess the raw_feedback in the subtask into json feedback per dataproduct. Sets the subtask to finished if all dataproducts are processed.
            Return the updated subtask, or raise an error'''
            return self.get_path_as_json_object('/subtask/%s/reprocess_raw_feedback_for_subtask_and_set_to_finished_if_complete' % (subtask_id,))