Source code for fsh_lib.comms

"""Communication-platform primitives for kiln-generated apps.

Sends typed messages over pluggable delivery methods (email, SMS,
push, ...) using pgqueuer as the transactional outbox.  The pieces:

* :class:`CommType` -- a named, schema-validated communication.
  Carries a Pydantic ``context_schema`` and a pair of template
  strings (subject + body); :class:`CommRegistry` holds the set the
  consumer's app supports.

* :class:`Renderer` -- a Protocol that turns a :class:`CommType`
  plus a validated context into a rendered subject + body pair.
  The default :class:`JinjaRenderer` is in-process; the same
  surface fits an HTTP-call renderer that defers to a separate
  template service (a node renderer, MJML compiler, anything)
  without changing the platform's call site.

* :class:`MessageMixin` / :class:`RecipientMixin` /
  :class:`NotificationPreferenceMixin` -- SQLAlchemy mixins
  supplying the storage columns for the three tables every
  comm-platform install needs.  Same pgcraft-friendly idiom as
  the file / rate-limit mixins: consumer owns the table, we own
  the columns.

* :class:`Transport` -- a Protocol implemented per delivery
  method.  One adapter per supported method, e.g. ``email``,
  ``sms``.  Implementations live in consumer code or third-party
  adapters; this module ships :class:`LoggingTransport` for tests
  and local development.

* :class:`PreferenceResolver` -- a Protocol that answers
  "should this recipient receive this comm-type via this method?"
  Looked up once per recipient inside :func:`send_communication`;
  an opted-out recipient yields no row and no job (the message
  row still records the attempt for audit).

* :func:`send_communication` -- the producer entry point.  Validates
  context against the :class:`CommType` schema, renders the
  templates, inserts one message row and one recipient row per
  delivery, then enqueues one pgqueuer job per recipient under
  :data:`DISPATCH_ENTRYPOINT`.  All writes ride the caller's
  SQLAlchemy session and the pgqueuer ``Queries`` is bound to the
  same connection (see :func:`fsh_lib.queue.get_queue`), so a single
  ``await session.commit()`` makes the message + recipients +
  jobs durable atomically.

* :func:`make_dispatch_entrypoint` -- the worker-side counterpart.
  Returns an async ``(job) -> None`` callable wired to the consumer's
  session factory, transports, and mixin classes -- register it
  against :data:`DISPATCH_ENTRYPOINT` on a pgqueuer ``PgQueuer``
  instance.

The module's only optional dependency is ``jinja2`` (already pulled
in by ``kiln-generator`` for codegen); no extras gate is needed.
"""

from __future__ import annotations

import dataclasses
import datetime as _dt
import enum
import uuid
from collections.abc import Awaitable
from typing import TYPE_CHECKING, Any, Literal, Protocol

from jinja2 import Template
from pydantic import BaseModel
from sqlalchemy import (
    JSON,
    Boolean,
    DateTime,
    String,
    Text,
    Uuid,
    insert,
    select,
    update,
)
from sqlalchemy.orm import Mapped, mapped_column

from fsh_lib.queue import instrument_entrypoint

if TYPE_CHECKING:
    from collections.abc import Callable, Sequence

    from pgqueuer import Queries
    from pgqueuer.models import Job
    from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker


DISPATCH_ENTRYPOINT = "fsh_lib_comms_dispatch"
"""pgqueuer entrypoint name jobs are enqueued under.

The producer side (:func:`send_communication`) enqueues under this
name; the worker side must register its handler under the same name
(see :func:`make_dispatch_entrypoint`).  Exposed so consumers don't
hard-code the literal in two places.
"""


[docs] class DeliveryStatus(enum.StrEnum): """Lifecycle states of a single recipient's delivery attempt.""" PENDING = "pending" """Row inserted, job enqueued, transport not yet called.""" SENT = "sent" """Transport returned without raising.""" FAILED = "failed" """Transport raised; ``error`` carries the message."""
# ------------------------------------------------------------------- # CommType + registry # -------------------------------------------------------------------
[docs] @dataclasses.dataclass(frozen=True) class CommType[ContextT: BaseModel]: """A named communication: schema + templates + default methods. A ``CommType`` is the unit of design for the comm platform: it binds a Pydantic context schema to a pair of template strings plus the set of methods the consumer wants delivery on by default. The same instance is shared by the producer (which validates and renders) and the worker (which renders too if the rendered body wasn't persisted). Attributes: name: Stable identifier (e.g. ``"order_shipped"``). Used as the registry key and stored on the message row so the audit log survives template churn. context_schema: Pydantic model describing the fields the templates reference. ``send_communication`` validates the caller-supplied context against this before rendering, so missing or mistyped fields fail fast at the call site instead of half-way through a template. subject_template: Source string for the subject line. Interpreted by the configured :class:`Renderer`; the default :class:`JinjaRenderer` treats it as Jinja2. body_template: Source string for the body. Same renderer treatment as :attr:`subject_template`. default_methods: Methods to deliver on when the caller doesn't pass an explicit recipient list. Empty tuple means "no default; caller must specify recipients". """ name: str context_schema: type[ContextT] subject_template: str body_template: str default_methods: tuple[str, ...] = ()
[docs] class CommRegistry: """Mutable registry of :class:`CommType` entries by name. Built once at app startup (or as a module-level global if the consumer prefers) and passed into :func:`send_communication` and :func:`make_dispatch_entrypoint`. Not thread-safe -- mutate it only during startup. """ def __init__(self) -> None: """Build an empty registry.""" self._types: dict[str, CommType[Any]] = {}
[docs] def register(self, comm_type: CommType[Any]) -> None: """Add *comm_type* to the registry. Raises: ValueError: If a type with the same name is already registered. Re-registration is almost always a bug (two modules thought they owned the same name); force callers to deregister first if they really want it. """ if comm_type.name in self._types: msg = f"comm_type {comm_type.name!r} already registered" raise ValueError(msg) self._types[comm_type.name] = comm_type
[docs] def get(self, name: str) -> CommType[Any]: """Return the :class:`CommType` registered under *name*. Raises: KeyError: If *name* is not registered. """ try: return self._types[name] except KeyError: msg = f"unknown comm_type: {name!r}" raise KeyError(msg) from None
[docs] def names(self) -> tuple[str, ...]: """Return registered comm-type names in registration order.""" return tuple(self._types)
# ------------------------------------------------------------------- # Renderer # -------------------------------------------------------------------
[docs] @dataclasses.dataclass(frozen=True) class RenderedMessage: """Output of a :class:`Renderer` -- the strings we persist. Attributes: subject: Subject line. Populated by every renderer. body: Plain-text body. The legacy field :class:`JinjaRenderer` produces and that transports already consume; :class:`HttpRenderer` populates it from the render service's ``text`` alternative for email output, or leaves it ``""`` for PDF output. html: HTML alternative body for email outputs, or ``None``. ``JinjaRenderer`` never sets it; ``HttpRenderer`` sets it when the render service returns ``output: "email"``. Email transports should send HTML + plain-text as ``multipart/alternative``. pdf: Raw PDF bytes for PDF outputs, or ``None``. ``HttpRenderer`` sets it when the render service returns ``output: "pdf"``; transports treat it as a single attachment. """ subject: str body: str html: str | None = None pdf: bytes | None = None
[docs] class Renderer(Protocol): """Hook: turn a :class:`CommType` + context into rendered strings. The default :class:`JinjaRenderer` evaluates the template strings in-process. A microservice-based renderer (e.g. a node service that compiles MJML or runs a richer template language) implements the same single method and gets dropped in via the ``renderer`` argument to :func:`send_communication` -- no other code changes. *context* is the validated Pydantic model; implementations call :meth:`~pydantic.BaseModel.model_dump` themselves so they can pick the dump mode (``json`` vs Python) that fits their wire format. """
[docs] async def render( self, comm_type: CommType[Any], context: BaseModel, ) -> RenderedMessage: """Return the rendered subject + body for this comm.""" ...
[docs] class JinjaRenderer: """In-process Jinja2 renderer. The default for :func:`send_communication`. Treats :attr:`CommType.subject_template` and :attr:`CommType.body_template` as Jinja2 source strings and renders them against the context dump (``model_dump(mode="json")`` so dates/uuids stringify the same way they would over the wire). Trivial enough to construct inline; instances hold no state beyond the autoescape choice, which only matters for the body of HTML emails (callers building HTML bodies should pass ``autoescape=True``). """ def __init__(self, *, autoescape: bool = False) -> None: """Build a renderer. Args: autoescape: When ``True``, Jinja autoescapes HTML in the rendered output. Off by default because plain-text bodies (SMS, push) are the more common case; consumers building HTML email bodies should opt in. """ self._autoescape = autoescape
[docs] async def render( self, comm_type: CommType[Any], context: BaseModel, ) -> RenderedMessage: """Render the comm's templates against *context*.""" data = context.model_dump(mode="json") subject = Template( comm_type.subject_template, autoescape=self._autoescape, ).render(**data) body = Template( comm_type.body_template, autoescape=self._autoescape, ).render(**data) return RenderedMessage(subject=subject, body=body)
[docs] class HttpRenderer: """Renderer that defers to an external render microservice. Drop-in replacement for :class:`JinjaRenderer` when the project runs a separate render service (the kiln-render scaffold or any HTTP equivalent). Posts ``{template, context, theme}`` to the service's ``/v1/render`` endpoint and maps the response onto a :class:`RenderedMessage`: * ``output: "email"`` -> ``{subject, html, text}`` -> ``RenderedMessage(subject=subject, body=text, html=html)``. * ``output: "pdf"`` -> ``{subject, pdf_base64}`` -> ``RenderedMessage(subject=subject, body="", pdf=bytes)``. The service is the source of truth for ``output``; the renderer inspects the response shape rather than carrying a copy of the build-time contract at runtime. Discrepancies between the project's declared :class:`~be.config.schema.CommTypeConfig` output and the service's response are surfaced by the generated ``render-contract.json`` + the render service's ``tsc`` step, not by this client. Args: base_url: Base URL of the render service, e.g. ``"http://render:8200"``. Typically pulled from a ``RENDER_URL`` env var by the consumer. theme: Default theme payload merged into every request -- ``{logoUrl, colors, typography, ...}``. Per-send overrides are not supported on this surface yet; supply the theme once at construction. ``None`` sends no theme block. client: An existing :class:`httpx.AsyncClient` to use. ``None`` (default) creates and owns one. The owned client is closed by :meth:`aclose`. timeout: Per-request timeout in seconds. Ignored when an explicit *client* is supplied. Raises: ImportError: ``httpx`` is not installed. Install ``fsh-lib[comms-http]`` to use this renderer. """ _DEFAULT_TIMEOUT = 10.0 def __init__( self, base_url: str, *, theme: dict[str, Any] | None = None, client: Any | None = None, timeout: float = _DEFAULT_TIMEOUT, ) -> None: """Store the endpoint + theme; lazy-import httpx.""" try: import httpx # noqa: PLC0415 -- optional dep, gate at construct except ImportError as exc: msg = ( "fsh_lib.comms.HttpRenderer requires httpx; install " "the optional extra via ``pip install " "'fsh-lib[comms-http]'``" ) raise ImportError(msg) from exc self._theme = dict(theme) if theme is not None else None self._owns_client = client is None self._client = client or httpx.AsyncClient( base_url=base_url, timeout=timeout, )
[docs] async def aclose(self) -> None: """Close the underlying client iff this instance owns it.""" if self._owns_client: await self._client.aclose()
[docs] async def render( self, comm_type: CommType[Any], context: BaseModel, ) -> RenderedMessage: """POST ``/v1/render`` and convert the response. The request envelope is:: { "template": "<comm_type.name>", "context": <context.model_dump(mode="json")>, "theme": <self._theme or omitted> } The response is one of the two shapes documented on the class. A non-2xx response or a response missing the expected keys raises :class:`RuntimeError` -- the caller sees the same surface as a Jinja syntax error would produce. """ document: dict[str, Any] = { "template": comm_type.name, "context": context.model_dump(mode="json"), } if self._theme is not None: document["theme"] = self._theme response = await self._client.post("/v1/render", json=document) response.raise_for_status() payload = response.json() subject = payload.get("subject", "") if "pdf_base64" in payload: import base64 # noqa: PLC0415 -- stdlib, branch-local return RenderedMessage( subject=subject, body="", pdf=base64.b64decode(payload["pdf_base64"]), ) if "html" in payload: return RenderedMessage( subject=subject, body=payload.get("text", ""), html=payload["html"], ) msg = ( f"render service response for template " f"{comm_type.name!r} carries neither ``html`` nor " f"``pdf_base64`` (got keys: {sorted(payload)})" ) raise RuntimeError(msg)
# ------------------------------------------------------------------- # Mixins # -------------------------------------------------------------------
[docs] class MessageMixin: """SQLAlchemy mixin for the message table. One row per ``send_communication`` call -- represents the *intent* to communicate. Per-method delivery state lives on the :class:`RecipientMixin` rows that point back here via :attr:`RecipientMixin.message_id`. Subclass on a regular ``Base`` to materialise the table: .. code-block:: python from fsh_lib.comms import MessageMixin class CommMessage(Base, MessageMixin): __tablename__ = "comm_messages" id: Mapped[uuid.UUID] = mapped_column( Uuid, primary_key=True, default=uuid.uuid4, ) Storing the rendered ``subject``/``body`` (rather than re-rendering from ``context`` at send time) means template churn doesn't invalidate the audit log: the message row reflects what the recipient actually saw. The mixin owns no primary key -- declare ``id`` on the consumer class (typically via a pgcraft PK plugin or an explicit UUID column) so the consumer's PK convention wins. Same idiom as :class:`~fsh_lib.saved_views.SavedViewMixin`. """ if TYPE_CHECKING: # Type-only -- concrete column lives on the consumer class. # ``send_communication`` and the dispatch entrypoint read # ``message.id`` and need it typed. id: Mapped[Any] comm_type: Mapped[str] = mapped_column( String(255), nullable=False, index=True, ) """Registry key (:attr:`CommType.name`). Indexed so the audit log can group by type cheaply.""" context: Mapped[dict[str, Any]] = mapped_column( JSON, nullable=False, default=dict, ) """JSON dump of the validated context, kept so the row can be re-rendered or replayed if templates change.""" subject: Mapped[str | None] = mapped_column( String(1024), nullable=True, ) """Rendered subject. Nullable because some methods (SMS, push) have only a body.""" body: Mapped[str] = mapped_column(Text, nullable=False, default="") """Rendered body. ``Text`` rather than ``String`` because bodies (especially HTML email) routinely exceed the 64 KiB Postgres ``varchar`` ceiling."""
# ``created_at`` / ``updated_at`` are pgcraft-managed -- the # consumer's factory injects them via :class:`TimestampPlugin`. # Declaring them here would collide at table-build time.
[docs] class RecipientMixin: """SQLAlchemy mixin for the recipient table. One row per ``(message, method, address)`` triple. The pgqueuer job carries the recipient id, so the dispatch path is: job -> recipient -> message -> transport lookup. The mixin deliberately doesn't declare a foreign key to the consumer's :class:`MessageMixin` table -- the consumer names that table, so the FK has to come from their own subclass via a ``sqlalchemy.ForeignKey`` on the ``message_id`` column. Keeping the mixin FK-free means the same class works regardless of where the consumer mounts the message table. Like :class:`MessageMixin`, the PK is left to the consumer to declare so pgcraft / consumer conventions own the column. """ if TYPE_CHECKING: # Type-only -- concrete column lives on the consumer class. # The dispatch entrypoint reads ``recipient.id`` and the # pgqueuer payload encodes it. id: Mapped[Any] message_id: Mapped[uuid.UUID] = mapped_column( Uuid, nullable=False, index=True, ) """Points back at the :class:`MessageMixin` row. Consumers typically add an explicit ``ForeignKey`` in their subclass.""" method: Mapped[str] = mapped_column( String(64), nullable=False, ) """Delivery method (``"email"``, ``"sms"``, ...). Used to look up the right transport at dispatch time.""" address: Mapped[str] = mapped_column( String(512), nullable=False, ) """Method-specific destination (email address, phone number, push token, ...). Opaque to this module.""" kind: Mapped[str] = mapped_column( String(8), nullable=False, default="to", ) """One of :data:`RecipientKind` (``"to"`` / ``"cc"`` / ``"bcc"``). Stored as a string so adding a new addressing kind doesn't need a migration. Email transports read this off the row to lay out the RFC 5322 headers; non-email transports typically ignore it.""" status: Mapped[str] = mapped_column( String(32), nullable=False, default=DeliveryStatus.PENDING.value, ) """One of :class:`DeliveryStatus`'s values. Stored as a string (not a SQL enum) so adding a new state doesn't require a migration.""" sent_at: Mapped[_dt.datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, ) """When the transport returned successfully. ``None`` while pending or failed.""" error: Mapped[str | None] = mapped_column( String(1024), nullable=True, ) """Truncated exception message from a failed delivery. ``None`` until the dispatch path catches an error."""
[docs] class NotificationPreferenceMixin: """SQLAlchemy mixin for the per-recipient preference table. One row per ``(subject_key, comm_type, method)`` triple records whether that recipient wants that comm type via that method. :func:`send_communication` looks the row up via the :class:`PreferenceResolver` protocol -- this mixin just supplies the columns; the resolver implementation lives in consumer code (or in a generated helper). ``subject_key`` is intentionally a string rather than a typed foreign key: a comm platform routinely targets users, accounts, org-level addresses, or external identifiers, and a typed FK would lock the table to one of those. Like the rest of the comms mixins, the PK is left to the consumer to declare so pgcraft / consumer conventions own the column. """ if TYPE_CHECKING: id: Mapped[Any] subject_key: Mapped[str] = mapped_column( String(255), nullable=False, index=True, ) """Identifier of the recipient whose preferences this row captures (typically a user id formatted as a string).""" comm_type: Mapped[str] = mapped_column( String(255), nullable=False, ) """Registry key (:attr:`CommType.name`).""" method: Mapped[str] = mapped_column( String(64), nullable=False, ) """Delivery method this preference scopes.""" enabled: Mapped[bool] = mapped_column( Boolean, nullable=False, default=True, ) """``True`` when the recipient consents to this ``(comm_type, method)`` combination. Default ``True`` so an absent row reads as opt-in by default; flip per consumer policy if your default is opt-out."""
# ------------------------------------------------------------------- # Recipients + preferences + transports # ------------------------------------------------------------------- RecipientKind = Literal["to", "cc", "bcc"] """How a recipient is addressed in the outbound message. For email: maps directly to RFC 5322 ``To:`` / ``Cc:`` / ``Bcc:`` headers; transports lay the headers out themselves so cc recipients see the to/cc lists and bcc recipients don't. For methods without header semantics (SMS, push) this is informational -- the dispatch path delivers each row regardless of kind. """
[docs] @dataclasses.dataclass(frozen=True) class RecipientSpec: """Single recipient handed to :func:`send_communication`. Attributes: method: Delivery method (must match a transport key). address: Method-specific destination (email, phone, ...). kind: To / Cc / Bcc -- gives transports the information they need to lay the message out correctly. Defaults to ``"to"`` so call sites that don't care don't need to set it. subject_key: Identifier whose preferences gate delivery. ``None`` skips the preference check (e.g. transactional sends to non-user addresses like a billing inbox). """ method: str address: str kind: RecipientKind = "to" subject_key: str | None = None
[docs] class PreferenceResolver(Protocol): """Hook: gate delivery on the recipient's per-method opt-in."""
[docs] async def is_enabled( self, *, subject_key: str, comm_type: str, method: str, ) -> bool: """Return ``True`` to deliver, ``False`` to suppress.""" ...
[docs] class Transport(Protocol): """Method-specific delivery adapter. Implementations are free to do whatever the method requires (SMTP send, Twilio API call, FCM push, ...). Raise to mark the delivery failed; return normally to mark it sent. The dispatch path stamps :attr:`RecipientMixin.status` and :attr:`RecipientMixin.sent_at` based on which path you take. """
[docs] async def send( self, *, message: MessageMixin, recipient: RecipientMixin, ) -> None: """Deliver *message* to *recipient* or raise.""" ...
[docs] class LoggingTransport: """Test/dev transport: records every send into an in-memory list. Not async-safe across processes, obviously; the point is to give unit tests something to assert against and to give local development a no-credentials fallback. Production transports live in consumer code or third-party packages. """ def __init__(self) -> None: """Build a transport with an empty ``self.sent`` log.""" self.sent: list[tuple[uuid.UUID, str, str]] = []
[docs] async def send( self, *, message: MessageMixin, recipient: RecipientMixin, ) -> None: """Record ``(message_id, address, body)`` into ``self.sent``.""" self.sent.append((message.id, recipient.address, message.body))
# ------------------------------------------------------------------- # Producer side -- the entry point a request handler calls. # -------------------------------------------------------------------
[docs] async def send_communication( *, session: AsyncSession, queue: Queries, registry: CommRegistry, comm_type: str, context: BaseModel | dict[str, Any], recipients: Sequence[RecipientSpec], message_cls: type[MessageMixin], recipient_cls: type[RecipientMixin], renderer: Renderer | None = None, preferences: PreferenceResolver | None = None, ) -> uuid.UUID: """Validate, render, persist, and enqueue a communication. The transactional-outbox guarantee: 1. Validate *context* against the comm-type's schema. 2. Render the templates with *renderer* (defaults to :class:`JinjaRenderer`). 3. Insert one :class:`MessageMixin` row. 4. For each recipient, consult *preferences* (if supplied); insert a :class:`RecipientMixin` row for each one that passes. 5. Enqueue one pgqueuer job per surviving recipient under :data:`DISPATCH_ENTRYPOINT`, payload = recipient id (UTF-8). Steps 3-5 all ride *session*'s transaction (see :func:`fsh_lib.queue.get_queue` for how *queue* is bound to the same connection). Commit the session and the message, recipients, and jobs all become durable atomically; roll back and the communication never happened. Args: session: The async SQLAlchemy session running the caller's transaction. All inserts ride it. queue: A pgqueuer ``Queries`` bound to *session*'s connection. Build it with :func:`fsh_lib.queue.get_queue` immediately before this call. registry: The :class:`CommRegistry` containing the type named by *comm_type*. comm_type: Registry key for the comm to send. context: Either an instance of the type's :attr:`CommType.context_schema` or a dict that will be validated against it. recipients: Per-delivery specs (method + address + optional subject_key for preference lookup). message_cls: Consumer's concrete :class:`MessageMixin` subclass. recipient_cls: Consumer's concrete :class:`RecipientMixin` subclass. renderer: Override for the template renderer. Defaults to :class:`JinjaRenderer` -- swap in an HTTP-call renderer to defer rendering to a separate service. preferences: Optional preference resolver. When omitted, every recipient is delivered to (subject to ``subject_key`` semantics on :class:`RecipientSpec`). Returns: The id of the inserted :class:`MessageMixin` row. """ spec = registry.get(comm_type) if isinstance(context, BaseModel): validated: BaseModel = context else: validated = spec.context_schema.model_validate(context) used_renderer = renderer or JinjaRenderer() rendered = await used_renderer.render(spec, validated) return await _persist_and_enqueue( session=session, queue=queue, comm_type=comm_type, context=validated.model_dump(mode="json"), subject=rendered.subject, body=rendered.body, recipients=recipients, message_cls=message_cls, recipient_cls=recipient_cls, preferences=preferences, )
async def _persist_and_enqueue( *, session: AsyncSession, queue: Queries, comm_type: str, context: dict[str, Any], subject: str, body: str, recipients: Sequence[RecipientSpec], message_cls: type[MessageMixin], recipient_cls: type[RecipientMixin], preferences: PreferenceResolver | None, ) -> uuid.UUID: """Insert the message + recipient rows and enqueue per-recipient jobs. The transactional-outbox tail of the producer path: separated out so :func:`send_communication` (which renders Jinja templates from a registered :class:`CommType`) and :func:`send_user_template` (which renders tokenized fields from a saved row) share one persist + enqueue implementation. All inserts ride *session*'s transaction and the enqueue rides *queue*'s connection binding -- a single ``await session.commit()`` by the caller makes the message, recipients, and jobs durable atomically. Recipients suppressed by *preferences* leave no recipient row and no job (the message row still records the attempt for audit). """ message_id = uuid.uuid4() await session.execute( insert(message_cls).values( id=message_id, comm_type=comm_type, context=context, subject=subject, body=body, ), ) delivered_ids: list[uuid.UUID] = [] for recipient in recipients: if ( preferences is not None and recipient.subject_key is not None and not await preferences.is_enabled( subject_key=recipient.subject_key, comm_type=comm_type, method=recipient.method, ) ): continue rid = uuid.uuid4() await session.execute( insert(recipient_cls).values( id=rid, message_id=message_id, method=recipient.method, address=recipient.address, kind=recipient.kind, status=DeliveryStatus.PENDING.value, ), ) delivered_ids.append(rid) if delivered_ids: await queue.enqueue( [DISPATCH_ENTRYPOINT] * len(delivered_ids), [str(rid).encode("utf-8") for rid in delivered_ids], [0] * len(delivered_ids), ) return message_id # ------------------------------------------------------------------- # Worker side -- the pgqueuer entrypoint factory. # -------------------------------------------------------------------
[docs] def make_dispatch_entrypoint( *, session_factory: async_sessionmaker[AsyncSession], transports: dict[str, Transport], message_cls: type[MessageMixin], recipient_cls: type[RecipientMixin], ) -> Callable[[Job], Awaitable[None]]: """Build the worker-side handler for :data:`DISPATCH_ENTRYPOINT`. Returns an ``async (job) -> None`` callable suitable for ``pgqueuer.PgQueuer.entrypoint``. Per job: 1. Decode the recipient id from ``job.payload``. 2. Open a session from *session_factory*; load the recipient and the matching message. 3. Skip if the recipient is missing or already advanced past :attr:`DeliveryStatus.PENDING` (job retried after a previous success / explicit failure). 4. Look up the transport for the recipient's method; mark the row failed if no transport is registered. 5. Call ``transport.send`` -- mark the row sent on success or failed (with the error message) on raise. Args: session_factory: Async sessionmaker; each job opens a short-lived session of its own. transports: Method -> :class:`Transport` lookup. message_cls: Consumer's concrete :class:`MessageMixin`. recipient_cls: Consumer's concrete :class:`RecipientMixin`. Returns: Async handler the consumer registers under :data:`DISPATCH_ENTRYPOINT`. Wrapped via :func:`fsh_lib.queue.instrument_entrypoint`, so it emits an OpenTelemetry consumer span per job when the ``fsh-lib[opentelemetry]`` extra is installed and runs unwrapped otherwise. """ async def handler(job: Job) -> None: if job.payload is None: # An empty payload can't identify a recipient. Drop the # job rather than raising -- pgqueuer's retry path would # just hit the same condition again. return recipient_id = uuid.UUID(job.payload.decode("utf-8")) async with session_factory() as session: # ``SELECT ... FOR UPDATE SKIP LOCKED`` claims the row for # this worker. pgqueuer can fan a job out to multiple # workers (concurrent consumers, retry on the same worker # while another instance is mid-send); without the lock, # two workers see ``status == PENDING`` and both call # ``transport.send`` -- the user gets duplicate # notifications. ``skip_locked`` makes the second worker # see no row rather than block, so it bails fast and # leaves the in-flight worker to finish. result = await session.execute( select(recipient_cls) .where(recipient_cls.id == recipient_id) .with_for_update(skip_locked=True), ) recipient = result.scalar_one_or_none() if recipient is None: return if recipient.status != DeliveryStatus.PENDING.value: # Job re-fire after a previous terminal outcome -- # leave the row alone so audit history stays honest. return message = await session.get(message_cls, recipient.message_id) if message is None: await _mark_failed( session, recipient_cls, recipient_id, error="message row missing", ) await session.commit() return transport = transports.get(recipient.method) if transport is None: await _mark_failed( session, recipient_cls, recipient_id, error=f"no transport for method {recipient.method!r}", ) await session.commit() return try: await transport.send(message=message, recipient=recipient) except Exception as exc: # noqa: BLE001 -- transport-agnostic await _mark_failed( session, recipient_cls, recipient_id, error=_truncate(str(exc)), ) await session.commit() return await session.execute( update(recipient_cls) .where(recipient_cls.id == recipient_id) .values( status=DeliveryStatus.SENT.value, sent_at=_dt.datetime.now(tz=_dt.UTC), error=None, ), ) await session.commit() return instrument_entrypoint(DISPATCH_ENTRYPOINT, handler)
_ERROR_COLUMN_LIMIT = 1024 """Length cap matching :attr:`RecipientMixin.error`'s column type.""" def _truncate(text: str) -> str: """Clip *text* to fit :attr:`RecipientMixin.error`.""" if len(text) <= _ERROR_COLUMN_LIMIT: return text return text[: _ERROR_COLUMN_LIMIT - 1] + "…" async def _mark_failed( session: AsyncSession, recipient_cls: type[RecipientMixin], recipient_id: uuid.UUID, *, error: str, ) -> None: """Stamp a recipient row as :attr:`DeliveryStatus.FAILED`.""" await session.execute( update(recipient_cls) .where(recipient_cls.id == recipient_id) .values( status=DeliveryStatus.FAILED.value, error=_truncate(error), ), ) # ------------------------------------------------------------------- # Convenience: load the audit row # -------------------------------------------------------------------
[docs] async def load_recipients( session: AsyncSession, recipient_cls: type[RecipientMixin], message_id: uuid.UUID, ) -> list[RecipientMixin]: """Return every recipient row for *message_id*, in insertion order. Lightweight read helper for an audit endpoint -- generated route handlers can call this without re-deriving the ``message_id ==`` filter. """ result = await session.execute( select(recipient_cls).where(recipient_cls.message_id == message_id), ) return list(result.scalars())
# ------------------------------------------------------------------- # User-defined templates # ------------------------------------------------------------------- # # The platform above is "developer-defined comms": types live in code # as :class:`CommType` instances, templates are inlined at build time, # and senders pick a type by name plus a typed context. This section # adds the parallel "user-defined comms" surface -- a template row a # user authored at runtime, optionally scoped to a resource so the # editor can fill the body with that resource's fields, rendered with # a Slate-friendly token format and dispatched through the same # outbox + worker. # # The two surfaces share storage (:class:`MessageMixin` / # :class:`RecipientMixin`) and the dispatch path -- the producer path # diverges to render tokens and resolve recipient slots, then funnels # back into :func:`_persist_and_enqueue`.
[docs] class UserCommTemplateMixin: """SQLAlchemy mixin for the user-authored template table. One row per saved template. Subclass on a regular ``Base`` to materialise the table: .. code-block:: python from fsh_lib.comms import UserCommTemplateMixin class UserCommTemplate(Base, UserCommTemplateMixin): __tablename__ = "user_comm_templates" The schema mirrors the editing UI: an optional target-resource slug (``None`` = free-form, no variables and no per-object recipient resolvers), a delivery method, tokenized subject / body / recipient lists, attachments, and bookkeeping columns. Tokens are persisted as JSON so the FE can round-trip its Slate document without a separate serialization layer. Token formats (all stored as JSON arrays): * Subject / body -- ``[{"kind": "text", "value": str}, ...]`` with ``var`` tokens of shape ``{"kind": "var", "path": str}`` mixed in. ``path`` is a dot-walk into the variable representation dump (e.g. ``"owner.email"``). * Recipients / cc / bcc -- one of ``{"kind": "literal", "method": str, "address": str, "subject_key": str | null}`` (concrete address hard-coded into the template) or ``{"kind": "resolver", "resolver": str}`` (a named slot resolved at send time against the loaded target object). * Attachments -- consumer-defined dict shape, passed through to the rendered output unchanged so transports can decide what to do with it. The mixin owns no primary key -- declare ``id`` on the consumer class (typically via a pgcraft PK plugin or an explicit ``Mapped[uuid.UUID]`` column) so the consumer's PK convention wins. Same idiom as :class:`~fsh_lib.saved_views.SavedViewMixin`. """ if TYPE_CHECKING: # Type-only -- concrete column lives on the consumer class. # Kept here so generated code can rely on ``template.id`` # being typed without committing to a column shape the # consumer may name differently. id: Mapped[Any] name: Mapped[str] = mapped_column( String(255), nullable=False, ) """Human-friendly title -- shown in the picker UI.""" description: Mapped[str | None] = mapped_column( String(1024), nullable=True, ) """Optional author note. Shown in template-management screens.""" target_resource: Mapped[str | None] = mapped_column( String(128), nullable=True, index=True, ) """Resource slug this template is scoped to, or ``None`` for a free-form template (no target object, no variables). Indexed so "templates for asset" lists are cheap.""" method: Mapped[str] = mapped_column( String(64), nullable=False, ) """Delivery method (matches a transport key). Stored once on the row -- the FE chooses the method at template-creation time and every recipient inherits it (literal recipient tokens may override per-row).""" subject: Mapped[list[dict[str, Any]]] = mapped_column( JSON, nullable=False, default=list, ) """Tokenized subject. Empty list for methods without subjects (SMS, push).""" body: Mapped[list[dict[str, Any]]] = mapped_column( JSON, nullable=False, default=list, ) """Tokenized body.""" recipients: Mapped[list[dict[str, Any]]] = mapped_column( JSON, nullable=False, default=list, ) """Tokenized To list.""" cc: Mapped[list[dict[str, Any]]] = mapped_column( JSON, nullable=False, default=list, ) """Tokenized Cc list.""" bcc: Mapped[list[dict[str, Any]]] = mapped_column( JSON, nullable=False, default=list, ) """Tokenized Bcc list.""" attachments: Mapped[list[dict[str, Any]]] = mapped_column( JSON, nullable=False, default=list, ) """Consumer-defined attachment refs. Passed through to the rendered output unchanged; rendering them is the transport's job.""" # ``created_at`` / ``updated_at`` are pgcraft-managed -- the # consumer's factory injects them via :class:`TimestampPlugin`. created_by: Mapped[str | None] = mapped_column( String(255), nullable=True, index=True, ) """Identifier of the author -- typically the session subject id. Nullable so seed templates can be inserted out-of-band."""
[docs] class RecipientResolver(Protocol): """Hook: resolve a named recipient slot against the target object. One implementation per slot a resource declares (``"assignee"``, ``"watchers"``, ...). The signature mirrors a route handler: the resolver gets the same async session the template render is running on, plus the loaded model instance and the template's method, and returns zero or more concrete :class:`RecipientSpec` rows. ``kind`` on the returned rows is preserved so a resolver can opt to bcc itself if it needs to. """ async def __call__( self, *, session: AsyncSession, instance: Any, method: str, ) -> Sequence[RecipientSpec]: """Return concrete recipients for this slot.""" ...
[docs] @dataclasses.dataclass(frozen=True) class RecipientSuggestionResolver: """Late-bound recipient group surfaced in the editor combobox. Maps to a registered :class:`RecipientResolver`; the FE author drops one of these in to fan out at send time. Attributes: slot: Resolver key registered on the target (e.g. ``"watchers"``). Stored on the resulting resolver-style recipient token in the editor's saved JSON. label: Optional display string; the FE defaults to ``"@<slot>"`` when omitted. description: Optional secondary line shown beneath *label* in the combobox row (e.g. ``"everyone watching the task"``). """ slot: str label: str | None = None description: str | None = None
[docs] @dataclasses.dataclass(frozen=True) class RecipientSuggestionLiteral: """Concrete recipient the BE has handy at describe-options time. Lets a resource expose directory entries (account-system users, a curated mailing list, etc.) right next to the resolver groups so the author picks a specific person without retyping the address. Attributes: address: Method-specific destination (email, phone, ...). Stored on the resulting literal recipient token in the editor's saved JSON. label: Optional display string (typically the person's name); the FE defaults to *address* when omitted. description: Optional secondary line (e.g. the address itself when *label* is the name). """ address: str label: str | None = None description: str | None = None
RecipientSuggestion = RecipientSuggestionResolver | RecipientSuggestionLiteral
[docs] @dataclasses.dataclass(frozen=True) class AttachmentSuggestionResolver: """Late-bound attachment group (resolver-style).""" slot: str label: str | None = None description: str | None = None
[docs] @dataclasses.dataclass(frozen=True) class AttachmentSuggestionFile: """Concrete file the BE has handy at describe-options time.""" name: str ref: str | None = None label: str | None = None description: str | None = None
AttachmentSuggestion = AttachmentSuggestionResolver | AttachmentSuggestionFile
[docs] @dataclasses.dataclass(frozen=True) class UserTemplateTarget: """Per-resource wiring for user-template renders. Registered once per resource that opts into being a template target. ``representation_serializer`` returns the variable dump -- a Pydantic model whose fields the author can pill into the subject / body. ``recipient_resolvers`` maps the named slots the author can drop into the to / cc / bcc lists. Attributes: resource_slug: User-facing slug the FE uses to scope a template (e.g. ``"asset"``). Stored on :attr:`UserCommTemplateMixin.target_resource`. load: Async ``(session, target_id) -> instance | None`` loader. Returning ``None`` from a send call surfaces as a :class:`LookupError`. representation_serializer: ``(instance, session) -> BaseModel`` (sync) or the async equivalent that produces the variable dump for *instance*. Both shapes are supported so kiln's fields-driven serializers (sync, no IO) and user-supplied async builders (may load related rows) drop in without adapter glue. representation_class: The Pydantic class returned by :attr:`representation_serializer`. Exposed so the FE options endpoint can surface the field schema without a second round-trip. recipient_resolvers: Slot name -> :class:`RecipientResolver` implementation. recipient_suggestions: Optional async builder that returns the list of :data:`RecipientSuggestion` rows the editor's combobox surfaces. Empty / unset falls back to :func:`default_recipient_suggestions` -- one resolver row per :attr:`recipient_resolvers` entry. Set this to mix in directory entries (concrete addresses pulled from the BE's user table, etc.) alongside the resolvers. attachment_suggestions: Optional async builder that returns the list of :data:`AttachmentSuggestion` rows for the attachment lane. Unset means the lane only shows the consumer-supplied upload affordance (or nothing). """ resource_slug: str load: Callable[[AsyncSession, Any], Awaitable[Any | None]] representation_serializer: Callable[ [Any, AsyncSession], BaseModel | Awaitable[BaseModel], ] representation_class: type[BaseModel] recipient_resolvers: dict[str, RecipientResolver] recipient_suggestions: ( Callable[[AsyncSession], Awaitable[Sequence[RecipientSuggestion]]] | None ) = None attachment_suggestions: ( Callable[[AsyncSession], Awaitable[Sequence[AttachmentSuggestion]]] | None ) = None
[docs] async def default_recipient_suggestions( target: UserTemplateTarget, _session: AsyncSession, *, groups: GroupRegistry | None = None, ) -> list[RecipientSuggestion]: """Build a suggestions list from *target*'s resolvers + project groups. Used by route handlers when the consumer didn't supply a custom :attr:`UserTemplateTarget.recipient_suggestions` builder. Emits one resolver row per per-target slot first, then one per project-wide group from *groups* so the editor's picker surfaces both flavours in a single combobox. """ rows: list[RecipientSuggestion] = [ RecipientSuggestionResolver(slot=name) for name in target.recipient_resolvers ] if groups is not None: rows.extend( RecipientSuggestionResolver( slot=group.slot, label=group.label, description=group.description, ) for group in groups.recipient_groups() ) return rows
[docs] async def default_attachment_suggestions( _target: UserTemplateTarget | None, _session: AsyncSession, *, groups: GroupRegistry | None = None, ) -> list[AttachmentSuggestion]: """Build the attachment suggestions list from project groups. Per-resource attachment groups aren't a concept on :class:`UserTemplateTarget` -- the platform only supports project-wide attachment groups via *groups*. Returns one resolver-style suggestion per registered attachment group. """ if groups is None: return [] return [ AttachmentSuggestionResolver( slot=group.slot, label=group.label, description=group.description, ) for group in groups.attachment_groups() ]
[docs] class UserTemplateTargetRegistry: """Registry of :class:`UserTemplateTarget` entries by resource slug. Built once at app startup -- typically populated from generated code that mirrors the project's :attr:`~be.config.schema.ResourceConfig.comms_target` declarations. Looked up by user-template route handlers and by :func:`send_user_template` to resolve variables and recipient slots. Not thread-safe; mutate only during startup. """ def __init__(self) -> None: """Build an empty registry.""" self._targets: dict[str, UserTemplateTarget] = {}
[docs] def register(self, target: UserTemplateTarget) -> None: """Add *target* to the registry. Raises: ValueError: When the slug is already registered. """ if target.resource_slug in self._targets: msg = ( f"user-template target {target.resource_slug!r} " f"already registered" ) raise ValueError(msg) self._targets[target.resource_slug] = target
[docs] def get(self, slug: str) -> UserTemplateTarget: """Return the target registered under *slug*. Raises: KeyError: If *slug* isn't registered. """ try: return self._targets[slug] except KeyError: msg = f"unknown user-template target: {slug!r}" raise KeyError(msg) from None
[docs] def slugs(self) -> tuple[str, ...]: """Return the registered slugs in insertion order.""" return tuple(self._targets)
# ------------------------------------------------------------------- # Project-wide group registry. # # Resolver / attachment slots that aren't tied to a single resource. # An author scoping a template against any target -- or no target at # all -- can drop these in: "@all-staff" works on a task email, a # project email, and an unscoped broadcast alike. The send path # falls through to this registry whenever a resolver token's slot # isn't owned by the per-target registry. # -------------------------------------------------------------------
[docs] class GroupRecipientResolver(Protocol): """Resolve a project-wide recipient group. Signature differs from :class:`RecipientResolver` -- no *instance*, since the slot isn't anchored to a specific target object. The session is still threaded through so a resolver can query a directory / membership table. """ async def __call__( self, *, session: AsyncSession, method: str, ) -> Sequence[RecipientSpec]: """Return concrete recipients for this group.""" ...
[docs] class GroupAttachmentResolver(Protocol): """Resolve a project-wide attachment group. Returns a single concrete file's metadata. Like :class:`GroupRecipientResolver`, no *instance* -- the file doesn't depend on a specific target object. """ async def __call__( self, *, session: AsyncSession, ) -> AttachmentSuggestionFile: """Return the resolved file metadata for this group.""" ...
[docs] @dataclasses.dataclass(frozen=True) class RecipientGroup: """Project-wide recipient slot definition. Registered against a :class:`GroupRegistry` so any template (regardless of scope) can drop ``@<slot>`` into to / cc / bcc and the send path resolves it via *resolver* at send time. Attributes: slot: Stable identifier stored on the recipient token's ``resolver`` field; same shape as a per-target slot. resolver: Async callable returning concrete :class:`RecipientSpec` rows. label: Optional display string surfaced in the editor's recipient picker. Defaults to ``"@<slot>"``. description: Optional secondary line below *label*. """ slot: str resolver: GroupRecipientResolver label: str | None = None description: str | None = None
[docs] @dataclasses.dataclass(frozen=True) class AttachmentGroup: """Project-wide attachment slot definition. Mirrors :class:`RecipientGroup` for the attachment lane. The slot resolves to a single :class:`AttachmentSuggestionFile` at send time. """ slot: str resolver: GroupAttachmentResolver label: str | None = None description: str | None = None
[docs] class GroupRegistry: """Project-wide recipient / attachment group registry. One instance per app, populated at startup -- typically from generated code that mirrors the project's ``recipient_groups`` / ``attachment_groups`` jsonnet declarations. Looked up by the user-template route handlers (to surface groups in the editor's suggestion popover) and by :func:`render_user_template` (to expand resolver tokens whose slot isn't owned by any per-target target). Not thread-safe -- mutate only during startup. """ def __init__(self) -> None: """Build an empty registry.""" self._recipients: dict[str, RecipientGroup] = {} self._attachments: dict[str, AttachmentGroup] = {}
[docs] def register_recipient(self, group: RecipientGroup) -> None: """Add a recipient group. Raises: ValueError: When *slot* is already registered. """ if group.slot in self._recipients: msg = f"recipient group {group.slot!r} already registered" raise ValueError(msg) self._recipients[group.slot] = group
[docs] def register_attachment(self, group: AttachmentGroup) -> None: """Add an attachment group. Raises: ValueError: When *slot* is already registered. """ if group.slot in self._attachments: msg = f"attachment group {group.slot!r} already registered" raise ValueError(msg) self._attachments[group.slot] = group
[docs] def recipient(self, slot: str) -> RecipientGroup | None: """Return the recipient group for *slot*, or ``None``.""" return self._recipients.get(slot)
[docs] def attachment(self, slot: str) -> AttachmentGroup | None: """Return the attachment group for *slot*, or ``None``.""" return self._attachments.get(slot)
[docs] def recipient_groups(self) -> tuple[RecipientGroup, ...]: """Return registered recipient groups in insertion order.""" return tuple(self._recipients.values())
[docs] def attachment_groups(self) -> tuple[AttachmentGroup, ...]: """Return registered attachment groups in insertion order.""" return tuple(self._attachments.values())
def _walk_path(data: Any, path: str) -> Any: """Walk a dotted *path* into the variable dump. Returns ``None`` for a missing segment rather than raising so a partially-populated representation still renders. Callers stringify the result themselves. """ cursor: Any = data for segment in path.split("."): if cursor is None: return None if isinstance(cursor, dict): cursor = cursor.get(segment) continue cursor = getattr(cursor, segment, None) return cursor
[docs] def render_text_field( tokens: Sequence[dict[str, Any]], variables: dict[str, Any], ) -> str: """Concatenate tokenized *tokens* into a plain string. ``text`` tokens emit their ``value`` verbatim; ``var`` tokens resolve through dotted-path lookup against *variables* and stringify. Missing or ``None`` values render as the empty string -- an unresolved variable at send time should produce a sendable message and let the FE surface a "this pill won't fill in" warning rather than blow up the dispatch path. Args: tokens: Slate-encoded token list. variables: Variable dump -- the dict form of the resource's representation Pydantic model. Returns: Rendered field as a single string. Raises: ValueError: When a token's ``kind`` is unrecognised -- a sign of an old client encoding tokens the platform doesn't know about, which is a real bug rather than something to silently swallow. """ pieces: list[str] = [] for token in tokens: kind = token.get("kind") if kind == "text": pieces.append(str(token.get("value", ""))) continue if kind == "var": path = token.get("path", "") value = _walk_path(variables, path) if path else None pieces.append("" if value is None else str(value)) continue msg = f"unknown subject/body token kind: {kind!r}" raise ValueError(msg) return "".join(pieces)
async def _resolve_recipient_tokens( tokens: Sequence[dict[str, Any]], *, kind: RecipientKind, target: UserTemplateTarget | None, instance: Any | None, session: AsyncSession, default_method: str, groups: GroupRegistry | None, ) -> list[RecipientSpec]: """Expand a templated recipient list into concrete :class:`RecipientSpec` s. Each returned spec inherits *kind* (``"to"`` / ``"cc"`` / ``"bcc"``) so all three lists from a template flatten into one persist-time recipient sequence with the correct addressing semantics preserved. ``literal`` tokens pass through with their declared method / address; ``resolver`` tokens look up the slot on *target* first, then fall through to the project-wide *groups* registry so a template scoped to any (or no) target can drop a project-wide group like ``@all-staff``. A resolver that returns multiple rows fans out into multiple specs -- the watcher case. """ out: list[RecipientSpec] = [] for token in tokens: token_kind = token.get("kind") if token_kind == _LITERAL_TOKEN_KIND: out.append( RecipientSpec( method=str(token.get("method", default_method)), address=str(token["address"]), kind=kind, subject_key=token.get("subject_key"), ), ) continue if token_kind == _RESOLVER_TOKEN_KIND: slot = str(token["resolver"]) resolved = await _invoke_recipient_resolver( slot=slot, target=target, instance=instance, session=session, default_method=default_method, groups=groups, ) # Override the kind to match the slot the author dropped # the resolver into -- a resolver doesn't get to decide # whether it's the To list or the Bcc list. Method is # left to the resolver in case a slot picks a method- # specific address (the assignee's SMS number rather # than their email). out.extend( dataclasses.replace(spec, kind=kind) for spec in resolved ) continue msg = f"unknown recipient token kind: {token_kind!r}" raise ValueError(msg) return out async def _invoke_recipient_resolver( *, slot: str, target: UserTemplateTarget | None, instance: Any | None, session: AsyncSession, default_method: str, groups: GroupRegistry | None, ) -> Sequence[RecipientSpec]: """Look up and invoke the resolver for *slot*. Per-target resolvers win when the template is scoped and the slot is registered against the target; otherwise the project- wide :class:`GroupRegistry` is consulted. Raises a :class:`ValueError` when neither owns the slot. """ if target is not None and instance is not None: per_target = target.recipient_resolvers.get(slot) if per_target is not None: return await per_target( session=session, instance=instance, method=default_method, ) if groups is not None: group = groups.recipient(slot) if group is not None: return await group.resolver( session=session, method=default_method, ) if target is None: msg = ( f"unknown recipient resolver {slot!r}: " f"no per-target target and no project group registered" ) else: msg = ( f"unknown recipient resolver {slot!r}: " f"not on {target.resource_slug!r} and no project group " f"registered" ) raise ValueError(msg)
[docs] @dataclasses.dataclass(frozen=True) class RenderedUserTemplate: """Output of :func:`render_user_template` -- pre-send view. Returned to the FE preview endpoint directly; the send path persists it as a :class:`MessageMixin` row + a fan-out of :class:`RecipientMixin` rows. """ subject: str body: str recipients: list[RecipientSpec] cc: list[RecipientSpec] bcc: list[RecipientSpec] attachments: list[dict[str, Any]]
[docs] async def render_user_template( template: UserCommTemplateMixin, *, session: AsyncSession, targets: UserTemplateTargetRegistry, target_id: Any | None = None, groups: GroupRegistry | None = None, ) -> RenderedUserTemplate: """Render *template* against the target object identified by *target_id*. For an unscoped template (``target_resource is None``) the variable dump is empty and ``target_id`` must also be ``None``; recipient tokens may still be ``literal`` or reference a project-wide *groups* slot. For a scoped template, the target is loaded, the representation serializer runs to produce the variable dump, and resolver recipient tokens fire against the loaded instance (per-target resolvers win) or fall through to the project-wide group registry. Attachment tokens with ``kind: "resolver"`` resolve via the same group registry. Raises: ValueError: When the scope / target_id combination is inconsistent or a resolver token references a slot that isn't registered on the target or in the project group registry. LookupError: When the target object can't be loaded. """ if template.target_resource is None: if target_id is not None: msg = "template is unscoped but a target_id was supplied" raise ValueError(msg) target: UserTemplateTarget | None = None instance: Any | None = None variables: dict[str, Any] = {} else: if target_id is None: msg = ( f"template is scoped to {template.target_resource!r} " f"but no target_id was supplied" ) raise ValueError(msg) target = targets.get(template.target_resource) instance = await target.load(session, target_id) if instance is None: msg = ( f"{template.target_resource!r} target_id " f"{target_id!r} not found" ) raise LookupError(msg) rep = await _maybe_await( target.representation_serializer(instance, session), ) variables = rep.model_dump(mode="json") subject = render_text_field(template.subject, variables) body = render_text_field(template.body, variables) recipients = await _resolve_recipient_tokens( template.recipients, kind="to", target=target, instance=instance, session=session, default_method=template.method, groups=groups, ) cc = await _resolve_recipient_tokens( template.cc, kind="cc", target=target, instance=instance, session=session, default_method=template.method, groups=groups, ) bcc = await _resolve_recipient_tokens( template.bcc, kind="bcc", target=target, instance=instance, session=session, default_method=template.method, groups=groups, ) attachments = await _resolve_attachment_tokens( template.attachments, session=session, groups=groups, ) return RenderedUserTemplate( subject=subject, body=body, recipients=recipients, cc=cc, bcc=bcc, attachments=attachments, )
async def _resolve_attachment_tokens( tokens: Sequence[dict[str, Any]], *, session: AsyncSession, groups: GroupRegistry | None, ) -> list[dict[str, Any]]: """Expand attachment tokens into concrete file metadata. ``file`` tokens pass through verbatim (they already carry name/ref). ``resolver`` tokens look up the slot on the project *groups* registry and substitute the resolved file's metadata. Without a registered group, raises -- callers catch and surface as 422 at the route boundary. """ out: list[dict[str, Any]] = [] for token in tokens: token_kind = token.get("kind") if token_kind == _FILE_TOKEN_KIND: out.append(dict(token)) continue if token_kind == _RESOLVER_TOKEN_KIND: slot = str(token["resolver"]) if groups is None: msg = ( f"resolver attachment token {slot!r} " f"requires a project group registry" ) raise ValueError(msg) group = groups.attachment(slot) if group is None: msg = f"unknown attachment resolver {slot!r}" raise ValueError(msg) resolved = await group.resolver(session=session) entry: dict[str, Any] = {"kind": "file", "name": resolved.name} if resolved.ref is not None: entry["ref"] = resolved.ref out.append(entry) continue msg = f"unknown attachment token kind: {token_kind!r}" raise ValueError(msg) return out _LITERAL_TOKEN_KIND = "literal" # noqa: S105 -- token discriminator, not a secret _RESOLVER_TOKEN_KIND = "resolver" # noqa: S105 -- token discriminator, not a secret _FILE_TOKEN_KIND = "file" # noqa: S105 -- token discriminator, not a secret async def _maybe_await[T](result: T | Awaitable[T]) -> T: """Await *result* if it's an awaitable; otherwise pass through. Lets :func:`render_user_template` accept both sync representation serializers (kiln's fields-driven default — no IO, no need for async) and async ones (user-supplied builders that may query related rows) without forcing the call site to fork. """ if isinstance(result, Awaitable): return await result return result
[docs] @dataclasses.dataclass(frozen=True) class ResolvedRecipientAddress: """Concrete recipient row produced by a resolver, with display label.""" address: str label: str | None = None
[docs] @dataclasses.dataclass(frozen=True) class TargetResolutions: """Concrete values for a target object, ready for FE display. Returned by :func:`resolve_target` when the editor renders against a *known* target -- the FE drops these into pills and chips so the author sees the values that ``@watchers`` and ``{title}`` would actually expand to right now. Attributes: variables: Dump of the target's representation Pydantic model. Same dict :func:`render_user_template` walks for variable tokens; keys are top-level field names + nested objects. recipient_slots: Slot name → addresses each registered recipient resolver returns for *this* target. Empty list for slots that resolved to nothing. """ variables: dict[str, Any] recipient_slots: dict[str, list[ResolvedRecipientAddress]]
[docs] async def resolve_target( *, session: AsyncSession, target: UserTemplateTarget, instance: Any, method: str, ) -> TargetResolutions: """Compute the variable dump + per-slot resolved recipients. *instance* is the already-loaded target object -- callers should use :attr:`UserTemplateTarget.load` upstream to fetch it (so a missing target maps to ``LookupError`` at the route boundary, not here). *method* drives any resolver that picks a method-specific address; templates carry ``method`` on the row itself. The resolution maps mirror what the FE editor's ``resolutions`` prop expects -- one variables dict, one recipient-slot map. Resolvers that raise propagate; the route boundary turns them into a 422. """ rep = await _maybe_await( target.representation_serializer(instance, session), ) variables = rep.model_dump(mode="json") recipient_slots: dict[str, list[ResolvedRecipientAddress]] = {} for slot, resolver in target.recipient_resolvers.items(): specs = await resolver( session=session, instance=instance, method=method, ) recipient_slots[slot] = [ ResolvedRecipientAddress(address=spec.address) for spec in specs ] return TargetResolutions( variables=variables, recipient_slots=recipient_slots, )
USER_TEMPLATE_COMM_TYPE = "_user_template" """Sentinel comm-type stamped on :attr:`MessageMixin.comm_type` for sends originating from a user-defined template. The audit row carries the template id + target id in :attr:`MessageMixin.context` so the audit log can distinguish developer-defined sends from user-template sends without a separate column."""
[docs] async def send_user_template( *, session: AsyncSession, queue: Queries, template: UserCommTemplateMixin, targets: UserTemplateTargetRegistry, message_cls: type[MessageMixin], recipient_cls: type[RecipientMixin], target_id: Any | None = None, preferences: PreferenceResolver | None = None, groups: GroupRegistry | None = None, ) -> uuid.UUID: """Render *template* against *target_id* and dispatch via the outbox. Mirrors :func:`send_communication` but skips the registry + Jinja steps -- the template's tokenized fields are rendered in-process by :func:`render_user_template` and the result is persisted into the same :class:`MessageMixin` / :class:`RecipientMixin` tables. All three of the template's recipient lists (to / cc / bcc) flow through one persist call so the worker delivers cc and bcc rows like any other recipient -- with their :attr:`RecipientMixin.kind` set so an email transport can lay out the headers correctly. The audit row's :attr:`MessageMixin.context` carries ``template_id``, ``template_name``, ``target_resource``, and ``target_id`` so a re-render or replay can re-fetch the same inputs. Attachment refs come along on ``context.attachments`` for transports that need them. Args: session: Async SQLAlchemy session. queue: pgqueuer ``Queries`` bound to *session*'s connection. template: The :class:`UserCommTemplateMixin` row to send. targets: Registry binding resource slugs to target wiring. message_cls: Consumer's concrete :class:`MessageMixin`. recipient_cls: Consumer's concrete :class:`RecipientMixin`. target_id: Target-object PK. ``None`` for unscoped templates. preferences: Optional preference resolver -- gates per-method opt-in just like :func:`send_communication`. groups: Optional project-wide :class:`GroupRegistry` for recipient + attachment slots that aren't per-target. Forwarded to :func:`render_user_template`. Returns: Id of the inserted message row. """ rendered = await render_user_template( template, session=session, targets=targets, target_id=target_id, groups=groups, ) return await _persist_and_enqueue( session=session, queue=queue, comm_type=USER_TEMPLATE_COMM_TYPE, context={ "template_id": str(template.id), "template_name": template.name, "target_resource": template.target_resource, "target_id": None if target_id is None else str(target_id), "attachments": rendered.attachments, }, subject=rendered.subject, body=rendered.body, recipients=[ *rendered.recipients, *rendered.cc, *rendered.bcc, ], message_cls=message_cls, recipient_cls=recipient_cls, preferences=preferences, )