Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tmss_http_rest_client.py 19.66 KiB
import logging
logger = logging.getLogger(__name__)
import requests
from http.client import responses
import os
import json
from datetime import datetime, timedelta
from lofar.common.datetimeutils import formatDatetime
from lofar.common.dbcredentials import DBCredentials
# usage example:
#
# with TMSSsession('paulus', 'pauluspass', 'localhost', 8000) as tmsssession:
# response = tmsssession.session.get(url='http://localhost/api/task_draft/')
# print(response)
#TODO: add unittests!
class TMSSsession(object):
OPENID = "openid"
BASICAUTH = "basicauth"
def __init__(self, username, password, host, port: int=8000, authentication_method=OPENID):
self.session = requests.session()
self.username = username
self.password = password
self.host_url = "http://%s:%d" % (host, port)
self.api_url = "%s/api" % (self.host_url,)
self.authentication_method = authentication_method
@staticmethod
def create_from_dbcreds_for_ldap(dbcreds_name: str=None) -> 'TMSSsession':
'''Factory method to create a TMSSSession object which uses the credentials in the ~/.lofar/dbcredentials/<dbcreds_name>.ini file
(mis)use the DBCredentials to get a url with user/pass for tmss
the contents below are used to contruct a url like this: http://localhost:8000/api
[database:TMSS]
host=localhost
user=<username>
password=<password>
type=http
port=8000
'''
if dbcreds_name is None:
dbcreds_name = os.environ.get("TMSS_CLIENT_DBCREDENTIALS", "TMSSClient")
dbcreds = DBCredentials().get(dbcreds_name)
return TMSSsession.create_from_dbcreds(dbcreds)
@staticmethod
def create_from_dbcreds(dbcreds: DBCredentials) -> 'TMSSsession':
'''Factory method to create a TMSSSession object which uses the credentials in the dbcreds object.
See also: create_from_dbcreds_for_ldap
'''
return TMSSsession(username=dbcreds.user, password=dbcreds.password,
host=dbcreds.host,
port=dbcreds.port,
authentication_method=TMSSsession.BASICAUTH)
def __enter__(self):
self.open()
# return the request session for use within the context
return self
def __exit__(self, type, value, traceback):
self.close()
def open(self):
'''open the request session and login'''
self.session.__enter__()
self.session.verify = False
self.session.headers['Accept'] = 'application/json'
if self.authentication_method == self.OPENID:
# get authentication page of OIDC through TMSS redirect
response = self.session.get(self.api_url.replace('/api', '/oidc/authenticate/'), allow_redirects=True)
csrftoken = self.session.cookies['csrftoken']
# post user credentials to login page, also pass csrf token
data = {'username': self.username, 'password': self.password, 'csrfmiddlewaretoken': csrftoken}
response = self.session.post(url=response.url, data=data, allow_redirects=True)
# raise when sth went wrong
if "The username and/or password you specified are not correct" in response.content.decode('utf8'):
raise ValueError("The username and/or password you specified are not correct")
if response.status_code != 200:
raise ConnectionError(response.content.decode('utf8'))
if self.authentication_method == self.BASICAUTH:
self.session.auth = (self.username, self.password)
def close(self):
'''close the request session and logout'''
try:
# logout user
self.session.get(self.api_url + '/logout/', allow_redirects=True)
self.session.close()
except:
pass
def set_subtask_status(self, subtask_id: int, status: str) -> {}:
'''set the status for the given subtask, and return the subtask with its new state, or raise on error'''
json_doc = {'state': "%s/subtask_state/%s/" % (self.api_url, status)}
if status == 'finishing' or status == 'cancelling':
json_doc['stop_time'] = datetime.utcnow().isoformat()
if status == 'cancelling':
json_doc['do_cancel'] = json_doc['stop_time']
response = self.session.patch(url='%s/subtask/%s/' % (self.api_url, subtask_id),
json=json_doc,
params={'format':'json'})
if response.status_code >= 200 and response.status_code < 300:
return json.loads(response.content.decode('utf-8'))
content = response.content.decode('utf-8')
raise Exception("Could not set status with url %s - %s %s - %s" % (response.request.url, response.status_code, responses.get(response.status_code), content))
def get_subtask_parset(self, subtask_id) -> str:
'''get the lofar parameterset (as text) for the given subtask'''
result = self.session.get(url=self.get_full_url_for_path('/subtask/%s/parset' % (subtask_id,)),
headers={'Accept': 'text/plain'})
if result.status_code >= 200 and result.status_code < 300:
return result.content.decode('utf-8')
raise Exception("Could not get parameterset for subtask %s.\nResponse: %s" % (subtask_id, result))
def get_subtask_predecessors(self, subtask_id: int, state: str=None) -> list:
'''get the subtask's predecessors as list of dict for the given subtask'''
clauses = {}
if state is not None:
clauses["state__value"] = state
path = 'subtask/%s/predecessors' % (subtask_id,)
return self.get_path_as_json_object(path, clauses)
def get_subtask_successors(self, subtask_id: int, state: str=None) -> list:
'''get the subtask's successors as list of dict for the given subtask'''
clauses = {}
if state is not None:
clauses["state__value"] = state
path = 'subtask/%s/successors' % (subtask_id,)
return self.get_path_as_json_object(path, clauses)
def get_subtask(self, subtask_id: int) -> dict:
'''get the subtask as dict for the given subtask'''
path = 'subtask/%s' % (subtask_id,)
return self.get_path_as_json_object(path)
def get_subtasks(self, state: str=None,
cluster: str=None,
start_time_less_then: datetime=None, start_time_greater_then: datetime=None,
stop_time_less_then: datetime = None, stop_time_greater_then: datetime = None) -> list:
'''get subtasks (as list of dicts) filtered by the given parameters'''
clauses = {}
if state is not None:
clauses["state__value"] = state
if cluster is not None:
clauses["cluster__name"] = cluster
if start_time_less_then is not None:
clauses["start_time__lt="] = formatDatetime(start_time_less_then)
if start_time_greater_then is not None:
clauses["start_time__gt"] = formatDatetime(start_time_greater_then)
if stop_time_less_then is not None:
clauses["stop_time__lt"] = formatDatetime(stop_time_less_then)
if stop_time_greater_then is not None:
clauses["stop_time__gt"] = formatDatetime(stop_time_greater_then)
return self.get_path_as_json_object("subtask", clauses)
def get_full_url_for_path(self, path: str) -> str:
'''get the full URL for the given path'''
return '%s/%s' % (self.api_url, path.strip('/'))
def get_path_as_json_object(self, path: str, params={}) -> object:
'''get resource at the given path, interpret it as json, and return it as as native object (usually a dict or a list of dicts)'''
return self.get_url_as_json_object(self.get_full_url_for_path(path=path), params=params)
def get_path_as_string(self, path: str, params={}) -> str:
'''get resource at the given path, and return it as as plain text'''
return self.get_url_as_string(self.get_full_url_for_path(path=path), params=params)
def get_url_as_string(self, full_url: str, params={}) -> str:
'''get resource at the given full url (including http://<base_url>, interpret it as json, and return it as as plain text'''
response = self.session.get(url=full_url, params=params, timeout=100000)
logger.info("%s %s %s in %.1fms%s on %s", response.request.method.upper(), response.status_code, responses.get(response.status_code),
response.elapsed.total_seconds()*1000, ' SLOW!' if response.elapsed > timedelta(seconds=1) else '',
response.request.url)
if response.status_code >= 200 and response.status_code < 300:
result = response.content.decode('utf-8')
return result
# ugly error message parsing
content = response.text
try:
error_msg = content.split('\n')[1] # magic! error message is at 2nd line of response...
except:
error_msg= content
raise Exception("Could not get %s - %s %s - %s" % (full_url, response.status_code, responses.get(response.status_code), error_msg))
def get_url_as_json_object(self, full_url: str, params={}) -> object:
'''get resource at the given full url (including http://<base_url>, interpret it as json, and return it as as native object (usually a dict or a list of dicts)'''
result = self.get_url_as_string(full_url, params)
result = json.loads(result)
if isinstance(result, dict):
result_object = result.get('results', result) # return the 'results' list if any, or else just the object itself
if result.get('next'):
# recurse, get the 'next' url, and return a concatenation of the results
return result_object + self.get_url_as_json_object(result['next'])
return result_object
return result
def _get_template(self, template_type_name: str, name: str, version: int=None) -> dict:
'''get the template of the given type as dict for the given name (and version)'''
clauses = {}
if name is not None:
clauses["name"] = name
if version is not None:
clauses["version"] = version
result = self.get_path_as_json_object(template_type_name, clauses)
if isinstance(result, list):
if len(result) > 1:
raise ValueError("Found more then one %s for clauses: %s" % (template_type_name, clauses))
elif len(result) == 1:
return result[0]
return None
return result
def get_schedulingunit_template(self, name: str, version: int=None) -> dict:
'''get the schedulingunit_template as dict for the given name (and version)'''
return self._get_template('scheduling_unit_template', name, version)
def get_task_template(self, name: str, version: int=None) -> dict:
'''get the task_template as dict for the given name (and version)'''
return self._get_template('task_template', name, version)
def get_subtask_template(self, name: str, version: int=None) -> dict:
'''get the subtask_template as dict for the given name (and version)'''
return self._get_template('subtask_template', name, version)
def get_schedulingunit_template_default_specification(self, name: str, version: int=None) -> dict:
template = self.get_schedulingunit_template(name=name, version=version)
return self.get_url_as_json_object(template['url']+"/default")
def get_task_template_default_specification(self, name: str, version: int=None) -> dict:
template = self.get_task_template(name=name, version=version)
return self.get_url_as_json_object(template['url']+"/default")
def get_subtask_template_default_specification(self, name: str, version: int=None) -> dict:
template = self.get_subtask_template(name=name, version=version)
return self.get_url_as_json_object(template['url']+"/default")
def get_subtask_output_dataproducts(self, subtask_id: int) -> []:
'''get the output dataproducts of the subtask with the given subtask_id'''
return self.get_path_as_json_object('subtask/%s/output_dataproducts' % subtask_id)
def get_subtask_input_dataproducts(self, subtask_id: int) -> []:
'''get the input dataproducts of the subtask with the given subtask_id'''
return self.get_path_as_json_object('subtask/%s/input_dataproducts' % subtask_id)
def get_dataproduct_SIP(self, dataproduct_id: int) -> str:
'''get the SIP for the dataproduct with the given dataproduct_id as an XML string'''
return self.get_path_as_string('dataproduct/%s/sip' % dataproduct_id)
def get_subtask_transformed_output_dataproduct(self, subtask_id: int, input_dataproduct_id: int) -> {}:
'''get the transformed output dataproduct of the subtask with the given subtask_id and input_dataproduct_id'''
return self.get_path_as_json_object('subtask/%s/transformed_output_dataproduct?input_dataproduct_id=%s' % (subtask_id, input_dataproduct_id))
def post_dataproduct_archive_information(self, dataproduct_id: int, storage_ticket: str,
srm_url: str, file_size: int,
md5_checksum: str = None, adler32_checksum: str = None) -> {}:
json_data={ 'storage_ticket': storage_ticket,
'srm_url': srm_url,
'file_size': file_size }
if md5_checksum:
json_data['md5_checksum'] = md5_checksum
if adler32_checksum:
json_data['adler32_checksum'] = adler32_checksum
response = self.session.post(url=self.get_full_url_for_path('dataproduct/%s/post_archive_information' % (dataproduct_id,)), json=json_data)
logger.info("post_dataproduct_archive_information: json_doc: %s response: %s", json_data, response.text)
if response.status_code == 201:
logger.info("created new template: %s", json.loads(response.text)['url'])
def specify_observation_task(self, task_id: int) -> requests.Response:
"""specify observation for the given draft task by just doing a REST API call """
result = self.session.get(url=self.get_full_url_for_path('/task/%s/specify_observation' % (task_id,)))
if result.status_code >= 200 and result.status_code < 300:
return result.content.decode('utf-8')
raise Exception("Could not specify observation for task %s.\nResponse: %s" % (task_id, result))
def create_blueprints_and_subtasks_from_scheduling_unit_draft(self, scheduling_unit_draft_id: int) -> {}:
"""create a scheduling_unit_blueprint, its specified taskblueprints and subtasks for the given scheduling_unit_draft_id.
returns the scheduled subtask upon success, or raises."""
return self.get_path_as_json_object('scheduling_unit_draft/%s/create_blueprints_and_subtasks' % scheduling_unit_draft_id)
def schedule_subtask(self, subtask_id: int) -> {}:
"""schedule the subtask for the given subtask_id.
returns the scheduled subtask upon success, or raises."""
return self.get_path_as_json_object('subtask/%s/schedule' % subtask_id)
def get_subtask_progress(self, subtask_id: int) -> {}:
"""get the progress [0.0, 1.0] of a running subtask.
returns a dict with the 'id' and 'progress', or raises."""
return self.get_path_as_json_object('subtask/%s/get_progress' % subtask_id)
def get_setting(self, setting_name: str) -> {}:
"""get the value of a TMSS setting.
returns the setting value upon success, or raises."""
response = self.session.get(url=self.get_full_url_for_path('/setting/%s/' % (setting_name,)),
params={'format': 'json'})
if response.status_code >= 200 and response.status_code < 300:
return json.loads(response.content.decode('utf-8'))['value']
content = response.content.decode('utf-8')
raise Exception("Could not get setting with name %s.\nResponse: %s" % (setting_name, content))
def set_setting(self, setting_name: str, setting_value: bool) -> {}:
"""set a value for a TMSS setting.
returns the setting value upon success, or raises."""
response = self.session.patch(url=self.get_full_url_for_path('/setting/%s/' % (setting_name,)),
json={'value': setting_value})
if response.status_code >= 200 and response.status_code < 300:
return json.loads(response.content.decode('utf-8'))['value']
content = response.content.decode('utf-8')
raise Exception("Could not set status with url %s - %s %s - %s" % (response.request.url, response.status_code, responses.get(response.status_code), content))
def post_template(self, template_path:str, name: str, description: str, version: int, schema: str=None, template: str=None, **kwargs):
'''POST a template at <BASE_URL>/<template_path> with the given name, description and version'''
json_data = {'name': name,
'description': description,
'version': version}
if schema is not None:
json_data['schema'] = json.loads(schema) if isinstance(schema, str) else schema
if template is not None:
json_data['template'] = json.loads(template) if isinstance(template, str) else template
json_data.update(**kwargs)
response = self.session.post(url=self.get_full_url_for_path(template_path), json=json_data)
if response.status_code == 201:
logger.info("created new template: %s", json.loads(response.text)['url'])
else:
raise Exception("Could not POST template: " + response.text)
def append_to_subtask_raw_feedback(self, subtask_id: int, feedback: str) -> {}:
'''append the raw_feedback for the given subtask, and return the subtask with its new state, or raise an error'''
existing_feedback = self.get_path_as_json_object('/subtask/%s/' % (subtask_id))['raw_feedback']
if existing_feedback is None or existing_feedback is "":
new_feedback = feedback
else:
new_feedback = "%s\n%s" % (existing_feedback, feedback)
response = self.session.patch(url=self.get_full_url_for_path('/subtask/%s/' % (subtask_id,)),
json={'raw_feedback': new_feedback},
params={'format': 'json'})
if response.status_code >= 200 and response.status_code < 300:
return json.loads(response.content.decode('utf-8'))
content = response.content.decode('utf-8')
raise Exception("Could not append feedback to subtask with url %s - %s %s - %s" % (
response.request.url, response.status_code, responses.get(response.status_code), content))
def process_subtask_feedback_and_set_finished(self, subtask_id: int) -> {}:
'''process the raw_feedback of a given subtask and set the subtask to finished on succes. Return the subtask
with its new state, or raise an error'''
response = self.session.post(url=self.get_full_url_for_path('/subtask/%s/process_feedback_and_set_finished' % (subtask_id,)),
params={'format': 'json'})
if response.status_code >= 200 and response.status_code < 300:
return json.loads(response.content.decode('utf-8'))
content = response.content.decode('utf-8')
raise Exception("Could not process feedback with url %s - %s %s - %s" % (
response.request.url, response.status_code, responses.get(response.status_code), content))