Skip to content
Snippets Groups Projects
Commit c7b39007 authored by Hugh Dickinson's avatar Hugh Dickinson
Browse files

Initial rucio connection.

parent 2a55802e
Branches
No related tags found
2 merge requests!43Esap gateway query,!42Esap gateway rucio
import requests
import os
import json
rucio_url = "https://escape-rucio.cern.ch"
AUTH_PORT = 32301
STANDARD_PORT = 32300
RUCIO_AUTH_TOKEN = "grange-/DC=org/DC=terena/DC=tcs/C=NL/O=ASTRON/CN=Robot - Yan Grange 1086@astron.nl-unknown-809a62ca07bb471cac3012b6af752c86"
def validate():
url = os.path.join(f"{rucio_url}:{AUTH_PORT}", "auth", "validate")
response = requests.get(
url, headers={"X-Rucio-Auth-Token": RUCIO_AUTH_TOKEN}, verify=False
)
if response.ok:
return True
else:
return False
def get_scope_names():
# try:
validated = validate()
if validated:
url = os.path.join(f"{rucio_url}:{STANDARD_PORT}", "scopes")
response = requests.get(
url + "/", headers={"X-Rucio-Auth-Token": RUCIO_AUTH_TOKEN}, verify=False
)
if response.ok:
return json.loads(response.content)
else:
return [
"validated but failed query"
] # , val_response.status_code, val_response.reason]
else:
return ["not validated"] # , val_response.status_code, val_response.reason]
# except Exception as e:
# return ["Failed", "Authentication", e]
title = "Rucio"
logo = "http://rucio.cern.ch/images/wide_logo2.png"
# the url location of the frontend application,
# this makes it possible to install multiple instances in different directories on the webserver
# that all have their own urls like 'http://esap.astron.nl/esap-gui-dev/queries'
frontend_basename = "esap-rucio"
# definition of the navigation bar
nav1 = {"title": "Archives", "route": "/archives"}
nav2 = {"title": "Query", "route": "/query"}
navbar = [nav1, nav2]
# if datasets_enabled is set, then only these datasets are visible to the GUI
# datasets_enabled = ['apertif-observations','astron.ivoa.obscore']
# if datasets_disabled is set, then all datasets except these are returned to the GUI
# datasets_disabled = ['nancay.ivoa.obscore']
# definition of the query
query_schema = {
"name": "rucio",
"title": "Rucio Query",
"type": "object",
"properties": {
"scope": {
"type": "string",
"title": "Scope",
"enum": get_scope_names(),
"enumNames": get_scope_names(),
},
"resource_category": {
"type": "string",
"title": "Category",
"enum": ["files", "dids"],
"enumNames": ["Files", "DIDs"],
"default": "dids",
},
"catalog": {
"type": "string",
"enum": ["esap_rucio_entities"],
"enumNames": ["esap_rucio_entities"],
"default": "esap_rucio_entities",
},
},
}
ui_schema = {"catalog": {"ui:widget": "hidden"}}
No preview for this file type
No preview for this file type
...@@ -5,9 +5,10 @@ ...@@ -5,9 +5,10 @@
import json import json
import logging import logging
from inspect import currentframe, getframeinfo
from . import alta from . import alta
from . import vo, helio, vo_reg, zooniverse, lofar from . import vo, helio, vo_reg, zooniverse, lofar, rucio
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -34,6 +35,9 @@ def instantiate_connector(dataset): ...@@ -34,6 +35,9 @@ def instantiate_connector(dataset):
elif service_module.upper() == 'LOFAR': elif service_module.upper() == 'LOFAR':
connector_class = getattr(lofar, service_connector) connector_class = getattr(lofar, service_connector)
elif service_module.upper() == 'RUCIO':
connector_class = getattr(rucio, service_connector)
url = str(dataset.dataset_catalog.url) url = str(dataset.dataset_catalog.url)
connector = connector_class(url) connector = connector_class(url)
return connector return connector
...@@ -64,6 +68,8 @@ def create_query(datasets, query_params, override_resource=None, connector=None, ...@@ -64,6 +68,8 @@ def create_query(datasets, query_params, override_resource=None, connector=None,
# institute is valid, continue # institute is valid, continue
# build a result json structure for the input query # build a result json structure for the input query
result = {} result = {}
result['query'] = "empty"
result['error'] = []
result['query_id'] = dataset.uri result['query_id'] = dataset.uri
result['dataset'] = dataset.uri result['dataset'] = dataset.uri
result['dataset_name'] = dataset.name result['dataset_name'] = dataset.name
...@@ -100,13 +106,13 @@ def create_query(datasets, query_params, override_resource=None, connector=None, ...@@ -100,13 +106,13 @@ def create_query(datasets, query_params, override_resource=None, connector=None,
result['query'] = query result['query'] = query
result['where'] = where result['where'] = where
if errors is not None: if errors is not None and len(errors):
result['error'] = str(errors) result['error'].append(f"{getframeinfo(currentframe()).lineno}, {errors}")
except Exception as error: except Exception as error:
# connector not found. # connector not found.
# store the error in the result and continue # store the error in the result and continue
result["error"] = str(error) result["error"].append(f"{getframeinfo(currentframe()).filename}, {getframeinfo(currentframe()).lineno}, {type(error)}, {error}")
# usually, the returned result in 'query' is a single query. # usually, the returned result in 'query' is a single query.
# occasionally, it is a structure of queries that was created by iterating over a registery # occasionally, it is a structure of queries that was created by iterating over a registery
...@@ -119,7 +125,7 @@ def create_query(datasets, query_params, override_resource=None, connector=None, ...@@ -119,7 +125,7 @@ def create_query(datasets, query_params, override_resource=None, connector=None,
except Exception as error: except Exception as error:
# store the error in the result and continue # store the error in the result and continue
result["error"] = str(error) result["error"].append(f"{getframeinfo(currentframe()).lineno}, {type(error)}, {error}")
input_results.append(result) input_results.append(result)
except Exception as error: except Exception as error:
......
...@@ -9,50 +9,24 @@ from .query_base import query_base ...@@ -9,50 +9,24 @@ from .query_base import query_base
import requests import requests
import json import json
import logging import logging
import string
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
AMP_REPLACEMENT = '_and_' AMP_REPLACEMENT = "_and_"
# The request header # The request header
RUCIO_HOST = "https://escape-rucio.cern.ch:32300" RUCIO_HOST = "https://escape-rucio.cern.ch:32300"
RUCIO_PORT = 32300
RUCIO_AUTH_TOKEN = "grange-/DC=org/DC=terena/DC=tcs/C=NL/O=ASTRON/CN=Robot - Yan Grange 1086@astron.nl-unknown-809a62ca07bb471cac3012b6af752c86"
# -------------------------------------------------------------------------------------------------------------------- URLPATTERNS = dict(
scope="{host}/scopes/",
dids="{host}/dids/{scope}/",
def get_data_from_rucio(query): files="{host}/dids/{scope}/files/",
""" use Rucio REST API to query the data lake """ )
# authenticate user using X509 certificates # --------------------------------------------------------------------------------------------------------------------
# curl --insecure -i --cert ~/.globus/client.crt -i --key ~/.globus/client.key -i -H "X-Rucio-Account: meyer" -X GET "https://escape-rucio.cern.ch:32301/auth/x509"
# export RUCIO_AUTH_TOKEN="meyer-/DC=org/DC=terena/DC=tcs/C=NL/O=ASTRON/CN=meyer 1775@astron.nl-unknown-*"
# validate user
# curl --insecure -X GET -H "X-Rucio-Auth-Token: $RUCIO_AUTH_TOKEN" https://escape-rucio.cern.ch:32301/auth/validate
# query DIDs with scope-name LOFAR_ASTRON_GRANGE
# curl --insecure -X GET -H "X-Rucio-Auth-Token: $RUCIO_AUTH_TOKEN" https://escape-rucio.cern.ch:32300/dids/LOFAR_ASTRON_GRANGE/
# list of scope names
# ESCAPE_CERN_TEAM-noise
# CMS_INFN_DCIANGOT
# SKA_SKAO_COLLINSON
# ESCAPE_DESY_TEAM-testing
# FAIR_GSI_SZUBA
# SKA_SKAO_JOSHI-testing
# CTA_LAPP_FREDERIC
# SKA_SKAO_BARNSLEY-testing
# ESCAPE_CERN_TEAM
# VIRGO_EGO_CHANIAL
# ESCAPE_CERN_TEAM-testing
# LSST_CCIN2P3_GOUNON
# ATLAS_LAPP_JEZEQUEL
# SKA_SKAO_COLL-testing
# MAGIC_PIC_BRUZZESE
# LOFAR_ASTRON_GRANGE
logger.info(results)
return list(results)
class rucio_connector(query_base): class rucio_connector(query_base):
...@@ -64,87 +38,123 @@ class rucio_connector(query_base): ...@@ -64,87 +38,123 @@ class rucio_connector(query_base):
def __init__(self, url): def __init__(self, url):
self.url = url self.url = url
# construct a query for this type of service # construct a query for the Rucio REST API
def construct_query(
def construct_query(self, dataset, esap_query_params, translation_parameters, equinox): self, dataset, esap_query_params, translation_parameters, equinox
):
where = '' query = {}
where = {}
errors = [] errors = []
# translate the esap_parameters to specific catalog parameters query = dict(
for esap_param in esap_query_params: resource_category=esap_query_params.pop("resource_category", ["dids"])[0]
esap_key = esap_param )
value = esap_query_params[esap_key][0]
url_pattern = URLPATTERNS.get(
query["resource_category"], URLPATTERNS.get("dids")
)
url_pattern_fields = [
field[1] for field in string.Formatter().parse(url_pattern)
]
try: try:
dataset_key = translation_parameters[esap_key] url_params = {
field: esap_query_params.pop(field, "Missing")[0]
# because '&' has a special meaning in urls (specifying a parameter) replace it with for field in url_pattern_fields
# something harmless during serialization. if field is not None and field != "host"
where = where + dataset_key + '=' + value + AMP_REPLACEMENT }
except Exception as error: # translate the remianing esap_parameters to specific catalog parameters
# if the parameter could not be translateget_data_from_lofard not translating key " + where = {
esap_key + ' ' + str(error)+', using it raw.') translation_parameters.get(key, key): value[0]
# errors.append("ERROR: translating key " + esap_key + ' ' + str(error)) for key, value in esap_query_params.items()
if key not in ["catalog"]
# if query ends with a separation character then cut it off }
if where.endswith(AMP_REPLACEMENT): query = dict(
where=where[:-len(AMP_REPLACEMENT)] query_info=dict(
url_pattern=url_pattern, url_params=url_params, where=where
# Zheng, this is where you could change the format of the Rucio query. )
# this is not required, you can also leave it like this. )
# The 'query' variable that is returned is already translated with the Rucio parameter_mapping except Exception as e:
# here. I only used some example paramters, so you may still want to change the parameter_mapping. errors.append(f"Rucio Connector {type(e)} {e}")
# construct the query url
# for now simply like: 'https://escape-rucio.cern.ch:32300/dids/LOFAR_ASTRON_GRANGE/'
query=self.url + '?' + where
logger.info('construct_query: '+query)
return query, where, errors return query, where, errors
def run_query(self, dataset, dataset_name, query, override_access_url = None, override_service_type = None): def _get_data_from_rucio(self, query):
""" use Rucio REST API to query the data lake """
query_info = query["query_info"]
url = query_info["url_pattern"].format(
host=f"{self.url}:{RUCIO_PORT}", **query_info["url_params"]
)
response = requests.get(
url,
query_info["where"],
headers={"X-Rucio-Auth-Token": RUCIO_AUTH_TOKEN},
verify=False,
)
results = []
if response.ok and len(response.content.strip()):
results = [
json.loads(element)
for element in response.content.decode("utf-8").strip().split("\n")
]
return results
def run_query(
self,
dataset,
dataset_name,
query,
override_access_url=None,
override_service_type=None,
):
""" """
:param dataset: the dataset object that must be queried :param dataset: the dataset object that must be queried
:param query_params: the incoming esap query parameters) :param query_params: the incoming esap query parameters)
:return: results: an array of dicts with the following structure; :return: results: an array of dicts with the following structure;
""" """
logger.info('query:'+query) logger.info("query:" + str(query))
results = [] results = []
# create a function that reads the data from lofar # create a function that reads the data from lofar
# rucio_results = get_data_from_rucio(query) rucio_results = self._get_data_from_rucio(query)
try: return rucio_results
for rucio_result in rucio_results:
record={}
record['name']=rucio_result['name']
record['parent']=rucio_result['parent']
record['level']=rucio_result['level']
record['bytes']=rucio_result['bytes']
record['scope']=rucio_result['scope']
record['type']=rucio_result['type']
results.append(record) # custom serializer for the 'query' endpoint
except Exception as error: class TypeToSerializerMap:
return "ERROR: " + str(error)
return results map = {
type(float): serializers.FloatField(),
type(int): serializers.IntegerField(),
type(str): serializers.CharField(),
type(dict): serializers.DictField(),
type(list): serializers.ListField(),
}
# custom serializer for the 'query' endpoint @classmethod
def getFieldForType(cls, value):
return cls.map.get(type(value), serializers.JSONField())
class CreateAndRunQuerySerializer(serializers.Serializer): class CreateAndRunQuerySerializer(serializers.Serializer):
"""
Custom serializer classes implement dynamic field definition based on
the contents of the query passed to it.
"""
def __init__(self, *args, **kwargs):
# Zheng: this defines the structure of the response to /esap/query/query for Rucio self.example_result = kwargs.get("instance", [])[0]
# the fields should be the same as in run-query
name=serializers.CharField() super().__init__(*args, **kwargs)
parent=serializers.CharField()
level=serializers.IntegerField()
size_in_bytes=serializers.IntegerField()
scope=serializers.CharField()
result_type=serializers.CharField()
class Meta: self.fields.update(
fields='__all__' {
key: rucio_connector.TypeToSerializerMap.getFieldForType(value)
for key, value in self.example_result.items()
}
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment