diff --git a/esap_client/connectors/rucio.py b/esap_client/connectors/rucio.py index a364ebb8c9f6a4ef1d906ae23729dc03e0d2f9a7..dd7a90366ea93ba9221890544664b6f37192d7c1 100644 --- a/esap_client/connectors/rucio.py +++ b/esap_client/connectors/rucio.py @@ -1,5 +1,303 @@ +from tokenize import Single from .baseConnector import BaseConnector +from typing import Union, List +import os +from warnings import warn +import datetime +import json +import time + +### The connector can be used outside of the DLaaS notebook, but we want +### to make sure that the commodity functions for staging are used inside +### the DLaaS. So we do some try-except here: +try: + from rucio_jupyterlab.kernels.ipython.types import SingleItemDID, MultipleItemDID + from rucio import client as rucio_base_client + from rucio.rse.protocols import protocol +except ModuleNotFoundError: + warn( + "Not using the Rucio connector inside the DLaaS environment. Staging functionality will not work" + ) + running_in_dlaas = False +else: + running_in_dlaas = True + + +class NotInDLaaSError(RuntimeError): + """Exception raised when functions are executed + outside the Data Lake as a Service environment. + Inherits from RuntimeError. + """ + + pass + + +def check_for_dlaas() -> None: + """Check if the imports of the DLaaS and Rucio libraries have + failed and therfore the connector runs outside of the dlaas + notebook. This check is executed in every function in the + connector object that has to do with Rucio. + + Raises + ------ + NotInDLaaSError + If the DLaaS libraries have failed to load before, this exception + is raised. + """ + if not running_in_dlaas: + raise NotInDLaaSError( + "Cannot execute function because you are not running in a DLaaS environment." + ) + class Rucio(BaseConnector): + """Rucio connector. Inherits the functionality from the BaseConnector. + Implements functionality to stage data that was obtained from the + ESAP shopping basket. Apart from the name and archive, which are used + by the BaseConnector, this object will have several variables if + it is executed inside a DLaaS environment. Those are: + active_rules: a list of rules managed by the class + active_dids: a list of dids managed by the class + lifetime: the lifetime of the rules that put the files on the local RSE + (this is set to the value of the + RUCIO_REPLICATION_RULE_LIFETIME_DAYS environment variable) + local_RSE: The RSE connected to the DLaaS environment, + read from the RUCIO_DESTINATION_RSE environment variable. + local_basepath: The path at which this RSE is fuse mounted + read from the RUCIO_RSE_MOUNT_PATH environment variable. + rucio_client: A rucio client instance, used to create rules and + manage DIDs + + Basic usage example: + # assuming that the shopping basket client has been installed + # in the notebook. Also assuming you have a shopping basket with + # Rucio files in it (and maybe others; we will be filtering anyway) + + esap_api_host = "https://sdc-dev.astron.nl" + from esap_client import shopping_client + from esap_client.connectors import Rucio as RucioConnector + + rucicon = RucioConnector() + sc = shopping_client.ShoppingClient(host=esap_api_host, connectors=[rucicon]) + basket = sc.get_basket(reload=True, filter_archives=True) + + rucicon.retrieve(basket, wait=True) + dids = rucicon.getDIDs() + """ + name = "rucio" archive = "rucio" + if running_in_dlaas: + active_rules = set() + active_dids = set() + lifetime = ( + os.environ["RUCIO_REPLICATION_RULE_LIFETIME_DAYS"] * 24 * 60 * 60 + ) # in seconds + local_RSE = os.environ["RUCIO_DESTINATION_RSE"] + local_basepath = os.environ["RUCIO_RSE_MOUNT_PATH"] + rucio_client = rucio_base_client.Client() + + def retrieve(self, items: Union[dict, list], wait: bool = False) -> None: + """Retrieve DIDs that come out of the shopping basket by + creating rules in Rucio to bring the files to the RSE that + is connected to the DLaaS instance. + + Parameters + ---------- + items : Union[dict, list] + Items to retrieve. This can be a single shopping basket entry (dict) + or a whole basket, which is a list. + wait : bool, optional + If True, block until all data has been staged (or rules are stuck), + in case of large volumes, this can take a while. By default False + """ + check_for_dlaas() + if type(items) is dict: + items = [items] + for raw_item in items: + item = json.loads(raw_item["item_data"])["record"] + self.active_dids.add(json.dumps(item)) + if not self._check_locality_fix_lifetime_store_rule(item): + self._bring_online(item) + if wait: + self.block_till_staged() + + def get_single_item_did(self, replica_data: dict) -> SingleItemDID: + """Parse a dictionary generated by the Rucio replica client + to a SingleItemDID object. This function takes the first pfn + from the list that exists on the local RSE. The fuse mounted path + is computed based on the rucio path translation functionality. + + Parameters + ---------- + replica_data : dict + A single replica dict, as returned by the rucio replica client. + + Returns + ------- + SingleItemDID + DLaaS SingleitemDID object corresponding to the DID. + """ + for pfn in replica_data["pfns"]: + if replica_data["pfns"][pfn]["rse"] == self.local_RSE: + break + + path_translator = protocol.RSEDeterministicTranslation(rse=self.local_RSE) + path = path_translator.path( + scope=replica_data["scope"], name=replica_data["name"] + ) + return SingleItemDID(pfn=pfn, path=os.path.join(self.local_basepath, path)) + + def getDIDs(self) -> List[Union[SingleItemDID, MultipleItemDID]]: + """Get the list of DIDs that are managed by this object so that + those can be further processed in the DLaaS supporting service. + + Returns + ------- + List[Union[SingleItemDID, MultipleItemDID]] + List of did's, either SingleItemDID objects (single files) or + MultipleItemDID (data sets or containers), which are essentially + a list of SingleItemDID objects. + """ + check_for_dlaas() + didlist = list() + all_states = self.get_staging_status() + for did_json in self.active_dids: + did_full_dic = json.loads(did_json) + name = did_full_dic["name"] + scope = did_full_dic["scope"] + did_dic = {"scope": scope, "name": name} + replicas = [didrep for didrep in self.rucio_client.list_replicas([did_dic])] + did_state = all_states[f"{scope}:{name}"]["state"] + if did_state != "OK": + warn( + f"{scope}:{name} has staging status {did_state}. " + + "Since the status is not OK, no DID object will be returned. " + ) + elif len(replicas) == 1: + didlist.append(self.get_single_item_did(replicas[0])) + elif len(replicas) != 1: + item_list = list() + for singledid in replicas: + item_list.append(self.get_single_item_did(singledid)) + didlist.append( + MultipleItemDID(items=item_list, did_available="available") + ) + return didlist + + def block_till_staged(self) -> None: + """ + This function will poll the status of the rules that are managed + by this class every minute and returns as soon as none of those are + "REPLICATING" anymore. If this function returns, this does not + necessarily mean the replications have been correctly executed + and have status "OK", the status can also be "STUCK". + """ + while True: + states = [a["state"] for a in self.get_staging_status().values()] + if all(value != "REPLICATING" for value in states): + break + time.sleep(60) + + def get_staging_status(self) -> dict: + """Get status of all the rules managed by this object. + + Returns + ------- + dict + A dict of dicts with keys SCOPE:FILENAME where each dict contains: + scope: scope of the DID managed by the rule + name: name of the DID managed by the rule + state: state of the rule (OK, REPLICATING, STUCK, etc.) + expires_at: If the rule has an expiration date set, the + expiration datetime object, else None. + """ + check_for_dlaas() + total_status = dict() + for rule_id in self.active_rules: + rule_data = self.rucio_client.get_replication_rule(rule_id) + status_dict = { + "scope": rule_data["scope"], + "name": rule_data["name"], + "state": rule_data["state"], + } + try: + status_dict["expires_at"] = rule_data["expires_at"] + except KeyError: + status_dict["expires_at"] = None + scope = rule_data["scope"] + name = rule_data["name"] + + total_status[f"{scope}:{name}"] = status_dict + return total_status + + def _check_locality_fix_lifetime_store_rule(self, did_dict: dict) -> bool: + """Check if there is already a rule that puts a DID in the + local RSE. If so check if the lifetime of that rule is more + than the predefined lifetime (see class docs). If the lifetime + of the rule is shorter, extend the rule lifetime to match the + value set in the class settings. If rules were already present + the rule ID is added to the list of rules managed by this class. + + Parameters + ---------- + did_dict : dict + dict object with at least the keys: + scope: scope of the DID + name: name of the DID + (the dicts from the ESAP shopping basket fit in this model) + + Returns + ------- + bool + True if there was a rule (whether or not it was extended), + False if not. + """ + did_rules = self.rucio_client.list_did_rules( + did_dict["scope"], did_dict["name"] + ) + for did_rule in did_rules: + if did_rule["rse_expression"] == self.local_RSE: + self.active_rules.add(did_rule["id"]) + if ( + did_rule["expires_at"] - datetime.datetime.now() + ).seconds >= self.lifetime: + return True + else: + return self._extend_time(did_rule["id"]) + return False + + def _bring_online(self, did_dict: dict) -> None: + """Create a rule to make a replica of the DID in did_dict on the RSE + that is mounted to the DLaaS instance. + + Parameters + ---------- + did_dict : dict + dict object with at least the keys: + scope: scope of the DID + name: name of the DID + (the dicts from the ESAP shopping basket fit in this model) + """ + self.active_rules.add( + self.rucio_client.add_replication_rule( + [did_dict], 1, self.local_RSE, lifetime=self.lifetime + )[0] + ) + + def _extend_time(self, ruleid: str) -> bool: + """Extend the lifetime of an existing rule. + + Parameters + ---------- + ruleid : str + The Rucio rule ID of the rule to extend. + + Returns + ------- + bool + The return value of the rucio client that updated the rule. + """ + return self.rucio_client.update_replication_rule( + ruleid, {"lifetime": self.lifetime} + )