Skip to content
Snippets Groups Projects
Select Git revision
  • e8904548d9cbb953ba44b39c0c60ce10238aeda3
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

views.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    tmss_http_rest_client.py 19.66 KiB
    import logging
    logger = logging.getLogger(__name__)
    
    import requests
    from http.client import responses
    import os
    import json
    from datetime import datetime, timedelta
    from lofar.common.datetimeutils import formatDatetime
    from lofar.common.dbcredentials import DBCredentials
    
    
    # usage example:
    #
    # with TMSSsession('paulus', 'pauluspass', 'localhost', 8000) as tmsssession:
    #    response = tmsssession.session.get(url='http://localhost/api/task_draft/')
    #    print(response)
    
    
    #TODO: add unittests!
    class TMSSsession(object):
    
        OPENID = "openid"
        BASICAUTH = "basicauth"
    
        def __init__(self, username, password, host, port: int=8000, authentication_method=OPENID):
            self.session = requests.session()
            self.username = username
            self.password = password
            self.host_url = "http://%s:%d" % (host, port)
            self.api_url = "%s/api" % (self.host_url,)
            self.authentication_method = authentication_method
    
        @staticmethod
        def create_from_dbcreds_for_ldap(dbcreds_name: str=None) -> 'TMSSsession':
            '''Factory method to create a TMSSSession object which uses the credentials in the ~/.lofar/dbcredentials/<dbcreds_name>.ini file
               (mis)use the DBCredentials to get a url with user/pass for tmss
               the contents below are used to contruct a url like this: http://localhost:8000/api
               [database:TMSS]
                host=localhost
                user=<username>
                password=<password>
                type=http
                port=8000
             '''
            if dbcreds_name is None:
                dbcreds_name = os.environ.get("TMSS_CLIENT_DBCREDENTIALS", "TMSSClient")
    
            dbcreds = DBCredentials().get(dbcreds_name)
            return TMSSsession.create_from_dbcreds(dbcreds)
    
        @staticmethod
        def create_from_dbcreds(dbcreds: DBCredentials) -> 'TMSSsession':
            '''Factory method to create a TMSSSession object which uses the credentials in the dbcreds object.
            See also: create_from_dbcreds_for_ldap
             '''
            return TMSSsession(username=dbcreds.user, password=dbcreds.password,
                               host=dbcreds.host,
                               port=dbcreds.port,
                               authentication_method=TMSSsession.BASICAUTH)
    
        def __enter__(self):
            self.open()
    
            # return the request session for use within the context
            return self
    
        def __exit__(self, type, value, traceback):
            self.close()
    
        def open(self):
            '''open the request session and login'''
            self.session.__enter__()
            self.session.verify = False
            self.session.headers['Accept'] = 'application/json'
    
            if self.authentication_method == self.OPENID:
                # get authentication page of OIDC through TMSS redirect
                response = self.session.get(self.api_url.replace('/api', '/oidc/authenticate/'), allow_redirects=True)
                csrftoken = self.session.cookies['csrftoken']
    
                # post user credentials to login page, also pass csrf token
                data = {'username': self.username, 'password': self.password, 'csrfmiddlewaretoken': csrftoken}
                response = self.session.post(url=response.url, data=data, allow_redirects=True)
    
                # raise when sth went wrong
                if "The username and/or password you specified are not correct" in response.content.decode('utf8'):
                    raise ValueError("The username and/or password you specified are not correct")
                if response.status_code != 200:
                    raise ConnectionError(response.content.decode('utf8'))
    
            if self.authentication_method == self.BASICAUTH:
                self.session.auth = (self.username, self.password)
    
        def close(self):
            '''close the request session and logout'''
            try:
                # logout user
                self.session.get(self.api_url + '/logout/', allow_redirects=True)
                self.session.close()
            except:
                pass
    
        def set_subtask_status(self, subtask_id: int, status: str) -> {}:
            '''set the status for the given subtask, and return the subtask with its new state, or raise on error'''
            json_doc = {'state': "%s/subtask_state/%s/" % (self.api_url, status)}
            if status == 'finishing' or status == 'cancelling':
                json_doc['stop_time'] = datetime.utcnow().isoformat()
                if status == 'cancelling':
                    json_doc['do_cancel'] = json_doc['stop_time']
    
            response = self.session.patch(url='%s/subtask/%s/' % (self.api_url, subtask_id),
                                          json=json_doc,
                                          params={'format':'json'})
    
            if response.status_code >= 200 and response.status_code < 300:
                return json.loads(response.content.decode('utf-8'))
    
            content = response.content.decode('utf-8')
            raise Exception("Could not set status with url %s - %s %s - %s" % (response.request.url, response.status_code, responses.get(response.status_code), content))
    
        def get_subtask_parset(self, subtask_id) -> str:
            '''get the lofar parameterset (as text) for the given subtask'''
            result = self.session.get(url=self.get_full_url_for_path('/subtask/%s/parset' % (subtask_id,)),
                                      headers={'Accept': 'text/plain'})
            if result.status_code >= 200 and result.status_code < 300:
                return result.content.decode('utf-8')
            raise Exception("Could not get parameterset for subtask %s.\nResponse: %s" % (subtask_id, result))
    
        def get_subtask_predecessors(self, subtask_id: int, state: str=None) -> list:
            '''get the subtask's predecessors as list of dict for the given subtask'''
    
            clauses = {}
            if state is not None:
                clauses["state__value"] = state
    
            path = 'subtask/%s/predecessors' % (subtask_id,)
            return self.get_path_as_json_object(path, clauses)
    
        def get_subtask_successors(self, subtask_id: int, state: str=None) -> list:
            '''get the subtask's successors as list of dict for the given subtask'''
            clauses = {}
            if state is not None:
                clauses["state__value"] = state
    
            path = 'subtask/%s/successors' % (subtask_id,)
            return self.get_path_as_json_object(path, clauses)
    
        def get_subtask(self, subtask_id: int) -> dict:
            '''get the subtask as dict for the given subtask'''
            path = 'subtask/%s' % (subtask_id,)
            return self.get_path_as_json_object(path)
    
        def get_subtasks(self, state: str=None,
                         cluster: str=None,
                         start_time_less_then: datetime=None, start_time_greater_then: datetime=None,
                         stop_time_less_then: datetime = None, stop_time_greater_then: datetime = None) -> list:
            '''get subtasks (as list of dicts) filtered by the given parameters'''
            clauses = {}
            if state is not None:
                clauses["state__value"] = state
            if cluster is not None:
                clauses["cluster__name"] = cluster
            if start_time_less_then is not None:
                clauses["start_time__lt="] = formatDatetime(start_time_less_then)
            if start_time_greater_then is not None:
                clauses["start_time__gt"] = formatDatetime(start_time_greater_then)
            if stop_time_less_then is not None:
                clauses["stop_time__lt"] = formatDatetime(stop_time_less_then)
            if stop_time_greater_then is not None:
                clauses["stop_time__gt"] = formatDatetime(stop_time_greater_then)
    
            return self.get_path_as_json_object("subtask", clauses)
    
        def get_full_url_for_path(self, path: str) -> str:
            '''get the full URL for the given path'''
            return '%s/%s' % (self.api_url, path.strip('/'))
    
        def get_path_as_json_object(self, path: str, params={}) -> object:
            '''get resource at the given path, interpret it as json, and return it as as native object (usually a dict or a list of dicts)'''
            return self.get_url_as_json_object(self.get_full_url_for_path(path=path), params=params)
    
        def get_path_as_string(self, path: str, params={}) -> str:
            '''get resource at the given path, and return it as as plain text'''
            return self.get_url_as_string(self.get_full_url_for_path(path=path), params=params)
    
        def get_url_as_string(self, full_url: str, params={}) -> str:
            '''get resource at the given full url (including http://<base_url>, interpret it as json, and return it as as plain text'''
            response = self.session.get(url=full_url, params=params, timeout=100000)
            logger.info("%s %s %s in %.1fms%s on %s", response.request.method.upper(), response.status_code, responses.get(response.status_code),
                                                      response.elapsed.total_seconds()*1000, ' SLOW!' if response.elapsed > timedelta(seconds=1) else '',
                                                      response.request.url)
    
            if response.status_code >= 200 and response.status_code < 300:
                result = response.content.decode('utf-8')
                return result
    
            # ugly error message parsing
            content = response.text
            try:
                error_msg = content.split('\n')[1] # magic! error message is at 2nd line of response...
            except:
                error_msg= content
    
            raise Exception("Could not get %s - %s %s - %s" % (full_url, response.status_code, responses.get(response.status_code), error_msg))
    
        def get_url_as_json_object(self, full_url: str, params={}) -> object:
            '''get resource at the given full url (including http://<base_url>, interpret it as json, and return it as as native object (usually a dict or a list of dicts)'''
            result = self.get_url_as_string(full_url, params)
            result = json.loads(result)
            if isinstance(result, dict):
                result_object = result.get('results', result) # return the 'results' list if any, or else just the object itself
    
                if result.get('next'):
                    # recurse, get the 'next' url, and return a concatenation of the results
                    return result_object + self.get_url_as_json_object(result['next'])
                return result_object
            return result
    
        def _get_template(self, template_type_name: str, name: str, version: int=None) -> dict:
            '''get the template of the given type as dict for the given name (and version)'''
            clauses = {}
            if name is not None:
                clauses["name"] = name
            if version is not None:
                clauses["version"] = version
            result = self.get_path_as_json_object(template_type_name, clauses)
            if isinstance(result, list):
                if len(result) > 1:
                    raise ValueError("Found more then one %s for clauses: %s" % (template_type_name, clauses))
                elif len(result) == 1:
                    return result[0]
                return None
            return result
    
        def get_schedulingunit_template(self, name: str, version: int=None) -> dict:
            '''get the schedulingunit_template as dict for the given name (and version)'''
            return self._get_template('scheduling_unit_template', name, version)
    
        def get_task_template(self, name: str, version: int=None) -> dict:
            '''get the task_template as dict for the given name (and version)'''
            return self._get_template('task_template', name, version)
    
        def get_subtask_template(self, name: str, version: int=None) -> dict:
            '''get the subtask_template as dict for the given name (and version)'''
            return self._get_template('subtask_template', name, version)
    
        def get_schedulingunit_template_default_specification(self, name: str, version: int=None) -> dict:
            template = self.get_schedulingunit_template(name=name, version=version)
            return self.get_url_as_json_object(template['url']+"/default")
    
        def get_task_template_default_specification(self, name: str, version: int=None) -> dict:
            template = self.get_task_template(name=name, version=version)
            return self.get_url_as_json_object(template['url']+"/default")
    
        def get_subtask_template_default_specification(self, name: str, version: int=None) -> dict:
            template = self.get_subtask_template(name=name, version=version)
            return self.get_url_as_json_object(template['url']+"/default")
    
        def get_subtask_output_dataproducts(self, subtask_id: int) -> []:
            '''get the output dataproducts of the subtask with the given subtask_id'''
            return self.get_path_as_json_object('subtask/%s/output_dataproducts' % subtask_id)
    
        def get_subtask_input_dataproducts(self, subtask_id: int) -> []:
            '''get the input dataproducts of the subtask with the given subtask_id'''
            return self.get_path_as_json_object('subtask/%s/input_dataproducts' % subtask_id)
    
        def get_dataproduct_SIP(self, dataproduct_id: int) -> str:
            '''get the SIP for the dataproduct with the given dataproduct_id as an XML string'''
            return self.get_path_as_string('dataproduct/%s/sip' % dataproduct_id)
    
        def get_subtask_transformed_output_dataproduct(self, subtask_id: int, input_dataproduct_id: int) -> {}:
            '''get the transformed output dataproduct of the subtask with the given subtask_id and input_dataproduct_id'''
            return self.get_path_as_json_object('subtask/%s/transformed_output_dataproduct?input_dataproduct_id=%s' % (subtask_id, input_dataproduct_id))
    
        def post_dataproduct_archive_information(self, dataproduct_id: int, storage_ticket: str,
                                                 srm_url: str, file_size: int,
                                                 md5_checksum: str = None, adler32_checksum: str = None) -> {}:
            json_data={ 'storage_ticket': storage_ticket,
                        'srm_url': srm_url,
                        'file_size': file_size }
            if md5_checksum:
                json_data['md5_checksum'] = md5_checksum
            if adler32_checksum:
                json_data['adler32_checksum'] = adler32_checksum
    
            response = self.session.post(url=self.get_full_url_for_path('dataproduct/%s/post_archive_information' % (dataproduct_id,)), json=json_data)
            logger.info("post_dataproduct_archive_information: json_doc: %s response: %s", json_data, response.text)
            if response.status_code == 201:
                logger.info("created new template: %s", json.loads(response.text)['url'])
    
        def specify_observation_task(self, task_id: int) -> requests.Response:
            """specify observation for the given draft task by just doing a REST API call """
            result = self.session.get(url=self.get_full_url_for_path('/task/%s/specify_observation' % (task_id,)))
            if result.status_code >= 200 and result.status_code < 300:
                return result.content.decode('utf-8')
            raise Exception("Could not specify observation for task %s.\nResponse: %s" % (task_id, result))
    
        def create_blueprints_and_subtasks_from_scheduling_unit_draft(self, scheduling_unit_draft_id: int) -> {}:
            """create a scheduling_unit_blueprint, its specified taskblueprints and subtasks for the given scheduling_unit_draft_id.
            returns the scheduled subtask upon success, or raises."""
            return self.get_path_as_json_object('scheduling_unit_draft/%s/create_blueprints_and_subtasks' % scheduling_unit_draft_id)
    
        def schedule_subtask(self, subtask_id: int) -> {}:
            """schedule the subtask for the given subtask_id.
            returns the scheduled subtask upon success, or raises."""
            return self.get_path_as_json_object('subtask/%s/schedule' % subtask_id)
    
        def get_subtask_progress(self, subtask_id: int) -> {}:
            """get the progress [0.0, 1.0] of a running subtask.
            returns a dict with the 'id' and 'progress', or raises."""
            return self.get_path_as_json_object('subtask/%s/get_progress' % subtask_id)
    
        def get_setting(self, setting_name: str) -> {}:
            """get the value of a TMSS setting.
            returns the setting value upon success, or raises."""
            response = self.session.get(url=self.get_full_url_for_path('/setting/%s/' % (setting_name,)),
                                        params={'format': 'json'})
    
            if response.status_code >= 200 and response.status_code < 300:
                return json.loads(response.content.decode('utf-8'))['value']
    
            content = response.content.decode('utf-8')
            raise Exception("Could not get setting with name %s.\nResponse: %s" % (setting_name, content))
    
        def set_setting(self, setting_name: str, setting_value: bool) -> {}:
            """set a value for a TMSS setting.
            returns the setting value upon success, or raises."""
            response = self.session.patch(url=self.get_full_url_for_path('/setting/%s/' % (setting_name,)),
                                          json={'value': setting_value})
    
            if response.status_code >= 200 and response.status_code < 300:
                return json.loads(response.content.decode('utf-8'))['value']
    
            content = response.content.decode('utf-8')
            raise Exception("Could not set status with url %s - %s %s - %s" % (response.request.url, response.status_code, responses.get(response.status_code), content))
    
        def post_template(self, template_path:str, name: str, description: str, version: int, schema: str=None, template: str=None, **kwargs):
            '''POST a template at <BASE_URL>/<template_path> with the given name, description and version'''
            json_data = {'name': name,
                         'description': description,
                         'version': version}
            if schema is not None:
                json_data['schema'] = json.loads(schema) if isinstance(schema, str) else schema
            if template is not None:
                json_data['template'] = json.loads(template) if isinstance(template, str) else template
            json_data.update(**kwargs)
    
            response = self.session.post(url=self.get_full_url_for_path(template_path), json=json_data)
            if response.status_code == 201:
                logger.info("created new template: %s", json.loads(response.text)['url'])
            else:
                raise Exception("Could not POST template: " + response.text)
    
        def append_to_subtask_raw_feedback(self, subtask_id: int, feedback: str) -> {}:
            '''append the raw_feedback for the given subtask, and return the subtask with its new state, or raise an error'''
            existing_feedback = self.get_path_as_json_object('/subtask/%s/' % (subtask_id))['raw_feedback']
            if existing_feedback is None or existing_feedback is "":
                new_feedback = feedback
            else:
                new_feedback = "%s\n%s" % (existing_feedback, feedback)
            response = self.session.patch(url=self.get_full_url_for_path('/subtask/%s/' % (subtask_id,)),
                                          json={'raw_feedback': new_feedback},
                                          params={'format': 'json'})
    
            if response.status_code >= 200 and response.status_code < 300:
                return json.loads(response.content.decode('utf-8'))
    
            content = response.content.decode('utf-8')
            raise Exception("Could not append feedback to subtask with url %s - %s %s - %s" % (
            response.request.url, response.status_code, responses.get(response.status_code), content))
    
        def process_subtask_feedback_and_set_finished(self, subtask_id: int) -> {}:
            '''process the raw_feedback of a given subtask and set the subtask to finished on succes. Return the subtask
            with its new state, or raise an error'''
            response = self.session.post(url=self.get_full_url_for_path('/subtask/%s/process_feedback_and_set_finished' % (subtask_id,)),
                                         params={'format': 'json'})
    
            if response.status_code >= 200 and response.status_code < 300:
                return json.loads(response.content.decode('utf-8'))
    
            content = response.content.decode('utf-8')
            raise Exception("Could not process feedback with url %s - %s %s - %s" % (
            response.request.url, response.status_code, responses.get(response.status_code), content))