Select Git revision
transactions.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
transactions.py 10.34 KiB
# -*- coding: utf-8 -*-
"""This module provides the transaction logging mechanism."""
import json
import logging
import os
import inspect
from random import randint
import threading
from typing import Mapping, Optional, Text
from ska.skuid.client import SkuidClient
thread_local_data = threading.local()
class TransactionIDTagsFilter(logging.Filter):
"""Adds the transaction ID as a tag to the log. Updates module and line number
for the Enter and Exit log messages.
"""
def get_transaction_id(self):
if hasattr(thread_local_data, "transaction_ids"):
thread_id = threading.get_ident()
return thread_local_data.transaction_ids.get(thread_id, None)
return None
def get_frame(self):
if hasattr(thread_local_data, "frames"):
thread_id = threading.get_ident()
return thread_local_data.frames.get(thread_id, None)
return None
def filter(self, record):
# Add the transaction ID to the tags
transaction_id = self.get_transaction_id()
if transaction_id:
if hasattr(record, "tags") and record.tags:
if transaction_id not in record.tags:
record.tags = f"{record.tags},transaction_id:{transaction_id}"
else:
record.tags = f"transaction_id:{transaction_id}"
# Override the calling module and line number since the log would have logged
# `transactions.py#X` on `__enter__` and `__exit__` of `Transaction`. This makes it
# difficult to debug where the `Enter` and `Exit` of a transaction log message
# originated.
# From Python 3.8 we should rather use `stacklevel`
frame = self.get_frame()
if frame:
if record.filename.startswith("transactions.py") and record.funcName in [
"__enter__",
"__exit__",
]:
record.filename = os.path.basename(frame.filename)
record.lineno = frame.lineno
record.funcName = frame.function
return True
class Transaction:
"""Transaction context handler.
Provides:
- Transaction ID:
- Re-use existing transaction ID, if available
- If no transaction ID, or empty or None, then generate a new ID
- context handler returns the transaction ID used
- Log messages on entry, exit, and exception
.. Examples::
def command(self, parameter_json):
parameters = json.reads(parameter_json)
with ska.logging.transaction('My Command', parameters) as transaction_id:
# ...
parameters['transaction_id'] = transaction_id
device.further_command(json.dumps(pars))
# ...
def command(self, parameter_json):
parameters = json.reads(parameter_json)
with ska.logging.transaction('My Command', parameters, transaction_id="123") as transaction_id:
# ...
parameters['transaction_id'] = transaction_id
device.further_command(json.dumps(pars))
# ...
def command(self, parameter_json):
parameters = json.reads(parameter_json)
parameters["txn_id_key"] = 123
with ska.logging.transaction('My Command', parameters, transaction_id_key="txn_id_key") as transaction_id:
# ...
parameters['transaction_id'] = transaction_id
device.further_command(json.dumps(pars))
# ...
Log message formats:
On Entry:
Transaction[id]: Enter [name] with parameters [arguments]
On Exit:
Transaction[id]: Exit [name]
On exception:
Transaction[id]: Exception [name]
Stacktrace
"""
def __init__(
self,
name: str,
params: dict = {},
transaction_id: str = "",
transaction_id_key: str = "transaction_id",
logger: Optional[logging.Logger] = None, # pylint: disable=E1101
):
"""Create the transaction context handler.
A new transaction ID is generated if none is passed in via `transaction_id` or
in `params`.
If there is a transaction ID in `params` and `transaction_id` is also passed in
then the passed in `transaction_id` will take precedence.
However, if both transaction IDs provided in `params` and `transaction_id` are deemed
invalid (not a string or an empty string), then a new transaction ID will be generated.
By default the key `transaction_id` will be used to get a transaction ID out of
`params`. If a different key is required then `transaction_id_key` can be
specified.
Parameters
----------
name : str
A description for the context. This is usually the Tango device command.
params : dict, optional
The parameters will be logged and will be used to retrieve the transaction
ID if `transaction_id` is not passed in, by default {}
transaction_id : str, optional
The transaction ID to be used for the context, by default ""
transaction_id_key : str, optional
The key to use to get the transaction ID from params,
by default "transaction_id"
logger : logging.Logger, optional
The logger to use for logging, by default None.
If no logger is specified a new one named `ska.transaction` will be used.
Raises
------
TransactionParamsError
If the `params` passed is not valid.
"""
if not isinstance(params, Mapping):
raise TransactionParamsError("params must be dict-like (Mapping)")
if logger:
self.logger = logger
else:
self.logger = logging.getLogger("ska.transaction") # pylint: disable=E1101
self._name = name
self._params = params
self._transaction_id_key = transaction_id_key
self._transaction_id = self._get_id_from_params_or_generate_new_id(transaction_id)
self._frame = inspect.stack()[1]
if transaction_id and params.get(self._transaction_id_key):
self.logger.info(
f"Received 2 transaction IDs {transaction_id} and"
f" {params.get(transaction_id_key)}, using {self._transaction_id}"
)
# Used to match enter and exit when multiple devices calls the same command
# on a shared device simultaneously
self._random_marker = str(randint(0, 99999)).zfill(5)
self._transaction_filter = TransactionIDTagsFilter()
def store_thread_data(self):
thread_id = threading.get_ident()
if not hasattr(thread_local_data, "transaction_ids"):
thread_local_data.transaction_ids = {}
if not hasattr(thread_local_data, "frames"):
thread_local_data.frames = {}
thread_local_data.transaction_ids[thread_id] = self._transaction_id
thread_local_data.frames[thread_id] = self._frame
def clear_thread_data(self):
thread_id = threading.get_ident()
if hasattr(thread_local_data, "transaction_ids"):
if thread_id in thread_local_data.transaction_ids:
del thread_local_data.transaction_ids[thread_id]
if hasattr(thread_local_data, "frames"):
if thread_id in thread_local_data.frames:
del thread_local_data.frames[thread_id]
def __enter__(self):
self.store_thread_data()
self.logger.addFilter(self._transaction_filter)
params_json = json.dumps(self._params)
self.logger.info(
f"Transaction[{self._transaction_id}]: Enter[{self._name}] "
f"with parameters [{params_json}] "
f"marker[{self._random_marker}]"
)
return self._transaction_id
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self.logger.exception(
f"Transaction[{self._transaction_id}]: Exception[{self._name}] marker[{self._random_marker}]"
)
self.logger.info(
f"Transaction[{self._transaction_id}]: Exit[{self._name}] "
f"marker[{self._random_marker}]"
)
self.logger.removeFilter(self._transaction_filter)
self.clear_thread_data()
if exc_type:
raise
def _get_id_from_params_or_generate_new_id(self, transaction_id):
"""At first use the transaction_id passed or use the transaction_id_key to get the
transaction ID from the parameters or generate a new one if it's not there.
Parameters
----------
transaction_id : [String]
[The transaction ID]
Returns
-------
[String]
[transaction ID]
"""
_transaction_id = (
transaction_id if transaction_id else self._params.get(self._transaction_id_key)
)
if not self._is_valid_id(_transaction_id):
_transaction_id = self._generate_new_id()
self.logger.info(f"Generated transaction ID {_transaction_id}")
return _transaction_id
def _is_valid_id(self, transaction_id):
"""Check if the ID is valid
Parameters
----------
transaction_id : [String]
[The transaction ID]
Returns
-------
[bool]
[Whether the ID is valid or not]
"""
if isinstance(transaction_id, Text) and transaction_id.strip():
return True
return False
def _generate_new_id(self):
"""Use TransactionIdGenerator to generate a new transaction ID
Returns
-------
[String]
[The newly generated transaction ID]
"""
id_generator = TransactionIdGenerator()
return id_generator.next() # pylint: disable=E1102
class TransactionIdGenerator:
"""
TransactionIdGenerator retrieves a transaction id from skuid.
Skuid may fetch the id from the service if the SKUID_URL is set or alternatively generate one.
"""
def __init__(self):
if os.environ.get("SKUID_URL"):
client = SkuidClient(os.environ["SKUID_URL"])
self._get_id = client.fetch_transaction_id
else:
self._get_id = SkuidClient.get_local_transaction_id
def next(self):
return self._get_id()
class TransactionParamsError(TypeError):
"""Invalid data type for transaction parameters."""