Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tmss_http_rest_client.py 58.13 KiB
import logging
import time
import typing

logger = logging.getLogger(__name__)

# see https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
# and https://stackoverflow.com/questions/27981545/suppress-insecurerequestwarning-unverified-https-request-is-being-made-in-pytho
import urllib3
import requests
if requests.__version__ >= '2.16.0':
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
else:
    try:
        from requests.packages.urllib3.exceptions import InsecureRequestWarning
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    except ImportError:
        pass

from http.client import responses
import os
try:
    # try to use the fast ujson module, else plain json
    import ujson as json
except ImportError:
    import json
from datetime import datetime, timedelta
import html
from urllib.parse import quote
from typing import Union
import socket
from threading import RLock

# usage example:
#
# with TMSSsession('paulus', 'pauluspass', 'localhost', 8000) as tmsssession:
#    response = tmsssession.session.get(url='http://localhost/api/task_draft/')
#    print(response)


#TODO: add unittests!
class TMSSsession(object):

    OPENID = "openid"
    BASICAUTH = "basicauth"

    DEFAULT_RETRY_COUNT = 0 # default number of retries (excluding the first normal attempt)

    def __init__(self, username, password, host, port: int=8000, authentication_method=OPENID):
        self.session = requests.session()
        self.username = username
        self.password = password
        self.host_url = "https://%s" % (host,) if port==443 else "http://%s:%s" % (host, port)
        self.api_url = "%s/api" % (self.host_url,)
        self.authentication_method = authentication_method
        self._lock = RLock()

    @staticmethod
    def create_from_dbcreds_for_ldap(dbcreds_name: str=None) -> 'TMSSsession':
        '''Factory method to create a TMSSSession object which uses the credentials in the ~/.lofar/dbcredentials/<dbcreds_name>.ini file
           (mis)use the DBCredentials to get a url with user/pass for tmss
           the contents below are used to contruct a url like this: http://localhost:8000/api
           [database:TMSS]
            host=localhost
            user=<username>
            password=<password>
            type=http
            port=8000
         '''
        if dbcreds_name is None:
            dbcreds_name = os.environ.get("TMSS_CLIENT_DBCREDENTIALS", "TMSSClient")

        from lofar.common.dbcredentials import DBCredentials
        dbcreds = DBCredentials().get(dbcreds_name)
        return TMSSsession.create_from_dbcreds(dbcreds)

    @staticmethod
    def create_from_dbcreds(dbcreds: 'DBCredentials') -> 'TMSSsession':
        '''Factory method to create a TMSSSession object which uses the credentials in the dbcreds object.
        See also: create_from_dbcreds_for_ldap
         '''
        return TMSSsession(username=dbcreds.user, password=dbcreds.password,
                           host=dbcreds.host,
                           port=dbcreds.port,
                           authentication_method=TMSSsession.BASICAUTH)

    @staticmethod
    def check_connection(dbcreds_name: str=None):
        '''Open a connection to TMSS using the credentials for the given dbcreds_name
        raises if no connection possible'''
        with TMSSsession.create_from_dbcreds_for_ldap(dbcreds_name) as client:
            try:
                client.get_path_as_string("")
                logger.info("Http REST login to TMSS working on %s for user '%s'", client.api_url, client.username)
            except:
                raise ConnectionError("Cannot connect to TMSS REST API %s for user '%s'" % (client.api_url, client.username))

    @staticmethod
    def check_connection_and_exit_on_error(dbcreds_name: str=None):
        '''Open a connection to TMSS using the credentials for the given dbcreds_name
        exit(1) upon ConnectionError'''
        try:
            TMSSsession.check_connection(dbcreds_name)
        except Exception as e:
            logger.error(e)
            exit(1)

    def __enter__(self):
        self.open()

        # return the request session for use within the context
        return self

    def __exit__(self, type, value, traceback):
        self.close()

    def open(self):
        '''open the request session and login'''
        with self._lock:
            self.session.__enter__()
            self.session.verify = False
            self.session.headers['Accept'] = 'application/json'
            self.session.headers['Accept-Encoding'] = 'gzip'

            if self.authentication_method == self.OPENID:
                # get authentication page of OIDC through TMSS redirect
                response = self.session.get(self.api_url.replace('/api', '/oidc/authenticate/'), allow_redirects=True)
                for resp in response.history:
                    logger.info("via %s: %s" % (resp.status_code, resp.url))
                logger.info("got %s: %s" % (response.status_code, response.url))
                # post user credentials to login page, also pass csrf token if present
                if 'csrftoken' in self.session.cookies:
                    # Mozilla OIDC provider
                    csrftoken = self.session.cookies['csrftoken']
                    data = {'username': self.username, 'password': self.password, 'csrfmiddlewaretoken': csrftoken}
                    response = self.session.post(url=response.url, data=data, allow_redirects=True)
                else:
                    # Keycloak
                    content = response.content.decode('utf-8')
                    if 'action' not in content:
                        raise Exception('Could not determine login form action from server response: %s' % content)
                    action = content.split('action="')[1].split('"')[0]
                    data = {'username': self.username, 'password': self.password, 'credentialId': ''}
                    response = self.session.post(url=html.unescape(action), data=data, allow_redirects=True)

                for resp in response.history:
                    logger.info("via %s: %s" % (resp.status_code, resp.url))
                logger.info("got %s: %s" % (response.status_code, response.url))

                # raise when sth went wrong
                if "The username and/or password you specified are not correct" in response.content.decode('utf8') \
                        or "Invalid username or password" in response.content.decode('utf8'):
                    raise ValueError("The username and/or password you specified are not correct")
                if response.status_code != 200:
                    raise ConnectionError(response.content.decode('utf8'))

            if self.authentication_method == self.BASICAUTH:
                self.session.auth = (self.username, self.password)

    def close(self):
        '''close the request session and logout'''
        with self._lock:
            try:
                # logout user
                self.session.post(self.api_url + '/logout/', allow_redirects=True)
                self.session.close()
            except:
                pass

    def do_request_and_get_result_as_string(self, method: str, full_url: str, json_data: dict=None, params: dict=None, retry_count: int=DEFAULT_RETRY_COUNT, headers: dict=None) -> str:
        '''do a GET/PUT/POST/PATCH/HEAD/OPTIONS request to the given full_url including http://<base_url>, and return the response as plain text
        Upon recoverable error(s), the request is retried <retry_count> times with increasing intervals.
        '''
        attempt_count = retry_count+1
        retry_interval = 1  # start with 1 second wait time between retry attempts
        method = method.upper()
        if method == 'GET':
            request_method = self.session.get
        elif method == 'POST':
            request_method = self.session.post
        elif method == 'PUT':
            request_method = self.session.put
        elif method == 'PATCH':
            request_method = self.session.patch
        elif method == 'HEAD':
            request_method = self.session.head
        elif method == 'OPTIONS':
            request_method = self.session.options

        if 'csrftoken' in self.session.cookies:
            if headers is None:
                headers = dict()
            headers.update({'Referer': self.host_url, 'X-CSRFToken': self.session.cookies['csrftoken']})

        for attempt_nr in range(attempt_count):
            with self._lock:
                response = request_method(url=full_url, timeout=100000, json=json_data, params=params, headers=headers)
            self._log_response(response)

            if response.status_code >= 200 and response.status_code < 300:
                result = response.content.decode('utf-8')
                return result

            if response.status_code == 401:
                logger.warning("Unauthorized to %s %s - did you login?", method, full_url)

            if response.status_code in (404, 408, 409, 410, 502, 503, 504):
                # retry recoverable error
                if attempt_nr < retry_count:
                    logger.info("Waiting %.1f seconds before retrying %s %s", retry_interval, method, full_url)
                    time.sleep(retry_interval)
                    retry_interval *= 2  # wait longer after more retries (allow the server to recover/reboot/reduce_load/etc)
            else:
                # do not retry
                break

        # ugly error message parsing
        content = response.text
        try:
            error_msg = content.split('\n')[1] # magic! error message is at 2nd line of response...
        except:
            error_msg= content

        raise Exception("Could not %s %s - %s %s - %s" % (method, full_url, response.status_code, responses.get(response.status_code), error_msg))

    def do_request_and_get_result_as_json_object(self, method: str, full_url: str, json_data: dict=None, params: dict=None, retry_count: int=DEFAULT_RETRY_COUNT) -> dict:
        '''do a GET/PUT/POST/PATCH/HEAD/OPTIONS request to the given full_url including http://<base_url>, and return the response as a json object
        Upon recoverable error(s), the request is retried <retry_count> times with increasing intervals.
        '''
        return json.loads(self.do_request_and_get_result_as_string(method=method, full_url=full_url, json_data=json_data, params=params, retry_count=retry_count))


    def set_subtask_status(self, subtask_id: int, status: str, error_reason=None, retry_count: int=DEFAULT_RETRY_COUNT) -> {}:
        '''set the status for the given subtask, and return the subtask with its new state, or raise on error'''
        json_doc = {'state': "%s/subtask_state/%s/" % (self.api_url, status)}
        if status == 'finishing' or status == 'cancelling':
            json_doc['actual_process_stop_time'] = datetime.utcnow().isoformat()
        if status == 'error':
            hostname = socket.gethostname()
            json_doc['error_reason'] = error_reason if error_reason is not None else f"set to error via REST client on host '{hostname}'"
        logger.info("updating subtask id=%s status to '%s'", subtask_id, status)
        return self.do_request_and_get_result_as_json_object('PATCH',
                                                             '%s/subtask/%s/' % (self.api_url, subtask_id),
                                                             json_data=json_doc,
                                                             retry_count=retry_count)


    def wait_for_subtask_status(self, subtask_id: int, expected_status: typing.Union[str,list,tuple], timeout: int=10, poll_interval: float=1.0) -> dict:
        """wait for the subtask with the given id to have/get the expected status.
        If the expected_status is a string, then wait until the subtask has exactly this expected status (or times out)
        If the expected_status is a list/tuple of strings, then wait until the subtask has exactly one of the expected statuses (or times out)
        Returns the subtask once it has the expected_status.
        """
        start_wait_timestamp = datetime.utcnow()
        expected_statuses = (expected_status,) if isinstance(expected_status, str) else expected_status
        while (datetime.utcnow() - start_wait_timestamp).total_seconds() < timeout:
            start_fetch_timestamp = datetime.utcnow()
            subtask = self.get_subtask(subtask_id)

            if subtask['state_value'] in expected_statuses:
                return subtask

            # wait a little...
            fetch_elapsed = (datetime.utcnow() - start_fetch_timestamp).total_seconds()
            time.sleep(max(0, poll_interval-fetch_elapsed))
            # loop, try again

        raise TimeoutError("Timeout while waiting for subtask id=%s to get expected status '%s'. Current status: '%s'" % (subtask_id, expected_status, subtask['state_value']))


    def get_subtask_parset(self, subtask_id, retry_count: int=DEFAULT_RETRY_COUNT) -> str:
        '''get the lofar parameterset (as text) for the given subtask'''
        return self.do_request_and_get_result_as_string('GET',
                                                        full_url=self.get_full_url_for_path('/subtask/%s/parset' % (subtask_id,)),
                                                        headers={'Accept': 'text/plain'},
                                                        retry_count=retry_count)

    def get_subtask_predecessors(self, subtask_id: int, state: str=None) -> list:
        '''get the subtask's predecessors as list of dict for the given subtask'''
        clauses = {}
        if state is not None:
            clauses["state__value"] = state

        path = 'subtask/%s/predecessors' % (subtask_id,)
        return self.get_path_as_json_object(path, clauses)

    def get_subtask_successors(self, subtask_id: int, state: str=None) -> list:
        '''get the subtask's successors as list of dict for the given subtask'''
        clauses = {}
        if state is not None:
            clauses["state__value"] = state

        path = 'subtask/%s/successors' % (subtask_id,)
        return self.get_path_as_json_object(path, clauses)

    def get_subtask(self, subtask_id: int) -> dict:
        '''get the subtask as dict for the given subtask'''
        path = 'subtask/%s' % (subtask_id,)
        return self.get_path_as_json_object(path)

    def get_subtasks(self, state: str=None, subtask_type: str=None,
                     cluster: str=None,
                     scheduled_start_time_less_then: datetime=None, scheduled_start_time_greater_then: datetime=None,
                     scheduled_stop_time_less_then: datetime = None, scheduled_stop_time_greater_then: datetime = None,
                     is_using_lofar2_stations: bool=None,
                     project: str=None) -> list:
        '''get subtasks (as list of dicts) filtered by the given parameters'''
        clauses = {}
        if state is not None:
            clauses["state__value"] = state
        if subtask_type is not None:
            clauses["subtask_type"] = subtask_type
        if cluster is not None:
            clauses["cluster__name"] = cluster
        if scheduled_start_time_less_then is not None:
            clauses["scheduled_start_time__lt="] = scheduled_start_time_less_then.isoformat()
        if scheduled_start_time_greater_then is not None:
            clauses["scheduled_start_time__gt"] = scheduled_start_time_greater_then.isoformat()
        if scheduled_stop_time_less_then is not None:
            clauses["scheduled_stop_time__lt"] = scheduled_stop_time_less_then.isoformat()
        if scheduled_stop_time_greater_then is not None:
            clauses["scheduled_stop_time__gt"] = scheduled_stop_time_greater_then.isoformat()
        if is_using_lofar2_stations is not None:
            clauses["is_using_lofar2_stations"] = str(is_using_lofar2_stations)
        if project is not None:
            clauses["project"] = project

        return self.get_path_as_json_object("subtask", params=clauses)

    def get_full_url_for_path(self, path: str) -> str:
        '''get the full URL for the given path'''
        return '%s/%s' % (self.api_url, quote(path.strip('/'), safe='/:=?-_&'))

    def get_path_as_json_object(self, path: str, params={}, json_data=None, retry_count: int=DEFAULT_RETRY_COUNT) -> object:
        '''get resource at the given path, interpret it as json, and return it as as native object (usually a dict or a list of dicts)'''
        full_url = self.get_full_url_for_path(path=path)
        return self.get_url_as_json_object(full_url, params=params, json_data=json_data, retry_count=retry_count)

    def get_path_as_string(self, path: str, params={}, json_data=None) -> str:
        '''get resource at the given path, and return it as as plain text'''
        full_url = self.get_full_url_for_path(path=path)
        return self.get_url_as_string(full_url, params=params, json_data=json_data)

    def get_url_as_string(self, full_url: str, params={}, json_data=None) -> str:
        '''get resource at the given full url (including http://<base_url>, interpret it as json, and return it as as plain text'''
        return self.do_request_and_get_result_as_string('GET', full_url, params=params, json_data=json_data)
        
    def get_url_as_json_object(self, full_url: str, params={}, json_data=None, retry_count: int=DEFAULT_RETRY_COUNT) -> object:
        '''get resource at the given full url (including http://<base_url>, interpret it as json, and return it as as native object (usually a dict or a list of dicts)
        Automatically fetch all results, even if paginated (resulting in more requests)'''
        result = self.do_request_and_get_result_as_json_object('GET', full_url, params=params, json_data=json_data, retry_count=retry_count)

        if isinstance(result, dict):
            result_object = result.get('results', result) # return the 'results' list if any, or else just the object itself

            if result.get('next'):
                # recurse, get the 'next' url, and return a concatenation of the results
                return result_object + self.get_url_as_json_object(result['next'])
            return result_object
        return result

    def post_to_url_and_get_result_as_string(self, full_url: str, json_data:dict=None, retry_count: int=DEFAULT_RETRY_COUNT) -> str:
        '''post to the given full_url including http://<base_url>, and return the response as plain text
        '''
        return self.do_request_and_get_result_as_string('POST', full_url, json_data=json_data, retry_count=retry_count)

    def post_to_url_and_get_result_as_json_object(self, full_url: str, json_data:dict=None, retry_count: int=DEFAULT_RETRY_COUNT) -> object:
        '''post to the given full_url (including http://<base_url>), and return the response as native object (usually a dict or a list of dicts)'''
        result = self.post_to_url_and_get_result_as_string(full_url, json_data=json_data, retry_count=retry_count)
        return json.loads(result)

    def post_to_path_and_get_result_as_json_object(self, path: str, json_data:dict=None, retry_count: int=DEFAULT_RETRY_COUNT) -> object:
        '''post to the given path, and return the response as native object (usually a dict or a list of dicts)'''
        return self.post_to_url_and_get_result_as_json_object(self.get_full_url_for_path(path=path), json_data=json_data, retry_count=retry_count)

    def _log_response(self, response: requests.Response):
        logger.log(level=logging.INFO if response.status_code >= 200 and response.status_code < 300 else logging.WARNING,
                   msg="%s %s %s in %.1fms%s on %s %s" % (response.request.method.upper(), response.status_code, responses.get(response.status_code),
                                                          response.elapsed.total_seconds()*1000, ' SLOW!' if response.elapsed > timedelta(seconds=1) else '',
                                                          response.request.url,
                                                          "" if response.status_code >= 200 and response.status_code < 300 else response.text))

    def _get_schema_template(self, template_type_name: str, name: str, version: int=None) -> dict:
        '''get the schema template of the given type as dict for the given name (and version)'''
        clauses = {}
        if name is not None:
            clauses["name"] = name
        if version is not None:
            clauses["version"] = version
        else:
            # try to determine the latest version
            if name is not None:
                try:
                    schema = self.get_path_as_json_object('/schemas/%s/%s/latest' % (template_type_name.replace('_', ''), name), retry_count=1)
                    if schema and 'version' in schema:
                        clauses["version"] = schema['version']
                except:
                    # could not determine latest version
                    pass

        result = self.get_path_as_json_object(template_type_name, clauses)
        if isinstance(result, list):
            if len(result) > 1:
                raise ValueError("Found more then one %s for clauses: %s" % (template_type_name, clauses))
            elif len(result) == 1:
                return result[0]
            raise ValueError("Could not find any template of type='%s' and clauses='%s'" % (template_type_name, clauses))
        return result

    def get_schedulingunit_template(self, name: str, version: int=None) -> dict:
        '''get the schedulingunit_template as dict for the given name (and version)'''
        return self._get_schema_template('scheduling_unit_template', name, version)

    def get_task_template(self, name: str, version: int=None) -> dict:
        '''get the task_template as dict for the given name (and version)'''
        return self._get_schema_template('task_template', name, version)

    def get_subtask_template(self, name: str, version: int=None) -> dict:
        '''get the subtask_template as dict for the given name (and version)'''
        return self._get_schema_template('subtask_template', name, version)

    def get_scheduling_constraints_template(self, name: str='constraints', version: int=None) -> dict:
        '''get the scheduling_constraints_template as dict for the given name (and version)'''
        return self._get_schema_template('scheduling_constraints_template', name, version)

    def get_schedulingunit_template_default_specification(self, name: str, version: int=None) -> dict:
        template = self.get_schedulingunit_template(name=name, version=version)
        return self.get_url_as_json_object(template['url']+"/default")

    def get_task_template_default_specification(self, name: str, version: int=None) -> dict:
        template = self.get_task_template(name=name, version=version)
        return self.get_url_as_json_object(template['url']+"/default")

    def get_subtask_template_default_specification(self, name: str, version: int=None) -> dict:
        template = self.get_subtask_template(name=name, version=version)
        return self.get_url_as_json_object(template['url']+"/default")

    def get_dataproduct_specifications_template(self, name: str, version: int=None) -> dict:
        '''get the subtask_template as dict for the given name (and version)'''
        return self._get_schema_template('dataproduct_specifications_template', name, version)

    def get_dataproduct_specifications_template_default_specification(self, name: str, version: int=None) -> dict:
        '''get the subtask_template as dict for the given name (and version)'''
        template = self.get_dataproduct_specifications_template(name=name, version=version)
        return self.get_url_as_json_object(template['url']+"/default")

    def get_subtask_output_dataproducts(self, subtask_id: int) -> []:
        '''get the output dataproducts of the subtask with the given subtask_id'''
        return self.get_path_as_json_object('subtask/%s/output_dataproducts' % subtask_id)

    def mark_output_dataproducts_as_deleted(self, subtask_id: int) -> []:
        '''mark the output dataproducts of the subtask with the given subtask_id as deleted'''
        return self.post_to_path_and_get_result_as_json_object('subtask/%s/mark_output_dataproducts_as_deleted' % subtask_id)

    def get_subtask_input_dataproducts(self, subtask_id: int) -> []:
        '''get the input dataproducts of the subtask with the given subtask_id'''
        return self.get_path_as_json_object('subtask/%s/input_dataproducts' % subtask_id)

    def get_dataproduct_SIP(self, dataproduct_id: int, retry_count: int=3) -> str:
        '''get the SIP for the dataproduct with the given dataproduct_id as an XML string'''
        full_url = self.get_full_url_for_path(path='dataproduct/%s/sip' % dataproduct_id)
        return self.do_request_and_get_result_as_string('GET', full_url, retry_count=retry_count)

    def get_subtask_transformed_output_dataproduct(self, subtask_id: int, input_dataproduct_id: int, retry_count: int=DEFAULT_RETRY_COUNT) -> {}:
        '''get the transformed output dataproduct of the subtask with the given subtask_id and input_dataproduct_id'''
        return self.get_path_as_json_object('subtask/%s/transformed_output_dataproduct?input_dataproduct_id=%s' % (subtask_id, input_dataproduct_id), retry_count==retry_count)

    def post_dataproduct_archive_information(self, dataproduct_id: int, storage_ticket: str,
                                             srm_url: str, file_size: int,
                                             md5_checksum: str = None, adler32_checksum: str = None,
                                             retry_count: int=DEFAULT_RETRY_COUNT) -> {}:
        json_data={ 'storage_ticket': storage_ticket,
                    'srm_url': srm_url,
                    'file_size': file_size }
        if md5_checksum:
            json_data['md5_checksum'] = md5_checksum
        if adler32_checksum:
            json_data['adler32_checksum'] = adler32_checksum

        result = self.post_to_url_and_get_result_as_json_object(self.get_full_url_for_path('dataproduct/%s/post_archive_information' % (dataproduct_id,)),
                                                                json_data=json_data,
                                                                retry_count=retry_count)
        logger.info("post_dataproduct_archive_information: json_doc: %s response: %s", json_data, result)

    def delete_dataproduct_archive_information(self, dataproduct_id: int, retry_count: int=DEFAULT_RETRY_COUNT) -> {}:
        return self.post_to_url_and_get_result_as_json_object(self.get_full_url_for_path('dataproduct/%s/delete_archive_information' % (dataproduct_id,)),
                                                              retry_count=retry_count)

    def schedule_subtask(self, subtask_id: int, scheduled_start_time: Union[str, datetime]=None, retry_count: int=DEFAULT_RETRY_COUNT) -> {}:
        """schedule the subtask for the given subtask_id at the given scheduled_start_time. If scheduled_start_time==None then already (pre)set scheduled_start_time is used.
        returns the scheduled subtask upon success, or raises."""
        if scheduled_start_time is not None:
            if isinstance(scheduled_start_time, datetime):
                scheduled_start_time = scheduled_start_time.isoformat()
            self.do_request_and_get_result_as_json_object('PATCH',
                                                          self.get_full_url_for_path('subtask/%s' % subtask_id),
                                                          json_data={'scheduled_start_time': scheduled_start_time},
                                                          retry_count=retry_count)
        return self.post_to_path_and_get_result_as_json_object('subtask/%s/schedule' % (subtask_id),
                                                               retry_count=retry_count)

    def unschedule_subtask(self, subtask_id: int, retry_count: int=0) -> {}:
        """unschedule the subtask for the given subtask_id.
        returns the unscheduled subtask upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('subtask/%s/unschedule' % (subtask_id),
                                                               retry_count=retry_count)

    def cancel_subtask(self, subtask_id: int, retry_count: int=0) -> {}:
        """cancel the subtask for the given subtask_id, either preventing it to start, or to kill it while running.
        returns the cancelled subtask upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('subtask/%s/cancel' % (subtask_id),
                                                               retry_count=retry_count)

    def cancel_task_blueprint(self, task_blueprint_id: int, retry_count: int=0) -> {}:
        """cancel the task_blueprint for the given task_blueprint_id, either preventing it to start, or to kill it while running.
        returns the cancelled task_blueprint upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('task_blueprint/%s/cancel' % (task_blueprint_id),
                                                               retry_count=retry_count)

    def cancel_scheduling_unit_blueprint(self, scheduling_unit_blueprint_id: int, retry_count: int=0) -> {}:
        """cancel the scheduling_unit_blueprint for the given scheduling_unit_blueprint_id, either preventing it to start, or to kill it while running.
        returns the cancelled scheduling_unit_blueprint upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_blueprint/%s/cancel' % (scheduling_unit_blueprint_id),
                                                               retry_count=retry_count)

    def mark_subtask_as_obsolete(self, subtask_id: int, retry_count: int=0) -> {}:
        """mark the subtask for the given subtask_id as obsolete.
        returns the marked_as_obsolete subtask upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('subtask/%s/mark_as_obsolete' % (subtask_id),
                                                               retry_count=retry_count)

    def mark_task_blueprint_as_obsolete(self, task_blueprint_id: int, retry_count: int=0) -> {}:
        """mark the task_blueprint for the given task_blueprint_id as obsolete.
        returns the marked_as_obsolete task_blueprint upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('task_blueprint/%s/mark_as_obsolete' % (task_blueprint_id),
                                                               retry_count=retry_count)

    def mark_scheduling_unit_blueprint_as_obsolete(self, scheduling_unit_blueprint_id: int, retry_count: int=0) -> {}:
        """mark the scheduling_unit_blueprint for the given scheduling_unit_blueprint_id as obsolete.
        returns the marked_as_obsolete scheduling_unit_blueprint upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_blueprint/%s/mark_as_obsolete' % (scheduling_unit_blueprint_id),
                                                               retry_count=retry_count)


    def mark_scheduling_unit_fixed_time_scheduled_at_scheduled_starttime(self, scheduling_unit_blueprint_id: int, retry_count: int=0) -> {}:
        """Try to mark this scheduling_unit as fixed_time-scheduled, with the current scheduled_start_time as 'at' constraint.
        returns the scheduling_unit_blueprint with updated constraints upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_blueprint/%s/mark_fixed_time_scheduled_at_scheduled_starttime' % (scheduling_unit_blueprint_id),
                                                               retry_count=retry_count)

    def mark_scheduling_unit_dynamically_scheduled(self, scheduling_unit_blueprint_id: int, retry_count: int=0) -> {}:
        """Try to mark this scheduling_unit as dynamically-scheduled, and let the scheduler find a start_time.
        returns the scheduling_unit_blueprint with updated constraints upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_blueprint/%s/mark_dynamically_scheduled' % (scheduling_unit_blueprint_id),
                                                               retry_count=retry_count)

    def schedule_scheduling_unit(self, scheduling_unit_blueprint_id: int, start_time: datetime, retry_count: int=0) -> {}:
        """Try to schedule this scheduling_unit directly, and also mark it as fixed_time schedule at the given starttime
        returns the scheduled scheduling_unit_blueprint upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_blueprint/%s/schedule' % (scheduling_unit_blueprint_id),
                                                               json_data={'start_time': start_time.strftime('%Y-%m-%dT%H:%M:%S')},
                                                               retry_count=retry_count)


    def unschedule_scheduling_unit(self, scheduling_unit_blueprint_id: int, retry_count: int=0) -> {}:
        """Try to unschedule this scheduling_unit.
        returns the unscheduled scheduling_unit_blueprint upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_blueprint/%s/unschedule' % (scheduling_unit_blueprint_id),
                                                               retry_count=retry_count)


    def update_task_graph_from_specifications_doc(self, scheduling_unit_draft_id: int, specifications_doc: dict, retry_count: int=0) -> {}:
        """create/update the task graph for the given scheduling_unit_draft_id with graph specified in the specifications_doc.
        returns the updated scheduling_unit_draft with its references to the tasks in the graph upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_draft/%s/update_task_graph_from_specifications_doc' % (scheduling_unit_draft_id),
                                                               retry_count=retry_count,
                                                               json_data={'specifications_doc':specifications_doc})

    def validate_template_specifications_doc(self, template_path_or_url: str, specifications_doc: dict) -> {}:
        """do a full validation of the submitted specifications_doc against the schema and validation rules of the given template, both checking the document against its json schema(s), and doing a full 'sanity check' if this document is observable.
        returns a json doc {'valid': true/false, 'message': <description of what is wrong>, 'specifications_doc': <the submitted doc>}"""
        template_path = template_path_or_url.replace(self.api_url,'',1)
        template_url = self.get_full_url_for_path(template_path)
        response = self.do_request_and_get_result_as_json_object('PUT',
                                                                 full_url=template_url.rstrip('/')+'/validate_specifications_doc',
                                                                 json_data={'specifications_doc': specifications_doc})
        return response

    def create_scheduling_unit_blueprint_and_tasks_and_subtasks_tree(self, scheduling_unit_draft_id: int, retry_count: int=0) -> {}:
        """create a scheduling_unit_blueprint, its specified taskblueprints and subtasks for the given scheduling_unit_draft_id.
        returns the created scheduling_unit_blueprint including the created task_blueprints and subtasks upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_draft/%s/create_scheduling_unit_blueprint_and_tasks_and_subtasks_tree' % (scheduling_unit_draft_id),
                                                               retry_count=retry_count)

    def update_task_blueprints_and_subtasks_graph_from_draft(self, scheduling_unit_blueprint_id: int, retry_count: int=0) -> {}:
        """update the task_blueprint graph for the scheduling_unit_blueprint, from the draft graph.
        returns the updated scheduling_unit_blueprint including the created task_blueprints and subtasks upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_blueprint/%s/update_task_blueprints_and_subtasks_graph_from_draft' % (scheduling_unit_blueprint_id),
                                                               retry_count=retry_count)

    def create_scheduling_unit_draft_from_strategy_template(self, scheduling_unit_observing_strategy_template_id: int, parent_scheduling_set_id: int, specifications_doc_overrides: dict=None, name: str=None, priority_queue: str=None, retry_count: int=0) -> {}:
        """create a scheduling_unit_blueprint, its specified taskblueprints and subtasks for the given scheduling_unit_draft_id.
        returns the created scheduling_unit_draft upon success, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_observing_strategy_template/%s/create_scheduling_unit?scheduling_set_id=%s&name=%s&priority_queue=%s' % (scheduling_unit_observing_strategy_template_id,
                                                                                                                                                                                         parent_scheduling_set_id,
                                                                                                                                                                                         name or 'scheduling unit',
                                                                                                                                                                                         priority_queue or 'A'),
                                                               json_data=specifications_doc_overrides,
                                                               retry_count=retry_count)

    def create_scheduling_unit_draft_from_specifications_doc(self, name: str, description: str, scheduling_set_id: int, specifications_doc: dict, retry_count: int=0) -> {}:
        """create a scheduling_unit_draft and instantiate the task graph from the given specifications_doc.
        returns the created scheduling_unit_draft upon success, or raises."""
        scheduling_constraints_template = self.get_scheduling_constraints_template(name=specifications_doc.get('scheduling_constraints_template', {'name': 'constraints'}).get('name'),
                                                                                   version=specifications_doc.get('scheduling_constraints_template', {}).get('version'))
        scheduling_constraints_doc = specifications_doc.get('scheduling_constraints_dec', self.get_url_as_json_object(scheduling_constraints_template['url']+'/default'))

        # assume we use the one and only scheduling_unit_template
        specifications_template_url = self.get_full_url_for_path('/scheduling_unit_template/1')

        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_draft', json_data={"name": name,
                                                                                                   "description": description,
                                                                                                   "scheduling_set": self.get_full_url_for_path('/scheduling_set/%s'%scheduling_set_id),
                                                                                                   "specifications_doc": specifications_doc,
                                                                                                   "specifications_template": specifications_template_url,
                                                                                                   "scheduling_constraints_doc": scheduling_constraints_doc,
                                                                                                   "scheduling_constraints_template": scheduling_constraints_template['url'],
                                                                                                   "scheduling_unit_blueprints": [],
                                                                                                   "task_drafts": [] },
                                                               retry_count=retry_count)

    def copy_scheduling_unit_blueprint_specifications_doc_back_into_draft(self, scheduling_unit_blueprint_id: int, including_copies_for_failed_tasks: bool=False, retry_count: int=0) -> {}:
        """Copy this blueprint's specifications_doc back into the originating draft scheduling unit.
        if including_copies_for_failed_tasks==True then extend the graph specification with copies for all failed tasks and link them.
        returns the scheduling_unit_blueprint (which containts a link to the draft with the updated specifications_doc), or raises."""
        action = 'copy_specifications_doc_including_copies_for_failed_tasks_back_into_draft' if including_copies_for_failed_tasks else 'copy_specifications_doc_back_into_draft'
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_blueprint/%s/%s' % (scheduling_unit_blueprint_id, action), retry_count=retry_count)


    def create_copies_of_failed_tasks_via_draft(self, scheduling_unit_blueprint_id: int, retry_count: int=0) -> {}:
        """Create new runnable copies of the failed task(s) in this scheduling_unit_blueprint via its draft.
        returns the scheduling_unit_blueprint (which contains link(s) to new task_blueprint copies, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_blueprint/%s/create_copies_of_failed_tasks_via_draft' % (scheduling_unit_blueprint_id, ), retry_count=retry_count)


    def copy_scheduling_unit_draft(self, scheduling_unit_draft_id: int, retry_count: int=0) -> {}:
        """Create a copy of the given scheduling_unit_draft including copies of all its task_drafts and relations.
        returns the copied scheduling_unit_draft, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_draft/%s/copy' % (scheduling_unit_draft_id, ), retry_count=retry_count)


    def create_cleanuptask_for_scheduling_unit_blueprint(self, scheduling_unit_blueprint_id: int, retry_count: int=0) -> {}:
        """Create a cleanup task for a given scheduling_unit_blueprint, including task_relations; also creates corresponding drafts.
        returns the copied task_blueprint, or raises."""
        return self.post_to_path_and_get_result_as_json_object('scheduling_unit_blueprint/%s/create_cleanuptask_for_scheduling_unit_blueprint' % (scheduling_unit_blueprint_id, ), retry_count=retry_count)


    def copy_task_draft(self, task_draft_id: int, retry_count: int=0) -> {}:
        """Create a copy of the given task_draft including copies to the new copy of the task_relations and/or task_scheduling_relations.
        returns the copied task_draft, or raises."""
        return self.post_to_path_and_get_result_as_json_object('task_draft/%s/copy' % (task_draft_id, ), retry_count=retry_count)


    def get_scheduling_set(self, scheduling_set_id: str) -> dict:
        '''get the schedulingunit_set as dict for the given scheduling_set_id.'''
        scheduling_set = self.get_path_as_json_object('scheduling_set/%s' % (scheduling_set_id,))
        return scheduling_set

    def get_schedulingunit_draft(self, scheduling_unit_draft_id: str, extended: bool=True, include_specifications_doc: bool=False) -> dict:
        '''get the schedulingunit_draft as dict for the given scheduling_unit_draft_id.
        When extended==True then you get the full scheduling_unit,task,subtask tree.
        When include_specifications_doc==True then an exported/generated spcifications_doc is included representing the task graph and its settings.'''
        scheduling_unit = self.get_path_as_json_object('scheduling_unit_draft%s/%s' % ('_extended' if extended else '', scheduling_unit_draft_id))
        if include_specifications_doc:
            scheduling_unit['specifications_doc'] = self.get_schedulingunit_draft_specifications_doc(scheduling_unit_draft_id)
        return scheduling_unit

    def get_schedulingunit_blueprint(self, scheduling_unit_blueprint_id: str, extended: bool=True, include_specifications_doc: bool=False) -> dict:
        '''get the schedulingunit_blueprint as dict for the given scheduling_unit_blueprint_id.
        When extended==True then you get the full scheduling_unit,task,subtask tree.
        When include_specifications_doc==True then an exported/generated spcifications_doc is included representing the task graph and its settings.'''
        scheduling_unit = self.get_path_as_json_object('scheduling_unit_blueprint%s/%s' % ('_extended' if extended else '', scheduling_unit_blueprint_id))
        if include_specifications_doc:
            scheduling_unit['specifications_doc'] = self.get_schedulingunit_blueprint_specifications_doc(scheduling_unit_blueprint_id)
        return scheduling_unit

    def get_schedulingunit_draft_specifications_doc(self, scheduling_unit_draft_id: str) -> dict:
        '''get the exported/generated specifications_doc representing the task graph and its settings for the given scheduling_unit_draft_id.'''
        return self.get_path_as_json_object('scheduling_unit_draft/%s/specifications_doc' % scheduling_unit_draft_id)

    def get_schedulingunit_blueprint_specifications_doc(self, scheduling_unit_blueprint_id: str) -> dict:
        '''get the exported/generated specifications_doc representing the task graph and its settings for the given scheduling_unit_blueprint_id.'''
        return self.get_path_as_json_object('scheduling_unit_blueprint/%s/specifications_doc' % scheduling_unit_blueprint_id)

    def get_task_blueprint(self, task_blueprint_id: str, extended: bool=False) -> dict:
        '''get the task_blueprint as dict for the given task_blueprint_id.
        When extended==True then you get the full task,subtask tree.'''
        return self.get_path_as_json_object('task_blueprint%s/%s' % ('_extended' if extended else '', task_blueprint_id))

    def get_task_draft(self, task_draft_id: str, extended: bool=False) -> dict:
        '''get the task_draft as dict for the given task_draft_id.
        When extended==True then you get the full task,subtask tree.'''
        return self.get_path_as_json_object('task_draft%s/%s' % ('_extended' if extended else '', task_draft_id))

    def get_subtask_progress(self, subtask_id: int) -> {}:
        """get the progress [0.0, 1.0] of a running subtask.
        returns a dict with the 'id' and 'progress', or raises."""
        return self.get_path_as_json_object('subtask/%s/get_progress' % subtask_id)

    def get_subtasks_in_same_scheduling_unit(self, subtask: dict) -> []:
        """get all subtasks in the same scheduling_unit for the given subtask.
        returns a list of subtask-dicts upon success, or raises."""
        task_blueprint = self.get_url_as_json_object(subtask['task_blueprint'])
        scheduling_unit_blueprint = self.get_url_as_json_object(task_blueprint['scheduling_unit_blueprint'])
        return self.get_url_as_json_object(full_url=scheduling_unit_blueprint['url'].rstrip('/') + '/subtasks')

    def get_subtasks_in_scheduling_unit(self, scheduling_unit_blueprint_id: int) -> []:
        """get all subtasks in the scheduling_unit_blueprint for the given id.
        returns a list of subtask-dicts upon success, or raises."""
        return self.get_url_as_json_object(full_url=self.get_full_url_for_path('/scheduling_unit_blueprint/%s/subtasks' % (scheduling_unit_blueprint_id,)))

    def get_setting(self, setting_name: str) -> {}:
        """get the value of a TMSS setting.
        returns the setting value upon success, or raises."""
        return self.get_url_as_json_object(self.get_full_url_for_path('/setting/%s/' % (setting_name,)))

    def set_setting(self, setting_name: str, setting_value: bool) -> {}:
        """set a value for a TMSS setting.
        returns the setting value upon success, or raises."""
        return self.do_request_and_get_result_as_json_object('PATCH', self.get_full_url_for_path('/setting/%s/' % (setting_name,)))['value']

    def post_template(self, template_path:str, name: str, description: str, version: int, schema: str=None, template: str=None, **kwargs):
        '''post a template at <BASE_URL>/<template_path> with the given name, description and version'''
        return self._upload_template('POST', template_path=template_path, name=name, description=description, version=version, schema=schema, template=template, **kwargs)

    def put_template(self, template_path:str, id:int, name: str, description: str, version: int, schema: str=None, template: str=None, **kwargs):
        '''update a template at <BASE_URL>/<template_path>/<id> with the given name, description and version'''
        return self._upload_template('PUT', template_path='%s/%s'%(template_path,id), name=name, description=description, version=version, schema=schema, template=template, **kwargs)

    def _upload_template(self, method: str, template_path:str, name: str, description: str, version: int, schema: str=None, template: str=None, **kwargs):
        '''upload a template with 'method' POST/PUT/PATCH at <BASE_URL>/<template_path> with the given name, description and version'''
        with self._lock:
            json_data = {'name': name,
                         'description': description,
                         'version': version}
            if schema is not None:
                json_data['schema'] = json.loads(schema) if isinstance(schema, str) else schema
            if template is not None:
                json_data['template'] = json.loads(template) if isinstance(template, str) else template
            json_data.update(**kwargs)

            if json_data['state'].rstrip('/').endswith('obsolete'):
                logger.info("Skipping upload of template with name='%s' version='%s' because its state is 'obsolete'", json_data['name'], json_data['version'])
                return

            response = self.session.request(method=method.upper(), url=self.get_full_url_for_path(template_path), json=json_data)
            self._log_response(response)

            if response.status_code in (200, 201):
                result_json_template = json.loads(response.text)
                logger.info("%s template with name=%s version=%s %s", "created new" if response.status_code==201 else "updated",
                                                                      result_json_template['name'],
                                                                      result_json_template['version'],
                                                                      result_json_template['url'])
            else:
                raise Exception("Could not %s template with name=%s version=%s: %s" % (method.upper(), name, version, response.text))

    def process_feedback_and_set_to_finished_if_complete(self, subtask_id: int, feedback: str) -> {}:
        '''Process the feedback_doc (which can be for one or more or all dataproducts), and process it into json feedback per dataproduct. Sets the subtask to finished if all dataproducts are processed, which may require multiple postings of partial feedback docs.
        Return the updated subtask, or raise an error'''
        with self._lock:
            if 'csrftoken' in self.session.cookies:
                headers = {'Referer': self.host_url, 'X-CSRFToken': self.session.cookies['csrftoken']}
            else:
                headers = None
            response = self.session.post(url=self.get_full_url_for_path('/subtask/%s/process_feedback_and_set_to_finished_if_complete' % (subtask_id,)),
                                         data=feedback, headers=headers)

            if response.status_code >= 200 and response.status_code < 300:
                return json.loads(response.content.decode('utf-8'))

            content = response.content.decode('utf-8')
            raise Exception("Could not process feedback with url %s - %s %s - %s" % (
            response.request.url, response.status_code, responses.get(response.status_code), content))

    def get_scheduling_unit_observing_strategy_templates(self) -> list:
        '''get a list of all scheduling_unit_observing_strategy_template's as list of dicts'''
        return self.get_path_as_json_object('scheduling_unit_observing_strategy_template')

    def get_scheduling_unit_observing_strategy_template(self, name: str, version: int=None) -> dict:
        '''get the scheduling_unit_observing_strategy_template as dict for the given name (and version)'''
        return self._get_strategy_template('scheduling_unit_observing_strategy_template', name, version)

    def get_reservation_strategy_template(self, name: str, version: int=None) -> dict:
        '''get the reservation_strategy_template as dict for the given name (and version)'''
        return self._get_strategy_template('reservation_strategy_template', name, version)

    def create_reservation_for_strategy_template(self, start_time: datetime, stop_time: datetime=None,
                                                 strategy_name: str='', strategy_version: int = None,
                                                 project: str=None,
                                                 reservation_name: str=None, reservation_description: str=None) -> dict:
        '''create a reservation based on the reservation_strategy_template given by the strategy_name (and strategy_version),
        as dict for the given name (and version)'''
        template = self._get_strategy_template('reservation_strategy_template', strategy_name, strategy_version)
        params = {'start_time': start_time.strftime('%Y-%m-%dT%H:%M:%S')}
        if stop_time:
            params['stop_time'] = stop_time.strftime('%Y-%m-%dT%H:%M:%S')
        if project:
            params['project_id'] = project
        if reservation_name:
            params['name'] = reservation_name
        if reservation_description:
            params['description'] = reservation_description
        return self.do_request_and_get_result_as_json_object('POST', template['url'].rstrip('/')+'/create_reservation', params=params)

    def _get_strategy_template(self, template_type_name:str, name: str, version: int=None) -> dict:
        '''get the strategy_template as dict for the given name (and version)'''
        clauses = {}
        if name is not None:
            clauses["name"] = name
        if version is not None:
            clauses["version"] = version
        else:
            # try to determine the latest version
            if name is not None:
                try:
                    templates = self.get_path_as_json_object(template_type_name, params={'name':name})
                    if templates:
                        templates = sorted(templates, key=lambda t: t['version'])
                        clauses["version"] = templates[-1]['version']
                except Exception as e:
                    # could not determine latest version
                    pass

        result = self.get_path_as_json_object(template_type_name, clauses)
        if isinstance(result, list):
            if len(result) > 1:
                raise ValueError("Found more then one %s's for clauses: %s" % (template_type_name, clauses))
            elif len(result) == 1:
                return result[0]
            raise ValueError("Could not find any template of type='%s' and clauses='%s'" % (template_type_name, clauses))
        return result

    def get_scheduling_unit_observing_strategy_template_default_specification(self, name: str, version: int=None) -> dict:
        '''get the scheduling_unit_observing_strategy_template as dict for the given name (and version) with completely filled in with all defaults'''
        strategy_template = self.get_scheduling_unit_observing_strategy_template(name=name, version=version)
        return self.get_url_as_json_object(strategy_template['url'].rstrip('/')+"/template_doc_complete_with_defaults")

    def get_scheduling_unit_observing_strategy_template_specification_with_just_the_parameters(self, name: str, version: int=None) -> dict:
        '''get the scheduling_unit_observing_strategy_template as dict for the given name (and version) with completely filled in with all defaults'''
        strategy_template = self.get_scheduling_unit_observing_strategy_template(name=name, version=version)
        return self.get_url_as_json_object(strategy_template['url'].rstrip('/')+"/template_doc_with_just_the_parameters")

    def get_trigger_specification_doc_for_scheduling_unit_observing_strategy_template(self, name: str, version: int=None) -> dict:
        '''get a trigger specification document for the scheduling_unit_observing_strategy_template with the given name (and version).
        You can edit this document to your needs, and submit it via method trigger_scheduling_unit'''
        strategy_template = self.get_scheduling_unit_observing_strategy_template(name, version)
        return self.get_url_as_json_object(strategy_template['url'].rstrip('/')+'/trigger_doc')

    def submit_trigger(self, trigger_doc:dict, retry_count: int=0) -> dict:
        """create a scheduling_unit_draft with the interrupts_telescope flag set.
        When mode=='run' trigger_doc also create schedulable/runnable blueprints and subtasks, and trigger the dynamic scheduler to try to cancel to current observation and put this triggered observation in place.
        Returns the created scheduling_unit_draft/blueprint upon success, or raises.
        Use the method get_trigger_specification_doc_for_scheduling_unit_observing_strategy_template to get a valid trigger json doc, edit it, and sumbit using this method."""
        return self.post_to_path_and_get_result_as_json_object('/submit_trigger', json_data=trigger_doc, retry_count=retry_count)

    def set_scheduling_unit_blueprint_pinning(self, scheduling_unit_blueprint_id: int, pinned: bool, retry_count: int=DEFAULT_RETRY_COUNT) -> {}:
        '''pin/unpin the data for the given scheduling_unit_blueprint_id'''
        json_doc = {'output_pinned': pinned}
        return self.do_request_and_get_result_as_json_object('PATCH',
                                                             '%s/scheduling_unit_blueprint/%s' % (self.api_url, scheduling_unit_blueprint_id),
                                                             json_data=json_doc,
                                                             retry_count=retry_count)

    def get_subtask_l2stationspecs(self, subtask_id, retry_count: int=DEFAULT_RETRY_COUNT) -> str:
        '''get the lofar 2.0 station specifications (as json) for the given subtask'''
        return self.get_path_as_json_object('/subtask/%s/l2stationspecs' % (subtask_id,), retry_count=retry_count)