Source code for structlog_sentry_logger._config

from __future__ import annotations

import dataclasses
import datetime
import inspect
import json
import logging
import logging.config
import os
import pathlib
import tempfile
import warnings
from types import FrameType
from typing import Any, Callable

try:
    import git
except ImportError:  # Namely, when a git executable is not found
    git = None  # type: ignore

import orjson  # type: ignore
import structlog

from structlog_sentry_logger import _feature_flags

try:
    from structlog_sentry_logger.structlog_sentry import SentryBreadcrumbJsonProcessor
except ImportError:
    SentryBreadcrumbJsonProcessor = None  # type: ignore


@dataclasses.dataclass
class Config:
    use_orjson: bool = True
    orjson_configs: int = orjson.OPT_NON_STR_KEYS
    stdlib_logging_config_already_configured: bool = False
    stdlib_logging_sort_keys: bool = False


def get_root_dir() -> pathlib.Path:
    if git is not None:
        try:
            return get_git_root()
        except git.InvalidGitRepositoryError as err:
            # the __str__() method on err returns the root descendant path, e.g., `/app`
            root_dir = pathlib.Path(str(err)).resolve(strict=True)
            return root_dir
    else:
        # If we couldn't import the git module, we have no way of automatically
        # determining the application root directory. In that case, fallback to the
        # system root directory for module namespacing
        system_root_dir = pathlib.Path(pathlib.Path.cwd().resolve().root)
        return system_root_dir


def get_git_root() -> pathlib.Path:  # Gratuitous indirection for testing
    git_repo = git.Repo(pathlib.Path.cwd(), search_parent_directories=True)

    # Note: `working_dir` should never be None, but mypy complains and there may be some
    # unknown corner case, so falling back to the system root directory for module
    # namespacing (this might be better merged with the logic in `get_root_dir`).
    git_root = git_repo.working_dir or pathlib.Path.cwd().resolve().root
    return pathlib.Path(git_root)


_LOG_LEVEL = logging.getLevelName(os.environ.get("LOG_LEVEL", "DEBUG").upper())
_SENTRY_LOG_LEVEL = logging.getLevelName(os.environ.get("STRUCTLOG_SENTRY_LOGGER_SENTRY_LOG_LEVEL", "ERROR").upper())

ROOT_DIR = get_root_dir()
_TIMESTAMPER = structlog.processors.TimeStamper(fmt="iso", utc=True)
_CONFIGS = Config(
    use_orjson=True,
    orjson_configs=orjson.OPT_NON_STR_KEYS,
    stdlib_logging_config_already_configured=False,
    stdlib_logging_sort_keys=False,
)


def _toggle_json_library(use_orjson: bool = True) -> None:
    _CONFIGS.use_orjson = use_orjson


[docs] def get_config_dict() -> dict: """Convenience function to get the local logging configuration dictionary,. e.g., to help configure loggers from other libraries. Returns: The logging configuration dictionary that would be used to configure the Python logging library component of the logger """ caller_name = get_caller_name_from_frames() return get_logging_config(caller_name)
[docs] def get_logger(name: str | None = None) -> Any: """Convenience function that returns a logger. Returns: A proxy that creates a correctly configured logger bound to the __name__ of the calling module """ del name caller_name = get_caller_name_from_frames() # Conditionally enable key sorting (off by default due to performance penalty) if _feature_flags.is_log_key_sorting_requested(): _CONFIGS.orjson_configs |= orjson.OPT_SORT_KEYS _CONFIGS.stdlib_logging_sort_keys = True if not _CONFIGS.stdlib_logging_config_already_configured: set_logging_config(caller_name) _CONFIGS.stdlib_logging_config_already_configured = True if not structlog.is_configured(): if ( _feature_flags.is_prettified_output_formatting_requested() or _feature_flags.is_stdlib_based_structlog_configuration_requested() ): set_stdlib_based_structlog_config() else: set_optimized_structlog_config() logger = structlog.get_logger(caller_name).bind(logger=caller_name) if hasattr(logger, "setLevel"): # stdlib-based logger logger.setLevel(_LOG_LEVEL) return logger
getLogger = get_logger """ CamelCase alias for `structlog_sentry_logger.get_logger`. """ def get_caller_name_from_frames() -> str: caller_frame, caller_name = _get_caller_stack_frame_and_name() if is_caller_main(caller_name): filename = inspect.getfile(caller_frame) caller_name = get_namespaced_module_name(filename) return caller_name def _get_caller_stack_frame_and_name() -> tuple[FrameType, str]: return structlog._frames._find_first_app_frame_and_name( # pylint:disable=protected-access additional_ignores=["structlog_sentry_logger"] ) def is_caller_main(caller_name: str) -> bool: # Gratuitous indirection for testing return caller_name == "__main__"
[docs] def get_namespaced_module_name(__file__: pathlib.Path | str) -> str: fully_qualified_path = pathlib.Path(__file__).resolve() prefix_dir = str(ROOT_DIR) if str(ROOT_DIR) in str(fully_qualified_path) else "/" namespaces = fully_qualified_path.relative_to(prefix_dir).with_suffix("").parts return ".".join(namespaces)
def set_logging_config(module_name: str) -> None: config_dict = get_logging_config(module_name) logging.config.dictConfig(config_dict) def get_logging_config(module_name: str) -> dict: handlers = get_handlers(module_name) return { "version": 1, "disable_existing_loggers": False, "formatters": (get_formatters()), "handlers": handlers, "loggers": { "": { "handlers": list(handlers.keys()), "level": "WARNING", "propagate": True, } }, } def get_handlers(module_name: str) -> dict: default_key = "default" base_handlers = { default_key: { "level": "DEBUG", "class": "logging.StreamHandler", "stream": "ext://sys.stdout", } } default_handler = base_handlers[default_key] if _feature_flags.is_prettified_output_formatting_requested(): # Add logfile handler filename_handler = get_dev_local_filename_handler(module_name) if filename_handler is not None: base_handlers["filename"] = filename_handler # Prettify stdout/stderr streams default_handler["formatter"] = "colored" else: default_handler["formatter"] = "plain" return base_handlers def get_dev_local_filename_handler(module_name: str) -> dict | None: """Builds logfile handler configs. Before building the logfile handler configurations, this function attempts to initialize the log directory in the (inferred) application root directory. If this fails (for example, if the directory is read-only), it will fall back to a platform-specific temp directory. If this too fails, it will exit without creating the logfile handler configuration. Args: module_name: the name of the calling module which will be incorporated in the logfile file name to provide better log provenance. Returns: logfile handler configurations if log directories are writeable, else None """ file_timestamp = datetime.datetime.utcnow().isoformat().replace(":", "-") log_file_name = f"{file_timestamp}_{module_name}.jsonl" fallback_log_data_root_dir = pathlib.Path(tempfile.mkdtemp(prefix=ROOT_DIR.name)) for log_data_dir in [ ROOT_DIR / ".logs", fallback_log_data_root_dir / ".logs", ]: if mkdir_logs_dir(log_data_dir): __LOGGER.debug( "saving JSON logs to local log directory", log_dir=str(log_data_dir), ) log_file_path = log_data_dir / log_file_name return { "level": "DEBUG", "class": "logging.handlers.RotatingFileHandler", "filename": str(log_file_path), # 1 MB "maxBytes": 1 << 20, # type: ignore[dict-item] "backupCount": 3, # type: ignore[dict-item] "formatter": "plain", } return None # def mkdir_logs_dir(log_data_dir: pathlib.Path) -> bool: try: log_data_dir.mkdir(exist_ok=True) return True except OSError as err: __LOGGER.warning( "logs directory creation failed", log_dir=str(log_data_dir), exc_info=err, ) return False def get_formatters() -> dict: pre_chain = [ # Add the log level and a timestamp to the event_dict if the log # entry is not from structlog. structlog.stdlib.add_log_level, _TIMESTAMPER, structlog.stdlib.add_logger_name, ] return { "plain": { "()": structlog.stdlib.ProcessorFormatter, "processor": structlog.processors.JSONRenderer( serializer=serializer, option=_CONFIGS.orjson_configs, ), "foreign_pre_chain": pre_chain, }, "colored": { "()": structlog.stdlib.ProcessorFormatter, "processor": structlog.dev.ConsoleRenderer(colors=True), "format": "%(message)s [in %(funcName)s]", "foreign_pre_chain": pre_chain, }, } def serializer( *args: Any, default: Callable[[Any], Any] | None = None, option: int | None = None, ) -> str: if _CONFIGS.use_orjson: return orjson.dumps(*args, default=default, option=option).decode() # type: ignore[misc] return json.dumps(*args, sort_keys=_CONFIGS.stdlib_logging_sort_keys) def set_stdlib_based_structlog_config() -> None: structlog_processors = [ _TIMESTAMPER, structlog.processors.StackInfoRenderer(), add_line_number_and_func_name, ] if SentryBreadcrumbJsonProcessor is not None and _feature_flags.is_sentry_integration_mode_requested(): structlog_processors.append(SentryBreadcrumbJsonProcessor(level=_SENTRY_LOG_LEVEL, tag_keys="__all__")) if ( _feature_flags.is_cloud_logging_compatibility_mode_requested() or _feature_flags.is_probably_in_cloud_environment() ): structlog_processors.append(add_severity_field_from_level_if_in_cloud_environment) stdlib_log_compatibility_processors = [ structlog.stdlib.filter_by_level, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), ] # Note: MUST come last! format_wrapper_processer = [structlog.stdlib.ProcessorFormatter.wrap_for_formatter] structlog.configure( processors=( stdlib_log_compatibility_processors # type: ignore[arg-type] + structlog_processors + format_wrapper_processer # type: ignore[arg-type,operator] ), # See [Performance](https://www.structlog.org/en/stable/performance.html) # for an in-depth explanation of the below settings context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) def set_optimized_structlog_config() -> None: processors = [ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, add_line_number_and_func_name, _TIMESTAMPER, ] if SentryBreadcrumbJsonProcessor is not None and _feature_flags.is_sentry_integration_mode_requested(): processors.append(SentryBreadcrumbJsonProcessor(level=_SENTRY_LOG_LEVEL, tag_keys="__all__")) if ( _feature_flags.is_cloud_logging_compatibility_mode_requested() or _feature_flags.is_probably_in_cloud_environment() ): processors.append(add_severity_field_from_level_if_in_cloud_environment) # Note: MUST come last! processors.append( structlog.processors.JSONRenderer( serializer=serializer_bytes, option=_CONFIGS.orjson_configs, ) ) structlog.configure( processors=processors, # type: ignore[arg-type] wrapper_class=structlog.make_filtering_bound_logger(_LOG_LEVEL), logger_factory=structlog.BytesLoggerFactory(), cache_logger_on_first_use=True, ) def serializer_bytes( *args: Any, default: Callable[[Any], Any] | None = None, option: int | None = None, ) -> bytes: if _CONFIGS.use_orjson: return orjson.dumps(*args, default=default, option=option) # type: ignore[misc] # pylint: disable=no-value-for-parameter return json.dumps(*args, sort_keys=_CONFIGS.stdlib_logging_sort_keys).encode("utf-8") # pylint: enable=no-value-for-parameter def add_line_number_and_func_name( logger: Any, # pylint: disable=unused-argument method: str, # pylint: disable=unused-argument event_dict: structlog.types.EventDict, ) -> structlog.types.EventDict: caller_frame, _ = _get_caller_stack_frame_and_name() event_dict["lineno"] = caller_frame.f_lineno event_dict["funcName"] = caller_frame.f_code.co_name return event_dict def add_severity_field_from_level_if_in_cloud_environment( logger: Any, # pylint: disable=unused-argument method: str, # pylint: disable=unused-argument event_dict: structlog.types.EventDict, ) -> structlog.types.EventDict: """A custom processor for structlog for Cloud Logging compatibility. Since Cloud Logging infers log levels from the `severity` key, simply duplicates `level` to the `severity` field in the logger's event dictionary. """ cloud_logging_log_level_key, python_log_level_key = "severity", "level" if cloud_logging_log_level_key in event_dict: # Warn users that they should fix their application code warnings.warn( f"Existing field " f"{cloud_logging_log_level_key}={event_dict[cloud_logging_log_level_key]} " "being overwritten by log level " f"({python_log_level_key}={event_dict[python_log_level_key]})", RuntimeWarning, ) # Also, redundantly log this warning to users. # # While best practice is to only log warnings which the user isn't expected # to fix (https://docs.python.org/2/howto/logging.html#when-to-use-logging), # many users rely on automated log parsing tools for their alerting and # audit trails. __LOGGER.warning( "Existing log value being overwritten", src_key=python_log_level_key, dest_key=cloud_logging_log_level_key, old_value=event_dict[cloud_logging_log_level_key], new_value=event_dict[python_log_level_key], logger_that_used_reserved_key=event_dict["logger"], ) event_dict[cloud_logging_log_level_key] = event_dict[python_log_level_key] return event_dict def __get_meta_logger() -> Any: """Meta-logger to emit messages generated during logger configuration.""" if _feature_flags.is_prettified_output_formatting_requested(): # The following line is covered by unit tests but coverage doesn't get picked up # for some reason set_stdlib_based_structlog_config() # pragma: no cover else: set_optimized_structlog_config() logger_name = "structlog_sentry_logger._config" logger = structlog.get_logger(logger_name).bind(logger=logger_name) structlog.reset_defaults() return logger __LOGGER = __get_meta_logger()