Commit 3640cf4e authored by Nico Vermaas's avatar Nico Vermaas

Merge branch 'esap-gateway-query' into 'master'

Esap gateway query

See merge request astron-sdc/esap-api-gateway!43
parents 29ce95dd 92be2fe0
......@@ -36,6 +36,51 @@ class EsapShoppingItemSerializer(serializers.HyperlinkedModelSerializer):
class EsapUserProfileSerializer(serializers.HyperlinkedModelSerializer):
shopping_cart = EsapShoppingItemSerializer(
many=True,
# view_name="shopping-items",
read_only=False,
# queryset=EsapShoppingItem.objects.all(),
)
def update(self, instance, validated_data):
# Do not allow the user name to be updated - it is the primary key
_ = validated_data.pop("user_name", None)
for m2m_field in [
"software_repositories",
"compute_resources",
"shopping_cart",
]:
field_data = validated_data.pop(m2m_field, None)
if field_data is not None:
if len(field_data[0]) == 0:
raise RuntimeError(f"WTF! {validated_data}")
field_instances = [
getattr(instance, m2m_field).model.objects.create(
item_data=str(dict(field_datum))
)
for field_datum in field_data
]
getattr(instance, m2m_field).add(*field_instances)
for key, value in validated_data.items():
setattr(instance, key, value)
instance.save()
return instance
def to_internal_value(self, data):
internal_value = super().to_internal_value(data)
for m2m_field in [
"software_repositories",
"compute_resources",
"shopping_cart",
]:
field_data = data.get(m2m_field, None)
if field_data is not None:
internal_value.update({m2m_field: field_data})
return internal_value
class Meta:
model = EsapUserProfile
fields = [
......
from rest_framework import viewsets
from rest_framework import permissions
from .serializers import *
from ..models import *
from django.contrib import auth
class EsapQuerySchemaViewSet(viewsets.ModelViewSet):
"""
......@@ -10,7 +13,7 @@ class EsapQuerySchemaViewSet(viewsets.ModelViewSet):
queryset = EsapQuerySchema.objects.all().order_by("schema_name")
serializer_class = EsapQuerySchemaSerializer
permission_classes = []
permission_classes = [permissions.AllowAny]
class EsapComputeResourceViewSet(viewsets.ModelViewSet):
......@@ -20,7 +23,7 @@ class EsapComputeResourceViewSet(viewsets.ModelViewSet):
queryset = EsapComputeResource.objects.all().order_by("resource_name")
serializer_class = EsapComputeResourceSerializer
permission_classes = []
permission_classes = [permissions.AllowAny]
class EsapSoftwareRepositoryViewSet(viewsets.ModelViewSet):
......@@ -30,7 +33,7 @@ class EsapSoftwareRepositoryViewSet(viewsets.ModelViewSet):
queryset = EsapSoftwareRepository.objects.all().order_by("repository_name")
serializer_class = EsapSoftwareRepositorySerializer
permission_classes = []
permission_classes = [permissions.AllowAny]
class EsapShoppingItemViewSet(viewsets.ModelViewSet):
......@@ -40,7 +43,7 @@ class EsapShoppingItemViewSet(viewsets.ModelViewSet):
queryset = EsapShoppingItem.objects.all()
serializer_class = EsapShoppingItemSerializer
permission_classes = []
permission_classes = [permissions.AllowAny]
class EsapUserProfileViewSet(viewsets.ModelViewSet):
......@@ -50,9 +53,13 @@ class EsapUserProfileViewSet(viewsets.ModelViewSet):
queryset = EsapUserProfile.objects.all().order_by("user_name")
serializer_class = EsapUserProfileSerializer
permission_classes = []
permission_classes = [permissions.AllowAny]
def get_queryset(self):
# Returns nothing if no user_name supplied instead of all
user_name = self.request.query_params.get("user_name", None)
return EsapUserProfile.objects.filter(user_name=user_name)
user = auth.get_user(self.request)
if user is None:
user_name = self.request.query_params.get("user_name", None)
return EsapUserProfile.objects.filter(user_name=user_name)
user_email = user.email
return EsapUserProfile.objects.filter(user_email=user_email)
......@@ -69,7 +69,7 @@ class EsapShoppingItem(models.Model):
class EsapUserProfile(models.Model):
user_name = models.CharField("Username", max_length=50)
user_name = models.CharField("Username", max_length=50, primary_key=True)
full_name = models.CharField("Full Name", max_length=100, null=True)
user_email = models.EmailField("User Email")
query_schema = models.ForeignKey(
......@@ -86,7 +86,7 @@ class EsapUserProfile(models.Model):
to=EsapComputeResource, verbose_name="Compute Resources", blank=True
)
shopping_cart = models.ManyToManyField(
to=EsapShoppingItem, verbose_name="Shopping Cart", blank=True
to=EsapShoppingItem, verbose_name="Shopping Cart", blank=True,
)
def __unicode__(self):
......
......@@ -12,12 +12,23 @@ services:
esap_api:
container_name: esap_api_query
image: esap_api_query:latest
ports:
- 5555:8000
# ports:
# - 5558:8000
expose:
- 8000
networks:
- traefik_proxy
- esap_network
labels:
- "traefik.enable=true"
- "traefik.http.routers.esap-api.entryPoints=esap-api-query"
- "traefik.http.routers.esap-api.service=esap-api"
- "traefik.http.routers.esap-api.rule=Host(`sdc.astron.nl`) && PathPrefix(`/esap-api`)"
- "traefik.http.services.esap-api.loadbalancer.server.port=8000"
- "traefik.http.routers.esap-qstatic.entryPoints=esap-api-query"
- "traefik.http.routers.esap-qstatic.service=esap-qstatic"
- "traefik.http.routers.esap-qstatic.rule=Host(`sdc.astron.nl`) && PathPrefix(`/static`)"
- "traefik.http.services.esap-qstatic.loadbalancer.server.port=8000"
env_file:
- $HOME/shared/oidc
restart: always
......@@ -44,11 +55,35 @@ services:
container_name: esap_nginx
image: esap_nginx:latest
networks:
- traefik_proxy
- esap_network
ports:
- 80:80
labels:
- "traefik.enable=true"
- "traefik.http.routers.esap-gui.entryPoints=websecure"
- "traefik.http.routers.esap-gui.service=esap-gui"
- "traefik.http.routers.esap-gui.rule=Host(`sdc.astron.nl`) && PathPrefix(`/esap-gui`)"
- "traefik.http.services.esap-gui.loadbalancer.server.port=80"
- "traefik.http.routers.esap-static.entryPoints=websecure"
- "traefik.http.routers.esap-static.service=esap-static"
- "traefik.http.routers.esap-static.rule=Host(`sdc.astron.nl`) && PathPrefix(`/static`)"
- "traefik.http.services.esap-static.loadbalancer.server.port=80"
- "traefik.http.routers.esap-static_esap.entryPoints=websecure"
- "traefik.http.routers.esap-static_esap.service=esap-static_esap"
- "traefik.http.routers.esap-static_esap.rule=Host(`sdc.astron.nl`) && PathPrefix(`/static_esap`)"
- "traefik.http.services.esap-static_esap.loadbalancer.server.port=80"
- "traefik.http.routers.esap-gui-api.entryPoints=websecure"
- "traefik.http.routers.esap-gui-api.service=esap-gui-api"
- "traefik.http.routers.esap-gui-api.rule=Host(`sdc.astron.nl`) && PathPrefix(`/esap-api`)"
- "traefik.http.services.esap-gui-api.loadbalancer.server.port=80"
- "traefik.http.routers.esap-oidc.entryPoints=websecure"
- "traefik.http.routers.esap-oidc.service=esap-oidc"
- "traefik.http.routers.esap-oidc.rule=Host(`sdc.astron.nl`) && PathPrefix(`/oidc`)"
- "traefik.http.services.esap-oidc.loadbalancer.server.port=80"
# ports:
# - 80:80
volumes:
- $HOME/shared:/shared
- $HOME/shared:/etc/nginx/conf.d
- $HOME/shared/static:/static
restart: always
import requests
import os
import json
rucio_url = "https://escape-rucio.cern.ch"
AUTH_PORT = 32301
STANDARD_PORT = 32300
RUCIO_AUTH_TOKEN = "<REDACTED>"
def validate():
url = os.path.join(f"{rucio_url}:{AUTH_PORT}", "auth", "validate")
response = requests.get(
url, headers={"X-Rucio-Auth-Token": RUCIO_AUTH_TOKEN}, verify=False
)
if response.ok:
return True
else:
return False
def get_scope_names():
# try:
validated = validate()
if validated:
url = os.path.join(f"{rucio_url}:{STANDARD_PORT}", "scopes")
response = requests.get(
url + "/", headers={"X-Rucio-Auth-Token": RUCIO_AUTH_TOKEN}, verify=False
)
if response.ok:
return json.loads(response.content)
else:
return [
"validated but failed query"
] # , val_response.status_code, val_response.reason]
else:
return ["not validated"] # , val_response.status_code, val_response.reason]
# except Exception as e:
# return ["Failed", "Authentication", e]
title = "Rucio"
logo = "http://rucio.cern.ch/images/wide_logo2.png"
# the url location of the frontend application,
# this makes it possible to install multiple instances in different directories on the webserver
# that all have their own urls like 'http://esap.astron.nl/esap-gui-dev/queries'
frontend_basename = "esap-rucio"
# definition of the navigation bar
nav1 = {"title": "Archives", "route": "/archives"}
nav2 = {"title": "Query", "route": "/query"}
navbar = [nav1, nav2]
# if datasets_enabled is set, then only these datasets are visible to the GUI
# datasets_enabled = ['apertif-observations','astron.ivoa.obscore']
# if datasets_disabled is set, then all datasets except these are returned to the GUI
# datasets_disabled = ['nancay.ivoa.obscore']
# definition of the query
query_schema = {
"name": "rucio",
"title": "Rucio Query",
"type": "object",
"properties": {
"scope": {
"type": "string",
"title": "Scope",
"enum": get_scope_names(),
"enumNames": get_scope_names(),
},
"resource_category": {
"type": "string",
"title": "Category",
"enum": ["files", "dids", "replicas"],
"enumNames": ["Files", "DIDs", "Replicas"],
"default": "dids",
},
"catalog": {
"type": "string",
"enum": ["esap_rucio_entities"],
"enumNames": ["esap_rucio_entities"],
"default": "esap_rucio_entities",
},
},
}
ui_schema = {"catalog": {"ui:widget": "hidden"}}
......@@ -5,9 +5,10 @@
import json
import logging
from inspect import currentframe, getframeinfo
from . import alta
from . import vo, helio, vo_reg, zooniverse, lofar
from . import vo, helio, vo_reg, zooniverse, lofar, rucio
logger = logging.getLogger(__name__)
......@@ -34,6 +35,9 @@ def instantiate_connector(dataset):
elif service_module.upper() == 'LOFAR':
connector_class = getattr(lofar, service_connector)
elif service_module.upper() == 'RUCIO':
connector_class = getattr(rucio, service_connector)
url = str(dataset.dataset_catalog.url)
connector = connector_class(url)
return connector
......@@ -64,6 +68,8 @@ def create_query(datasets, query_params, override_resource=None, connector=None,
# institute is valid, continue
# build a result json structure for the input query
result = {}
result['query'] = "empty"
result['error'] = []
result['query_id'] = dataset.uri
result['dataset'] = dataset.uri
result['dataset_name'] = dataset.name
......@@ -100,13 +106,13 @@ def create_query(datasets, query_params, override_resource=None, connector=None,
result['query'] = query
result['where'] = where
if errors is not None:
result['error'] = str(errors)
if errors is not None and len(errors):
result['error'].append(f"{getframeinfo(currentframe()).lineno}, {errors}")
except Exception as error:
# connector not found.
# store the error in the result and continue
result["error"] = str(error)
result["error"].append(f"{getframeinfo(currentframe()).filename}, {getframeinfo(currentframe()).lineno}, {type(error)}, {error}")
# usually, the returned result in 'query' is a single query.
# occasionally, it is a structure of queries that was created by iterating over a registery
......@@ -119,7 +125,7 @@ def create_query(datasets, query_params, override_resource=None, connector=None,
except Exception as error:
# store the error in the result and continue
result["error"] = str(error)
result["error"].append(f"{getframeinfo(currentframe()).lineno}, {type(error)}, {error}")
input_results.append(result)
except Exception as error:
......
......@@ -9,50 +9,25 @@ from .query_base import query_base
import requests
import json
import logging
import string
logger = logging.getLogger(__name__)
AMP_REPLACEMENT = '_and_'
AMP_REPLACEMENT = "_and_"
# The request header
RUCIO_HOST = "https://escape-rucio.cern.ch:32300"
RUCIO_PORT = 32300
RUCIO_AUTH_TOKEN = "<REDACTED>"
# --------------------------------------------------------------------------------------------------------------------
def get_data_from_rucio(query):
""" use Rucio REST API to query the data lake """
# authenticate user using X509 certificates
# curl --insecure -i --cert ~/.globus/client.crt -i --key ~/.globus/client.key -i -H "X-Rucio-Account: meyer" -X GET "https://escape-rucio.cern.ch:32301/auth/x509"
# export RUCIO_AUTH_TOKEN="meyer-/DC=org/DC=terena/DC=tcs/C=NL/O=ASTRON/CN=meyer 1775@astron.nl-unknown-*"
# validate user
# curl --insecure -X GET -H "X-Rucio-Auth-Token: $RUCIO_AUTH_TOKEN" https://escape-rucio.cern.ch:32301/auth/validate
URLPATTERNS = dict(
scope="{host}/scopes/",
dids="{host}/dids/{scope}/",
files="{host}/dids/{scope}/files/",
replicas="{host}/replicas/{scope}/"
)
# query DIDs with scope-name LOFAR_ASTRON_GRANGE
# curl --insecure -X GET -H "X-Rucio-Auth-Token: $RUCIO_AUTH_TOKEN" https://escape-rucio.cern.ch:32300/dids/LOFAR_ASTRON_GRANGE/
# list of scope names
# ESCAPE_CERN_TEAM-noise
# CMS_INFN_DCIANGOT
# SKA_SKAO_COLLINSON
# ESCAPE_DESY_TEAM-testing
# FAIR_GSI_SZUBA
# SKA_SKAO_JOSHI-testing
# CTA_LAPP_FREDERIC
# SKA_SKAO_BARNSLEY-testing
# ESCAPE_CERN_TEAM
# VIRGO_EGO_CHANIAL
# ESCAPE_CERN_TEAM-testing
# LSST_CCIN2P3_GOUNON
# ATLAS_LAPP_JEZEQUEL
# SKA_SKAO_COLL-testing
# MAGIC_PIC_BRUZZESE
# LOFAR_ASTRON_GRANGE
logger.info(results)
return list(results)
# --------------------------------------------------------------------------------------------------------------------
class rucio_connector(query_base):
......@@ -64,87 +39,123 @@ class rucio_connector(query_base):
def __init__(self, url):
self.url = url
# construct a query for this type of service
# construct a query for the Rucio REST API
def construct_query(
self, dataset, esap_query_params, translation_parameters, equinox
):
def construct_query(self, dataset, esap_query_params, translation_parameters, equinox):
where = ''
query = {}
where = {}
errors = []
# translate the esap_parameters to specific catalog parameters
for esap_param in esap_query_params:
esap_key = esap_param
value = esap_query_params[esap_key][0]
query = dict(
resource_category=esap_query_params.pop("resource_category", ["dids"])[0]
)
try:
dataset_key = translation_parameters[esap_key]
url_pattern = URLPATTERNS.get(
query["resource_category"], URLPATTERNS.get("dids")
)
# because '&' has a special meaning in urls (specifying a parameter) replace it with
# something harmless during serialization.
where = where + dataset_key + '=' + value + AMP_REPLACEMENT
url_pattern_fields = [
field[1] for field in string.Formatter().parse(url_pattern)
]
except Exception as error:
# if the parameter could not be translateget_data_from_lofard not translating key " +
esap_key + ' ' + str(error)+', using it raw.')
# errors.append("ERROR: translating key " + esap_key + ' ' + str(error))
try:
url_params = {
field: esap_query_params.pop(field, "Missing")[0]
for field in url_pattern_fields
if field is not None and field != "host"
}
# translate the remianing esap_parameters to specific catalog parameters
where = {
translation_parameters.get(key, key): value[0]
for key, value in esap_query_params.items()
if key not in ["catalog"]
}
query = dict(
query_info=dict(
url_pattern=url_pattern, url_params=url_params, where=where
)
)
except Exception as e:
errors.append(f"Rucio Connector {type(e)} {e}")
# if query ends with a separation character then cut it off
if where.endswith(AMP_REPLACEMENT):
where=where[:-len(AMP_REPLACEMENT)]
return query, where, errors
# Zheng, this is where you could change the format of the Rucio query.
# this is not required, you can also leave it like this.
# The 'query' variable that is returned is already translated with the Rucio parameter_mapping
# here. I only used some example paramters, so you may still want to change the parameter_mapping.
def _get_data_from_rucio(self, query):
""" use Rucio REST API to query the data lake """
query_info = query["query_info"]
url = query_info["url_pattern"].format(
host=f"{self.url}:{RUCIO_PORT}", **query_info["url_params"]
)
response = requests.get(
url,
query_info["where"],
headers={"X-Rucio-Auth-Token": RUCIO_AUTH_TOKEN},
verify=False,
)
results = []
if response.ok and len(response.content.strip()):
results = [
json.loads(element)
for element in response.content.decode("utf-8").strip().split("\n")
]
# construct the query url
# for now simply like: 'https://escape-rucio.cern.ch:32300/dids/LOFAR_ASTRON_GRANGE/'
query=self.url + '?' + where
logger.info('construct_query: '+query)
return query, where, errors
return results
def run_query(self, dataset, dataset_name, query, override_access_url = None, override_service_type = None):
def run_query(
self,
dataset,
dataset_name,
query,
override_access_url=None,
override_service_type=None,
):
"""
:param dataset: the dataset object that must be queried
:param query_params: the incoming esap query parameters)
:return: results: an array of dicts with the following structure;
"""
logger.info('query:'+query)
results=[]
logger.info("query:" + str(query))
results = []
# create a function that reads the data from lofar
# rucio_results = get_data_from_rucio(query)
rucio_results = self._get_data_from_rucio(query)
try:
for rucio_result in rucio_results:
record={}
record['name']=rucio_result['name']
record['parent']=rucio_result['parent']
record['level']=rucio_result['level']
record['bytes']=rucio_result['bytes']
record['scope']=rucio_result['scope']
record['type']=rucio_result['type']
return rucio_results
results.append(record)
# custom serializer for the 'query' endpoint
except Exception as error:
return "ERROR: " + str(error)
class TypeToSerializerMap:
return results
map = {
type(float): serializers.FloatField(),
type(int): serializers.IntegerField(),
type(str): serializers.CharField(),
type(dict): serializers.DictField(),
type(list): serializers.ListField(),
}
# custom serializer for the 'query' endpoint
@classmethod
def getFieldForType(cls, value):
return cls.map.get(type(value), serializers.JSONField())
class CreateAndRunQuerySerializer(serializers.Serializer):
"""
Custom serializer classes implement dynamic field definition based on
the contents of the query passed to it.
"""
def __init__(self, *args, **kwargs):
# Zheng: this defines the structure of the response to /esap/query/query for Rucio
# the fields should be the same as in run-query
self.example_result = kwargs.get("instance", [])[0]
name=serializers.CharField()
parent=serializers.CharField()
level=serializers.IntegerField()
size_in_bytes=serializers.IntegerField()
scope=serializers.CharField()
result_type=serializers.CharField()
super().__init__(*args, **kwargs)
class Meta:
fields='__all__'
self.fields.update(
{
key: rucio_connector.TypeToSerializerMap.getFieldForType(value)
for key, value in self.example_result.items()
}
)
......@@ -70,7 +70,7 @@
</div>
<p class="footer" small>ASTRON - version 29 oct 2020</p>
<p class="footer" small>ASTRON - version 5 nov 2020</p>
{% endblock %}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment