"""Communication-platform primitives for kiln-generated apps.
Sends typed messages over pluggable delivery methods (email, SMS,
push, ...) using pgqueuer as the transactional outbox. The pieces:
* :class:`CommType` -- a named, schema-validated communication.
Carries a Pydantic ``context_schema`` and a pair of template
strings (subject + body); :class:`CommRegistry` holds the set the
consumer's app supports.
* :class:`Renderer` -- a Protocol that turns a :class:`CommType`
plus a validated context into a rendered subject + body pair.
The default :class:`JinjaRenderer` is in-process; the same
surface fits an HTTP-call renderer that defers to a separate
template service (a node renderer, MJML compiler, anything)
without changing the platform's call site.
* :class:`MessageMixin` / :class:`RecipientMixin` /
:class:`NotificationPreferenceMixin` -- SQLAlchemy mixins
supplying the storage columns for the three tables every
comm-platform install needs. Same pgcraft-friendly idiom as
the file / rate-limit mixins: consumer owns the table, we own
the columns.
* :class:`Transport` -- a Protocol implemented per delivery
method. One adapter per supported method, e.g. ``email``,
``sms``. Implementations live in consumer code or third-party
adapters; this module ships :class:`LoggingTransport` for tests
and local development.
* :class:`PreferenceResolver` -- a Protocol that answers
"should this recipient receive this comm-type via this method?"
Looked up once per recipient inside :func:`send_communication`;
an opted-out recipient yields no row and no job (the message
row still records the attempt for audit).
* :func:`send_communication` -- the producer entry point. Validates
context against the :class:`CommType` schema, renders the
templates, inserts one message row and one recipient row per
delivery, then enqueues one pgqueuer job per recipient under
:data:`DISPATCH_ENTRYPOINT`. All writes ride the caller's
SQLAlchemy session and the pgqueuer ``Queries`` is bound to the
same connection (see :func:`fsh_lib.queue.get_queue`), so a single
``await session.commit()`` makes the message + recipients +
jobs durable atomically.
* :func:`make_dispatch_entrypoint` -- the worker-side counterpart.
Returns an async ``(job) -> None`` callable wired to the consumer's
session factory, transports, and mixin classes -- register it
against :data:`DISPATCH_ENTRYPOINT` on a pgqueuer ``PgQueuer``
instance.
The module's only optional dependency is ``jinja2`` (already pulled
in by ``kiln-generator`` for codegen); no extras gate is needed.
"""
from __future__ import annotations
import dataclasses
import datetime as _dt
import enum
import uuid
from collections.abc import Awaitable
from typing import TYPE_CHECKING, Any, Literal, Protocol
from jinja2 import Template
from pydantic import BaseModel
from sqlalchemy import (
JSON,
Boolean,
DateTime,
String,
Text,
Uuid,
insert,
select,
update,
)
from sqlalchemy.orm import Mapped, mapped_column
from fsh_lib.queue import instrument_entrypoint
if TYPE_CHECKING:
from collections.abc import Callable, Sequence
from pgqueuer import Queries
from pgqueuer.models import Job
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
DISPATCH_ENTRYPOINT = "fsh_lib_comms_dispatch"
"""pgqueuer entrypoint name jobs are enqueued under.
The producer side (:func:`send_communication`) enqueues under this
name; the worker side must register its handler under the same name
(see :func:`make_dispatch_entrypoint`). Exposed so consumers don't
hard-code the literal in two places.
"""
[docs]
class DeliveryStatus(enum.StrEnum):
"""Lifecycle states of a single recipient's delivery attempt."""
PENDING = "pending"
"""Row inserted, job enqueued, transport not yet called."""
SENT = "sent"
"""Transport returned without raising."""
FAILED = "failed"
"""Transport raised; ``error`` carries the message."""
# -------------------------------------------------------------------
# CommType + registry
# -------------------------------------------------------------------
[docs]
@dataclasses.dataclass(frozen=True)
class CommType[ContextT: BaseModel]:
"""A named communication: schema + templates + default methods.
A ``CommType`` is the unit of design for the comm platform: it
binds a Pydantic context schema to a pair of template strings
plus the set of methods the consumer wants delivery on by
default. The same instance is shared by the producer (which
validates and renders) and the worker (which renders too if the
rendered body wasn't persisted).
Attributes:
name: Stable identifier (e.g. ``"order_shipped"``). Used as
the registry key and stored on the message row so the
audit log survives template churn.
context_schema: Pydantic model describing the fields the
templates reference. ``send_communication`` validates
the caller-supplied context against this before
rendering, so missing or mistyped fields fail fast at
the call site instead of half-way through a template.
subject_template: Source string for the subject line.
Interpreted by the configured :class:`Renderer`; the
default :class:`JinjaRenderer` treats it as Jinja2.
body_template: Source string for the body. Same renderer
treatment as :attr:`subject_template`.
default_methods: Methods to deliver on when the caller
doesn't pass an explicit recipient list. Empty tuple
means "no default; caller must specify recipients".
"""
name: str
context_schema: type[ContextT]
subject_template: str
body_template: str
default_methods: tuple[str, ...] = ()
[docs]
class CommRegistry:
"""Mutable registry of :class:`CommType` entries by name.
Built once at app startup (or as a module-level global if the
consumer prefers) and passed into :func:`send_communication` and
:func:`make_dispatch_entrypoint`. Not thread-safe -- mutate it
only during startup.
"""
def __init__(self) -> None:
"""Build an empty registry."""
self._types: dict[str, CommType[Any]] = {}
[docs]
def register(self, comm_type: CommType[Any]) -> None:
"""Add *comm_type* to the registry.
Raises:
ValueError: If a type with the same name is already
registered. Re-registration is almost always a bug
(two modules thought they owned the same name);
force callers to deregister first if they really
want it.
"""
if comm_type.name in self._types:
msg = f"comm_type {comm_type.name!r} already registered"
raise ValueError(msg)
self._types[comm_type.name] = comm_type
[docs]
def get(self, name: str) -> CommType[Any]:
"""Return the :class:`CommType` registered under *name*.
Raises:
KeyError: If *name* is not registered.
"""
try:
return self._types[name]
except KeyError:
msg = f"unknown comm_type: {name!r}"
raise KeyError(msg) from None
[docs]
def names(self) -> tuple[str, ...]:
"""Return registered comm-type names in registration order."""
return tuple(self._types)
# -------------------------------------------------------------------
# Renderer
# -------------------------------------------------------------------
[docs]
@dataclasses.dataclass(frozen=True)
class RenderedMessage:
"""Output of a :class:`Renderer` -- the strings we persist.
Attributes:
subject: Subject line. Populated by every renderer.
body: Plain-text body. The legacy field
:class:`JinjaRenderer` produces and that transports
already consume; :class:`HttpRenderer` populates it
from the render service's ``text`` alternative for
email output, or leaves it ``""`` for PDF output.
html: HTML alternative body for email outputs, or
``None``. ``JinjaRenderer`` never sets it;
``HttpRenderer`` sets it when the render service
returns ``output: "email"``. Email transports should
send HTML + plain-text as ``multipart/alternative``.
pdf: Raw PDF bytes for PDF outputs, or ``None``.
``HttpRenderer`` sets it when the render service
returns ``output: "pdf"``; transports treat it as a
single attachment.
"""
subject: str
body: str
html: str | None = None
pdf: bytes | None = None
[docs]
class Renderer(Protocol):
"""Hook: turn a :class:`CommType` + context into rendered strings.
The default :class:`JinjaRenderer` evaluates the template strings
in-process. A microservice-based renderer (e.g. a node service
that compiles MJML or runs a richer template language) implements
the same single method and gets dropped in via the ``renderer``
argument to :func:`send_communication` -- no other code changes.
*context* is the validated Pydantic model; implementations call
:meth:`~pydantic.BaseModel.model_dump` themselves so they can
pick the dump mode (``json`` vs Python) that fits their wire
format.
"""
[docs]
async def render(
self,
comm_type: CommType[Any],
context: BaseModel,
) -> RenderedMessage:
"""Return the rendered subject + body for this comm."""
...
[docs]
class JinjaRenderer:
"""In-process Jinja2 renderer. The default for :func:`send_communication`.
Treats :attr:`CommType.subject_template` and
:attr:`CommType.body_template` as Jinja2 source strings and
renders them against the context dump (``model_dump(mode="json")``
so dates/uuids stringify the same way they would over the wire).
Trivial enough to construct inline; instances hold no state
beyond the autoescape choice, which only matters for the body
of HTML emails (callers building HTML bodies should pass
``autoescape=True``).
"""
def __init__(self, *, autoescape: bool = False) -> None:
"""Build a renderer.
Args:
autoescape: When ``True``, Jinja autoescapes HTML in the
rendered output. Off by default because plain-text
bodies (SMS, push) are the more common case;
consumers building HTML email bodies should opt in.
"""
self._autoescape = autoescape
[docs]
async def render(
self,
comm_type: CommType[Any],
context: BaseModel,
) -> RenderedMessage:
"""Render the comm's templates against *context*."""
data = context.model_dump(mode="json")
subject = Template(
comm_type.subject_template,
autoescape=self._autoescape,
).render(**data)
body = Template(
comm_type.body_template,
autoescape=self._autoescape,
).render(**data)
return RenderedMessage(subject=subject, body=body)
[docs]
class HttpRenderer:
"""Renderer that defers to an external render microservice.
Drop-in replacement for :class:`JinjaRenderer` when the project
runs a separate render service (the kiln-render scaffold or any
HTTP equivalent). Posts ``{template, context, theme}`` to the
service's ``/v1/render`` endpoint and maps the response onto a
:class:`RenderedMessage`:
* ``output: "email"`` -> ``{subject, html, text}`` ->
``RenderedMessage(subject=subject, body=text, html=html)``.
* ``output: "pdf"`` -> ``{subject, pdf_base64}`` ->
``RenderedMessage(subject=subject, body="", pdf=bytes)``.
The service is the source of truth for ``output``; the renderer
inspects the response shape rather than carrying a copy of the
build-time contract at runtime. Discrepancies between the
project's declared :class:`~be.config.schema.CommTypeConfig`
output and the service's response are surfaced by the generated
``render-contract.json`` + the render service's ``tsc`` step,
not by this client.
Args:
base_url: Base URL of the render service, e.g.
``"http://render:8200"``. Typically pulled from a
``RENDER_URL`` env var by the consumer.
theme: Default theme payload merged into every request --
``{logoUrl, colors, typography, ...}``. Per-send
overrides are not supported on this surface yet; supply
the theme once at construction. ``None`` sends no
theme block.
client: An existing :class:`httpx.AsyncClient` to use.
``None`` (default) creates and owns one. The owned
client is closed by :meth:`aclose`.
timeout: Per-request timeout in seconds. Ignored when an
explicit *client* is supplied.
Raises:
ImportError: ``httpx`` is not installed. Install
``fsh-lib[comms-http]`` to use this renderer.
"""
_DEFAULT_TIMEOUT = 10.0
def __init__(
self,
base_url: str,
*,
theme: dict[str, Any] | None = None,
client: Any | None = None,
timeout: float = _DEFAULT_TIMEOUT,
) -> None:
"""Store the endpoint + theme; lazy-import httpx."""
try:
import httpx # noqa: PLC0415 -- optional dep, gate at construct
except ImportError as exc:
msg = (
"fsh_lib.comms.HttpRenderer requires httpx; install "
"the optional extra via ``pip install "
"'fsh-lib[comms-http]'``"
)
raise ImportError(msg) from exc
self._theme = dict(theme) if theme is not None else None
self._owns_client = client is None
self._client = client or httpx.AsyncClient(
base_url=base_url,
timeout=timeout,
)
[docs]
async def aclose(self) -> None:
"""Close the underlying client iff this instance owns it."""
if self._owns_client:
await self._client.aclose()
[docs]
async def render(
self,
comm_type: CommType[Any],
context: BaseModel,
) -> RenderedMessage:
"""POST ``/v1/render`` and convert the response.
The request envelope is::
{
"template": "<comm_type.name>",
"context": <context.model_dump(mode="json")>,
"theme": <self._theme or omitted>
}
The response is one of the two shapes documented on the
class. A non-2xx response or a response missing the
expected keys raises :class:`RuntimeError` -- the caller
sees the same surface as a Jinja syntax error would
produce.
"""
document: dict[str, Any] = {
"template": comm_type.name,
"context": context.model_dump(mode="json"),
}
if self._theme is not None:
document["theme"] = self._theme
response = await self._client.post("/v1/render", json=document)
response.raise_for_status()
payload = response.json()
subject = payload.get("subject", "")
if "pdf_base64" in payload:
import base64 # noqa: PLC0415 -- stdlib, branch-local
return RenderedMessage(
subject=subject,
body="",
pdf=base64.b64decode(payload["pdf_base64"]),
)
if "html" in payload:
return RenderedMessage(
subject=subject,
body=payload.get("text", ""),
html=payload["html"],
)
msg = (
f"render service response for template "
f"{comm_type.name!r} carries neither ``html`` nor "
f"``pdf_base64`` (got keys: {sorted(payload)})"
)
raise RuntimeError(msg)
# -------------------------------------------------------------------
# Mixins
# -------------------------------------------------------------------
[docs]
class MessageMixin:
"""SQLAlchemy mixin for the message table.
One row per ``send_communication`` call -- represents the
*intent* to communicate. Per-method delivery state lives on the
:class:`RecipientMixin` rows that point back here via
:attr:`RecipientMixin.message_id`.
Subclass on a regular ``Base`` to materialise the table:
.. code-block:: python
from fsh_lib.comms import MessageMixin
class CommMessage(Base, MessageMixin):
__tablename__ = "comm_messages"
id: Mapped[uuid.UUID] = mapped_column(
Uuid, primary_key=True, default=uuid.uuid4,
)
Storing the rendered ``subject``/``body`` (rather than re-rendering
from ``context`` at send time) means template churn doesn't
invalidate the audit log: the message row reflects what the
recipient actually saw.
The mixin owns no primary key -- declare ``id`` on the consumer
class (typically via a pgcraft PK plugin or an explicit UUID
column) so the consumer's PK convention wins. Same idiom as
:class:`~fsh_lib.saved_views.SavedViewMixin`.
"""
if TYPE_CHECKING:
# Type-only -- concrete column lives on the consumer class.
# ``send_communication`` and the dispatch entrypoint read
# ``message.id`` and need it typed.
id: Mapped[Any]
comm_type: Mapped[str] = mapped_column(
String(255),
nullable=False,
index=True,
)
"""Registry key (:attr:`CommType.name`). Indexed so the
audit log can group by type cheaply."""
context: Mapped[dict[str, Any]] = mapped_column(
JSON,
nullable=False,
default=dict,
)
"""JSON dump of the validated context, kept so the row can be
re-rendered or replayed if templates change."""
subject: Mapped[str | None] = mapped_column(
String(1024),
nullable=True,
)
"""Rendered subject. Nullable because some methods (SMS, push)
have only a body."""
body: Mapped[str] = mapped_column(Text, nullable=False, default="")
"""Rendered body. ``Text`` rather than ``String`` because
bodies (especially HTML email) routinely exceed the 64 KiB
Postgres ``varchar`` ceiling."""
# ``created_at`` / ``updated_at`` are pgcraft-managed -- the
# consumer's factory injects them via :class:`TimestampPlugin`.
# Declaring them here would collide at table-build time.
[docs]
class RecipientMixin:
"""SQLAlchemy mixin for the recipient table.
One row per ``(message, method, address)`` triple. The pgqueuer
job carries the recipient id, so the dispatch path is:
job -> recipient -> message -> transport lookup.
The mixin deliberately doesn't declare a foreign key to the
consumer's :class:`MessageMixin` table -- the consumer names
that table, so the FK has to come from their own subclass via
a ``sqlalchemy.ForeignKey`` on the ``message_id`` column.
Keeping the mixin FK-free means the same class works regardless
of where the consumer mounts the message table.
Like :class:`MessageMixin`, the PK is left to the consumer to
declare so pgcraft / consumer conventions own the column.
"""
if TYPE_CHECKING:
# Type-only -- concrete column lives on the consumer class.
# The dispatch entrypoint reads ``recipient.id`` and the
# pgqueuer payload encodes it.
id: Mapped[Any]
message_id: Mapped[uuid.UUID] = mapped_column(
Uuid,
nullable=False,
index=True,
)
"""Points back at the :class:`MessageMixin` row. Consumers
typically add an explicit ``ForeignKey`` in their subclass."""
method: Mapped[str] = mapped_column(
String(64),
nullable=False,
)
"""Delivery method (``"email"``, ``"sms"``, ...). Used to look
up the right transport at dispatch time."""
address: Mapped[str] = mapped_column(
String(512),
nullable=False,
)
"""Method-specific destination (email address, phone number,
push token, ...). Opaque to this module."""
kind: Mapped[str] = mapped_column(
String(8),
nullable=False,
default="to",
)
"""One of :data:`RecipientKind` (``"to"`` / ``"cc"`` / ``"bcc"``).
Stored as a string so adding a new addressing kind doesn't need a
migration. Email transports read this off the row to lay out the
RFC 5322 headers; non-email transports typically ignore it."""
status: Mapped[str] = mapped_column(
String(32),
nullable=False,
default=DeliveryStatus.PENDING.value,
)
"""One of :class:`DeliveryStatus`'s values. Stored as a string
(not a SQL enum) so adding a new state doesn't require a
migration."""
sent_at: Mapped[_dt.datetime | None] = mapped_column(
DateTime(timezone=True),
nullable=True,
)
"""When the transport returned successfully. ``None`` while
pending or failed."""
error: Mapped[str | None] = mapped_column(
String(1024),
nullable=True,
)
"""Truncated exception message from a failed delivery. ``None``
until the dispatch path catches an error."""
[docs]
class NotificationPreferenceMixin:
"""SQLAlchemy mixin for the per-recipient preference table.
One row per ``(subject_key, comm_type, method)`` triple records
whether that recipient wants that comm type via that method.
:func:`send_communication` looks the row up via the
:class:`PreferenceResolver` protocol -- this mixin just supplies
the columns; the resolver implementation lives in consumer code
(or in a generated helper).
``subject_key`` is intentionally a string rather than a typed
foreign key: a comm platform routinely targets users, accounts,
org-level addresses, or external identifiers, and a typed FK
would lock the table to one of those.
Like the rest of the comms mixins, the PK is left to the consumer
to declare so pgcraft / consumer conventions own the column.
"""
if TYPE_CHECKING:
id: Mapped[Any]
subject_key: Mapped[str] = mapped_column(
String(255),
nullable=False,
index=True,
)
"""Identifier of the recipient whose preferences this row
captures (typically a user id formatted as a string)."""
comm_type: Mapped[str] = mapped_column(
String(255),
nullable=False,
)
"""Registry key (:attr:`CommType.name`)."""
method: Mapped[str] = mapped_column(
String(64),
nullable=False,
)
"""Delivery method this preference scopes."""
enabled: Mapped[bool] = mapped_column(
Boolean,
nullable=False,
default=True,
)
"""``True`` when the recipient consents to this
``(comm_type, method)`` combination. Default ``True`` so an
absent row reads as opt-in by default; flip per consumer policy
if your default is opt-out."""
# -------------------------------------------------------------------
# Recipients + preferences + transports
# -------------------------------------------------------------------
RecipientKind = Literal["to", "cc", "bcc"]
"""How a recipient is addressed in the outbound message.
For email: maps directly to RFC 5322 ``To:`` / ``Cc:`` / ``Bcc:``
headers; transports lay the headers out themselves so cc recipients
see the to/cc lists and bcc recipients don't. For methods without
header semantics (SMS, push) this is informational -- the dispatch
path delivers each row regardless of kind.
"""
[docs]
@dataclasses.dataclass(frozen=True)
class RecipientSpec:
"""Single recipient handed to :func:`send_communication`.
Attributes:
method: Delivery method (must match a transport key).
address: Method-specific destination (email, phone, ...).
kind: To / Cc / Bcc -- gives transports the information they
need to lay the message out correctly. Defaults to
``"to"`` so call sites that don't care don't need to set
it.
subject_key: Identifier whose preferences gate delivery.
``None`` skips the preference check (e.g. transactional
sends to non-user addresses like a billing inbox).
"""
method: str
address: str
kind: RecipientKind = "to"
subject_key: str | None = None
[docs]
class PreferenceResolver(Protocol):
"""Hook: gate delivery on the recipient's per-method opt-in."""
[docs]
async def is_enabled(
self,
*,
subject_key: str,
comm_type: str,
method: str,
) -> bool:
"""Return ``True`` to deliver, ``False`` to suppress."""
...
[docs]
class Transport(Protocol):
"""Method-specific delivery adapter.
Implementations are free to do whatever the method requires
(SMTP send, Twilio API call, FCM push, ...). Raise to mark the
delivery failed; return normally to mark it sent. The dispatch
path stamps :attr:`RecipientMixin.status` and
:attr:`RecipientMixin.sent_at` based on which path you take.
"""
[docs]
async def send(
self,
*,
message: MessageMixin,
recipient: RecipientMixin,
) -> None:
"""Deliver *message* to *recipient* or raise."""
...
[docs]
class LoggingTransport:
"""Test/dev transport: records every send into an in-memory list.
Not async-safe across processes, obviously; the point is to give
unit tests something to assert against and to give local
development a no-credentials fallback. Production transports
live in consumer code or third-party packages.
"""
def __init__(self) -> None:
"""Build a transport with an empty ``self.sent`` log."""
self.sent: list[tuple[uuid.UUID, str, str]] = []
[docs]
async def send(
self,
*,
message: MessageMixin,
recipient: RecipientMixin,
) -> None:
"""Record ``(message_id, address, body)`` into ``self.sent``."""
self.sent.append((message.id, recipient.address, message.body))
# -------------------------------------------------------------------
# Producer side -- the entry point a request handler calls.
# -------------------------------------------------------------------
[docs]
async def send_communication(
*,
session: AsyncSession,
queue: Queries,
registry: CommRegistry,
comm_type: str,
context: BaseModel | dict[str, Any],
recipients: Sequence[RecipientSpec],
message_cls: type[MessageMixin],
recipient_cls: type[RecipientMixin],
renderer: Renderer | None = None,
preferences: PreferenceResolver | None = None,
) -> uuid.UUID:
"""Validate, render, persist, and enqueue a communication.
The transactional-outbox guarantee:
1. Validate *context* against the comm-type's schema.
2. Render the templates with *renderer* (defaults to
:class:`JinjaRenderer`).
3. Insert one :class:`MessageMixin` row.
4. For each recipient, consult *preferences* (if supplied);
insert a :class:`RecipientMixin` row for each one that
passes.
5. Enqueue one pgqueuer job per surviving recipient under
:data:`DISPATCH_ENTRYPOINT`, payload = recipient id (UTF-8).
Steps 3-5 all ride *session*'s transaction (see
:func:`fsh_lib.queue.get_queue` for how *queue* is bound to the
same connection). Commit the session and the message,
recipients, and jobs all become durable atomically; roll back
and the communication never happened.
Args:
session: The async SQLAlchemy session running the caller's
transaction. All inserts ride it.
queue: A pgqueuer ``Queries`` bound to *session*'s
connection. Build it with
:func:`fsh_lib.queue.get_queue` immediately before this
call.
registry: The :class:`CommRegistry` containing the type
named by *comm_type*.
comm_type: Registry key for the comm to send.
context: Either an instance of the type's
:attr:`CommType.context_schema` or a dict that will be
validated against it.
recipients: Per-delivery specs (method + address +
optional subject_key for preference lookup).
message_cls: Consumer's concrete :class:`MessageMixin`
subclass.
recipient_cls: Consumer's concrete :class:`RecipientMixin`
subclass.
renderer: Override for the template renderer. Defaults to
:class:`JinjaRenderer` -- swap in an HTTP-call renderer
to defer rendering to a separate service.
preferences: Optional preference resolver. When omitted,
every recipient is delivered to (subject to
``subject_key`` semantics on :class:`RecipientSpec`).
Returns:
The id of the inserted :class:`MessageMixin` row.
"""
spec = registry.get(comm_type)
if isinstance(context, BaseModel):
validated: BaseModel = context
else:
validated = spec.context_schema.model_validate(context)
used_renderer = renderer or JinjaRenderer()
rendered = await used_renderer.render(spec, validated)
return await _persist_and_enqueue(
session=session,
queue=queue,
comm_type=comm_type,
context=validated.model_dump(mode="json"),
subject=rendered.subject,
body=rendered.body,
recipients=recipients,
message_cls=message_cls,
recipient_cls=recipient_cls,
preferences=preferences,
)
async def _persist_and_enqueue(
*,
session: AsyncSession,
queue: Queries,
comm_type: str,
context: dict[str, Any],
subject: str,
body: str,
recipients: Sequence[RecipientSpec],
message_cls: type[MessageMixin],
recipient_cls: type[RecipientMixin],
preferences: PreferenceResolver | None,
) -> uuid.UUID:
"""Insert the message + recipient rows and enqueue per-recipient jobs.
The transactional-outbox tail of the producer path: separated out
so :func:`send_communication` (which renders Jinja templates from
a registered :class:`CommType`) and :func:`send_user_template`
(which renders tokenized fields from a saved row) share one
persist + enqueue implementation.
All inserts ride *session*'s transaction and the enqueue rides
*queue*'s connection binding -- a single ``await session.commit()``
by the caller makes the message, recipients, and jobs durable
atomically.
Recipients suppressed by *preferences* leave no recipient row and
no job (the message row still records the attempt for audit).
"""
message_id = uuid.uuid4()
await session.execute(
insert(message_cls).values(
id=message_id,
comm_type=comm_type,
context=context,
subject=subject,
body=body,
),
)
delivered_ids: list[uuid.UUID] = []
for recipient in recipients:
if (
preferences is not None
and recipient.subject_key is not None
and not await preferences.is_enabled(
subject_key=recipient.subject_key,
comm_type=comm_type,
method=recipient.method,
)
):
continue
rid = uuid.uuid4()
await session.execute(
insert(recipient_cls).values(
id=rid,
message_id=message_id,
method=recipient.method,
address=recipient.address,
kind=recipient.kind,
status=DeliveryStatus.PENDING.value,
),
)
delivered_ids.append(rid)
if delivered_ids:
await queue.enqueue(
[DISPATCH_ENTRYPOINT] * len(delivered_ids),
[str(rid).encode("utf-8") for rid in delivered_ids],
[0] * len(delivered_ids),
)
return message_id
# -------------------------------------------------------------------
# Worker side -- the pgqueuer entrypoint factory.
# -------------------------------------------------------------------
[docs]
def make_dispatch_entrypoint(
*,
session_factory: async_sessionmaker[AsyncSession],
transports: dict[str, Transport],
message_cls: type[MessageMixin],
recipient_cls: type[RecipientMixin],
) -> Callable[[Job], Awaitable[None]]:
"""Build the worker-side handler for :data:`DISPATCH_ENTRYPOINT`.
Returns an ``async (job) -> None`` callable suitable for
``pgqueuer.PgQueuer.entrypoint``. Per job:
1. Decode the recipient id from ``job.payload``.
2. Open a session from *session_factory*; load the recipient
and the matching message.
3. Skip if the recipient is missing or already advanced past
:attr:`DeliveryStatus.PENDING` (job retried after a previous
success / explicit failure).
4. Look up the transport for the recipient's method; mark the
row failed if no transport is registered.
5. Call ``transport.send`` -- mark the row sent on success or
failed (with the error message) on raise.
Args:
session_factory: Async sessionmaker; each job opens a
short-lived session of its own.
transports: Method -> :class:`Transport` lookup.
message_cls: Consumer's concrete :class:`MessageMixin`.
recipient_cls: Consumer's concrete :class:`RecipientMixin`.
Returns:
Async handler the consumer registers under
:data:`DISPATCH_ENTRYPOINT`. Wrapped via
:func:`fsh_lib.queue.instrument_entrypoint`, so it emits an
OpenTelemetry consumer span per job when the
``fsh-lib[opentelemetry]`` extra is installed and runs
unwrapped otherwise.
"""
async def handler(job: Job) -> None:
if job.payload is None:
# An empty payload can't identify a recipient. Drop the
# job rather than raising -- pgqueuer's retry path would
# just hit the same condition again.
return
recipient_id = uuid.UUID(job.payload.decode("utf-8"))
async with session_factory() as session:
# ``SELECT ... FOR UPDATE SKIP LOCKED`` claims the row for
# this worker. pgqueuer can fan a job out to multiple
# workers (concurrent consumers, retry on the same worker
# while another instance is mid-send); without the lock,
# two workers see ``status == PENDING`` and both call
# ``transport.send`` -- the user gets duplicate
# notifications. ``skip_locked`` makes the second worker
# see no row rather than block, so it bails fast and
# leaves the in-flight worker to finish.
result = await session.execute(
select(recipient_cls)
.where(recipient_cls.id == recipient_id)
.with_for_update(skip_locked=True),
)
recipient = result.scalar_one_or_none()
if recipient is None:
return
if recipient.status != DeliveryStatus.PENDING.value:
# Job re-fire after a previous terminal outcome --
# leave the row alone so audit history stays honest.
return
message = await session.get(message_cls, recipient.message_id)
if message is None:
await _mark_failed(
session,
recipient_cls,
recipient_id,
error="message row missing",
)
await session.commit()
return
transport = transports.get(recipient.method)
if transport is None:
await _mark_failed(
session,
recipient_cls,
recipient_id,
error=f"no transport for method {recipient.method!r}",
)
await session.commit()
return
try:
await transport.send(message=message, recipient=recipient)
except Exception as exc: # noqa: BLE001 -- transport-agnostic
await _mark_failed(
session,
recipient_cls,
recipient_id,
error=_truncate(str(exc)),
)
await session.commit()
return
await session.execute(
update(recipient_cls)
.where(recipient_cls.id == recipient_id)
.values(
status=DeliveryStatus.SENT.value,
sent_at=_dt.datetime.now(tz=_dt.UTC),
error=None,
),
)
await session.commit()
return instrument_entrypoint(DISPATCH_ENTRYPOINT, handler)
_ERROR_COLUMN_LIMIT = 1024
"""Length cap matching :attr:`RecipientMixin.error`'s column type."""
def _truncate(text: str) -> str:
"""Clip *text* to fit :attr:`RecipientMixin.error`."""
if len(text) <= _ERROR_COLUMN_LIMIT:
return text
return text[: _ERROR_COLUMN_LIMIT - 1] + "…"
async def _mark_failed(
session: AsyncSession,
recipient_cls: type[RecipientMixin],
recipient_id: uuid.UUID,
*,
error: str,
) -> None:
"""Stamp a recipient row as :attr:`DeliveryStatus.FAILED`."""
await session.execute(
update(recipient_cls)
.where(recipient_cls.id == recipient_id)
.values(
status=DeliveryStatus.FAILED.value,
error=_truncate(error),
),
)
# -------------------------------------------------------------------
# Convenience: load the audit row
# -------------------------------------------------------------------
[docs]
async def load_recipients(
session: AsyncSession,
recipient_cls: type[RecipientMixin],
message_id: uuid.UUID,
) -> list[RecipientMixin]:
"""Return every recipient row for *message_id*, in insertion order.
Lightweight read helper for an audit endpoint -- generated route
handlers can call this without re-deriving the ``message_id ==``
filter.
"""
result = await session.execute(
select(recipient_cls).where(recipient_cls.message_id == message_id),
)
return list(result.scalars())
# -------------------------------------------------------------------
# User-defined templates
# -------------------------------------------------------------------
#
# The platform above is "developer-defined comms": types live in code
# as :class:`CommType` instances, templates are inlined at build time,
# and senders pick a type by name plus a typed context. This section
# adds the parallel "user-defined comms" surface -- a template row a
# user authored at runtime, optionally scoped to a resource so the
# editor can fill the body with that resource's fields, rendered with
# a Slate-friendly token format and dispatched through the same
# outbox + worker.
#
# The two surfaces share storage (:class:`MessageMixin` /
# :class:`RecipientMixin`) and the dispatch path -- the producer path
# diverges to render tokens and resolve recipient slots, then funnels
# back into :func:`_persist_and_enqueue`.
[docs]
class UserCommTemplateMixin:
"""SQLAlchemy mixin for the user-authored template table.
One row per saved template. Subclass on a regular ``Base`` to
materialise the table:
.. code-block:: python
from fsh_lib.comms import UserCommTemplateMixin
class UserCommTemplate(Base, UserCommTemplateMixin):
__tablename__ = "user_comm_templates"
The schema mirrors the editing UI: an optional target-resource
slug (``None`` = free-form, no variables and no per-object
recipient resolvers), a delivery method, tokenized subject /
body / recipient lists, attachments, and bookkeeping columns.
Tokens are persisted as JSON so the FE can round-trip its Slate
document without a separate serialization layer.
Token formats (all stored as JSON arrays):
* Subject / body -- ``[{"kind": "text", "value": str}, ...]``
with ``var`` tokens of shape ``{"kind": "var", "path": str}``
mixed in. ``path`` is a dot-walk into the variable
representation dump (e.g. ``"owner.email"``).
* Recipients / cc / bcc -- one of
``{"kind": "literal", "method": str, "address": str,
"subject_key": str | null}``
(concrete address hard-coded into the template) or
``{"kind": "resolver", "resolver": str}``
(a named slot resolved at send time against the loaded
target object).
* Attachments -- consumer-defined dict shape, passed through
to the rendered output unchanged so transports can decide
what to do with it.
The mixin owns no primary key -- declare ``id`` on the consumer
class (typically via a pgcraft PK plugin or an explicit
``Mapped[uuid.UUID]`` column) so the consumer's PK convention
wins. Same idiom as :class:`~fsh_lib.saved_views.SavedViewMixin`.
"""
if TYPE_CHECKING:
# Type-only -- concrete column lives on the consumer class.
# Kept here so generated code can rely on ``template.id``
# being typed without committing to a column shape the
# consumer may name differently.
id: Mapped[Any]
name: Mapped[str] = mapped_column(
String(255),
nullable=False,
)
"""Human-friendly title -- shown in the picker UI."""
description: Mapped[str | None] = mapped_column(
String(1024),
nullable=True,
)
"""Optional author note. Shown in template-management screens."""
target_resource: Mapped[str | None] = mapped_column(
String(128),
nullable=True,
index=True,
)
"""Resource slug this template is scoped to, or ``None`` for a
free-form template (no target object, no variables). Indexed so
"templates for asset" lists are cheap."""
method: Mapped[str] = mapped_column(
String(64),
nullable=False,
)
"""Delivery method (matches a transport key). Stored once on the
row -- the FE chooses the method at template-creation time and
every recipient inherits it (literal recipient tokens may
override per-row)."""
subject: Mapped[list[dict[str, Any]]] = mapped_column(
JSON,
nullable=False,
default=list,
)
"""Tokenized subject. Empty list for methods without subjects
(SMS, push)."""
body: Mapped[list[dict[str, Any]]] = mapped_column(
JSON,
nullable=False,
default=list,
)
"""Tokenized body."""
recipients: Mapped[list[dict[str, Any]]] = mapped_column(
JSON,
nullable=False,
default=list,
)
"""Tokenized To list."""
cc: Mapped[list[dict[str, Any]]] = mapped_column(
JSON,
nullable=False,
default=list,
)
"""Tokenized Cc list."""
bcc: Mapped[list[dict[str, Any]]] = mapped_column(
JSON,
nullable=False,
default=list,
)
"""Tokenized Bcc list."""
attachments: Mapped[list[dict[str, Any]]] = mapped_column(
JSON,
nullable=False,
default=list,
)
"""Consumer-defined attachment refs. Passed through to the
rendered output unchanged; rendering them is the transport's
job."""
# ``created_at`` / ``updated_at`` are pgcraft-managed -- the
# consumer's factory injects them via :class:`TimestampPlugin`.
created_by: Mapped[str | None] = mapped_column(
String(255),
nullable=True,
index=True,
)
"""Identifier of the author -- typically the session subject id.
Nullable so seed templates can be inserted out-of-band."""
[docs]
class RecipientResolver(Protocol):
"""Hook: resolve a named recipient slot against the target object.
One implementation per slot a resource declares (``"assignee"``,
``"watchers"``, ...). The signature mirrors a route handler: the
resolver gets the same async session the template render is
running on, plus the loaded model instance and the template's
method, and returns zero or more concrete :class:`RecipientSpec`
rows. ``kind`` on the returned rows is preserved so a resolver
can opt to bcc itself if it needs to.
"""
async def __call__(
self,
*,
session: AsyncSession,
instance: Any,
method: str,
) -> Sequence[RecipientSpec]:
"""Return concrete recipients for this slot."""
...
[docs]
@dataclasses.dataclass(frozen=True)
class RecipientSuggestionResolver:
"""Late-bound recipient group surfaced in the editor combobox.
Maps to a registered :class:`RecipientResolver`; the FE author
drops one of these in to fan out at send time.
Attributes:
slot: Resolver key registered on the target (e.g.
``"watchers"``). Stored on the resulting resolver-style
recipient token in the editor's saved JSON.
label: Optional display string; the FE defaults to
``"@<slot>"`` when omitted.
description: Optional secondary line shown beneath *label*
in the combobox row (e.g. ``"everyone watching the
task"``).
"""
slot: str
label: str | None = None
description: str | None = None
[docs]
@dataclasses.dataclass(frozen=True)
class RecipientSuggestionLiteral:
"""Concrete recipient the BE has handy at describe-options time.
Lets a resource expose directory entries (account-system users,
a curated mailing list, etc.) right next to the resolver groups
so the author picks a specific person without retyping the
address.
Attributes:
address: Method-specific destination (email, phone, ...).
Stored on the resulting literal recipient token in the
editor's saved JSON.
label: Optional display string (typically the person's
name); the FE defaults to *address* when omitted.
description: Optional secondary line (e.g. the address
itself when *label* is the name).
"""
address: str
label: str | None = None
description: str | None = None
RecipientSuggestion = RecipientSuggestionResolver | RecipientSuggestionLiteral
[docs]
@dataclasses.dataclass(frozen=True)
class AttachmentSuggestionResolver:
"""Late-bound attachment group (resolver-style)."""
slot: str
label: str | None = None
description: str | None = None
[docs]
@dataclasses.dataclass(frozen=True)
class AttachmentSuggestionFile:
"""Concrete file the BE has handy at describe-options time."""
name: str
ref: str | None = None
label: str | None = None
description: str | None = None
AttachmentSuggestion = AttachmentSuggestionResolver | AttachmentSuggestionFile
[docs]
@dataclasses.dataclass(frozen=True)
class UserTemplateTarget:
"""Per-resource wiring for user-template renders.
Registered once per resource that opts into being a template
target. ``representation_serializer`` returns the variable dump
-- a Pydantic model whose fields the author can pill into the
subject / body. ``recipient_resolvers`` maps the named slots the
author can drop into the to / cc / bcc lists.
Attributes:
resource_slug: User-facing slug the FE uses to scope a
template (e.g. ``"asset"``). Stored on
:attr:`UserCommTemplateMixin.target_resource`.
load: Async ``(session, target_id) -> instance | None``
loader. Returning ``None`` from a send call surfaces as
a :class:`LookupError`.
representation_serializer: ``(instance, session) -> BaseModel``
(sync) or the async equivalent that produces the variable
dump for *instance*. Both shapes are supported so kiln's
fields-driven serializers (sync, no IO) and user-supplied
async builders (may load related rows) drop in without
adapter glue.
representation_class: The Pydantic class returned by
:attr:`representation_serializer`. Exposed so the FE
options endpoint can surface the field schema without a
second round-trip.
recipient_resolvers: Slot name -> :class:`RecipientResolver`
implementation.
recipient_suggestions: Optional async builder that returns
the list of :data:`RecipientSuggestion` rows the
editor's combobox surfaces. Empty / unset falls back to
:func:`default_recipient_suggestions` -- one resolver
row per :attr:`recipient_resolvers` entry. Set this to
mix in directory entries (concrete addresses pulled from
the BE's user table, etc.) alongside the resolvers.
attachment_suggestions: Optional async builder that returns
the list of :data:`AttachmentSuggestion` rows for the
attachment lane. Unset means the lane only shows the
consumer-supplied upload affordance (or nothing).
"""
resource_slug: str
load: Callable[[AsyncSession, Any], Awaitable[Any | None]]
representation_serializer: Callable[
[Any, AsyncSession],
BaseModel | Awaitable[BaseModel],
]
representation_class: type[BaseModel]
recipient_resolvers: dict[str, RecipientResolver]
recipient_suggestions: (
Callable[[AsyncSession], Awaitable[Sequence[RecipientSuggestion]]]
| None
) = None
attachment_suggestions: (
Callable[[AsyncSession], Awaitable[Sequence[AttachmentSuggestion]]]
| None
) = None
[docs]
async def default_recipient_suggestions(
target: UserTemplateTarget,
_session: AsyncSession,
*,
groups: GroupRegistry | None = None,
) -> list[RecipientSuggestion]:
"""Build a suggestions list from *target*'s resolvers + project groups.
Used by route handlers when the consumer didn't supply a custom
:attr:`UserTemplateTarget.recipient_suggestions` builder. Emits
one resolver row per per-target slot first, then one per
project-wide group from *groups* so the editor's picker surfaces
both flavours in a single combobox.
"""
rows: list[RecipientSuggestion] = [
RecipientSuggestionResolver(slot=name)
for name in target.recipient_resolvers
]
if groups is not None:
rows.extend(
RecipientSuggestionResolver(
slot=group.slot,
label=group.label,
description=group.description,
)
for group in groups.recipient_groups()
)
return rows
[docs]
async def default_attachment_suggestions(
_target: UserTemplateTarget | None,
_session: AsyncSession,
*,
groups: GroupRegistry | None = None,
) -> list[AttachmentSuggestion]:
"""Build the attachment suggestions list from project groups.
Per-resource attachment groups aren't a concept on
:class:`UserTemplateTarget` -- the platform only supports
project-wide attachment groups via *groups*. Returns one
resolver-style suggestion per registered attachment group.
"""
if groups is None:
return []
return [
AttachmentSuggestionResolver(
slot=group.slot,
label=group.label,
description=group.description,
)
for group in groups.attachment_groups()
]
[docs]
class UserTemplateTargetRegistry:
"""Registry of :class:`UserTemplateTarget` entries by resource slug.
Built once at app startup -- typically populated from generated
code that mirrors the project's
:attr:`~be.config.schema.ResourceConfig.comms_target` declarations.
Looked up by user-template route handlers and by
:func:`send_user_template` to resolve variables and recipient
slots.
Not thread-safe; mutate only during startup.
"""
def __init__(self) -> None:
"""Build an empty registry."""
self._targets: dict[str, UserTemplateTarget] = {}
[docs]
def register(self, target: UserTemplateTarget) -> None:
"""Add *target* to the registry.
Raises:
ValueError: When the slug is already registered.
"""
if target.resource_slug in self._targets:
msg = (
f"user-template target {target.resource_slug!r} "
f"already registered"
)
raise ValueError(msg)
self._targets[target.resource_slug] = target
[docs]
def get(self, slug: str) -> UserTemplateTarget:
"""Return the target registered under *slug*.
Raises:
KeyError: If *slug* isn't registered.
"""
try:
return self._targets[slug]
except KeyError:
msg = f"unknown user-template target: {slug!r}"
raise KeyError(msg) from None
[docs]
def slugs(self) -> tuple[str, ...]:
"""Return the registered slugs in insertion order."""
return tuple(self._targets)
# -------------------------------------------------------------------
# Project-wide group registry.
#
# Resolver / attachment slots that aren't tied to a single resource.
# An author scoping a template against any target -- or no target at
# all -- can drop these in: "@all-staff" works on a task email, a
# project email, and an unscoped broadcast alike. The send path
# falls through to this registry whenever a resolver token's slot
# isn't owned by the per-target registry.
# -------------------------------------------------------------------
[docs]
class GroupRecipientResolver(Protocol):
"""Resolve a project-wide recipient group.
Signature differs from :class:`RecipientResolver` -- no
*instance*, since the slot isn't anchored to a specific target
object. The session is still threaded through so a resolver can
query a directory / membership table.
"""
async def __call__(
self,
*,
session: AsyncSession,
method: str,
) -> Sequence[RecipientSpec]:
"""Return concrete recipients for this group."""
...
[docs]
class GroupAttachmentResolver(Protocol):
"""Resolve a project-wide attachment group.
Returns a single concrete file's metadata. Like
:class:`GroupRecipientResolver`, no *instance* -- the file doesn't
depend on a specific target object.
"""
async def __call__(
self,
*,
session: AsyncSession,
) -> AttachmentSuggestionFile:
"""Return the resolved file metadata for this group."""
...
[docs]
@dataclasses.dataclass(frozen=True)
class RecipientGroup:
"""Project-wide recipient slot definition.
Registered against a :class:`GroupRegistry` so any template
(regardless of scope) can drop ``@<slot>`` into to / cc / bcc and
the send path resolves it via *resolver* at send time.
Attributes:
slot: Stable identifier stored on the recipient token's
``resolver`` field; same shape as a per-target slot.
resolver: Async callable returning concrete
:class:`RecipientSpec` rows.
label: Optional display string surfaced in the editor's
recipient picker. Defaults to ``"@<slot>"``.
description: Optional secondary line below *label*.
"""
slot: str
resolver: GroupRecipientResolver
label: str | None = None
description: str | None = None
[docs]
@dataclasses.dataclass(frozen=True)
class AttachmentGroup:
"""Project-wide attachment slot definition.
Mirrors :class:`RecipientGroup` for the attachment lane. The
slot resolves to a single :class:`AttachmentSuggestionFile` at
send time.
"""
slot: str
resolver: GroupAttachmentResolver
label: str | None = None
description: str | None = None
[docs]
class GroupRegistry:
"""Project-wide recipient / attachment group registry.
One instance per app, populated at startup -- typically from
generated code that mirrors the project's ``recipient_groups`` /
``attachment_groups`` jsonnet declarations. Looked up by the
user-template route handlers (to surface groups in the editor's
suggestion popover) and by :func:`render_user_template` (to
expand resolver tokens whose slot isn't owned by any per-target
target).
Not thread-safe -- mutate only during startup.
"""
def __init__(self) -> None:
"""Build an empty registry."""
self._recipients: dict[str, RecipientGroup] = {}
self._attachments: dict[str, AttachmentGroup] = {}
[docs]
def register_recipient(self, group: RecipientGroup) -> None:
"""Add a recipient group.
Raises:
ValueError: When *slot* is already registered.
"""
if group.slot in self._recipients:
msg = f"recipient group {group.slot!r} already registered"
raise ValueError(msg)
self._recipients[group.slot] = group
[docs]
def register_attachment(self, group: AttachmentGroup) -> None:
"""Add an attachment group.
Raises:
ValueError: When *slot* is already registered.
"""
if group.slot in self._attachments:
msg = f"attachment group {group.slot!r} already registered"
raise ValueError(msg)
self._attachments[group.slot] = group
[docs]
def recipient(self, slot: str) -> RecipientGroup | None:
"""Return the recipient group for *slot*, or ``None``."""
return self._recipients.get(slot)
[docs]
def attachment(self, slot: str) -> AttachmentGroup | None:
"""Return the attachment group for *slot*, or ``None``."""
return self._attachments.get(slot)
[docs]
def recipient_groups(self) -> tuple[RecipientGroup, ...]:
"""Return registered recipient groups in insertion order."""
return tuple(self._recipients.values())
[docs]
def attachment_groups(self) -> tuple[AttachmentGroup, ...]:
"""Return registered attachment groups in insertion order."""
return tuple(self._attachments.values())
def _walk_path(data: Any, path: str) -> Any:
"""Walk a dotted *path* into the variable dump.
Returns ``None`` for a missing segment rather than raising so a
partially-populated representation still renders. Callers
stringify the result themselves.
"""
cursor: Any = data
for segment in path.split("."):
if cursor is None:
return None
if isinstance(cursor, dict):
cursor = cursor.get(segment)
continue
cursor = getattr(cursor, segment, None)
return cursor
[docs]
def render_text_field(
tokens: Sequence[dict[str, Any]],
variables: dict[str, Any],
) -> str:
"""Concatenate tokenized *tokens* into a plain string.
``text`` tokens emit their ``value`` verbatim; ``var`` tokens
resolve through dotted-path lookup against *variables* and
stringify. Missing or ``None`` values render as the empty
string -- an unresolved variable at send time should produce a
sendable message and let the FE surface a "this pill won't
fill in" warning rather than blow up the dispatch path.
Args:
tokens: Slate-encoded token list.
variables: Variable dump -- the dict form of the resource's
representation Pydantic model.
Returns:
Rendered field as a single string.
Raises:
ValueError: When a token's ``kind`` is unrecognised -- a
sign of an old client encoding tokens the platform
doesn't know about, which is a real bug rather than
something to silently swallow.
"""
pieces: list[str] = []
for token in tokens:
kind = token.get("kind")
if kind == "text":
pieces.append(str(token.get("value", "")))
continue
if kind == "var":
path = token.get("path", "")
value = _walk_path(variables, path) if path else None
pieces.append("" if value is None else str(value))
continue
msg = f"unknown subject/body token kind: {kind!r}"
raise ValueError(msg)
return "".join(pieces)
async def _resolve_recipient_tokens(
tokens: Sequence[dict[str, Any]],
*,
kind: RecipientKind,
target: UserTemplateTarget | None,
instance: Any | None,
session: AsyncSession,
default_method: str,
groups: GroupRegistry | None,
) -> list[RecipientSpec]:
"""Expand a templated recipient list into concrete :class:`RecipientSpec` s.
Each returned spec inherits *kind* (``"to"`` / ``"cc"`` /
``"bcc"``) so all three lists from a template flatten into one
persist-time recipient sequence with the correct addressing
semantics preserved.
``literal`` tokens pass through with their declared method /
address; ``resolver`` tokens look up the slot on *target* first,
then fall through to the project-wide *groups* registry so a
template scoped to any (or no) target can drop a project-wide
group like ``@all-staff``. A resolver that returns multiple
rows fans out into multiple specs -- the watcher case.
"""
out: list[RecipientSpec] = []
for token in tokens:
token_kind = token.get("kind")
if token_kind == _LITERAL_TOKEN_KIND:
out.append(
RecipientSpec(
method=str(token.get("method", default_method)),
address=str(token["address"]),
kind=kind,
subject_key=token.get("subject_key"),
),
)
continue
if token_kind == _RESOLVER_TOKEN_KIND:
slot = str(token["resolver"])
resolved = await _invoke_recipient_resolver(
slot=slot,
target=target,
instance=instance,
session=session,
default_method=default_method,
groups=groups,
)
# Override the kind to match the slot the author dropped
# the resolver into -- a resolver doesn't get to decide
# whether it's the To list or the Bcc list. Method is
# left to the resolver in case a slot picks a method-
# specific address (the assignee's SMS number rather
# than their email).
out.extend(
dataclasses.replace(spec, kind=kind) for spec in resolved
)
continue
msg = f"unknown recipient token kind: {token_kind!r}"
raise ValueError(msg)
return out
async def _invoke_recipient_resolver(
*,
slot: str,
target: UserTemplateTarget | None,
instance: Any | None,
session: AsyncSession,
default_method: str,
groups: GroupRegistry | None,
) -> Sequence[RecipientSpec]:
"""Look up and invoke the resolver for *slot*.
Per-target resolvers win when the template is scoped and the
slot is registered against the target; otherwise the project-
wide :class:`GroupRegistry` is consulted. Raises a
:class:`ValueError` when neither owns the slot.
"""
if target is not None and instance is not None:
per_target = target.recipient_resolvers.get(slot)
if per_target is not None:
return await per_target(
session=session,
instance=instance,
method=default_method,
)
if groups is not None:
group = groups.recipient(slot)
if group is not None:
return await group.resolver(
session=session,
method=default_method,
)
if target is None:
msg = (
f"unknown recipient resolver {slot!r}: "
f"no per-target target and no project group registered"
)
else:
msg = (
f"unknown recipient resolver {slot!r}: "
f"not on {target.resource_slug!r} and no project group "
f"registered"
)
raise ValueError(msg)
[docs]
@dataclasses.dataclass(frozen=True)
class RenderedUserTemplate:
"""Output of :func:`render_user_template` -- pre-send view.
Returned to the FE preview endpoint directly; the send path
persists it as a :class:`MessageMixin` row + a fan-out of
:class:`RecipientMixin` rows.
"""
subject: str
body: str
recipients: list[RecipientSpec]
cc: list[RecipientSpec]
bcc: list[RecipientSpec]
attachments: list[dict[str, Any]]
[docs]
async def render_user_template(
template: UserCommTemplateMixin,
*,
session: AsyncSession,
targets: UserTemplateTargetRegistry,
target_id: Any | None = None,
groups: GroupRegistry | None = None,
) -> RenderedUserTemplate:
"""Render *template* against the target object identified by *target_id*.
For an unscoped template (``target_resource is None``) the
variable dump is empty and ``target_id`` must also be ``None``;
recipient tokens may still be ``literal`` or reference a
project-wide *groups* slot.
For a scoped template, the target is loaded, the representation
serializer runs to produce the variable dump, and resolver
recipient tokens fire against the loaded instance (per-target
resolvers win) or fall through to the project-wide group
registry. Attachment tokens with ``kind: "resolver"`` resolve
via the same group registry.
Raises:
ValueError: When the scope / target_id combination is
inconsistent or a resolver token references a slot that
isn't registered on the target or in the project group
registry.
LookupError: When the target object can't be loaded.
"""
if template.target_resource is None:
if target_id is not None:
msg = "template is unscoped but a target_id was supplied"
raise ValueError(msg)
target: UserTemplateTarget | None = None
instance: Any | None = None
variables: dict[str, Any] = {}
else:
if target_id is None:
msg = (
f"template is scoped to {template.target_resource!r} "
f"but no target_id was supplied"
)
raise ValueError(msg)
target = targets.get(template.target_resource)
instance = await target.load(session, target_id)
if instance is None:
msg = (
f"{template.target_resource!r} target_id "
f"{target_id!r} not found"
)
raise LookupError(msg)
rep = await _maybe_await(
target.representation_serializer(instance, session),
)
variables = rep.model_dump(mode="json")
subject = render_text_field(template.subject, variables)
body = render_text_field(template.body, variables)
recipients = await _resolve_recipient_tokens(
template.recipients,
kind="to",
target=target,
instance=instance,
session=session,
default_method=template.method,
groups=groups,
)
cc = await _resolve_recipient_tokens(
template.cc,
kind="cc",
target=target,
instance=instance,
session=session,
default_method=template.method,
groups=groups,
)
bcc = await _resolve_recipient_tokens(
template.bcc,
kind="bcc",
target=target,
instance=instance,
session=session,
default_method=template.method,
groups=groups,
)
attachments = await _resolve_attachment_tokens(
template.attachments,
session=session,
groups=groups,
)
return RenderedUserTemplate(
subject=subject,
body=body,
recipients=recipients,
cc=cc,
bcc=bcc,
attachments=attachments,
)
async def _resolve_attachment_tokens(
tokens: Sequence[dict[str, Any]],
*,
session: AsyncSession,
groups: GroupRegistry | None,
) -> list[dict[str, Any]]:
"""Expand attachment tokens into concrete file metadata.
``file`` tokens pass through verbatim (they already carry
name/ref). ``resolver`` tokens look up the slot on the project
*groups* registry and substitute the resolved file's metadata.
Without a registered group, raises -- callers catch and surface
as 422 at the route boundary.
"""
out: list[dict[str, Any]] = []
for token in tokens:
token_kind = token.get("kind")
if token_kind == _FILE_TOKEN_KIND:
out.append(dict(token))
continue
if token_kind == _RESOLVER_TOKEN_KIND:
slot = str(token["resolver"])
if groups is None:
msg = (
f"resolver attachment token {slot!r} "
f"requires a project group registry"
)
raise ValueError(msg)
group = groups.attachment(slot)
if group is None:
msg = f"unknown attachment resolver {slot!r}"
raise ValueError(msg)
resolved = await group.resolver(session=session)
entry: dict[str, Any] = {"kind": "file", "name": resolved.name}
if resolved.ref is not None:
entry["ref"] = resolved.ref
out.append(entry)
continue
msg = f"unknown attachment token kind: {token_kind!r}"
raise ValueError(msg)
return out
_LITERAL_TOKEN_KIND = "literal" # noqa: S105 -- token discriminator, not a secret
_RESOLVER_TOKEN_KIND = "resolver" # noqa: S105 -- token discriminator, not a secret
_FILE_TOKEN_KIND = "file" # noqa: S105 -- token discriminator, not a secret
async def _maybe_await[T](result: T | Awaitable[T]) -> T:
"""Await *result* if it's an awaitable; otherwise pass through.
Lets :func:`render_user_template` accept both sync representation
serializers (kiln's fields-driven default — no IO, no need for
async) and async ones (user-supplied builders that may query
related rows) without forcing the call site to fork.
"""
if isinstance(result, Awaitable):
return await result
return result
[docs]
@dataclasses.dataclass(frozen=True)
class ResolvedRecipientAddress:
"""Concrete recipient row produced by a resolver, with display label."""
address: str
label: str | None = None
[docs]
@dataclasses.dataclass(frozen=True)
class TargetResolutions:
"""Concrete values for a target object, ready for FE display.
Returned by :func:`resolve_target` when the editor renders against
a *known* target -- the FE drops these into pills and chips so the
author sees the values that ``@watchers`` and ``{title}`` would
actually expand to right now.
Attributes:
variables: Dump of the target's representation Pydantic model.
Same dict :func:`render_user_template` walks for variable
tokens; keys are top-level field names + nested objects.
recipient_slots: Slot name → addresses each registered
recipient resolver returns for *this* target. Empty list
for slots that resolved to nothing.
"""
variables: dict[str, Any]
recipient_slots: dict[str, list[ResolvedRecipientAddress]]
[docs]
async def resolve_target(
*,
session: AsyncSession,
target: UserTemplateTarget,
instance: Any,
method: str,
) -> TargetResolutions:
"""Compute the variable dump + per-slot resolved recipients.
*instance* is the already-loaded target object -- callers should
use :attr:`UserTemplateTarget.load` upstream to fetch it (so a
missing target maps to ``LookupError`` at the route boundary, not
here). *method* drives any resolver that picks a method-specific
address; templates carry ``method`` on the row itself.
The resolution maps mirror what the FE editor's ``resolutions``
prop expects -- one variables dict, one recipient-slot map.
Resolvers that raise propagate; the route boundary turns them
into a 422.
"""
rep = await _maybe_await(
target.representation_serializer(instance, session),
)
variables = rep.model_dump(mode="json")
recipient_slots: dict[str, list[ResolvedRecipientAddress]] = {}
for slot, resolver in target.recipient_resolvers.items():
specs = await resolver(
session=session,
instance=instance,
method=method,
)
recipient_slots[slot] = [
ResolvedRecipientAddress(address=spec.address) for spec in specs
]
return TargetResolutions(
variables=variables,
recipient_slots=recipient_slots,
)
USER_TEMPLATE_COMM_TYPE = "_user_template"
"""Sentinel comm-type stamped on :attr:`MessageMixin.comm_type` for
sends originating from a user-defined template. The audit row carries
the template id + target id in :attr:`MessageMixin.context` so the
audit log can distinguish developer-defined sends from user-template
sends without a separate column."""
[docs]
async def send_user_template(
*,
session: AsyncSession,
queue: Queries,
template: UserCommTemplateMixin,
targets: UserTemplateTargetRegistry,
message_cls: type[MessageMixin],
recipient_cls: type[RecipientMixin],
target_id: Any | None = None,
preferences: PreferenceResolver | None = None,
groups: GroupRegistry | None = None,
) -> uuid.UUID:
"""Render *template* against *target_id* and dispatch via the outbox.
Mirrors :func:`send_communication` but skips the registry +
Jinja steps -- the template's tokenized fields are rendered
in-process by :func:`render_user_template` and the result is
persisted into the same :class:`MessageMixin` /
:class:`RecipientMixin` tables. All three of the template's
recipient lists (to / cc / bcc) flow through one persist call so
the worker delivers cc and bcc rows like any other recipient --
with their :attr:`RecipientMixin.kind` set so an email transport
can lay out the headers correctly.
The audit row's :attr:`MessageMixin.context` carries
``template_id``, ``template_name``, ``target_resource``, and
``target_id`` so a re-render or replay can re-fetch the same
inputs. Attachment refs come along on ``context.attachments``
for transports that need them.
Args:
session: Async SQLAlchemy session.
queue: pgqueuer ``Queries`` bound to *session*'s connection.
template: The :class:`UserCommTemplateMixin` row to send.
targets: Registry binding resource slugs to target wiring.
message_cls: Consumer's concrete :class:`MessageMixin`.
recipient_cls: Consumer's concrete :class:`RecipientMixin`.
target_id: Target-object PK. ``None`` for unscoped templates.
preferences: Optional preference resolver -- gates per-method
opt-in just like :func:`send_communication`.
groups: Optional project-wide :class:`GroupRegistry` for
recipient + attachment slots that aren't per-target.
Forwarded to :func:`render_user_template`.
Returns:
Id of the inserted message row.
"""
rendered = await render_user_template(
template,
session=session,
targets=targets,
target_id=target_id,
groups=groups,
)
return await _persist_and_enqueue(
session=session,
queue=queue,
comm_type=USER_TEMPLATE_COMM_TYPE,
context={
"template_id": str(template.id),
"template_name": template.name,
"target_resource": template.target_resource,
"target_id": None if target_id is None else str(target_id),
"attachments": rendered.attachments,
},
subject=rendered.subject,
body=rendered.body,
recipients=[
*rendered.recipients,
*rendered.cc,
*rendered.bcc,
],
message_cls=message_cls,
recipient_cls=recipient_cls,
preferences=preferences,
)