# SPDX-FileCopyrightText: 2020-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
# PROGRAM/MODULE: firebird-base
# FILE: firebird/base/trace.py
# DESCRIPTION: Trace/audit for class instances
# CREATED: 5.6.2020
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2020 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
# ______________________________________
"""firebird-base - Trace/audit for class instances
"""
from __future__ import annotations
from typing import Any, Type, Hashable, List, Dict, Callable
import os
from inspect import signature, Signature, isfunction
from dataclasses import dataclass, field
from enum import IntFlag, auto
from functools import wraps, partial
from time import monotonic
from decimal import Decimal
from configparser import ConfigParser
from .types import Error, Distinct, DEFAULT, UNLIMITED, load
from .collections import Registry
from .strconv import convert_from_str
from .config import StrOption, IntOption, BoolOption, ListOption, FlagOption, EnumOption, \
ConfigListOption, Config
from .logging import LogLevel, FBLoggerAdapter, get_logger
[docs]class TraceFlag(IntFlag):
"""`LoggingManager` trace/audit flags.
"""
NONE = 0
ACTIVE = auto()
BEFORE = auto()
AFTER = auto()
FAIL = auto()
@dataclass
class TracedItem(Distinct):
"""Class method trace specification.
"""
method: str
decorator: Callable
args: List = field(default_factory=list)
kwargs: Dict = field(default_factory=dict)
def get_key(self) -> Hashable:
"""Returns Distinct key for traced item [method]."""
return self.method
@dataclass
class TracedClass(Distinct):
"""Traced class registry entry.
"""
cls: Type
traced: Registry = field(default_factory=Registry)
def get_key(self) -> Hashable:
"""Returns Distinct key for traced item [cls]."""
return self.cls
_traced: Registry = Registry()
class TracedMeta(type):
"""Metaclass that instruments instances on creation.
"""
def __call__(cls: Type, *args, **kwargs):
return trace_object(super().__call__(*args, **kwargs), strict=True)
[docs]class TracedMixin(metaclass=TracedMeta):
"""Mixin class that automatically registers descendants for trace and instruments
instances on creation.
"""
def __init_subclass__(cls: Type, /, **kwargs) -> None:
super().__init_subclass__(**kwargs)
trace_manager.register(cls)
[docs]class traced: # pylint: disable=[R0902]
"""Base decorator for logging of callables, suitable for trace/audit.
It's not applied on decorated function/method if `FBASE_TRACE` environment variable is
set to False, or if `FBASE_TRACE` is not defined and `__debug__` is False (optimized
Python code).
Both positional and keyword arguments of decorated callable are available by name for
f-string type message interpolation as `dict` passed to logger as positional argument.
"""
[docs] def __init__(self, *, agent: Any=DEFAULT, context: Any=DEFAULT, topic: str='trace',
msg_before: str=DEFAULT, msg_after: str=DEFAULT, msg_failed: str=DEFAULT,
flags: TraceFlag=TraceFlag(0), level: LogLevel=LogLevel.DEBUG,
max_param_length: int=UNLIMITED, extra: Dict=None,
callback: Callable[[Any], bool]=None, has_result: bool=DEFAULT,
with_args: bool=True):
"""
Arguments:
agent: Agent identification
context: Context identification
topic: Trace/audit logging topic
msg_before: Trace/audit message logged before decorated function
msg_after: Trace/audit message logged after decorated function
msg_failed: Trace/audit message logged when decorated function raises an exception
flags: Trace flags override
level: Logging level for trace/audit messages
max_param_length: Max. length of parameters (longer will be trimmed)
extra: Extra data for `LogRecord`
callback: Callback function that gets the agent identification as argument,
and must return True/False indicating whether trace is allowed.
has_result: Indicator whether function has result value. If True, `_result_`
is available for interpolation in `msg_after`. The `DEFAULT` value means,
that value for this argument should be decided from function return value
annotation.
with_args: If True, function arguments are available for interpolation in
`msg_before`.
"""
#: Trace/audit message logged before decorated function
self.msg_before: str = msg_before
#: Trace/audit message logged after decorated function
self.msg_after: str = msg_after
#: Trace/audit message logged when decorated function raises an exception
self.msg_failed: str = msg_failed
#: Agent identification
self.agent: Any = agent
#: Context identification
self.context: Any = context
#: Trace/audit logging topic
self.topic: str = topic
#: Trace flags override
self.flags: TraceFlag = flags
#: Logging level for trace/audit messages
self.level: LogLevel = level
#: Max. length of parameters (longer will be trimmed)
self.max_len: int = max_param_length
#: Extra data for `LogRecord`
self.extra: Dict = extra
#: Callback function that gets the agent identification as argument,
#: and must return True/False indicating whether trace is allowed.
self.callback: Callable[[Any], bool] = self.__callback if callback is None else callback
#: Indicator whether function has result value. If True, `_result_` is available
#: for interpolation in `msg_after`.
self.has_result: bool = has_result
#: If True, function arguments are available for interpolation in `msg_before`
self.with_args: bool = with_args
def __callback(self, agent: Any) -> bool: # pylint: disable=[W0613]
"""Default callback, does nothing.
"""
return True
[docs] def set_before_msg(self, fn: Callable, sig: Signature) -> None:
"""Sets the DEFAULT before message f-string template.
"""
if self.with_args:
self.msg_before = f">>> {fn.__name__}({', '.join(f'{{{x}=}}' for x in sig.parameters if x != 'self')})"
else:
self.msg_before = f">>> {fn.__name__}"
[docs] def set_after_msg(self, fn: Callable, sig: Signature) -> None: # pylint: disable=[W0613]
"""Sets the DEFAULT after message f-string template.
"""
self.msg_after = f"<<< {fn.__name__}[{{_etime_}}] Result: {{_result_}}" \
if self.has_result else f"<<< {fn.__name__}[{{_etime_}}]"
[docs] def set_fail_msg(self, fn: Callable, sig: Signature) -> None: # pylint: disable=[W0613]
"""Sets the DEFAULT fail message f-string template.
"""
self.msg_failed = f"<-- {fn.__name__}[{{_etime_}}] {{_exc_}}"
[docs] def log_before(self, logger: FBLoggerAdapter, params: Dict) -> None:
"""Executed before decorated callable.
"""
logger.log(self.level, self.msg_before, params, stacklevel=2)
[docs] def log_after(self, logger: FBLoggerAdapter, params: Dict) -> None:
"""Executed after decorated callable.
"""
logger.log(self.level, self.msg_after, params, stacklevel=2)
[docs] def log_failed(self, logger: FBLoggerAdapter, params: Dict) -> None:
"""Executed when decorated callable raises an exception.
"""
logger.log(self.level, self.msg_failed, params, stacklevel=2)
def __call__(self, fn: Callable): # pylint: disable=[R0915]
@wraps(fn)
def wrapper(*args, **kwargs): # pylint: disable=[R0912]
flags = trace_manager.flags | self.flags
if enabled := ((TraceFlag.ACTIVE in flags) and int(flags) > 1): # pylint: disable=[R1702]
params = {}
bound = sig.bind_partial(*args, **kwargs)
# If it's not a bound method, look for 'self'
log = get_logger(bound.arguments.get('self', 'function') if self.agent is None
else self.agent, self.context, self.topic)
if enabled := (log.isEnabledFor(self.level) and self.callback(self.agent)):
if self.with_args:
bound.apply_defaults()
params.update(bound.arguments)
if self.max_len is not UNLIMITED:
for k, v in params.items():
s = str(v)
if (i := len(s)) > self.max_len:
params[k] = f'{s[:self.max_len]}..[{i - self.max_len}]'
if self.extra is not None:
params.update(self.extra)
params['_fname_'] = fn.__name__
params['_result_'] = None
#
if TraceFlag.BEFORE in flags:
self.log_before(log, params)
result = None
start = monotonic()
try:
result = fn(*args, **kwargs)
except Exception as exc:
if enabled and TraceFlag.FAIL | TraceFlag.ACTIVE in flags:
e = str(Decimal(monotonic() - start))
params['_etime_'] = e[:e.find('.')+6]
params['_exc_'] = f'{exc.__class__.__qualname__}: {exc}'
self.log_failed(log, params)
raise
else:
if enabled and TraceFlag.AFTER | TraceFlag.ACTIVE in flags:
e = str(Decimal(monotonic() - start))
params['_etime_'] = e[:e.find('.')+6]
if self.has_result:
params['_result_'] = result
if self.max_len is not UNLIMITED:
s = str(result)
if (i := len(s)) > self.max_len:
params['_result_'] = f'{s[:self.max_len]}..[{i - self.max_len}]'
self.log_after(log, params)
return result
if (trace := os.getenv('FBASE_TRACE')) is not None:
if not convert_from_str(bool, trace):
return fn
elif not __debug__:
return fn
if self.agent is DEFAULT:
self.agent = getattr(fn, '__self__', None)
sig = signature(fn)
if self.has_result is DEFAULT:
self.has_result = sig.return_annotation != 'None'
if self.msg_before is DEFAULT:
self.set_before_msg(fn, sig)
if self.msg_after is DEFAULT:
self.set_after_msg(fn, sig)
if self.msg_failed is DEFAULT:
self.set_fail_msg(fn, sig)
return wrapper
[docs]class BaseTraceConfig(Config): # pylint: disable=[R0902]
"""Base configuration for trace.
"""
def __init__(self, name: str):
super().__init__(name)
#: Agent identification
self.agent: StrOption = \
StrOption('agent', "Agent identification")
#: Context identification
self.context: StrOption = \
StrOption('context', "Context identification")
#: Trace/audit logging topic
self.topic: StrOption = \
StrOption('topic', "Trace/audit logging topic")
#: Trace/audit message logged before decorated function
self.msg_before: StrOption = \
StrOption('msg_before', "Trace/audit message logged before decorated function")
#: Trace/audit message logged after decorated function
self.msg_after: StrOption = \
StrOption('msg_after', "Trace/audit message logged after decorated function")
#: Trace/audit message logged when decorated function raises an exception
self.msg_failed: StrOption = \
StrOption('msg_failed', "Trace/audit message logged when decorated function raises an exception")
#: Trace flags override
self.flags: FlagOption = \
FlagOption('flags', TraceFlag, "Trace flags override")
#: Logging level for trace/audit messages
self.level: EnumOption = \
EnumOption('level', LogLevel, "Logging level for trace/audit messages")
#: Max. length of parameters (longer will be trimmed)
self.max_param_length: IntOption = \
IntOption('max_param_length', "Max. length of parameters (longer will be trimmed)")
#: Indicator whether function has result value
self.has_result: BoolOption = \
BoolOption('has_result', "Indicator whether function has result value")
#: If True, function arguments are available for interpolation in `msg_before`
self.with_args: BoolOption = \
BoolOption('with_args',
"If True, function arguments are available for interpolation in `msg_before`")
[docs]class TracedMethodConfig(BaseTraceConfig):
"""Configuration of traced Python method.
"""
def __init__(self, name: str):
super().__init__(name)
#: Class method name [required]
self.method: StrOption = \
StrOption('method', "Class method name", required=True)
[docs]class TracedClassConfig(BaseTraceConfig):
"""Configuration of traced Python class.
"""
def __init__(self, name: str):
super().__init__(name)
#: Fully qualified class name [required]
self.source: StrOption = \
StrOption('source', "Fully qualified class name", required=True)
#: Names of traced class methods
self.methods: ListOption = \
ListOption('methods', str, "Names of traced class methods")
#: Configuration sections with extended config of traced class methods
self.special: ConfigListOption = \
ConfigListOption('special',
"Configuration sections with extended config of traced class methods",
TracedMethodConfig)
#: Wherher configuration should be applied also to all registered descendant classes [default: True].
self.apply_to_descendants: BoolOption = \
BoolOption('apply_to_descendants',
"Configuration should be applied also to all registered descendant classes",
default=True)
[docs]class TraceConfig(BaseTraceConfig):
"""Trace manager configuration.
"""
def __init__(self, name: str):
super().__init__(name)
#: When True, unregistered classes are registered automatically [default: True].
self.autoregister: BoolOption = \
BoolOption('autoregister',
"When True, unregistered classes are registered automatically",
default=True)
#: Configuration sections with traced Python classes [required].
self.classes: ConfigListOption = \
ConfigListOption('classes',
"Configuration sections with traced Python classes",
TracedClassConfig, required=True)
[docs]class TraceManager:
"""Trace manager.
"""
def __init__(self):
#: Decorator that should be used for trace instrumentation (via `add_trace`),
#: default: `traced`.
self.decorator: Callable = traced
self._traced: Registry = Registry()
self._flags: TraceFlag = TraceFlag.NONE
self.trace_active = convert_from_str(bool, os.getenv('FBASE_TRACE', str(__debug__)))
if convert_from_str(bool, os.getenv('FBASE_TRACE_BEFORE', 'no')): # pragma: no cover
self.set_flag(TraceFlag.BEFORE)
if convert_from_str(bool, os.getenv('FBASE_TRACE_AFTER', 'no')): # pragma: no cover
self.set_flag(TraceFlag.AFTER)
if convert_from_str(bool, os.getenv('FBASE_TRACE_FAIL', 'yes')):
self.set_flag(TraceFlag.FAIL)
[docs] def is_registered(self, cls: Type) -> bool:
"""Return True if class is registered.
"""
return cls in self._traced
[docs] def clear(self) -> None:
"""Removes all trace specifications.
"""
for cls in self._traced:
cls.traced.clear()
[docs] def register(self, cls: Type) -> None:
"""Register class for trace.
Arguments:
cls: Class to be registered.
Does nothing if class is already registered.
"""
if cls not in self._traced:
self._traced.store(TracedClass(cls))
[docs] def add_trace(self, cls: Type, method: str, / , *args, **kwargs) -> None:
"""Add/update trace specification for class method.
Arguments:
cls: Registered traced class
method: Name of class method that should be instrumented for trace
args: Positional arguments for decorator
kwargs: Keyword arguments for decorator
"""
self._traced[cls].traced.update(TracedItem(method, self.decorator, args, kwargs))
[docs] def remove_trace(self, cls: Type, method: str) -> None:
"""Remove trace specification for class method.
Arguments:
cls: Registered traced class
method: Name of class method
"""
del self._traced[cls].traced[method]
[docs] def trace_object(self, obj: Any, *, strict: bool=False) -> Any:
"""Instruments object's methods with decorator according to trace configuration.
Arguments:
strict: Determines the response if the object class is not registered for trace.
Raises exception when True, or return the instance as is when False [default].
Only methods registered with `.add_trace()` are instrumented.
Returns:
Decorated instance.
Raises:
TypeError: When object class is not registered and `strict` is True.
"""
if (trace := os.getenv('FBASE_TRACE')) is not None:
if not convert_from_str(bool, trace):
return obj
elif not __debug__:
return obj
entry: TracedClass = self._traced.get(obj.__class__)
if entry is None:
if strict:
raise TypeError(f"Class '{obj.__class__.__name__}' not registered for trace!")
return obj
for item in entry.traced:
setattr(obj, item.method, item.decorator(*item.args, **item.kwargs)(getattr(obj, item.method)))
return obj
[docs] def load_config(self, config: ConfigParser, section: str='trace') -> None:
"""Update trace from configuration.
Arguments:
config: ConfigParser instance with trace configuration.
section: Name of ConfigParser section that should be used to get trace
configuration.
Uses `.TraceConfig`, `.TracedClassConfig` and `.TracedMethodConfig` to process
the configuration.
Note:
Does not `.clear()` existing trace specifications.
"""
def build_kwargs(from_cfg: BaseTraceConfig) -> Dict[str, Any]:
result = {}
for item in ['agent', 'context', 'topic', 'msg_before', 'msg_after',
'msg_failed', 'flags', 'level', 'max_param_length',
'has_result', 'with_args']:
if (value := getattr(from_cfg, item).value) is not None:
result[item] = value
return result
def apply_on(cls):
if (items := cls_cfg.methods.value) is not None:
if (len(items) == 1) and (items[0] == '*'):
items = [i for i in dir(cls) if not i.startswith('_') and isfunction(getattr(cls, i))]
for item in items:
self.add_trace(cls, item, *[], **cls_kwargs)
if (items := cls_cfg.special.value) is not None:
for mcfg in items:
method = mcfg.method.value
kwargs = {}
kwargs.update(cls_kwargs)
kwargs.update(build_kwargs(mcfg))
self.add_trace(cls, method, *[], **kwargs)
def with_name(name: str, obj: Any) -> bool:
return f'{obj.cls.__module__}.{obj.cls.__name__}' == name
cfg = TraceConfig('trace')
cfg.load_config(config, section)
self.flags = cfg.flags.value
global_kwargs = build_kwargs(cfg)
for cls_cfg in cfg.classes.value:
cls_name = cls_cfg.source.value
cls_kwargs = {}
cls_kwargs.update(global_kwargs)
cls_kwargs.update(build_kwargs(cls_cfg))
if (cls_desc := self._traced.find(partial(with_name, cls_name))) is None:
if cfg.autoregister.value:
cls = load(':'.join(cls_name.rsplit('.', 1)))
self.register(cls)
else:
raise Error(f"Class '{cls_name}' is not registered for trace.")
else:
cls = cls_desc.cls
apply_on(cls)
if cls_cfg.apply_to_descendants.value:
for cls_desc in self._traced.values():
if (cls_desc.cls is not cls) and issubclass(cls_desc.cls, cls):
apply_on(cls_desc.cls)
[docs] def set_flag(self, flag: TraceFlag) -> None:
"""Set flag specified by `flag` mask.
"""
self._flags |= flag
[docs] def clear_flag(self, flag: TraceFlag) -> None:
"""Clear flag specified by `flag` mask.
"""
self._flags &= ~flag
@property
def flags(self) -> TraceFlag:
"""Trace flags.
"""
return self._flags
@flags.setter
def flags(self, value: TraceFlag) -> None:
self._flags = value if isinstance(value, TraceFlag) else TraceFlag(value)
@property
def trace_active(self) -> bool:
"""True if trace is active.
"""
return TraceFlag.ACTIVE in self._flags
@trace_active.setter
def trace_active(self, value: bool) -> None:
if value:
self._flags |= TraceFlag.ACTIVE
else:
self._flags &= ~TraceFlag.ACTIVE
#: Trace manager
trace_manager: TraceManager = TraceManager()
#: shortcut for `trace_manager.add_trace()`
add_trace = trace_manager.add_trace
#: shortcut for `trace_manager.remove_trace()`
remove_trace = trace_manager.remove_trace
#: shortcut for `trace_manager.trace_object()`
trace_object = trace_manager.trace_object