Skip to content
Snippets Groups Projects
Select Git revision
  • b7038f981b7a997ab1ea285f49e71c3f0635e9d7
  • master default protected
  • MAM-56-prepare-update-for-sip-version-3
  • TMSS-1777
  • SDC-545_update_SIP
  • lofar_repo
  • 2.7.1
  • 2.8.0
8 results

feedback.py

Blame
  • query.py 6.25 KiB
    # This module allows querying MoM / the catalog for SIPs of related dataproducts that can be added with the full history to a new SIP.
    # This is preliminary, for use by the pilot user. Should be cleaned up / replaced by some alternative method
    
    import urllib.request, urllib.parse, urllib.error
    import requests
    from os.path import expanduser, exists
    import os
    import xmlrpc.client
    from configparser import ConfigParser
    import re
    import logging
    
    
    def _hide_password(message):
        url_regex = r'^(\w*://\w*:)\w*(@\w*\.\w*.\w*)'
        return re.sub(url_regex, '\g<1>*****@\g<2>', message)
    
    
    class LTAClientExistsAlready(Exception):
        pass
    
    
    class LTAClientError(Exception):
        pass
    
    class LTAIDAlreadyExists(Exception):
        pass
    
    def lta_on_error_print(func):
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except xmlrpc.client.Fault as error:
                logging.error('error sending request on LTA rpc with code %s: %s', error.faultCode,
                              _hide_password(error.faultString))
                raise LTAClientError()
            except Exception as e:
                raise e
        return wrapper
    
    
    def parse_config(path=None):
        default_path = expanduser("~/.siplibrc")
        config = ConfigParser()
        if path:
            config_path = path
        else:
            config_path = default_path
    
        found = config.read([config_path])
        if not found:
            raise SystemExit(f'Cannot find configuration files {default_path}: please create one')
        return config
    
    
    DEFAULT_CONFIG = parse_config()
    
    user = DEFAULT_CONFIG['DEFAULT']['user']
    passw = DEFAULT_CONFIG['DEFAULT']['passw']
    host = DEFAULT_CONFIG['DEFAULT']['host']
    
    login_data = {
        'j_username': user,
        'j_password': passw
    }
    
    
    class RequestsTransport(xmlrpc.client.SafeTransport):
        """
        Transport method which supports proxies
        """
        # change our user agent to reflect Requests
        user_agent = "Python XMLRPC with Requests (python-requests.org)"
    
        def __init__(self, use_https=True, cert=None, verify=None, *args, **kwargs):
            self.cert = cert
            self.verify = verify
            self.use_https = use_https
            xmlrpc.client.SafeTransport.__init__(self, *args, **kwargs)
    
        def request(self, host, handler, request_body, verbose=False):
            """
            Make an xmlrpc request.
            """
            headers = {'User-Agent': self.user_agent}
            url = self._build_url(host, handler)
            proxies = {}
            if 'http_proxy' in os.environ:
                proxies['http_proxy'] = os.environ['http_proxy']
            if 'https_proxy' in os.environ:
                proxies['https_proxy'] = os.environ['https_proxy']
            try:
                resp = requests.post(url, data=request_body, headers=headers,
                                     stream=True,
                                     cert=self.cert, verify=self.verify,
                                     proxies=proxies)
            except ValueError:
                raise
            except Exception:
                raise  # something went wrong
            else:
                try:
                    resp.raise_for_status()
                except requests.RequestException as e:
                    raise xmlrpc.client.ProtocolError(url, resp.status_code,
                                                      str(e), resp.headers)
                else:
                    self.verbose = verbose
                    return self.parse_response(resp.raw)
    
        def _build_url(self, host, handler):
            """
            Build a url for our request based on the host, handler and use_http
            property
            """
            scheme = 'https' if self.use_https else 'http'
            return '%s://%s/%s' % (scheme, host, handler.lstrip('/'))
    
    
    class SafeRequestsTransport(RequestsTransport):
        def __init__(self, *args, cert=None, verify=None, **kwargs):
            super(SafeRequestsTransport, self).__init__(*args, use_https=True, cert=cert, verify=verify, **kwargs)
    
    
    class UnSafeRequestsTransport(RequestsTransport):
        def __init__(self, *args, **kwargs):
            super(UnSafeRequestsTransport, self).__init__(*args, use_https=False, cert=None, verify=None, **kwargs)
    
    
    url = 'https://' + user + ':' + passw + '@' + host
    client = xmlrpc.client.ServerProxy(url, transport=SafeRequestsTransport())
    
    
    def _call_idservice(source, userlabel=None):
        if userlabel is not None:
            response = client.GetUniqueIDForLabel(source, userlabel)
        else:
            response = client.GetUniqueID(source)
        return response
    
    
    @lta_on_error_print
    def create_unique_id(source, userlabel=None):
        """
        Creates a new unique numeric identifier in the LTA catalog for the given source name.
        An optional userlabel can be assigned to later query the identifier based on this String.
        Throws an exception if the given label already exists for the given source.
        """
        response = _call_idservice(source, userlabel)
        if not response.get("result") == "ok":
            raise Exception('An identifier for this userlabel could not be created -> ' + str(response.get("error")))
        if not response.get("is_new"):
            raise LTAIDAlreadyExists('An identifier for this userlabel already exists -> ' + str(userlabel))
        return response.get('id')
    
    
    @lta_on_error_print
    def get_unique_id(source, userlabel):
        """
        Queries an existing numeric ID from the LTA catalog based on it's userlabel (which had
        to be assigned at the time the Identifier is created to allow this lookup to work).
        Throws an exception if the given label does not exist for the given source.
        """
        response = _call_idservice(source, userlabel)
        if not response.get("result") == "ok":
            raise Exception('An identifier for this userlabel could not be retrieved -> ' + str(response.get("error")))
        if response.get("is_new"):
            raise Exception('An identifier for this userlabel does not exist -> ' + str(userlabel))
        return response.get('id')
    
    
    @lta_on_error_print
    def get_dataproduct_sip(projectname, dataproductid):
        response = client.GetSip(projectname, dataproductid)
        error = response.get('error')
        if error:
            raise LTAClientError(_hide_password(error))
        return response.get('sip')
    
    
    @lta_on_error_print
    def get_dataproduct_ids(projectname, sasid):
        response = client.GetDataProductIDS(projectname, sasid)
        error = response.get('error')
        if error:
            raise LTAClientError(_hide_password(error))
    
        return response.get("ids")