Skip to content
Snippets Groups Projects
Select Git revision
  • 569def50aabc843d6609a44fbb0faa2d47decb46
  • SAR-150_add_transaction_id_in_all_logs default protected
  • master
  • sar-293_fix_lint_issue
  • sar-259/Testing-skallop-docs
  • sar-255/Publish-package-to-CAR
  • test-package
  • SAR-189-use-single-namespace-for-package-naming
  • revert-52d118c0
  • 0.4.1
  • 0.4.0
  • 0.3.0
  • 0.2.1
  • 0.2.0
  • 0.1.0
15 results

transactions.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    transactions.py 10.34 KiB
    # -*- coding: utf-8 -*-
    
    """This module provides the transaction logging mechanism."""
    import json
    import logging
    import os
    import inspect
    from random import randint
    import threading
    
    from typing import Mapping, Optional, Text
    
    from ska.skuid.client import SkuidClient
    
    thread_local_data = threading.local()
    
    
    class TransactionIDTagsFilter(logging.Filter):
        """Adds the transaction ID as a tag to the log. Updates module and line number
        for the Enter and Exit log messages.
        """
    
        def get_transaction_id(self):
            if hasattr(thread_local_data, "transaction_ids"):
                thread_id = threading.get_ident()
                return thread_local_data.transaction_ids.get(thread_id, None)
            return None
    
        def get_frame(self):
            if hasattr(thread_local_data, "frames"):
                thread_id = threading.get_ident()
                return thread_local_data.frames.get(thread_id, None)
            return None
    
        def filter(self, record):
            # Add the transaction ID to the tags
            transaction_id = self.get_transaction_id()
            if transaction_id:
                if hasattr(record, "tags") and record.tags:
                    if transaction_id not in record.tags:
                        record.tags = f"{record.tags},transaction_id:{transaction_id}"
                else:
                    record.tags = f"transaction_id:{transaction_id}"
    
            # Override the calling module and line number since the log would have logged
            # `transactions.py#X` on `__enter__` and `__exit__` of `Transaction`. This makes it
            # difficult to debug where the `Enter` and `Exit` of a transaction log message
            # originated.
            # From Python 3.8 we should rather use `stacklevel`
            frame = self.get_frame()
            if frame:
                if record.filename.startswith("transactions.py") and record.funcName in [
                    "__enter__",
                    "__exit__",
                ]:
                    record.filename = os.path.basename(frame.filename)
                    record.lineno = frame.lineno
                    record.funcName = frame.function
    
            return True
    
    
    class Transaction:
        """Transaction context handler.
    
        Provides:
        - Transaction ID:
          - Re-use existing transaction ID, if available
          - If no transaction ID, or empty or None, then generate a new ID
          - context handler returns the transaction ID used
        - Log messages on entry, exit, and exception
    
        .. Examples::
    
           def command(self, parameter_json):
               parameters = json.reads(parameter_json)
               with ska.logging.transaction('My Command', parameters) as transaction_id:
                   # ...
                   parameters['transaction_id'] = transaction_id
                   device.further_command(json.dumps(pars))
                   # ...
    
           def command(self, parameter_json):
               parameters = json.reads(parameter_json)
               with ska.logging.transaction('My Command', parameters, transaction_id="123") as transaction_id:
                   # ...
                   parameters['transaction_id'] = transaction_id
                   device.further_command(json.dumps(pars))
                   # ...
    
           def command(self, parameter_json):
               parameters = json.reads(parameter_json)
               parameters["txn_id_key"] = 123
               with ska.logging.transaction('My Command', parameters, transaction_id_key="txn_id_key") as transaction_id:
                   # ...
                   parameters['transaction_id'] = transaction_id
                   device.further_command(json.dumps(pars))
                   # ...
    
        Log message formats:
           On Entry:
               Transaction[id]: Enter [name] with parameters [arguments]
           On Exit:
               Transaction[id]: Exit [name]
           On exception:
               Transaction[id]: Exception [name]
               Stacktrace
        """
    
        def __init__(
            self,
            name: str,
            params: dict = {},
            transaction_id: str = "",
            transaction_id_key: str = "transaction_id",
            logger: Optional[logging.Logger] = None,  # pylint: disable=E1101
        ):
            """Create the transaction context handler.
    
            A new transaction ID is generated if none is passed in via `transaction_id` or
            in `params`.
    
            If there is a transaction ID in `params` and `transaction_id` is also passed in
            then the passed in `transaction_id` will take precedence.
    
            However, if both transaction IDs provided in `params` and `transaction_id` are deemed
            invalid (not a string or an empty string), then a new transaction ID will be generated.
    
            By default the key `transaction_id` will be used to get a transaction ID out of
            `params`. If a different key is required then `transaction_id_key` can be
            specified.
    
            Parameters
            ----------
            name : str
                A description for the context. This is usually the Tango device command.
            params : dict, optional
                The parameters will be logged and will be used to retrieve the transaction
                ID if `transaction_id` is not passed in, by default {}
            transaction_id : str, optional
                The transaction ID to be used for the context, by default ""
            transaction_id_key : str, optional
                The key to use to get the transaction ID from params,
                by default "transaction_id"
            logger : logging.Logger, optional
                The logger to use for logging, by default None.
                If no logger is specified a new one named `ska.transaction` will be used.
    
            Raises
            ------
            TransactionParamsError
                If the `params` passed is not valid.
            """
            if not isinstance(params, Mapping):
                raise TransactionParamsError("params must be dict-like (Mapping)")
    
            if logger:
                self.logger = logger
            else:
                self.logger = logging.getLogger("ska.transaction")  # pylint: disable=E1101
            self._name = name
            self._params = params
            self._transaction_id_key = transaction_id_key
    
            self._transaction_id = self._get_id_from_params_or_generate_new_id(transaction_id)
            self._frame = inspect.stack()[1]
    
            if transaction_id and params.get(self._transaction_id_key):
                self.logger.info(
                    f"Received 2 transaction IDs {transaction_id} and"
                    f" {params.get(transaction_id_key)}, using {self._transaction_id}"
                )
    
            # Used to match enter and exit when multiple devices calls the same command
            # on a shared device simultaneously
            self._random_marker = str(randint(0, 99999)).zfill(5)
            self._transaction_filter = TransactionIDTagsFilter()
    
        def store_thread_data(self):
            thread_id = threading.get_ident()
            if not hasattr(thread_local_data, "transaction_ids"):
                thread_local_data.transaction_ids = {}
            if not hasattr(thread_local_data, "frames"):
                thread_local_data.frames = {}
            thread_local_data.transaction_ids[thread_id] = self._transaction_id
            thread_local_data.frames[thread_id] = self._frame
    
        def clear_thread_data(self):
            thread_id = threading.get_ident()
            if hasattr(thread_local_data, "transaction_ids"):
                if thread_id in thread_local_data.transaction_ids:
                    del thread_local_data.transaction_ids[thread_id]
            if hasattr(thread_local_data, "frames"):
                if thread_id in thread_local_data.frames:
                    del thread_local_data.frames[thread_id]
    
        def __enter__(self):
            self.store_thread_data()
            self.logger.addFilter(self._transaction_filter)
            params_json = json.dumps(self._params)
    
            self.logger.info(
                f"Transaction[{self._transaction_id}]: Enter[{self._name}] "
                f"with parameters [{params_json}] "
                f"marker[{self._random_marker}]"
            )
    
            return self._transaction_id
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            if exc_type:
                self.logger.exception(
                    f"Transaction[{self._transaction_id}]: Exception[{self._name}] marker[{self._random_marker}]"
                )
    
            self.logger.info(
                f"Transaction[{self._transaction_id}]: Exit[{self._name}] "
                f"marker[{self._random_marker}]"
            )
    
            self.logger.removeFilter(self._transaction_filter)
            self.clear_thread_data()
    
            if exc_type:
                raise
    
        def _get_id_from_params_or_generate_new_id(self, transaction_id):
            """At first use the transaction_id passed or use the transaction_id_key to get the
            transaction ID from the parameters or generate a new one if it's not there.
    
            Parameters
            ----------
            transaction_id : [String]
                [The transaction ID]
    
            Returns
            -------
            [String]
                [transaction ID]
            """
            _transaction_id = (
                transaction_id if transaction_id else self._params.get(self._transaction_id_key)
            )
            if not self._is_valid_id(_transaction_id):
                _transaction_id = self._generate_new_id()
                self.logger.info(f"Generated transaction ID {_transaction_id}")
            return _transaction_id
    
        def _is_valid_id(self, transaction_id):
            """Check if the ID is valid
    
            Parameters
            ----------
            transaction_id : [String]
                [The transaction ID]
    
            Returns
            -------
            [bool]
                [Whether the ID is valid or not]
            """
            if isinstance(transaction_id, Text) and transaction_id.strip():
                return True
            return False
    
        def _generate_new_id(self):
            """Use TransactionIdGenerator to generate a new transaction ID
    
            Returns
            -------
            [String]
                [The newly generated transaction ID]
            """
            id_generator = TransactionIdGenerator()
            return id_generator.next()  # pylint: disable=E1102
    
    
    class TransactionIdGenerator:
        """
        TransactionIdGenerator retrieves a transaction id from skuid.
        Skuid may fetch the id from the service if the SKUID_URL is set or alternatively generate one.
        """
    
        def __init__(self):
            if os.environ.get("SKUID_URL"):
                client = SkuidClient(os.environ["SKUID_URL"])
                self._get_id = client.fetch_transaction_id
            else:
                self._get_id = SkuidClient.get_local_transaction_id
    
        def next(self):
            return self._get_id()
    
    
    class TransactionParamsError(TypeError):
        """Invalid data type for transaction parameters."""