Select Git revision
add_virtual_instrument.sql
-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
query.py 6.25 KiB
# This module allows querying MoM / the catalog for SIPs of related dataproducts that can be added with the full history to a new SIP.
# This is preliminary, for use by the pilot user. Should be cleaned up / replaced by some alternative method
import urllib.request, urllib.parse, urllib.error
import requests
from os.path import expanduser, exists
import os
import xmlrpc.client
from configparser import ConfigParser
import re
import logging
def _hide_password(message):
url_regex = r'^(\w*://\w*:)\w*(@\w*\.\w*.\w*)'
return re.sub(url_regex, '\g<1>*****@\g<2>', message)
class LTAClientExistsAlready(Exception):
pass
class LTAClientError(Exception):
pass
class LTAIDAlreadyExists(Exception):
pass
def lta_on_error_print(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except xmlrpc.client.Fault as error:
logging.error('error sending request on LTA rpc with code %s: %s', error.faultCode,
_hide_password(error.faultString))
raise LTAClientError()
except Exception as e:
raise e
return wrapper
def parse_config(path=None):
default_path = expanduser("~/.siplibrc")
config = ConfigParser()
if path:
config_path = path
else:
config_path = default_path
found = config.read([config_path])
if not found:
raise SystemExit(f'Cannot find configuration files {default_path}: please create one')
return config
DEFAULT_CONFIG = parse_config()
user = DEFAULT_CONFIG['DEFAULT']['user']
passw = DEFAULT_CONFIG['DEFAULT']['passw']
host = DEFAULT_CONFIG['DEFAULT']['host']
login_data = {
'j_username': user,
'j_password': passw
}
class RequestsTransport(xmlrpc.client.SafeTransport):
"""
Transport method which supports proxies
"""
# change our user agent to reflect Requests
user_agent = "Python XMLRPC with Requests (python-requests.org)"
def __init__(self, use_https=True, cert=None, verify=None, *args, **kwargs):
self.cert = cert
self.verify = verify
self.use_https = use_https
xmlrpc.client.SafeTransport.__init__(self, *args, **kwargs)
def request(self, host, handler, request_body, verbose=False):
"""
Make an xmlrpc request.
"""
headers = {'User-Agent': self.user_agent}
url = self._build_url(host, handler)
proxies = {}
if 'http_proxy' in os.environ:
proxies['http_proxy'] = os.environ['http_proxy']
if 'https_proxy' in os.environ:
proxies['https_proxy'] = os.environ['https_proxy']
try:
resp = requests.post(url, data=request_body, headers=headers,
stream=True,
cert=self.cert, verify=self.verify,
proxies=proxies)
except ValueError:
raise
except Exception:
raise # something went wrong
else:
try:
resp.raise_for_status()
except requests.RequestException as e:
raise xmlrpc.client.ProtocolError(url, resp.status_code,
str(e), resp.headers)
else:
self.verbose = verbose
return self.parse_response(resp.raw)
def _build_url(self, host, handler):
"""
Build a url for our request based on the host, handler and use_http
property
"""
scheme = 'https' if self.use_https else 'http'
return '%s://%s/%s' % (scheme, host, handler.lstrip('/'))
class SafeRequestsTransport(RequestsTransport):
def __init__(self, *args, cert=None, verify=None, **kwargs):
super(SafeRequestsTransport, self).__init__(*args, use_https=True, cert=cert, verify=verify, **kwargs)
class UnSafeRequestsTransport(RequestsTransport):
def __init__(self, *args, **kwargs):
super(UnSafeRequestsTransport, self).__init__(*args, use_https=False, cert=None, verify=None, **kwargs)
url = 'https://' + user + ':' + passw + '@' + host
client = xmlrpc.client.ServerProxy(url, transport=SafeRequestsTransport())
def _call_idservice(source, userlabel=None):
if userlabel is not None:
response = client.GetUniqueIDForLabel(source, userlabel)
else:
response = client.GetUniqueID(source)
return response
@lta_on_error_print
def create_unique_id(source, userlabel=None):
"""
Creates a new unique numeric identifier in the LTA catalog for the given source name.
An optional userlabel can be assigned to later query the identifier based on this String.
Throws an exception if the given label already exists for the given source.
"""
response = _call_idservice(source, userlabel)
if not response.get("result") == "ok":
raise Exception('An identifier for this userlabel could not be created -> ' + str(response.get("error")))
if not response.get("is_new"):
raise LTAIDAlreadyExists('An identifier for this userlabel already exists -> ' + str(userlabel))
return response.get('id')
@lta_on_error_print
def get_unique_id(source, userlabel):
"""
Queries an existing numeric ID from the LTA catalog based on it's userlabel (which had
to be assigned at the time the Identifier is created to allow this lookup to work).
Throws an exception if the given label does not exist for the given source.
"""
response = _call_idservice(source, userlabel)
if not response.get("result") == "ok":
raise Exception('An identifier for this userlabel could not be retrieved -> ' + str(response.get("error")))
if response.get("is_new"):
raise Exception('An identifier for this userlabel does not exist -> ' + str(userlabel))
return response.get('id')
@lta_on_error_print
def get_dataproduct_sip(projectname, dataproductid):
response = client.GetSip(projectname, dataproductid)
error = response.get('error')
if error:
raise LTAClientError(_hide_password(error))
return response.get('sip')
@lta_on_error_print
def get_dataproduct_ids(projectname, sasid):
response = client.GetDataProductIDS(projectname, sasid)
error = response.get('error')
if error:
raise LTAClientError(_hide_password(error))
return response.get("ids")