Source code for fsh_lib.telemetry

"""Runtime helpers for OpenTelemetry-instrumented be apps.

This module is imported by the ``telemetry/setup.py`` and
``telemetry/decorators.py`` files generated by
:class:`be.operations.telemetry.TelemetryScaffold`.  Generated
apps without telemetry never import this module.

OpenTelemetry is an *optional* runtime dependency of be.  Install
it via ``pip install kiln-generator[opentelemetry]``, or directly
from the pinned ``telemetry/requirements.txt`` that the scaffold
emits next to the generated app.  Importing this module without
OTel installed raises :class:`ImportError`.

The decorators here create internal spans named ``{resource}.{op}``
that survive ASGI middleware reordering and carry low-cardinality
``be.resource`` / ``be.op`` attributes for filtering.  They
compose cleanly with the request span emitted by
:class:`opentelemetry.instrumentation.fastapi.FastAPIInstrumentor`.
"""

import functools
import os
from typing import TYPE_CHECKING, Any, Literal, assert_never

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
    OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
    OTLPSpanExporter as HttpSpanExporter,
)
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
    ConsoleSpanExporter,
    SpanExporter,
)
from opentelemetry.sdk.trace.sampling import (
    ALWAYS_OFF,
    ALWAYS_ON,
    ParentBased,
    Sampler,
    TraceIdRatioBased,
)

if TYPE_CHECKING:
    from collections.abc import Awaitable, Callable, Mapping

# -------------------------------------------------------------------
# Constants -- attribute keys used throughout, kept in one place so
# the docs page can link to them.
# -------------------------------------------------------------------

ATTR_RESOURCE = "be.resource"
"""Span attribute carrying the resource name (e.g. ``"article"``)."""
ATTR_OP = "be.op"
"""Span attribute carrying the operation name (e.g. ``"get"``)."""

# pgqueuer worker-span attribute keys.  The ``messaging.*`` keys are
# OpenTelemetry messaging semantic-convention names, so a generic OTel
# backend recognises an entrypoint span as a queue consumer without
# any be-specific dashboard wiring; ``pgqueuer.*`` is platform-
# specific.  Every key here is deliberately low-cardinality -- drawn
# from a small fixed set -- so the span is safe to group by in
# dashboards and span-derived metrics.  The unbounded per-job id is
# kept *off* the span for that reason (see :func:`traced_entrypoint`).
_ATTR_MSG_SYSTEM = "messaging.system"
_ATTR_MSG_OPERATION = "messaging.operation.type"
_ATTR_MSG_DESTINATION = "messaging.destination.name"
_ATTR_JOB_PRIORITY = "pgqueuer.job.priority"

_MESSAGING_SYSTEM = "pgqueuer"
"""Value of the ``messaging.system`` attribute on entrypoint spans --
identifies the broker so a backend can group every pgqueuer consumer
together regardless of which entrypoint emitted the span."""

_SCRUB_PLACEHOLDER = "[scrubbed]"
"""Replacement string written in place of credential / session
attributes on the auth router's spans.  A placeholder rather than
the absent attribute so a missing-attribute alert doesn't mask a
real outage."""


SamplerName = Literal[
    "always_on",
    "always_off",
    "parentbased_always_on",
    "parentbased_always_off",
    "parentbased_traceidratio",
    "traceidratio",
]
"""Accepted values of the ``sampler`` parameter to
:func:`build_tracer_provider`.  Mirrors
:data:`be.config.schema.SamplerName` -- duplicated here rather
than imported because ``fsh_lib`` is a runtime package and must not
depend on ``be``."""


ExporterName = Literal["otlp_http", "otlp_grpc", "console", "none"]
"""Accepted values of the ``exporter`` parameter to
:func:`build_tracer_provider`.  Mirrors
:data:`be.config.schema.ExporterName`."""


# -------------------------------------------------------------------
# Provider builders
# -------------------------------------------------------------------


def _build_resource(
    *,
    service_name: str,
    service_version: str | None,
    environment_env: str | None,
    extra: Mapping[str, str],
) -> Resource:
    """Build the OTel ``Resource`` shared across signal providers.

    Centralised so traces, metrics, and logs all carry identical
    resource attributes -- otherwise correlation across signals on
    a single service breaks silently.

    ``environment_env`` is the *name* of the variable holding the
    deployment-environment value, not the value itself; the lookup
    happens here so callers don't repeat it.  Empty / unset variables
    are skipped (``ENVIRONMENT=`` in a .env file is almost always a
    typo, not a real value).
    """
    attrs: dict[str, str] = {"service.name": service_name}

    if service_version:
        attrs["service.version"] = service_version

    if environment_env and (env_value := os.environ.get(environment_env)):
        attrs["deployment.environment.name"] = env_value

    attrs.update(extra)
    return Resource.create(attrs)


def _build_sampler(*, name: SamplerName, ratio: float | None) -> Sampler:
    """Translate the schema's sampler name to an SDK sampler.

    Validation already constrained ``name`` to a known string and
    ratio to ``[0.0, 1.0]`` when applicable; this function trusts
    that and is meant to be a thin lookup.
    """
    if name == "always_on":
        return ALWAYS_ON

    if name == "always_off":
        return ALWAYS_OFF

    if name == "parentbased_always_on":
        return ParentBased(ALWAYS_ON)

    if name == "parentbased_always_off":
        return ParentBased(ALWAYS_OFF)

    if name == "traceidratio":
        assert ratio is not None  # noqa: S101 -- guaranteed by config validator
        return TraceIdRatioBased(ratio)

    if name == "parentbased_traceidratio":
        assert ratio is not None  # noqa: S101 -- guaranteed by config validator
        return ParentBased(TraceIdRatioBased(ratio))

    assert_never(name)


def _build_span_exporter(
    *,
    exporter: ExporterName | None,
) -> SpanExporter | None:
    """Build a span exporter from the schema's selection.

    Returns ``None`` when ``exporter`` is ``"none"`` (caller skips
    the BatchSpanProcessor).  Otherwise the exporter is constructed
    with no arguments -- the OTel SDK reads ``OTEL_EXPORTER_OTLP_*``
    natively, so duplicating that here would just be churn.

    The gRPC exporter is imported lazily because its protobuf /
    grpc-io stack is roughly an order of magnitude heavier than the
    HTTP transport.  HTTP-only consumers don't pay that import
    cost; gRPC consumers install the
    ``kiln-generator[opentelemetry-grpc]`` extra to make the import
    succeed.
    """
    if exporter == "none":
        return None

    if exporter == "console":
        return ConsoleSpanExporter()

    if exporter == "otlp_grpc":
        from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (  # type: ignore[import-not-found]  # noqa: PLC0415
            OTLPSpanExporter as GrpcSpanExporter,
        )

        return GrpcSpanExporter()

    # Default and ``otlp_http``: HTTP/protobuf transport.
    return HttpSpanExporter()


[docs] def build_tracer_provider( *, service_name: str, service_version: str | None = None, environment_env: str | None = None, resource_attributes: Mapping[str, str] | None = None, sampler: SamplerName = "parentbased_always_on", sampler_ratio: float | None = None, exporter: ExporterName | None = None, ) -> TracerProvider: """Build and return a configured ``TracerProvider``. The caller is responsible for installing it via :func:`opentelemetry.trace.set_tracer_provider`. Splitting construction from installation keeps the helper testable without touching the global tracer provider. ``environment_env`` is the *name* of the environment variable holding the deployment-environment value (e.g. ``"ENVIRONMENT"``), not the value itself -- the same generated artifact deploys across dev / staging / prod, so the value is resolved at startup. ``None`` skips the lookup; an unset / empty variable does the same. The OTLP endpoint and headers are read by the SDK from the standard ``OTEL_EXPORTER_OTLP_*`` environment variables -- there is no kiln-side override for the variable names. """ resource = _build_resource( service_name=service_name, service_version=service_version, environment_env=environment_env, extra=resource_attributes or {}, ) provider = TracerProvider( resource=resource, sampler=_build_sampler(name=sampler, ratio=sampler_ratio), ) span_exporter = _build_span_exporter(exporter=exporter) if span_exporter is not None: provider.add_span_processor(BatchSpanProcessor(span_exporter)) return provider
[docs] def build_meter_provider( *, service_name: str, service_version: str | None = None, environment_env: str | None = None, resource_attributes: Mapping[str, str] | None = None, ) -> MeterProvider: """Build a ``MeterProvider`` reading exporter config from env. The metrics exporter is always selected via the standard ``OTEL_METRICS_EXPORTER`` env var (default ``otlp``); we don't expose a per-config override because metrics deployments almost universally pair with the same OTLP endpoint as traces. """ resource = _build_resource( service_name=service_name, service_version=service_version, environment_env=environment_env, extra=resource_attributes or {}, ) reader = PeriodicExportingMetricReader(OTLPMetricExporter()) return MeterProvider(resource=resource, metric_readers=[reader])
[docs] def build_logger_provider( *, service_name: str, service_version: str | None = None, environment_env: str | None = None, resource_attributes: Mapping[str, str] | None = None, ) -> LoggerProvider: """Build a ``LoggerProvider`` configured via OTLP env vars. Off by default in :class:`~be.config.schema.TelemetryConfig`; only used when the consumer opts in to log export. The provider on its own is not enough -- the generated ``init_telemetry`` also attaches a :class:`opentelemetry.sdk._logs.LoggingHandler` to the stdlib root logger so ``logging.getLogger().info(...)`` calls flow through OTLP. """ resource = _build_resource( service_name=service_name, service_version=service_version, environment_env=environment_env, extra=resource_attributes or {}, ) provider = LoggerProvider(resource=resource) provider.add_log_record_processor( BatchLogRecordProcessor(OTLPLogExporter()) ) return provider
# ------------------------------------------------------------------- # Decorators # -------------------------------------------------------------------
[docs] def traced_handler( span_name: str, *, resource: str, op: str, ) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]: """Wrap an async route handler in an internal span. Used for both CRUD ops and user-defined actions -- action names (``publish``, ``archive``, ...) flow through the same ``op`` parameter as CRUD names (``get``, ``list``, ...) and end up on the same :data:`ATTR_OP` attribute. Distinguishing CRUD from actions in dashboards is left to the value: be's CRUD names are a fixed small set, anything else is user-defined. Exceptions raised from the handler are deliberately *not* recorded on this span and the span status is left ``OK``. The final HTTP status -- including FastAPI's conversion of ``HTTPException`` into 4xx / 5xx responses -- is captured by the surrounding ``FastAPIInstrumentor`` server span, which is the authoritative signal for "did the request succeed." Recording here would double-count routine flow-control exceptions (``HTTPException(404)`` from ``get_object_from_query_or_404``, ``HTTPException(401)`` from the auth dep) as backend errors. Args: span_name: Span name, conventionally ``f"{resource}.{op}"``. resource: Low-cardinality resource label (e.g. ``"article"``) attached as :data:`ATTR_RESOURCE`. op: Low-cardinality op label (e.g. ``"get"``, ``"publish"``) attached as :data:`ATTR_OP`. The returned decorator preserves the wrapped function's signature so FastAPI's dependency-injection introspection still works. """ tracer = trace.get_tracer("be.handler") def decorator( fn: Callable[..., Awaitable[Any]], ) -> Callable[..., Awaitable[Any]]: @functools.wraps(fn) async def wrapper(*args: Any, **kwargs: Any) -> Any: with tracer.start_as_current_span( span_name, record_exception=False, set_status_on_exception=False, ) as span: span.set_attribute(ATTR_RESOURCE, resource) span.set_attribute(ATTR_OP, op) return await fn(*args, **kwargs) return wrapper return decorator
[docs] def traced_entrypoint( entrypoint: str, ) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]: """Wrap a pgqueuer entrypoint coroutine in a consumer span. pgqueuer entrypoints are async ``(job) -> None`` coroutines the ``pgq`` worker runs outside any HTTP request -- there is no surrounding ``FastAPIInstrumentor`` server span to root them under or to record their failures. This decorator gives every job its own root span so worker activity lands in the same trace backend as the request path. In pgqueuer the entrypoint *is* the queue -- jobs are routed to handlers purely by entrypoint name -- so *entrypoint* is emitted as the OTel ``messaging.destination.name``, the queue/topic key a backend groups consumer traffic by. The span is a standalone root (``SpanKind.CONSUMER``); it is deliberately *not* linked to the trace of the request that enqueued the job, since pgqueuer payloads are opaque ``bytes`` with no envelope to carry a ``traceparent`` -- producer and worker traces stay separate. Every attribute on the span is low-cardinality -- the entrypoint name and the job priority, each from a small fixed set -- so the span is safe to group by in dashboards and span-derived metrics. The unbounded per-job id is intentionally left off the span; it is carried on the worker's log records instead, which share this span's trace id when log correlation is enabled. Unlike :func:`traced_handler`, exceptions raised by the wrapped coroutine *are* recorded and the span status set to ``ERROR`` (the ``start_as_current_span`` defaults, left in place here). A raising entrypoint is pgqueuer's failure signal -- the job is retried or moved to the dead-letter log -- and with no server span downstream this span is the only place that failure surfaces in a trace. Args: entrypoint: The pgqueuer entrypoint (== queue) name the handler is registered under, e.g. ``"fsh_lib_reports_dispatch"``. Low-cardinality: it names the span and is attached as ``messaging.destination.name``. Returns: A decorator wrapping an async ``(job, ...) -> Any`` coroutine and preserving its signature, so ``PgQueuer.entrypoint`` still sees the original callable shape. """ tracer = trace.get_tracer("be.queue") def decorator( fn: Callable[..., Awaitable[Any]], ) -> Callable[..., Awaitable[Any]]: @functools.wraps(fn) async def wrapper(job: Any, *args: Any, **kwargs: Any) -> Any: with tracer.start_as_current_span( f"{entrypoint} process", kind=trace.SpanKind.CONSUMER, ) as span: span.set_attribute(_ATTR_MSG_SYSTEM, _MESSAGING_SYSTEM) span.set_attribute(_ATTR_MSG_OPERATION, "process") span.set_attribute(_ATTR_MSG_DESTINATION, entrypoint) span.set_attribute(_ATTR_JOB_PRIORITY, job.priority) return await fn(job, *args, **kwargs) return wrapper return decorator
[docs] def scrub_current_span_attributes(*keys: str) -> None: """Replace named attributes on the current span with a placeholder. The auth router wraps its login/logout handlers in a span that *would* otherwise carry the credentials request body and the session response body once ``capture_request_body`` / ``capture_response_body`` are turned on globally. This helper is invoked from inside those handlers to overwrite specific attribute keys (set by a future body-capture hook) with ``[scrubbed]`` -- a placeholder rather than removal so a "missing http.request.body" alert doesn't mask an outage. Independent of the global capture toggles: even if the consumer never turns on body capture, the auth router still calls this so any third-party span processor that chooses to attach bodies sees the scrubbed value. """ span = trace.get_current_span() for key in keys: span.set_attribute(key, _SCRUB_PLACEHOLDER)