Skip to content
Snippets Groups Projects
Commit 6d426c4f authored by Yan Grange's avatar Yan Grange :wave:
Browse files

Merge branch 'tickets/128' into 'master'

Adding staging functionality to the Rucio connector

See merge request astron-sdc/esap-userprofile-python-client!13
parents 23d1565e 3c02ae01
No related branches found
No related tags found
1 merge request!13Adding staging functionality to the Rucio connector
from tokenize import Single
from .baseConnector import BaseConnector
from typing import Union, List
import os
from warnings import warn
import datetime
import json
import time
### The connector can be used outside of the DLaaS notebook, but we want
### to make sure that the commodity functions for staging are used inside
### the DLaaS. So we do some try-except here:
try:
from rucio_jupyterlab.kernels.ipython.types import SingleItemDID, MultipleItemDID
from rucio import client as rucio_base_client
from rucio.rse.protocols import protocol
except ModuleNotFoundError:
warn(
"Not using the Rucio connector inside the DLaaS environment. Staging functionality will not work"
)
running_in_dlaas = False
else:
running_in_dlaas = True
class NotInDLaaSError(RuntimeError):
"""Exception raised when functions are executed
outside the Data Lake as a Service environment.
Inherits from RuntimeError.
"""
pass
def check_for_dlaas() -> None:
"""Check if the imports of the DLaaS and Rucio libraries have
failed and therfore the connector runs outside of the dlaas
notebook. This check is executed in every function in the
connector object that has to do with Rucio.
Raises
------
NotInDLaaSError
If the DLaaS libraries have failed to load before, this exception
is raised.
"""
if not running_in_dlaas:
raise NotInDLaaSError(
"Cannot execute function because you are not running in a DLaaS environment."
)
class Rucio(BaseConnector):
"""Rucio connector. Inherits the functionality from the BaseConnector.
Implements functionality to stage data that was obtained from the
ESAP shopping basket. Apart from the name and archive, which are used
by the BaseConnector, this object will have several variables if
it is executed inside a DLaaS environment. Those are:
active_rules: a list of rules managed by the class
active_dids: a list of dids managed by the class
lifetime: the lifetime of the rules that put the files on the local RSE
(this is set to the value of the
RUCIO_REPLICATION_RULE_LIFETIME_DAYS environment variable)
local_RSE: The RSE connected to the DLaaS environment,
read from the RUCIO_DESTINATION_RSE environment variable.
local_basepath: The path at which this RSE is fuse mounted
read from the RUCIO_RSE_MOUNT_PATH environment variable.
rucio_client: A rucio client instance, used to create rules and
manage DIDs
Basic usage example:
# assuming that the shopping basket client has been installed
# in the notebook. Also assuming you have a shopping basket with
# Rucio files in it (and maybe others; we will be filtering anyway)
esap_api_host = "https://sdc-dev.astron.nl"
from esap_client import shopping_client
from esap_client.connectors import Rucio as RucioConnector
rucicon = RucioConnector()
sc = shopping_client.ShoppingClient(host=esap_api_host, connectors=[rucicon])
basket = sc.get_basket(reload=True, filter_archives=True)
rucicon.retrieve(basket, wait=True)
dids = rucicon.getDIDs()
"""
name = "rucio"
archive = "rucio"
if running_in_dlaas:
active_rules = set()
active_dids = set()
lifetime = (
os.environ["RUCIO_REPLICATION_RULE_LIFETIME_DAYS"] * 24 * 60 * 60
) # in seconds
local_RSE = os.environ["RUCIO_DESTINATION_RSE"]
local_basepath = os.environ["RUCIO_RSE_MOUNT_PATH"]
rucio_client = rucio_base_client.Client()
def retrieve(self, items: Union[dict, list], wait: bool = False) -> None:
"""Retrieve DIDs that come out of the shopping basket by
creating rules in Rucio to bring the files to the RSE that
is connected to the DLaaS instance.
Parameters
----------
items : Union[dict, list]
Items to retrieve. This can be a single shopping basket entry (dict)
or a whole basket, which is a list.
wait : bool, optional
If True, block until all data has been staged (or rules are stuck),
in case of large volumes, this can take a while. By default False
"""
check_for_dlaas()
if type(items) is dict:
items = [items]
for raw_item in items:
item = json.loads(raw_item["item_data"])["record"]
self.active_dids.add(json.dumps(item))
if not self._check_locality_fix_lifetime_store_rule(item):
self._bring_online(item)
if wait:
self.block_till_staged()
def get_single_item_did(self, replica_data: dict) -> SingleItemDID:
"""Parse a dictionary generated by the Rucio replica client
to a SingleItemDID object. This function takes the first pfn
from the list that exists on the local RSE. The fuse mounted path
is computed based on the rucio path translation functionality.
Parameters
----------
replica_data : dict
A single replica dict, as returned by the rucio replica client.
Returns
-------
SingleItemDID
DLaaS SingleitemDID object corresponding to the DID.
"""
for pfn in replica_data["pfns"]:
if replica_data["pfns"][pfn]["rse"] == self.local_RSE:
break
path_translator = protocol.RSEDeterministicTranslation(rse=self.local_RSE)
path = path_translator.path(
scope=replica_data["scope"], name=replica_data["name"]
)
return SingleItemDID(pfn=pfn, path=os.path.join(self.local_basepath, path))
def getDIDs(self) -> List[Union[SingleItemDID, MultipleItemDID]]:
"""Get the list of DIDs that are managed by this object so that
those can be further processed in the DLaaS supporting service.
Returns
-------
List[Union[SingleItemDID, MultipleItemDID]]
List of did's, either SingleItemDID objects (single files) or
MultipleItemDID (data sets or containers), which are essentially
a list of SingleItemDID objects.
"""
check_for_dlaas()
didlist = list()
all_states = self.get_staging_status()
for did_json in self.active_dids:
did_full_dic = json.loads(did_json)
name = did_full_dic["name"]
scope = did_full_dic["scope"]
did_dic = {"scope": scope, "name": name}
replicas = [didrep for didrep in self.rucio_client.list_replicas([did_dic])]
did_state = all_states[f"{scope}:{name}"]["state"]
if did_state != "OK":
warn(
f"{scope}:{name} has staging status {did_state}. "
+ "Since the status is not OK, no DID object will be returned. "
)
elif len(replicas) == 1:
didlist.append(self.get_single_item_did(replicas[0]))
elif len(replicas) != 1:
item_list = list()
for singledid in replicas:
item_list.append(self.get_single_item_did(singledid))
didlist.append(
MultipleItemDID(items=item_list, did_available="available")
)
return didlist
def block_till_staged(self) -> None:
"""
This function will poll the status of the rules that are managed
by this class every minute and returns as soon as none of those are
"REPLICATING" anymore. If this function returns, this does not
necessarily mean the replications have been correctly executed
and have status "OK", the status can also be "STUCK".
"""
while True:
states = [a["state"] for a in self.get_staging_status().values()]
if all(value != "REPLICATING" for value in states):
break
time.sleep(60)
def get_staging_status(self) -> dict:
"""Get status of all the rules managed by this object.
Returns
-------
dict
A dict of dicts with keys SCOPE:FILENAME where each dict contains:
scope: scope of the DID managed by the rule
name: name of the DID managed by the rule
state: state of the rule (OK, REPLICATING, STUCK, etc.)
expires_at: If the rule has an expiration date set, the
expiration datetime object, else None.
"""
check_for_dlaas()
total_status = dict()
for rule_id in self.active_rules:
rule_data = self.rucio_client.get_replication_rule(rule_id)
status_dict = {
"scope": rule_data["scope"],
"name": rule_data["name"],
"state": rule_data["state"],
}
try:
status_dict["expires_at"] = rule_data["expires_at"]
except KeyError:
status_dict["expires_at"] = None
scope = rule_data["scope"]
name = rule_data["name"]
total_status[f"{scope}:{name}"] = status_dict
return total_status
def _check_locality_fix_lifetime_store_rule(self, did_dict: dict) -> bool:
"""Check if there is already a rule that puts a DID in the
local RSE. If so check if the lifetime of that rule is more
than the predefined lifetime (see class docs). If the lifetime
of the rule is shorter, extend the rule lifetime to match the
value set in the class settings. If rules were already present
the rule ID is added to the list of rules managed by this class.
Parameters
----------
did_dict : dict
dict object with at least the keys:
scope: scope of the DID
name: name of the DID
(the dicts from the ESAP shopping basket fit in this model)
Returns
-------
bool
True if there was a rule (whether or not it was extended),
False if not.
"""
did_rules = self.rucio_client.list_did_rules(
did_dict["scope"], did_dict["name"]
)
for did_rule in did_rules:
if did_rule["rse_expression"] == self.local_RSE:
self.active_rules.add(did_rule["id"])
if (
did_rule["expires_at"] - datetime.datetime.now()
).seconds >= self.lifetime:
return True
else:
return self._extend_time(did_rule["id"])
return False
def _bring_online(self, did_dict: dict) -> None:
"""Create a rule to make a replica of the DID in did_dict on the RSE
that is mounted to the DLaaS instance.
Parameters
----------
did_dict : dict
dict object with at least the keys:
scope: scope of the DID
name: name of the DID
(the dicts from the ESAP shopping basket fit in this model)
"""
self.active_rules.add(
self.rucio_client.add_replication_rule(
[did_dict], 1, self.local_RSE, lifetime=self.lifetime
)[0]
)
def _extend_time(self, ruleid: str) -> bool:
"""Extend the lifetime of an existing rule.
Parameters
----------
ruleid : str
The Rucio rule ID of the rule to extend.
Returns
-------
bool
The return value of the rucio client that updated the rule.
"""
return self.rucio_client.update_replication_rule(
ruleid, {"lifetime": self.lifetime}
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment