API reference

Runtime helpers for kiln-generated FastAPI projects.

Each submodule groups related primitives:

  • fsh_lib.auth – JWT session auth (bearer + cookie transports).

  • fsh_lib.comms – communication-platform primitives: CommType registry, transport adapters, preference resolver, and a transactional-outbox send_communication() that rides the request session via pgqueuer.

  • fsh_lib.filesFileMixin (pgcraft- flavoured storage-column mixin) + S3 client + four ready-made action functions. Requires the files extra (pip install 'kiln-generator[files]') for boto3.

  • fsh_lib.filters – declarative filter expressions.

  • fsh_lib.identifiersautogenerated_identifier_field(): a create-form field default for server-assigned identifiers (pgcraft’s PGCraftAutogeneratedIdentifierColumn), marking them optional, read-only, and tagged with the x-autogenerated OpenAPI extension.

  • fsh_lib.invalidationQueryInvalidations request-scoped collector that writes X-Invalidate-Queries response headers consumed by the FE’s TanStack Query layer.

  • fsh_lib.links – short-link primitives: ShortLinkMixin storage columns, shorten() (explicit producer entry point with same-URL dedup and a configurable base62 code length), and resolve() for the redirect handler.

  • fsh_lib.openapiconstruct_openapi_extra() helper that builds the kiln-emitted openapi_extra dict (x-cache-key, x-resource, x-auth-role) every generated route uses.

  • fsh_lib.loadingapply_eager_loads(): eager-load options that reuse an ordering join via contains_eager instead of fetching the relationship twice.

  • fsh_lib.numericDecimalString (and the decimal_string() precision/scale factory): a Decimal that stays exact in Python and crosses the wire as a precision-safe JSON string, typed {"type": "string"} so the FE’s openapi-ts codegen renders it as a TypeScript string (parsed with a decimal library when the FE needs to compute) rather than a lossy number.

  • fsh_lib.geoCoordinateSchema: the wire form of a geographic coordinate, an object with DecimalString latitude / longitude parts (precision-safe strings, string in the generated TypeScript) – what DecimalString is for a single decimal, for a lat/long pair.

  • fsh_lib.orderingapply_ordering(): ORDER BY application, returning the relationships it LEFT-joined.

  • fsh_lib.pagination – keyset and offset pagination.

  • fsh_lib.opaOpaClient for a stateless Rego (OPA) permissions service: assembles the decision input (subject, action, resource, roles, bindings) and POSTs it for a Decision. Requires the opa extra (pip install 'kiln-generator[opa]') for httpx.

  • fsh_lib.queue – pgqueuer integration: get_queue() for transactional-outbox enqueue from a SQLAlchemy session, open_worker_driver() for the worker-side asyncpg connection, and instrument_entrypoint() to wrap an entrypoint coroutine in an OpenTelemetry consumer span.

  • fsh_lib.transformersbuild_patch_kwargs() for generated PATCH-builder one-liners that project model_fields_set onto a kwargs dict.

  • fsh_lib.utilsget_object_from_query_or_404 (the load-or-404 helper used by every read/mutate handler) and run_once().

Generated code imports from the owning submodule directly – from fsh_lib.files import FileMixin, from fsh_lib.auth import session_auth – so the package root is intentionally empty. This keeps the public surface organized by concern rather than as one flat namespace.

Everything here is pure Python – the be CLI knows to emit imports pointing at these submodules instead of scaffolding a utils.py into the generated app.

Action availability for kiln-generated FastAPI projects.

An action is anything you can do to (or with) a resource: the built-in CRUD ops (get, list, create, update, delete) plus any custom action endpoints declared in the spec. Every action carries a single guard callable –

async def can_<name>(resource, session) -> bool

– that decides two things at once: whether the current session may execute the action, and whether the action should appear in serialized responses so the frontend can show the corresponding button. Object-scope guards see the resource instance; collection- scope guards see None (there is no per-row resource yet).

Generated code emits one actions.py per app holding tuples of ActionSpec per resource (object and collection scopes kept separate). The route-handler templates call available_actions() against the right tuple to populate the actions field on response payloads, and call the matching can callable directly before executing each handler so the visibility predicate and the authorization gate can never drift.

class ActionRef(**data)[source]

One action exposed in a serialized response.

Carries the bare minimum the frontend needs to render a button: the action name (matches the operation name on the backend) and its scope. Kept Pydantic so the OpenAPI schema surfaces a stable shape; consumers downstream get a typed TS interface for free.

Generic over the name and scope types so generated code can subclass with a Literal union for name and a single-member scopeActionRef[FooAction, Literal["object"]] – and have the OpenAPI schema (and the TypeScript types openapi-ts derives from it) carry the enum / const rather than a bare str. Narrowing through the type parameters keeps the override compatible, so no field has to be redeclared.

name

Operation name (e.g. "publish", "update").

scope

"object" for per-row actions, "collection" for actions that target the resource as a whole.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ActionSpec(name, can, is_object_action)[source]

Generator-emitted descriptor for one action.

Lives in the per-app actions.py registry. The route handlers and serializers consume ActionSpec tuples via available_actions(); nothing outside generated code should construct these by hand.

name

Operation name.

can

Async guard returning True when the action is available. Bound to always_true() when the spec did not declare a can dotted path.

is_object_action

True for object-scope actions, False for collection-scope actions. Drives the ActionRef scope field and disambiguates which tuple a spec belongs in.

property scope: Literal['object', 'collection']

Match ActionRef.scope for this spec.

Scope

Whether an action targets a single resource or a collection.

Object-scope actions take (resource, session); collection- scope actions take (None, session). The frontend uses this to decide where to render the button – per row or once on the list page.

alias of Literal[‘object’, ‘collection’]

async always_true(_resource, _session)[source]

Default guard used when an action declares no can path.

Returning True unconditionally matches the historical behavior of generated handlers (auth handled at the route level, no per-action gating); opting in to action gating is additive.

Return type:

bool

async available_actions(resource, session, specs, ref_cls=<class 'fsh_lib.actions.ActionRef'>)[source]

Return the subset of specs whose guards pass for session.

The guard for each spec is awaited in declaration order; specs whose guard returns False are dropped. Order is preserved so the frontend can rely on a stable button layout driven by the spec.

ref_cls lets generated code substitute a per-resource typed ActionRef subclass – e.g. AssetObjectPermission whose name is a Literal["get", "update", "publish"] – so the OpenAPI schema (and the TypeScript types openapi-ts derives from it) surface the enum rather than a bare str. The default keeps the historical untyped shape for callers that don’t go through codegen.

Parameters:
  • resource (Any) – The SQLAlchemy instance for object-scope dumps, or None for collection-scope dumps.

  • session (Any) – Whatever the auth dep resolved – passed through to each guard untouched.

  • specs (Iterable[ActionSpec]) – Iterable of ActionSpec; typically a tuple literal from the generated per-app actions.py.

  • ref_cls (type[TypeVar(T, bound= BaseModel)]) – Pydantic model used to construct each ref. Defaults to ActionRef.

Return type:

list[TypeVar(T, bound= BaseModel)]

Returns:

List of ref_cls instances, one per spec whose guard returned True, in spec order.

find_can(specs, name)[source]

Return the guard for the action name in specs.

Generated handlers resolve their own row-level guard at module-import time – e.g. _CAN_LIST = find_can(...) – so the per-request path is a single attribute lookup rather than a tuple scan.

Parameters:
  • specs (Iterable[ActionSpec]) – An ActionSpec tuple from the per-app registry, typically the collection-scoped tuple for row-level filters or the object-scoped tuple for execution-time gates.

  • name (str) – Operation name to look up (e.g. "list", "publish").

Return type:

Callable[[Any, Any], Awaitable[bool]]

Returns:

The bound guard callable (a CanCallable).

Raises:

KeyError – If name is not present in specs – the generated code should never reach this branch since the registry is built from the same operation list.

JWT auth primitives for kiln-generated FastAPI projects.

A session is a Pydantic model dumped into JWT claims. Tokens travel over one or both of two sources:

  • "bearer"Authorization header; API clients.

  • "cookie"httpOnly cookie; browser frontends (out of reach of JS so XSS can’t steal it).

The signing secret lives in an env var (caller-named, typically JWT_SECRET) so generated source never embeds a key.

DEFAULT_TOKEN_TTL = datetime.timedelta(seconds=1800)

Default exp stamped on tokens when the caller doesn’t set one.

class LoginResponse(**data)[source]

OAuth2-shaped login body for the bearer case.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class OkResponse(**data)[source]

Minimal ack body for cookie-only login and every logout.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class SessionStore(*args, **kwargs)[source]

Hook pair for server-side session state (deny-list, sessions, …).

Turns the stateless-JWT flow stateful. The store receives the full session model so it can key on whatever identity claim the consumer picks (typically jti); fsh_lib.auth stays agnostic.

Both methods are async so the store can hit a database.

async is_revoked(session)[source]

Return True to reject the request with HTTP 401.

Return type:

bool

async revoke(session)[source]

Mark session dead. Must be idempotent.

Return type:

None

clear_session(response, *, sources, cookie_name=None, cookie_secure=True, cookie_samesite='lax')[source]

Delete the session cookie if configured; ack for bearer.

cookie_secure and cookie_samesite must match the values issue_session() used – browsers refuse to overwrite an existing cookie when either attribute differs.

Return type:

OkResponse

decode_jwt(token, *, secret_env, algorithm)[source]

Decode token and return its claims, or raise HTTP 401.

Return type:

dict[str, Any]

encode_jwt(payload, *, secret_env, algorithm, ttl=datetime.timedelta(seconds=1800))[source]

Sign payload as a JWT; stamps exp if absent. Never mutates.

Return type:

str

issue_session(response, session, *, sources, secret_env, algorithm, ttl=datetime.timedelta(seconds=1800), cookie_name=None, cookie_secure=True, cookie_samesite='lax')[source]

Mint a JWT and emit it to every configured transport.

Upstream validate_login returns a None session in the case of no user, a password that failed validation, etc.

Return type:

LoginResponse | OkResponse

session_auth(schema, sources, *, secret_env, algorithm, token_url=None, cookie_name=None, store=None)[source]

Build a FastAPI dep that yields a validated schema instance.

The returned callable takes one parameter per supported transport; configured sources plug in their real extractors, unconfigured ones get a no-token shim (returns None). The first non-None token wins. Claims parse through model_validate() so handlers get the full model, not a raw dict.

store, when supplied, turns every authenticated request into a deny-list check – avoids a wrapper dep on the consumer side.

Return type:

Callable[..., Awaitable[TypeVar(SessionT, bound= BaseModel)]]

Communication-platform primitives for kiln-generated apps.

Sends typed messages over pluggable delivery methods (email, SMS, push, …) using pgqueuer as the transactional outbox. The pieces:

  • CommType – a named, schema-validated communication. Carries a Pydantic context_schema and a pair of template strings (subject + body); CommRegistry holds the set the consumer’s app supports.

  • Renderer – a Protocol that turns a CommType plus a validated context into a rendered subject + body pair. The default JinjaRenderer is in-process; the same surface fits an HTTP-call renderer that defers to a separate template service (a node renderer, MJML compiler, anything) without changing the platform’s call site.

  • MessageMixin / RecipientMixin / NotificationPreferenceMixin – SQLAlchemy mixins supplying the storage columns for the three tables every comm-platform install needs. Same pgcraft-friendly idiom as the file / rate-limit mixins: consumer owns the table, we own the columns.

  • Transport – a Protocol implemented per delivery method. One adapter per supported method, e.g. email, sms. Implementations live in consumer code or third-party adapters; this module ships LoggingTransport for tests and local development.

  • PreferenceResolver – a Protocol that answers “should this recipient receive this comm-type via this method?” Looked up once per recipient inside send_communication(); an opted-out recipient yields no row and no job (the message row still records the attempt for audit).

  • send_communication() – the producer entry point. Validates context against the CommType schema, renders the templates, inserts one message row and one recipient row per delivery, then enqueues one pgqueuer job per recipient under DISPATCH_ENTRYPOINT. All writes ride the caller’s SQLAlchemy session and the pgqueuer Queries is bound to the same connection (see fsh_lib.queue.get_queue()), so a single await session.commit() makes the message + recipients + jobs durable atomically.

  • make_dispatch_entrypoint() – the worker-side counterpart. Returns an async (job) -> None callable wired to the consumer’s session factory, transports, and mixin classes – register it against DISPATCH_ENTRYPOINT on a pgqueuer PgQueuer instance.

The module’s only optional dependency is jinja2 (already pulled in by kiln-generator for codegen); no extras gate is needed.

class AttachmentGroup(slot, resolver, label=None, description=None)[source]

Project-wide attachment slot definition.

Mirrors RecipientGroup for the attachment lane. The slot resolves to a single AttachmentSuggestionFile at send time.

class AttachmentSuggestionFile(name, ref=None, label=None, description=None)[source]

Concrete file the BE has handy at describe-options time.

class AttachmentSuggestionResolver(slot, label=None, description=None)[source]

Late-bound attachment group (resolver-style).

class CommRegistry[source]

Mutable registry of CommType entries by name.

Built once at app startup (or as a module-level global if the consumer prefers) and passed into send_communication() and make_dispatch_entrypoint(). Not thread-safe – mutate it only during startup.

get(name)[source]

Return the CommType registered under name.

Raises:

KeyError – If name is not registered.

Return type:

CommType[Any]

names()[source]

Return registered comm-type names in registration order.

Return type:

tuple[str, ...]

register(comm_type)[source]

Add comm_type to the registry.

Raises:

ValueError – If a type with the same name is already registered. Re-registration is almost always a bug (two modules thought they owned the same name); force callers to deregister first if they really want it.

Return type:

None

class CommType(name, context_schema, subject_template, body_template, default_methods=())[source]

A named communication: schema + templates + default methods.

A CommType is the unit of design for the comm platform: it binds a Pydantic context schema to a pair of template strings plus the set of methods the consumer wants delivery on by default. The same instance is shared by the producer (which validates and renders) and the worker (which renders too if the rendered body wasn’t persisted).

name

Stable identifier (e.g. "order_shipped"). Used as the registry key and stored on the message row so the audit log survives template churn.

context_schema

Pydantic model describing the fields the templates reference. send_communication validates the caller-supplied context against this before rendering, so missing or mistyped fields fail fast at the call site instead of half-way through a template.

subject_template

Source string for the subject line. Interpreted by the configured Renderer; the default JinjaRenderer treats it as Jinja2.

body_template

Source string for the body. Same renderer treatment as subject_template.

default_methods

Methods to deliver on when the caller doesn’t pass an explicit recipient list. Empty tuple means “no default; caller must specify recipients”.

DISPATCH_ENTRYPOINT = 'fsh_lib_comms_dispatch'

pgqueuer entrypoint name jobs are enqueued under.

The producer side (send_communication()) enqueues under this name; the worker side must register its handler under the same name (see make_dispatch_entrypoint()). Exposed so consumers don’t hard-code the literal in two places.

class DeliveryStatus(*values)[source]

Lifecycle states of a single recipient’s delivery attempt.

FAILED = 'failed'

Transport raised; error carries the message.

PENDING = 'pending'

Row inserted, job enqueued, transport not yet called.

SENT = 'sent'

Transport returned without raising.

class GroupAttachmentResolver(*args, **kwargs)[source]

Resolve a project-wide attachment group.

Returns a single concrete file’s metadata. Like GroupRecipientResolver, no instance – the file doesn’t depend on a specific target object.

class GroupRecipientResolver(*args, **kwargs)[source]

Resolve a project-wide recipient group.

Signature differs from RecipientResolver – no instance, since the slot isn’t anchored to a specific target object. The session is still threaded through so a resolver can query a directory / membership table.

class GroupRegistry[source]

Project-wide recipient / attachment group registry.

One instance per app, populated at startup – typically from generated code that mirrors the project’s recipient_groups / attachment_groups jsonnet declarations. Looked up by the user-template route handlers (to surface groups in the editor’s suggestion popover) and by render_user_template() (to expand resolver tokens whose slot isn’t owned by any per-target target).

Not thread-safe – mutate only during startup.

attachment(slot)[source]

Return the attachment group for slot, or None.

Return type:

AttachmentGroup | None

attachment_groups()[source]

Return registered attachment groups in insertion order.

Return type:

tuple[AttachmentGroup, ...]

recipient(slot)[source]

Return the recipient group for slot, or None.

Return type:

RecipientGroup | None

recipient_groups()[source]

Return registered recipient groups in insertion order.

Return type:

tuple[RecipientGroup, ...]

register_attachment(group)[source]

Add an attachment group.

Raises:

ValueError – When slot is already registered.

Return type:

None

register_recipient(group)[source]

Add a recipient group.

Raises:

ValueError – When slot is already registered.

Return type:

None

class HttpRenderer(base_url, *, theme=None, client=None, timeout=10.0)[source]

Renderer that defers to an external render microservice.

Drop-in replacement for JinjaRenderer when the project runs a separate render service (the kiln-render scaffold or any HTTP equivalent). Posts {template, context, theme} to the service’s /v1/render endpoint and maps the response onto a RenderedMessage:

  • output: "email" -> {subject, html, text} -> RenderedMessage(subject=subject, body=text, html=html).

  • output: "pdf" -> {subject, pdf_base64} -> RenderedMessage(subject=subject, body="", pdf=bytes).

The service is the source of truth for output; the renderer inspects the response shape rather than carrying a copy of the build-time contract at runtime. Discrepancies between the project’s declared CommTypeConfig output and the service’s response are surfaced by the generated render-contract.json + the render service’s tsc step, not by this client.

Parameters:
  • base_url (str) – Base URL of the render service, e.g. "http://render:8200". Typically pulled from a RENDER_URL env var by the consumer.

  • theme (dict[str, Any] | None) – Default theme payload merged into every request – {logoUrl, colors, typography, ...}. Per-send overrides are not supported on this surface yet; supply the theme once at construction. None sends no theme block.

  • client (Any | None) – An existing httpx.AsyncClient to use. None (default) creates and owns one. The owned client is closed by aclose().

  • timeout (float) – Per-request timeout in seconds. Ignored when an explicit client is supplied.

Raises:

ImportErrorhttpx is not installed. Install fsh-lib[comms-http] to use this renderer.

async aclose()[source]

Close the underlying client iff this instance owns it.

Return type:

None

async render(comm_type, context)[source]

POST /v1/render and convert the response.

The request envelope is:

{
  "template": "<comm_type.name>",
  "context":  <context.model_dump(mode="json")>,
  "theme":    <self._theme or omitted>
}

The response is one of the two shapes documented on the class. A non-2xx response or a response missing the expected keys raises RuntimeError – the caller sees the same surface as a Jinja syntax error would produce.

Return type:

RenderedMessage

class JinjaRenderer(*, autoescape=False)[source]

In-process Jinja2 renderer. The default for send_communication().

Treats CommType.subject_template and CommType.body_template as Jinja2 source strings and renders them against the context dump (model_dump(mode="json") so dates/uuids stringify the same way they would over the wire).

Trivial enough to construct inline; instances hold no state beyond the autoescape choice, which only matters for the body of HTML emails (callers building HTML bodies should pass autoescape=True).

async render(comm_type, context)[source]

Render the comm’s templates against context.

Return type:

RenderedMessage

class LoggingTransport[source]

Test/dev transport: records every send into an in-memory list.

Not async-safe across processes, obviously; the point is to give unit tests something to assert against and to give local development a no-credentials fallback. Production transports live in consumer code or third-party packages.

async send(*, message, recipient)[source]

Record (message_id, address, body) into self.sent.

Return type:

None

class MessageMixin[source]

SQLAlchemy mixin for the message table.

One row per send_communication call – represents the intent to communicate. Per-method delivery state lives on the RecipientMixin rows that point back here via RecipientMixin.message_id.

Subclass on a regular Base to materialise the table:

from fsh_lib.comms import MessageMixin

class CommMessage(Base, MessageMixin):
    __tablename__ = "comm_messages"
    id: Mapped[uuid.UUID] = mapped_column(
        Uuid, primary_key=True, default=uuid.uuid4,
    )

Storing the rendered subject/body (rather than re-rendering from context at send time) means template churn doesn’t invalidate the audit log: the message row reflects what the recipient actually saw.

The mixin owns no primary key – declare id on the consumer class (typically via a pgcraft PK plugin or an explicit UUID column) so the consumer’s PK convention wins. Same idiom as SavedViewMixin.

body: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Rendered body. Text rather than String because bodies (especially HTML email) routinely exceed the 64 KiB Postgres varchar ceiling.

comm_type: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Registry key (CommType.name). Indexed so the audit log can group by type cheaply.

context: Mapped[dict[str, Any]] = <sqlalchemy.orm.properties.MappedColumn object>

JSON dump of the validated context, kept so the row can be re-rendered or replayed if templates change.

subject: Mapped[str | None] = <sqlalchemy.orm.properties.MappedColumn object>

Rendered subject. Nullable because some methods (SMS, push) have only a body.

class NotificationPreferenceMixin[source]

SQLAlchemy mixin for the per-recipient preference table.

One row per (subject_key, comm_type, method) triple records whether that recipient wants that comm type via that method. send_communication() looks the row up via the PreferenceResolver protocol – this mixin just supplies the columns; the resolver implementation lives in consumer code (or in a generated helper).

subject_key is intentionally a string rather than a typed foreign key: a comm platform routinely targets users, accounts, org-level addresses, or external identifiers, and a typed FK would lock the table to one of those.

Like the rest of the comms mixins, the PK is left to the consumer to declare so pgcraft / consumer conventions own the column.

comm_type: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Registry key (CommType.name).

enabled: Mapped[bool] = <sqlalchemy.orm.properties.MappedColumn object>

True when the recipient consents to this (comm_type, method) combination. Default True so an absent row reads as opt-in by default; flip per consumer policy if your default is opt-out.

method: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Delivery method this preference scopes.

subject_key: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Identifier of the recipient whose preferences this row captures (typically a user id formatted as a string).

class PreferenceResolver(*args, **kwargs)[source]

Hook: gate delivery on the recipient’s per-method opt-in.

async is_enabled(*, subject_key, comm_type, method)[source]

Return True to deliver, False to suppress.

Return type:

bool

class RecipientGroup(slot, resolver, label=None, description=None)[source]

Project-wide recipient slot definition.

Registered against a GroupRegistry so any template (regardless of scope) can drop @<slot> into to / cc / bcc and the send path resolves it via resolver at send time.

slot

Stable identifier stored on the recipient token’s resolver field; same shape as a per-target slot.

resolver

Async callable returning concrete RecipientSpec rows.

label

Optional display string surfaced in the editor’s recipient picker. Defaults to "@<slot>".

description

Optional secondary line below label.

RecipientKind

How a recipient is addressed in the outbound message.

For email: maps directly to RFC 5322 To: / Cc: / Bcc: headers; transports lay the headers out themselves so cc recipients see the to/cc lists and bcc recipients don’t. For methods without header semantics (SMS, push) this is informational – the dispatch path delivers each row regardless of kind.

alias of Literal[‘to’, ‘cc’, ‘bcc’]

class RecipientMixin[source]

SQLAlchemy mixin for the recipient table.

One row per (message, method, address) triple. The pgqueuer job carries the recipient id, so the dispatch path is: job -> recipient -> message -> transport lookup.

The mixin deliberately doesn’t declare a foreign key to the consumer’s MessageMixin table – the consumer names that table, so the FK has to come from their own subclass via a sqlalchemy.ForeignKey on the message_id column. Keeping the mixin FK-free means the same class works regardless of where the consumer mounts the message table.

Like MessageMixin, the PK is left to the consumer to declare so pgcraft / consumer conventions own the column.

address: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Method-specific destination (email address, phone number, push token, …). Opaque to this module.

error: Mapped[str | None] = <sqlalchemy.orm.properties.MappedColumn object>

Truncated exception message from a failed delivery. None until the dispatch path catches an error.

kind: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

One of RecipientKind ("to" / "cc" / "bcc"). Stored as a string so adding a new addressing kind doesn’t need a migration. Email transports read this off the row to lay out the RFC 5322 headers; non-email transports typically ignore it.

message_id: Mapped[UUID] = <sqlalchemy.orm.properties.MappedColumn object>

Points back at the MessageMixin row. Consumers typically add an explicit ForeignKey in their subclass.

method: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Delivery method ("email", "sms", …). Used to look up the right transport at dispatch time.

sent_at: Mapped[datetime | None] = <sqlalchemy.orm.properties.MappedColumn object>

When the transport returned successfully. None while pending or failed.

status: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

One of DeliveryStatus’s values. Stored as a string (not a SQL enum) so adding a new state doesn’t require a migration.

class RecipientResolver(*args, **kwargs)[source]

Hook: resolve a named recipient slot against the target object.

One implementation per slot a resource declares ("assignee", "watchers", …). The signature mirrors a route handler: the resolver gets the same async session the template render is running on, plus the loaded model instance and the template’s method, and returns zero or more concrete RecipientSpec rows. kind on the returned rows is preserved so a resolver can opt to bcc itself if it needs to.

class RecipientSpec(method, address, kind='to', subject_key=None)[source]

Single recipient handed to send_communication().

method

Delivery method (must match a transport key).

address

Method-specific destination (email, phone, …).

kind

To / Cc / Bcc – gives transports the information they need to lay the message out correctly. Defaults to "to" so call sites that don’t care don’t need to set it.

subject_key

Identifier whose preferences gate delivery. None skips the preference check (e.g. transactional sends to non-user addresses like a billing inbox).

class RecipientSuggestionLiteral(address, label=None, description=None)[source]

Concrete recipient the BE has handy at describe-options time.

Lets a resource expose directory entries (account-system users, a curated mailing list, etc.) right next to the resolver groups so the author picks a specific person without retyping the address.

address

Method-specific destination (email, phone, …). Stored on the resulting literal recipient token in the editor’s saved JSON.

label

Optional display string (typically the person’s name); the FE defaults to address when omitted.

description

Optional secondary line (e.g. the address itself when label is the name).

class RecipientSuggestionResolver(slot, label=None, description=None)[source]

Late-bound recipient group surfaced in the editor combobox.

Maps to a registered RecipientResolver; the FE author drops one of these in to fan out at send time.

slot

Resolver key registered on the target (e.g. "watchers"). Stored on the resulting resolver-style recipient token in the editor’s saved JSON.

label

Optional display string; the FE defaults to "@<slot>" when omitted.

description

Optional secondary line shown beneath label in the combobox row (e.g. "everyone watching the task").

class RenderedMessage(subject, body, html=None, pdf=None)[source]

Output of a Renderer – the strings we persist.

subject

Subject line. Populated by every renderer.

body

Plain-text body. The legacy field JinjaRenderer produces and that transports already consume; HttpRenderer populates it from the render service’s text alternative for email output, or leaves it "" for PDF output.

html

HTML alternative body for email outputs, or None. JinjaRenderer never sets it; HttpRenderer sets it when the render service returns output: "email". Email transports should send HTML + plain-text as multipart/alternative.

pdf

Raw PDF bytes for PDF outputs, or None. HttpRenderer sets it when the render service returns output: "pdf"; transports treat it as a single attachment.

class RenderedUserTemplate(subject, body, recipients, cc, bcc, attachments)[source]

Output of render_user_template() – pre-send view.

Returned to the FE preview endpoint directly; the send path persists it as a MessageMixin row + a fan-out of RecipientMixin rows.

class Renderer(*args, **kwargs)[source]

Hook: turn a CommType + context into rendered strings.

The default JinjaRenderer evaluates the template strings in-process. A microservice-based renderer (e.g. a node service that compiles MJML or runs a richer template language) implements the same single method and gets dropped in via the renderer argument to send_communication() – no other code changes.

context is the validated Pydantic model; implementations call model_dump() themselves so they can pick the dump mode (json vs Python) that fits their wire format.

async render(comm_type, context)[source]

Return the rendered subject + body for this comm.

Return type:

RenderedMessage

class ResolvedRecipientAddress(address, label=None)[source]

Concrete recipient row produced by a resolver, with display label.

class TargetResolutions(variables, recipient_slots)[source]

Concrete values for a target object, ready for FE display.

Returned by resolve_target() when the editor renders against a known target – the FE drops these into pills and chips so the author sees the values that @watchers and {title} would actually expand to right now.

variables

Dump of the target’s representation Pydantic model. Same dict render_user_template() walks for variable tokens; keys are top-level field names + nested objects.

recipient_slots

Slot name → addresses each registered recipient resolver returns for this target. Empty list for slots that resolved to nothing.

class Transport(*args, **kwargs)[source]

Method-specific delivery adapter.

Implementations are free to do whatever the method requires (SMTP send, Twilio API call, FCM push, …). Raise to mark the delivery failed; return normally to mark it sent. The dispatch path stamps RecipientMixin.status and RecipientMixin.sent_at based on which path you take.

async send(*, message, recipient)[source]

Deliver message to recipient or raise.

Return type:

None

USER_TEMPLATE_COMM_TYPE = '_user_template'

Sentinel comm-type stamped on MessageMixin.comm_type for sends originating from a user-defined template. The audit row carries the template id + target id in MessageMixin.context so the audit log can distinguish developer-defined sends from user-template sends without a separate column.

class UserCommTemplateMixin[source]

SQLAlchemy mixin for the user-authored template table.

One row per saved template. Subclass on a regular Base to materialise the table:

from fsh_lib.comms import UserCommTemplateMixin

class UserCommTemplate(Base, UserCommTemplateMixin):
    __tablename__ = "user_comm_templates"

The schema mirrors the editing UI: an optional target-resource slug (None = free-form, no variables and no per-object recipient resolvers), a delivery method, tokenized subject / body / recipient lists, attachments, and bookkeeping columns. Tokens are persisted as JSON so the FE can round-trip its Slate document without a separate serialization layer.

Token formats (all stored as JSON arrays):

  • Subject / body – [{"kind": "text", "value": str}, ...] with var tokens of shape {"kind": "var", "path": str} mixed in. path is a dot-walk into the variable representation dump (e.g. "owner.email").

  • Recipients / cc / bcc – one of ``{“kind”: “literal”, “method”: str, “address”: str,

    “subject_key”: str | null}``

    (concrete address hard-coded into the template) or {"kind": "resolver", "resolver": str} (a named slot resolved at send time against the loaded target object).

  • Attachments – consumer-defined dict shape, passed through to the rendered output unchanged so transports can decide what to do with it.

The mixin owns no primary key – declare id on the consumer class (typically via a pgcraft PK plugin or an explicit Mapped[uuid.UUID] column) so the consumer’s PK convention wins. Same idiom as SavedViewMixin.

attachments: Mapped[list[dict[str, Any]]] = <sqlalchemy.orm.properties.MappedColumn object>

Consumer-defined attachment refs. Passed through to the rendered output unchanged; rendering them is the transport’s job.

bcc: Mapped[list[dict[str, Any]]] = <sqlalchemy.orm.properties.MappedColumn object>

Tokenized Bcc list.

body: Mapped[list[dict[str, Any]]] = <sqlalchemy.orm.properties.MappedColumn object>

Tokenized body.

cc: Mapped[list[dict[str, Any]]] = <sqlalchemy.orm.properties.MappedColumn object>

Tokenized Cc list.

created_by: Mapped[str | None] = <sqlalchemy.orm.properties.MappedColumn object>

Identifier of the author – typically the session subject id. Nullable so seed templates can be inserted out-of-band.

description: Mapped[str | None] = <sqlalchemy.orm.properties.MappedColumn object>

Optional author note. Shown in template-management screens.

method: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Delivery method (matches a transport key). Stored once on the row – the FE chooses the method at template-creation time and every recipient inherits it (literal recipient tokens may override per-row).

name: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Human-friendly title – shown in the picker UI.

recipients: Mapped[list[dict[str, Any]]] = <sqlalchemy.orm.properties.MappedColumn object>

Tokenized To list.

subject: Mapped[list[dict[str, Any]]] = <sqlalchemy.orm.properties.MappedColumn object>

Tokenized subject. Empty list for methods without subjects (SMS, push).

target_resource: Mapped[str | None] = <sqlalchemy.orm.properties.MappedColumn object>

Resource slug this template is scoped to, or None for a free-form template (no target object, no variables). Indexed so “templates for asset” lists are cheap.

class UserTemplateTarget(resource_slug, load, representation_serializer, representation_class, recipient_resolvers, recipient_suggestions=None, attachment_suggestions=None)[source]

Per-resource wiring for user-template renders.

Registered once per resource that opts into being a template target. representation_serializer returns the variable dump – a Pydantic model whose fields the author can pill into the subject / body. recipient_resolvers maps the named slots the author can drop into the to / cc / bcc lists.

resource_slug

User-facing slug the FE uses to scope a template (e.g. "asset"). Stored on UserCommTemplateMixin.target_resource.

load

Async (session, target_id) -> instance | None loader. Returning None from a send call surfaces as a LookupError.

representation_serializer

(instance, session) -> BaseModel (sync) or the async equivalent that produces the variable dump for instance. Both shapes are supported so kiln’s fields-driven serializers (sync, no IO) and user-supplied async builders (may load related rows) drop in without adapter glue.

representation_class

The Pydantic class returned by representation_serializer. Exposed so the FE options endpoint can surface the field schema without a second round-trip.

recipient_resolvers

Slot name -> RecipientResolver implementation.

recipient_suggestions

Optional async builder that returns the list of RecipientSuggestion rows the editor’s combobox surfaces. Empty / unset falls back to default_recipient_suggestions() – one resolver row per recipient_resolvers entry. Set this to mix in directory entries (concrete addresses pulled from the BE’s user table, etc.) alongside the resolvers.

attachment_suggestions

Optional async builder that returns the list of AttachmentSuggestion rows for the attachment lane. Unset means the lane only shows the consumer-supplied upload affordance (or nothing).

class UserTemplateTargetRegistry[source]

Registry of UserTemplateTarget entries by resource slug.

Built once at app startup – typically populated from generated code that mirrors the project’s comms_target declarations. Looked up by user-template route handlers and by send_user_template() to resolve variables and recipient slots.

Not thread-safe; mutate only during startup.

get(slug)[source]

Return the target registered under slug.

Raises:

KeyError – If slug isn’t registered.

Return type:

UserTemplateTarget

register(target)[source]

Add target to the registry.

Raises:

ValueError – When the slug is already registered.

Return type:

None

slugs()[source]

Return the registered slugs in insertion order.

Return type:

tuple[str, ...]

async default_attachment_suggestions(_target, _session, *, groups=None)[source]

Build the attachment suggestions list from project groups.

Per-resource attachment groups aren’t a concept on UserTemplateTarget – the platform only supports project-wide attachment groups via groups. Returns one resolver-style suggestion per registered attachment group.

Return type:

list[AttachmentSuggestionResolver | AttachmentSuggestionFile]

async default_recipient_suggestions(target, _session, *, groups=None)[source]

Build a suggestions list from target’s resolvers + project groups.

Used by route handlers when the consumer didn’t supply a custom UserTemplateTarget.recipient_suggestions builder. Emits one resolver row per per-target slot first, then one per project-wide group from groups so the editor’s picker surfaces both flavours in a single combobox.

Return type:

list[RecipientSuggestionResolver | RecipientSuggestionLiteral]

async load_recipients(session, recipient_cls, message_id)[source]

Return every recipient row for message_id, in insertion order.

Lightweight read helper for an audit endpoint – generated route handlers can call this without re-deriving the message_id == filter.

Return type:

list[RecipientMixin]

make_dispatch_entrypoint(*, session_factory, transports, message_cls, recipient_cls)[source]

Build the worker-side handler for DISPATCH_ENTRYPOINT.

Returns an async (job) -> None callable suitable for pgqueuer.PgQueuer.entrypoint. Per job:

  1. Decode the recipient id from job.payload.

  2. Open a session from session_factory; load the recipient and the matching message.

  3. Skip if the recipient is missing or already advanced past DeliveryStatus.PENDING (job retried after a previous success / explicit failure).

  4. Look up the transport for the recipient’s method; mark the row failed if no transport is registered.

  5. Call transport.send – mark the row sent on success or failed (with the error message) on raise.

Parameters:
Return type:

Callable[[Job], Awaitable[None]]

Returns:

Async handler the consumer registers under DISPATCH_ENTRYPOINT. Wrapped via fsh_lib.queue.instrument_entrypoint(), so it emits an OpenTelemetry consumer span per job when the fsh-lib[opentelemetry] extra is installed and runs unwrapped otherwise.

render_text_field(tokens, variables)[source]

Concatenate tokenized tokens into a plain string.

text tokens emit their value verbatim; var tokens resolve through dotted-path lookup against variables and stringify. Missing or None values render as the empty string – an unresolved variable at send time should produce a sendable message and let the FE surface a “this pill won’t fill in” warning rather than blow up the dispatch path.

Parameters:
  • tokens (Sequence[dict[str, Any]]) – Slate-encoded token list.

  • variables (dict[str, Any]) – Variable dump – the dict form of the resource’s representation Pydantic model.

Return type:

str

Returns:

Rendered field as a single string.

Raises:

ValueError – When a token’s kind is unrecognised – a sign of an old client encoding tokens the platform doesn’t know about, which is a real bug rather than something to silently swallow.

async render_user_template(template, *, session, targets, target_id=None, groups=None)[source]

Render template against the target object identified by target_id.

For an unscoped template (target_resource is None) the variable dump is empty and target_id must also be None; recipient tokens may still be literal or reference a project-wide groups slot.

For a scoped template, the target is loaded, the representation serializer runs to produce the variable dump, and resolver recipient tokens fire against the loaded instance (per-target resolvers win) or fall through to the project-wide group registry. Attachment tokens with kind: "resolver" resolve via the same group registry.

Raises:
  • ValueError – When the scope / target_id combination is inconsistent or a resolver token references a slot that isn’t registered on the target or in the project group registry.

  • LookupError – When the target object can’t be loaded.

Return type:

RenderedUserTemplate

async resolve_target(*, session, target, instance, method)[source]

Compute the variable dump + per-slot resolved recipients.

instance is the already-loaded target object – callers should use UserTemplateTarget.load upstream to fetch it (so a missing target maps to LookupError at the route boundary, not here). method drives any resolver that picks a method-specific address; templates carry method on the row itself.

The resolution maps mirror what the FE editor’s resolutions prop expects – one variables dict, one recipient-slot map. Resolvers that raise propagate; the route boundary turns them into a 422.

Return type:

TargetResolutions

async send_communication(*, session, queue, registry, comm_type, context, recipients, message_cls, recipient_cls, renderer=None, preferences=None)[source]

Validate, render, persist, and enqueue a communication.

The transactional-outbox guarantee:

  1. Validate context against the comm-type’s schema.

  2. Render the templates with renderer (defaults to JinjaRenderer).

  3. Insert one MessageMixin row.

  4. For each recipient, consult preferences (if supplied); insert a RecipientMixin row for each one that passes.

  5. Enqueue one pgqueuer job per surviving recipient under DISPATCH_ENTRYPOINT, payload = recipient id (UTF-8).

Steps 3-5 all ride session’s transaction (see fsh_lib.queue.get_queue() for how queue is bound to the same connection). Commit the session and the message, recipients, and jobs all become durable atomically; roll back and the communication never happened.

Parameters:
Return type:

UUID

Returns:

The id of the inserted MessageMixin row.

async send_user_template(*, session, queue, template, targets, message_cls, recipient_cls, target_id=None, preferences=None, groups=None)[source]

Render template against target_id and dispatch via the outbox.

Mirrors send_communication() but skips the registry + Jinja steps – the template’s tokenized fields are rendered in-process by render_user_template() and the result is persisted into the same MessageMixin / RecipientMixin tables. All three of the template’s recipient lists (to / cc / bcc) flow through one persist call so the worker delivers cc and bcc rows like any other recipient – with their RecipientMixin.kind set so an email transport can lay out the headers correctly.

The audit row’s MessageMixin.context carries template_id, template_name, target_resource, and target_id so a re-render or replay can re-fetch the same inputs. Attachment refs come along on context.attachments for transports that need them.

Parameters:
Return type:

UUID

Returns:

Id of the inserted message row.

RecipientSuggestion

Represent a union type

E.g. for int | str

AttachmentSuggestion

Represent a union type

E.g. for int | str

File storage primitives for kiln-generated FastAPI projects.

This module’s runtime dependency on boto3 is gated behind the files extra. Install with:

pip install 'kiln-generator[files]'
# or: uv add 'kiln-generator[files]'

Importing this module without the extra raises ModuleNotFoundError on import boto3 – so the gate is honest rather than lazy: either the dep is there and everything works, or it isn’t and the import surface fails fast.

A file is a binary blob (image, PDF, attachment) tracked by a metadata row in the consumer’s database and a corresponding object in S3-compatible storage. This module ships three pieces:

  • FileMixin – a pgcraft-compatible mixin supplying the six storage columns every file row needs (s3_key, content_type, size_bytes, original_filename, created_at, uploaded_at). Consumers subclass it on a pgcraft model and add a PK plugin (typically UUIDV4PKPlugin) for the id column.

  • S3Storage – a small wrapper around boto3 that exposes the three operations a presigned-upload flow actually needs: mint a presigned PUT URL, mint a presigned GET URL, delete an object. The constructor takes explicit config so it’s testable; default_storage() builds one from FSH_S3_* env vars for the common case.

  • Action functions – request_upload(), complete_upload(), download(), and delete_file(). These plug into be’s Action operation: the consumer points resource.action entries at them directly (no per-resource wrapper module). The FileMixin-typed parameters (instance for object actions, class for collection actions) match any concrete subclass via the introspector’s supertype check, so the same four functions serve every file resource.

DEFAULT_PRESIGN_TTL = 900

Presigned URL lifetime in seconds (15 min).

Long enough for a browser to PUT a multi-megabyte file over a slow connection; short enough that a leaked URL stops working before it shows up in logs anyone reads.

class DownloadResponse(**data)[source]

Response for the download action – a short-lived GET URL.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class FileMixin[source]

pgcraft mixin supplying the storage columns of a file record.

Subclass on a pgcraft-mapped model alongside a PK plugin (the plugin owns id):

from fsh_lib.files import FileMixin
from pgcraft.factory import PGCraftSimple
from pgcraft.plugins.pk import UUIDV4PKPlugin

class Attachment(Base, FileMixin):
    __tablename__ = "attachments"
    __table_args__ = {"schema": "public"}
    __factory__ = PGCraftSimple
    __plugins__ = [UUIDV4PKPlugin()]

The mixin deliberately doesn’t declare id – pgcraft’s idiom is that primary keys are plugin-owned, and declaring it on the mixin would collide with the plugin’s column at table-build time. The TYPE_CHECKING annotation below keeps file.id typed for the action helpers without committing to a column.

A row with uploaded_at is None represents a file the server has reserved a key for (and handed the client a presigned PUT URL) but whose upload hasn’t yet been confirmed. Consumers typically clear or expire these rows on a schedule.

content_type: Mapped[str | None] = <sqlalchemy.orm.properties.MappedColumn object>

MIME type the client declared at upload time, when known.

original_filename: Mapped[str | None] = <sqlalchemy.orm.properties.MappedColumn object>

Filename the client supplied; useful for Content-Disposition on download. Not used for storage – the canonical name is s3_key.

s3_key: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Object key in the storage bucket. Unique so a row maps to exactly one blob; collision is a programming error, not a race.

size_bytes: Mapped[int | None] = <sqlalchemy.orm.properties.MappedColumn object>

Object size in bytes after the upload completes. BigInteger because Integer tops out around 2 GiB and large media uploads blow past that.

uploaded_at: Mapped[datetime | None] = <sqlalchemy.orm.properties.MappedColumn object>

When the upload was confirmed. None means pending – metadata exists but the blob may or may not be in S3.

class S3Storage(bucket, region=None, endpoint_url=None, client_factory=<function client>)[source]

boto3-backed S3 client wrapper.

The constructor takes explicit config so tests can build an instance pointed at a stub or a localstack endpoint without setting env vars. default_storage() is the env-driven factory for production use.

client_factory is plumbed through so tests can inject a MagicMock instead of a real boto3.client.

property client: Any[source]

Lazily-built boto3 S3 client.

Cached so a single S3Storage instance reuses one connection pool across calls.

client_factory(**kwargs)

Create a low-level service client by name using the default session.

See boto3.session.Session.client().

delete(key)[source]

Delete the object at key.

S3’s DeleteObject is idempotent – deleting a missing key returns 204 the same as deleting an existing one – so callers don’t need to guard against double-delete races.

Return type:

None

presigned_get_url(key, *, expires_in=900)[source]

Mint a presigned GET URL for key.

Return type:

str

presigned_put_url(key, *, expires_in=900, content_type=None)[source]

Mint a presigned PUT URL for key.

When content_type is supplied, the client must send a matching Content-Type header on the PUT or S3 rejects the request – this binds the upload to the type the row was created for.

Return type:

str

class UploadRequest(**data)[source]

Body for the request-upload action.

Carries everything request_upload() needs to reserve a key and bind the presigned PUT URL to the right content type.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class UploadResponse(**data)[source]

Response for the request-upload action.

The client PUTs the file bytes to upload_url (it must send a matching Content-Type header), then calls the complete-upload action with id to flip the row out of pending state.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

async complete_upload(file, *, db)[source]

Mark file as uploaded.

Returns None so the action op emits 204 No Content – a completed upload has no useful body to return; the client already knows the id.

Issues a Core UPDATE rather than mutating the loaded ORM instance, so the persistence path is identical regardless of whether the caller’s session has autoflush quirks. Idempotent – calling twice just refreshes the timestamp.

Return type:

None

default_storage()[source]

Build an S3Storage from FSH_S3_* env vars.

Reads:

  • FSH_S3_BUCKET – bucket name (required).

  • FSH_S3_REGION – AWS region; optional, falls back to the boto3 default chain.

  • FSH_S3_ENDPOINT_URL – override for MinIO / localstack / non-AWS S3-compatible endpoints; optional.

Raises:

RuntimeError – When FSH_S3_BUCKET is not set.

Return type:

S3Storage

async delete_file(file, *, db)[source]

Cascade-delete file: remove the S3 object then the row.

Returns None so the action op emits 204 No Content – the client doesn’t need a body to know the row is gone.

S3 first because S3Storage.delete() is idempotent – a crash between the two steps leaves an orphan row, which the next delete attempt cleans up. Reversing the order would instead leak S3 objects, which are harder to find later.

Return type:

None

async download(file, *, db)[source]

Return a presigned GET URL for file.

Refuses with 404 when uploaded_at is None – the row exists but the client never confirmed the PUT, so the object may not be in S3 and a presigned URL would just 404 noisily.

Return type:

DownloadResponse

async request_upload(*, model_cls, db, body)[source]

Reserve a key and return a presigned PUT URL.

The row is created with uploaded_at=NULL; the client confirms the actual byte upload via complete_upload().

model_cls is supplied by the action handler, which detects the type[FileMixin] annotation and passes the resource’s mapped class. No per-resource factory binding needed – consumers point a resource’s action config at this function directly.

Return type:

UploadResponse

Body schema for the project-wide value-provider endpoint.

Carries the autocomplete query and an optional limit. The endpoint is intentionally single-page (no cursor): autocomplete UX narrows by typing more characters, not by paginating.

class FilterValuesRequest(**data)[source]

Common search params used by every value-provider runner.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

resolved_limit(req_limit)[source]

Clamp req_limit to [1, _MAX_LIMIT] with a sensible default.

Return type:

int

Filter-clause construction for typed Pydantic filter trees.

FilterOp

Operator keys accepted by a condition node’s op field.

Callers’ Pydantic condition models should declare op as a Literal over this set (or a subset of it).

alias of Literal[‘eq’, ‘neq’, ‘gt’, ‘gte’, ‘lt’, ‘lte’, ‘contains’, ‘starts_with’, ‘in’, ‘is_null’]

apply_filters(stmt, node, model)[source]

Build WHERE clauses from a typed filter expression.

Accepts a typed Pydantic filter model – either a single condition (with field, op, value) or a combiner (with and_ / or_ lists of nested conditions). Models that match none of these shapes are treated as a no-op and the statement is returned unchanged. None is also a no-op so call sites can invoke apply_filters(stmt, body.filter, Model) unconditionally without an outer if body.filter is not None branch.

Parameters:
  • stmt (Select) – The SQLAlchemy SELECT statement to filter.

  • node (BaseModel | None) – A Pydantic model representing the filter tree, or None to skip filtering entirely.

  • model (type) – The SQLAlchemy model class providing columns.

Return type:

Select

Returns:

The statement with WHERE clauses applied.

Filter stmt to rows where any of columns matches q.

Powers the list endpoint’s free-text search box – the typed counterpart to apply_filters(), driven by the resource’s configured search fields rather than an explicit filter tree. strategy is chosen per resource in the BE config (SearchConfig.strategy):

  • "trigram" – each column is matched two ways and OR’d: a pg_trgm similarity hit (the % operator) for fuzzy, typo-tolerant matches, plus a case-insensitive substring (ILIKE) so queries too short to form a trigram still match. Requires the pg_trgm extension. columns are the resource’s text columns.

  • "tsvector" – each column is a maintained tsvector column matched with @@ against websearch_to_tsquery. Word/lexeme search with stemming – right for prose columns. columns are tsvector column name(s); the english config here must match the one the column was built with.

q of None or blank is a no-op, and so is an empty columns, so call sites can invoke this unconditionally without an outer guard.

Parameters:
  • stmt (Select) – The SQLAlchemy SELECT statement to filter.

  • model (type) – The SQLAlchemy model class providing columns.

  • columns (Sequence[str]) – Model attribute names to match q against – text columns for trigram, tsvector columns for tsvector.

  • q (str | None) – The free-text query, or None to skip search.

  • strategy (Literal['trigram', 'tsvector']) – Matching strategy – "trigram" (default) or "tsvector".

Return type:

Select

Returns:

The statement with the search WHERE clause applied.

Create-form field helper for server-assigned identifiers.

A column whose value the database fills in – an autogenerated identifier like the 100001 in “Order #100001”, produced by pgcraft’s PGCraftAutogeneratedIdentifierColumn – must not appear as a required input on a create form. The client never supplies it; the BEFORE INSERT trigger does.

Generated create models declare such a field through autogenerated_identifier_field(). It marks the field optional and read-only and tags it with the x-autogenerated OpenAPI extension, mirroring the x-* convention in fsh_lib.openapi: the FE codegen layer reads the extension and renders the field as a server-assigned, non-editable value instead of a form input.

X_AUTOGENERATED = 'x-autogenerated'

OpenAPI schema-extension key carrying the pgcraft identifier name (the “enum” key) that produces a field’s value. Present on every field built by autogenerated_identifier_field().

autogenerated_identifier_field(*, name, description=None)[source]

Build a pydantic field for a server-assigned identifier.

The returned field is optional (defaults to None, so a create request may omit it) and read-only, and carries the X_AUTOGENERATED extension so FE codegen can label it with the originating counter and suppress the input control.

Use it as the default of a create-model field:

class OrderCreate(BaseModel):
    customer: str
    order_no: str | None = autogenerated_identifier_field(
        name="order",
    )
Parameters:
  • name (str) – The pgcraft identifier-counter name (the “enum” key) that produces this value. Surfaced under X_AUTOGENERATED so the FE can label the field.

  • description (str | None) – Optional human-readable description for the generated JSON schema.

Return type:

Any

Returns:

A pydantic FieldInfo – typed Any so it can sit directly as a model-field default – declaring the field optional, read-only, and tagged x-autogenerated.

Server-driven cache invalidation for kiln-generated apps.

Kiln-generated mutations don’t tell the FE what to invalidate – the BE does, via the X-Invalidate-Queries response header. Handlers grab a QueryInvalidations collector through FastAPI’s dependency system and call invalidations.add(key) for each TanStack queryKey the FE should drop after the call succeeds; the collector serializes the running list onto the header.

Generated CRUD handlers wire this dependency automatically and register the resource’s own cache_key, so the typical case needs no hand-coding. The helper is exposed for hand-written handlers that need to opt in or extend the auto-generated set:

from typing import Annotated

from fastapi import APIRouter, Depends
from fsh_lib.invalidation import QueryInvalidations

router = APIRouter(prefix="/projects")


@router.delete("/{id}")
async def delete_project(
    id: int,
    invalidations: Annotated[
        QueryInvalidations, Depends(QueryInvalidations)
    ],
) -> None:
    # ...delete the row...
    invalidations.add_all("projects")  # list + every detail
    # or, narrower: invalidations.add_one("projects", id)
class QueryInvalidations(response)[source]

Per-request collector for TanStack queryKeys to invalidate.

Construct via FastAPI dependency injection (Depends(QueryInvalidations)); the framework hands us the request’s mutable Response so each add_* call updates the response header eagerly.

Two scopes:

  • add_all() – “blow everything for this resource”. The FE side prefix-matches and drops list caches plus every per-id detail in one shot. Right for create / delete / anything that affects multiple list rows.

  • add_one() – “blow this specific id only”. Just the detail cache for that row; lists are left alone. Useful when you’ve changed one row’s representation but no list could be filtering on the changed field.

add_all(cache_key)[source]

Blow every cache rooted at cache_key.

Emits the queryKey [cache_key]; TanStack invalidates anything with that prefix – list caches, every [cache_key, id] detail cache, etc.

Return type:

None

add_one(cache_key, key_id)[source]

Blow only the [cache_key, key_id] detail cache.

Return type:

None

QueryKey = str | int | bool | float | None

One segment of a TanStack queryKey. TanStack accepts arbitrary JSON, but in practice keys are arrays of these scalars; we restrict the type so the header stays small and predictable.

A short link maps a compact, URL-safe code to a longer target_url. Useful for fitting links into SMS bodies (the 160-character single-segment limit is unforgiving) and for emitting click counts on outbound comms.

Shortening is explicit: the producer calls shorten() to swap a long URL for a short one before assembling the message body (e.g. just before send_communication()). There is intentionally no auto-applied Jinja filter – the row write happens at a predictable point, and shortened URLs that never get sent (because PreferenceResolver filters the recipient out, or the caller’s transaction rolls back) don’t leak into the table.

The module ships three primitives, following the same pgcraft- flavoured idiom as fsh_lib.files – consumer owns the table, we own the columns:

  • ShortLinkMixin – pgcraft-compatible mixin supplying the storage columns (code, target_url, click_count). created_at is managed by pgcraft’s TimestampPlugin, which PGCraftSimple auto-adds; id is plugin-owned (the consumer attaches e.g. UUIDV4PKPlugin), matching the FileMixin pattern.

  • shorten() – producer entry point. A single INSERT ... ON CONFLICT DO NOTHING RETURNING code does both the dedup (“shorten X twice, get the same code back” because target_url is UNIQUE) and the code-collision check in one round trip – no SELECT-then-INSERT race window. Returns the full short URL {base_url}/{code}.

  • resolve() – redirect-handler helper. Single UPDATE ... RETURNING atomically increments ShortLinkMixin.click_count and returns the row’s target_url (or None for unknown codes). Consumers wire it into a 5-line FastAPI route:

    from fastapi import APIRouter, HTTPException
    from fastapi.responses import RedirectResponse
    from fsh_lib.links import resolve
    
    router = APIRouter()
    
    @router.get("/l/{code}")
    async def follow(code: str, db: AsyncSession = Depends(get_db)):
        url = await resolve(model_cls=ShortLink, db=db, code=code)
        if url is None:
            raise HTTPException(status_code=404)
        return RedirectResponse(url, status_code=302)
    
DEFAULT_CODE_LENGTH = 7

Default short-code length.

Seven base26 (lowercase-letter) characters give 26**7 ≈ 8.0e9 possible codes. At a million rows the per-insert collision rate is ≈ 1.2e-4 – low enough that shorten()’s retry loop is overwhelmingly a no-op.

Tune via the code_length argument when shorter codes are needed (smaller keyspace, higher collision rate – a 5-char code with 100k rows collides ≈ 0.8% of the time, well within MAX_CODE_RETRIES) or when longer is acceptable.

MAX_CODE_RETRIES = 5

How many times shorten() retries a colliding code.

A retry triggers when INSERT ... ON CONFLICT DO NOTHING returns no row and the follow-up SELECT confirms it was the code collision (not the target_url dedup). With default-length codes the loop is virtually never entered; the bound exists so a saturated keyspace (very small code_length, many rows) raises loudly instead of looping forever.

class ShortLinkMixin[source]

pgcraft mixin supplying the storage columns of a short link.

Subclass on a pgcraft-mapped model alongside a PK plugin (the plugin owns id):

from fsh_lib.links import ShortLinkMixin
from pgcraft.factory import PGCraftSimple
from pgcraft.plugins.pk import UUIDV4PKPlugin

class ShortLink(Base, ShortLinkMixin):
    __tablename__ = "short_links"
    __factory__ = PGCraftSimple
    __plugins__ = [UUIDV4PKPlugin()]

Like fsh_lib.files.FileMixin, the mixin deliberately doesn’t declare id – the consumer’s PK plugin owns it. created_at is also pgcraft-owned: PGCraftSimple auto-adds TimestampPlugin, which injects created_at with server_default=now().

Both code and target_url are UNIQUE. The code uniqueness lets shorten() retry on the rare random-code collision; the target_url uniqueness collapses same-URL dedup into the same atomic INSERT (no SELECT-then- INSERT race).

click_count: Mapped[int] = <sqlalchemy.orm.properties.MappedColumn object>

Times resolve() has served this row. BigInteger so popular links don’t wrap in any realistic horizon.

code: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

The short code. UNIQUE so shorten() can catch the rare random-code collision via ON CONFLICT DO NOTHING and retry with a fresh code.

target_url: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

The URL the short link redirects to. UNIQUE so a repeat shorten() for the same URL bounces off the constraint and returns the existing code rather than racing to insert a duplicate row.

class ShortenRequest(**data)[source]

Request body for shorten_action().

Field shape matches the dotted-path pattern be’s introspector expects for a collection-scoped action body.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ShortenResponse(**data)[source]

Response from shorten_action().

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

default_base_url()[source]

Return the public origin + redirect prefix from env.

Reads LINK_BASE_URL (e.g. "https://l.example.com/l") – the value shorten() joins with the random code to form {base}/{code}. Mirrors fsh_lib.files.default_storage() in shape: the action’s resource lookup is env-driven so the same generated handler works across environments.

Raises:

RuntimeError – when LINK_BASE_URL is not set.

Return type:

str

generate_code(length=7)[source]

Return a random lowercase-letter code of length characters.

Uses secrets.choice() – code generation is security- adjacent (a guessable code lets an attacker enumerate link targets), so the CSPRNG matters even though the keyspace is large for default lengths.

Raises:

ValueError – When length is less than 1.

Return type:

str

async resolve(*, model_cls, db, code)[source]

Return the target URL for code, or None if unknown.

Atomically increments ShortLinkMixin.click_count and returns the row’s target_url in a single UPDATE ... RETURNING – the click counter and the lookup can’t drift even under concurrent redirects.

Parameters:
  • model_cls (type[ShortLinkMixin]) – The consumer’s short-link model class.

  • db (AsyncSession) – Async SQLAlchemy session. The caller commits – if the surrounding transaction rolls back, the click increment is rolled back with it, which is the right behaviour (a failed redirect didn’t actually serve the link).

  • code (str) – The short code from the request path.

Return type:

str | None

Returns:

The target URL on hit, None when code is unknown.

async shorten(*, model_cls, db, target_url, base_url, code_length=7)[source]

Return a short URL for target_url.

Reuses the existing row for target_url when one is found – “shorten X twice, get the same code back”. The dedup rides the UNIQUE (target_url) constraint so it’s race-free: two concurrent shortens of the same URL can’t both insert, and the loser’s INSERT bounces off the unique violation rather than racing past a stale SELECT.

Parameters:
  • model_cls (type[ShortLinkMixin]) – The consumer’s short-link model class (must mix in ShortLinkMixin).

  • db (AsyncSession) – Async SQLAlchemy session for the insert. The caller commits.

  • target_url (str) – Long URL to shorten.

  • base_url (str) – Base URL of the redirect endpoint, without a trailing slash. The returned short URL is {base_url}/{code}. Typically a short domain the redirect route is mounted under, e.g. "https://l.example.com".

  • code_length (int) – Number of base26 letters to generate when inserting a new row. Defaults to DEFAULT_CODE_LENGTH. Has no effect when the dedup hits – the existing row’s code is returned regardless.

Returns:

{base_url}/{code}.

Return type:

str

Raises:

RuntimeError – If MAX_CODE_RETRIES random codes in a row collided with existing rows – almost certainly means code_length is too small for the current row count.

async shorten_action(*, model_cls, db, body)[source]

Collection-scoped action wrapping shorten().

Wired into a generated POST route by be’s action codegen – the type[ShortLinkMixin] annotation matches any concrete subclass via the introspector’s supertype check, so the same function serves every consumer’s short-link model. Mirrors fsh_lib.files.request_upload()’s shape.

base_url comes from default_base_url() (i.e. the LINK_BASE_URL env var) so the same generated handler works across dev / staging / prod without scaffold-time configuration.

Return type:

ShortenResponse

Eager-load application for list endpoints.

apply_eager_loads() is the companion of fsh_lib.ordering.apply_ordering(): it turns a representation’s EagerLoad plan into select(...).options(...) loader options. The one piece of coordination is joined – the set of relationships apply_ordering already LEFT-joined for an ORDER BY. A top-level relationship in that set is loaded with contains_eager (reuse the existing join) instead of being fetched a second time by selectinload / joinedload.

Generated list handlers wire the two together explicitly:

stmt = select(Task)
stmt, joined = apply_ordering(stmt=stmt, sort_clauses=body.sort, ...)
stmt = apply_filters(stmt=stmt, node=body.filter, model=Task)
stmt = apply_eager_loads(
    stmt, [EagerLoad(Task.project, "joined")], joined=joined
)
class EagerLoad(attr, strategy=EagerStrategy.SELECTIN, children=())[source]

One relationship to eager-load, with its strategy.

attr

The relationship attribute, e.g. Task.project.

strategy

The EagerStrategy to load it with. Overridden by contains_eager for a top-level relationship a sort already joined.

children

Nested eager loads one level deeper, e.g. a project’s own owner. Their strategy is always honoured – contains_eager reuse only applies to a top-level relationship a sort actually joined.

class EagerStrategy(*values)[source]

Eager-loading strategy for an EagerLoad.

The three relationship loader strategies that make sense for a representation dump. Each member’s value is the string token SQLAlchemy’s relationship(lazy=...) accepts – a precise subset of SQLAlchemy’s (private) _LazyLoadArgumentType.

contains_eager is deliberately not a member: it isn’t a per-field choice but is applied automatically by apply_eager_loads() when a sort already joined the relationship.

JOINED = 'joined'

LEFT JOIN – right for scalar to-one relationships.

SELECTIN = 'selectin'

Separate WHERE ... IN query – safe for collections.

SUBQUERY = 'subquery'

Separate query correlated by subquery.

apply_eager_loads(stmt, loads, joined=None)[source]

Apply eager-load options to a SELECT, reusing ordering’s joins.

Parameters:
Return type:

Select

Returns:

The statement with .options(...) applied.

kiln’s openapi-extension keys + their runtime constructor.

Routes the BE generates carry two custom extensions consumed by the FE codegen layer:

  • X_CACHE_KEY ("x-cache-key") – string identifying the TanStack Query key root the FE should use for this route’s response cache. Only emitted on read routes (any GET, plus the POST search endpoints that are semantically reads). Its presence is the read signal: the FE generates useQuery / useSuspenseQuery wrappers for any GET or any route carrying a cache key, and treats everything else as a mutation – so no separate “this is a query” extension is needed.

  • X_RESOURCE ("x-resource") – the resource’s singular slug (the model’s snake name, e.g. "task" for the tasks resource). Stamped on every CRUD route (reads and writes alike, unlike X_CACHE_KEY). It’s the canonical resource identity: the FE groups a resource’s routes by it (to discover the CRUD SDK fns), and derives the saved-view resource_type discriminator + default singular display label from it. The plural cache namespace is the separate X_CACHE_KEY (e.g. "tasks"); a read route carries both, which is how the FE bridges its plural config key to this singular identity.

  • X_AUTH_ROLE ("x-auth-role") – "login" / "validate" / "logout" on the three auth-router routes. Lets the FE derive the auth SDK fns (and, off them, the session + credentials types) instead of naming them in fe.jsonnet.

construct_openapi_extra() builds a dict suitable for FastAPI’s openapi_extra= kwarg. Generated handlers call it inline so the extension keys live in exactly one place (this module). Hand-written handlers can use it the same way.

construct_openapi_extra(*, cache_key=None, resource=None, auth_role=None)[source]

Build the openapi_extra payload for a kiln-generated route.

Parameters:
  • cache_key (str | None) – TanStack Query key root for the route’s response cache. None skips the X_CACHE_KEY entry (right for write routes, which don’t seed any cache). Its presence doubles as the read signal – the FE treats any GET or any cache-keyed route as a query, so a read POST (a search endpoint) just sets this.

  • resource (str | None) – The resource’s singular slug (model snake name, e.g. "task"). None skips the X_RESOURCE entry. Stamped on both read and write routes; the canonical resource identity the FE groups routes by and derives the saved-view discriminator + label from.

  • auth_role (str | None) – "login" / "validate" / "logout" on an auth-router route. None skips the X_AUTH_ROLE entry. Lets the FE derive its auth SDK fns and the session / credentials types.

Return type:

dict[str, object]

Returns:

A dict suitable for @router.<method>(..., openapi_extra=...). Empty when no argument is set.

ORDER BY application from typed Pydantic sort clauses.

apply_ordering(stmt, sort_clauses, model, default_field, default_dir='asc')[source]

Apply one or more sort clauses to a SELECT statement.

Each clause is a Pydantic model with field (an enum whose .value is the sort field) and dir (SortDirection).

A sort field naming a column sorts by that column. A field naming a (to-one) relationship"vendor" on a model with a vendor many-to-one – is intuited as the related row’s representative column (vendor.name, falling back to its primary key), with the related table LEFT-joined in so rows with a null relationship still appear. The sort field stays the bare relationship name end to end – callers never see or send "vendor.name".

When sort_clauses is None or empty, the default field and direction are used.

Parameters:
  • stmt (Select) – The SQLAlchemy SELECT statement to sort.

  • sort_clauses (Sequence[BaseModel] | None) – List of sort clause models, or None.

  • model (type) – The SQLAlchemy model class providing columns.

  • default_field (str) – Field to sort by when no clauses are provided.

  • default_dir (Literal['asc', 'desc']) – Direction for the default sort.

Return type:

tuple[Select, set[RelationshipProperty]]

Returns:

(stmt, joined) – the statement with ORDER BY applied, and the set of relationships LEFT-joined to resolve a relationship sort field. Hand the joined set to fsh_lib.loading.apply_eager_loads() so an eager load of one of those relationships reuses the join (contains_eager) instead of fetching it again.

Keyset and offset pagination helpers.

apply_keyset_pagination(stmt, model, cursor, cursor_field, page_size, max_page_size)[source]

Apply keyset (cursor-based) pagination to a SELECT.

Adds a WHERE cursor_field > cursor clause when a cursor is provided, clamps page_size, and adds LIMIT page_size + 1 (the extra row detects whether more results exist).

Parameters:
  • stmt (Select) – The SQLAlchemy SELECT statement.

  • model (type) – The SQLAlchemy model class providing columns.

  • cursor (Any) – The cursor value (already cast to the correct type), or None.

  • cursor_field (str) – Name of the cursor column.

  • page_size (int) – Requested page size.

  • max_page_size (int) – Maximum allowed page size.

Return type:

tuple[Select, int]

Returns:

(paginated_stmt, effective_page_size) tuple. The caller is responsible for executing paginated_stmt.

apply_offset_pagination(stmt, offset, limit, max_page_size)[source]

Apply offset pagination to a SELECT.

Clamps limit to max_page_size, applies OFFSET / LIMIT, and builds a companion COUNT(*) statement so the caller can fetch the total alongside the page.

Parameters:
  • stmt (Select) – The SQLAlchemy SELECT statement (without offset/limit applied).

  • offset (int) – Number of rows to skip.

  • limit (int) – Requested page size.

  • max_page_size (int) – Hard ceiling on limit.

Return type:

tuple[Select, Select, int]

Returns:

(paginated_stmt, count_stmt, effective_limit) tuple. The caller is responsible for executing both statements.

Client for a Rego (OPA) permissions service.

A kiln permissions service is a stateless Open Policy Agent deployment: it carries the policy (the generic RBAC engine plus any custom rules) but no data. The roles and role bindings live in the backend’s own database; this module ships them – together with the subject, the action, and the resource – to OPA on every call.

Three query modes:

  • Point check – a decision about one resource (get / create / update / delete). OpaClient.check() POSTs the decision input to /v1/data/<opa_package>/decision and returns a Decision.

  • Bulk check – many (action, resource) decisions for one subject in a single round-trip (the actions-envelope case: every row of a list page x every action). OpaClient.check_many() POSTs to /v1/data/<opa_package>/decisions and returns one Decision per item.

  • List filter – a collection request where every candidate row must be checked. OpaClient.compile_filter() asks OPA to partially evaluate the allow rule with the resource left unknown (the /v1/compile API). OPA returns residual conditions on the resource; FilterResult.to_sqlalchemy() turns them into a SQLAlchemy WHERE clause the backend folds into the list query. One round-trip filters the whole collection – no per-row check.

Requires the opa extra (pip install 'kiln-generator[opa]') for httpx.

Example – point check:

client = OpaClient("http://opa:8181", opa_package="authz")
decision = await client.check(
    subject=Subject("user", "alice"),
    action="task:update",
    resource=ResourceRef("Task", "t-1", {"created_by": "alice"}),
    roles={"editor": {"permissions": ["task:read", "task:update"]}},
    bindings=[RoleBinding(Subject("user", "alice"), "editor")],
)
if not decision.permit:
    raise HTTPException(status_code=403)

Example – list filter:

result = await client.compile_filter(
    subject=Subject("user", "alice"),
    action="task:list",
    resource_type="Task",
    roles=roles,
    bindings=bindings,
)
stmt = select(Task).where(result.to_sqlalchemy({"id": Task.id}))
class Condition(field, op, value, negated=False)[source]

One residual constraint on a resource field.

A Condition is the translated form of a single comparison OPA left unresolved when it partially evaluated the policy – e.g. input.resource.id == "t-1" becomes Condition("id", "eq", "t-1").

field

The resource field the constraint is on – the path after input.resource., dotted for a nested field.

op

The comparison: "eq", "ne", "lt", "le", "gt", "ge", or "in".

value

The literal the field is compared against (a list for "in").

negated

True when OPA emitted the expression negated.

class Decision(permit, allow, deny, raw)[source]

The answer a point permissions check returns.

permit

The bottom line. True iff some grant applies and no veto overrides it – the value the backend gates on.

allow

True iff a grant applied (generic RBAC or a custom allow rule), before vetoes.

deny

True iff a custom deny rule vetoed the request.

raw

The full result object the service returned, for logging or richer custom decisions.

classmethod from_result(result)[source]

Build a Decision from a decision result object.

Missing keys default to the safe value (False): a service that answers with a partial document is treated as a denial rather than a silent allow.

Return type:

Decision

class FilterResult(always_allow, always_deny, conjunctions)[source]

A compiled list policy: which rows a subject may see.

The outcome of partially evaluating the allow rule with the resource unknown. It is one of three shapes:

  • always allow – no constraint; every row passes.

  • always deny – the policy can never hold; no row passes.

  • conditionalconjunctions is an OR of ANDs of Condition objects (disjunctive normal form, the shape OPA’s /v1/compile returns).

always_allow

Every row passes; to_sqlalchemy() returns a tautology.

always_deny

No row passes; to_sqlalchemy() returns a contradiction.

conjunctions

OR-of-ANDs residual. Each inner tuple is a conjunction of Condition objects; a row passes when it satisfies any conjunction.

to_sqlalchemy(columns)[source]

Render the residual as a SQLAlchemy boolean clause.

Parameters:

columns (Mapping[str, ColumnElement[Any]]) – Maps a resource field name (as it appears in Condition.field) to the SQLAlchemy column it constrains, e.g. {"id": Task.id}.

Returns:

a tautology when always_allow, a contradiction when always_deny, otherwise the OR-of-ANDs.

Return type:

ColumnElement[bool]

Raises:

OpaError – A residual constrains a field absent from columns.

class OpaClient(base_url, *, opa_package='authz', client=None, timeout=5.0)[source]

Async client for a kiln Rego permissions service.

Wraps an httpx.AsyncClient pointed at an OPA decision server. Construct one per application (it is cheap to keep open) and close it on shutdown via aclose(), or use it as an async context manager.

Parameters:
  • base_url (str) – Base URL of the OPA server, e.g. "http://opa:8181".

  • opa_package (str) – Rego package of the decision entrypoint. The point-check path is /v1/data/<opa_package>/decision; the list filter compiles data.<opa_package>.allow. Defaults to "authz".

  • client (AsyncClient | None) – An existing httpx.AsyncClient to use. When None (the default) the OpaClient creates and owns one, and aclose() closes it.

  • timeout (float) – Per-request timeout in seconds. Ignored when an explicit client is supplied.

async aclose()[source]

Close the underlying client iff this instance owns it.

Return type:

None

async check(*, subject, action, resource, roles, bindings)[source]

Ask the service whether subject may take action.

A point check – one decision about one resource.

Parameters:
  • subject (Subject) – The principal making the request.

  • action (str) – The permission string being checked, e.g. "task:update".

  • resource (ResourceRef) – The resource the action concerns.

  • roles (Mapping[str, Any]) – The role catalogue – {role_name: {"permissions": [...]}}. The backend loads this from its own store; only the roles relevant to bindings need be present.

  • bindings (Sequence[RoleBinding]) – The subject’s role bindings, loaded by the backend from its own store.

Return type:

Decision

Returns:

The service’s Decision.

Raises:

OpaError – The service was unreachable, timed out, or answered with a non-2xx status.

async check_many(*, subject, roles, bindings, items)[source]

Decide a batch of (action, resource) checks in one call.

Every item shares one subject, roles, and bindings – the actions-envelope case: one user, many row x action checks for a whole list page. The service evaluates the same per-one policy for each item, so a page costs a single OPA round-trip rather than one call per item.

Parameters:
Return type:

list[bool]

Returns:

One permit verdict per item, in the same order as items. The bulk entrypoint returns the bottom-line permit only – use check() for the full allow / deny breakdown.

Raises:

OpaError – The service was unreachable, timed out, or answered with a non-2xx status.

async compile_filter(*, subject, action, resource_type, roles, bindings, unknowns=('input.resource.id',))[source]

Compile the list policy into a row filter for resource_type.

Asks OPA to partially evaluate data.<opa_package>.allow with the resource left unknown, then translates the residual into a FilterResult. The resource type is supplied as a known value so type-scoped and global bindings resolve fully and only instance-level constraints survive into the residual.

Parameters:
  • subject (Subject) – The principal making the request.

  • action (str) – The collection action, e.g. "task:list".

  • resource_type (str) – The resource type being listed, e.g. "Task". Passed as a known value so the residual is purely about instance fields.

  • roles (Mapping[str, Any]) – The role catalogue (see check()).

  • bindings (Sequence[RoleBinding]) – The subject’s role bindings.

  • unknowns (Sequence[str]) – The input paths OPA treats as symbolic. Defaults to ("input.resource.id",) – enough for generic RBAC. Extend it (e.g. with "input.resource.created_by") when a custom rule constrains other resource fields.

Return type:

FilterResult

Returns:

The compiled FilterResult.

Raises:

OpaError – The service failed, or returned a residual the translator cannot represent as a SQL filter.

exception OpaError[source]

A permissions-service call failed.

Raised when the service is unreachable, times out, answers with a non-2xx status, or returns a residual the translator cannot turn into a SQL filter. The caller decides whether an error means “deny” (fail-closed, the safe default for authz) or “allow” (fail-open) – this module never silently picks one.

class ResourceRef(type, id=None, attributes=<factory>)[source]

The resource a decision concerns.

type

Resource type, e.g. "Task". Matched against the permission catalogue and against object-scoped bindings.

id

Resource instance id, or None for a collection-level decision (a list / create where no single instance exists yet).

attributes

Extra fields folded into input.resource so custom Rego rules can read them – e.g. {"created_by": "alice"} for an ownership rule. type and id always take precedence over a same-named attribute key.

as_input()[source]

Return the input.resource object the Rego engine reads.

Return type:

dict[str, Any]

class RoleBinding(subject, role, scope=None)[source]

A role bound to a subject, optionally scoped to a resource.

subject

The principal the role is granted to.

role

Name of the granted role. Must be a key of the roles catalogue passed to the query method.

scope

None for a global binding (applies to every resource); a ResourceRef with no id for a type-scoped binding (every instance of a type); a ResourceRef with an id for an instance-scoped binding.

as_input()[source]

Return the binding object one input.bindings entry expects.

Return type:

dict[str, Any]

class Subject(type, id)[source]

The principal a decision is made for.

type

"user" or "token". A binding for one kind never satisfies a request for the other – the RBAC engine matches on the pair.

id

Stable identifier of the principal (a user id, a token id / jti).

as_input()[source]

Return the {"type", "id"} pair the Rego engine reads.

Return type:

dict[str, str]

pgqueuer integration for kiln-generated FastAPI projects.

Three helpers — that’s the whole surface be contributes to the queue story. Everything else (worker run loop, @entrypoint, CLI) is pgqueuer’s own; use it directly per the upstream docs.

  • get_queue() (producer) — wraps the asyncpg connection underlying a SQLAlchemy AsyncSession so jobs enqueue inside the same transaction as the request’s other writes. The job becomes durable when the session commits; if the session rolls back, the job is gone. This is the transactional-outbox pattern, and it’s the one piece pgqueuer doesn’t ship.

  • open_worker_driver() (worker bootstrap) — opens a dedicated asyncpg connection from a DSN, coercing SQLAlchemy’s postgresql+asyncpg:// URL form to plain postgresql:// so the same env var works for both sides. Use as the outermost async with in your pgqueuer factory.

  • instrument_entrypoint() (worker observability) — wraps an entrypoint coroutine in an OpenTelemetry consumer span when the fsh-lib[opentelemetry] extra is installed, and returns it untouched otherwise. Safe to call unconditionally from a worker factory; the fsh_lib-supplied dispatch entrypoints apply it themselves.

The two connection helpers assume the database is reached through the asyncpg driver. Other drivers (psycopg, etc.) would need a parallel shim and are not supported today.

async get_queue(session)[source]

Return a pgqueuer.Queries bound to session’s connection.

Calls to await queue.enqueue(...) issue SQL on the same asyncpg connection the SQLAlchemy session is using, so they join the session’s transaction. Commit the session and the job is durable; roll back and it never existed.

Parameters:

session (AsyncSession) – A SQLAlchemy async session backed by the asyncpg driver. The session is checked out from a connection so the underlying asyncpg.Connection can be unwrapped.

Return type:

Queries

Returns:

A pgqueuer.Queries whose driver wraps the session’s asyncpg connection.

instrument_entrypoint(entrypoint, handler)[source]

Wrap a pgqueuer entrypoint handler in an OpenTelemetry span.

Returns handler decorated with fsh_lib.telemetry.traced_entrypoint() when OpenTelemetry is importable, and handler unchanged when it is not — so a worker factory can call this unconditionally without caring whether the consumer installed the fsh-lib[opentelemetry] extra.

Importable is not the same as configured: when OTel is installed but the app never builds a TracerProvider, the wrapped handler emits no-op spans at negligible cost. Real spans start appearing once the consumer’s init_telemetry runs — so it is safe to instrument an entrypoint before deciding whether a given deployment exports traces at all.

Parameters:
  • entrypoint (str) – The pgqueuer entrypoint (== queue) name the handler is registered under — names the span and is attached as the messaging.destination.name attribute.

  • handler (Callable[..., Awaitable[None]]) – The async (job) -> None coroutine pgqueuer runs for that entrypoint.

Return type:

Callable[..., Awaitable[None]]

Returns:

Either the span-wrapped handler or handler itself; both keep the (job) -> None shape PgQueuer.entrypoint expects.

open_worker_driver(dsn)[source]

Open a dedicated worker connection and yield an AsyncpgDriver.

Workers need a long-lived connection of their own so pgqueuer can LISTEN for new-job notifications on it. Use this as the outermost async with in the factory you hand to pgq run, and shape the factory itself as an asynccontextmanager() so pgqueuer’s run_factory keeps the connection alive for the lifetime of the worker – a plain async def main() -> PgQueuer exits this async with before pgqueuer can use the driver and the supervisor crashes with InterfaceError: connection is closed:

from contextlib import asynccontextmanager

from fsh_lib.queue import open_worker_driver
from pgqueuer import PgQueuer

@asynccontextmanager
async def main():
    async with open_worker_driver(
        os.environ["DATABASE_URL"],
    ) as driver:
        pgq = PgQueuer(driver)

        @pgq.entrypoint("ping")
        async def ping(job): ...

        yield pgq
Parameters:

dsn (str) – A PostgreSQL DSN. Either plain (postgresql://user:pw@host/db) or SQLAlchemy-shaped (postgresql+asyncpg://...) — the latter is rewritten so the same env var works for both the request path and the worker.

Yields:

An AsyncpgDriver wrapping the freshly-opened connection.

Rate-limiting primitives for kiln-generated FastAPI projects.

This module’s runtime dependency on slowapi and limits is gated behind the rate-limit extra. Install with:

pip install 'kiln-generator[rate-limit]'
# or: uv add 'kiln-generator[rate-limit]'

The pieces:

  • RateLimitBucketMixin – a SQLAlchemy mixin supplying the three columns every counter row needs (key, hits, expires_at). Same idiom as fsh_lib.files.FileMixin: the consumer subclasses it on their own model so they own the table and we own the columns.

  • PostgresStorage – a limits-compatible synchronous storage backend backed by a small dedicated SQLAlchemy engine. slowapi’s enforcement path calls limiter.hit(...) synchronously (not awaited) so an async storage cannot satisfy it; we use a separate sync engine targeting the same Postgres database the rest of the app talks to.

  • build_limiter() – factory that constructs a slowapi slowapi.Limiter and wires our PostgresStorage in as its backing store, swapping out the placeholder memory:// storage slowapi creates internally.

  • default_key_func() – the per-request rate-limit key callable used by default (client IP).

class PostgresStorage(*, model, session_maker)[source]

limits-compatible storage backed by a sync Postgres engine.

slowapi’s enforcement path is synchronous (limiter.hit(...) is not awaited), so an async storage cannot satisfy it. This class uses a dedicated synchronous SQLAlchemy engine pointed at the same Postgres database as the rest of the app – separate connection pool, same data.

The counter row is upserted with Postgres INSERT ... ON CONFLICT DO UPDATE: a hit on a fresh window inserts a row; a hit on an active window increments hits; a hit on an expired window resets hits to amount and shifts expires_at forward.

STORAGE_SCHEME: list[str] | None = ['postgres-rate-limit']

URI scheme this storage registers under. Not used for instantiation – build_limiter() constructs the storage directly and patches it onto the slowapi limiter – but limits requires the attribute on every storage subclass.

Declared as an instance attribute (not ClassVar) to mirror the base Storage class – limits annotates it as such and a ClassVar override would conflict at type-check time.

property base_exceptions: Any

Exception class(es) limits should treat as storage failures.

The limits base class types this as type[Exception] | tuple[type[Exception], ...]; we narrow to a single class and annotate Any here to keep autodoc from cross-referencing type (which collides with three unrelated type: discriminator fields in the be schema). Returns SQLAlchemyError.

check()[source]

Return whether the storage is reachable.

limits calls this opportunistically when a previous call raised; we keep it cheap by issuing SELECT 1 rather than touching the bucket table.

Return type:

bool

clear(key)[source]

Delete the counter row for key (no-op when absent).

Return type:

None

get(key)[source]

Return the current counter value for key (0 when stale).

Return type:

int

get_expiry(key)[source]

Return the window expiry for key as a UNIX timestamp.

limits treats a value in the past as “no active window”.

Return type:

float

incr(key, expiry, amount=1)[source]

Increment key by amount, opening a fresh window when stale.

Parameters:
  • key (str) – Rate-limit key.

  • expiry (int) – Window duration in seconds.

  • amount (int) – Increment step (defaults to 1).

Return type:

int

Returns:

The new counter value after the increment.

reset()[source]

Delete every counter row. Returns the number deleted.

Return type:

int | None

class RateLimitBucketMixin[source]

SQLAlchemy mixin supplying the columns of a rate-limit bucket.

Subclass on a regular SQLAlchemy Base to carry the storage columns:

from fsh_lib.rate_limit import RateLimitBucketMixin

class RateLimitBucket(Base, RateLimitBucketMixin):
    __tablename__ = "rate_limit_buckets"

Unlike fsh_lib.files.FileMixin, the natural primary key here is key itself (the limit identifier produced by the key_func plus the limit string). Declaring it primary_key=True means the consumer doesn’t need to bring their own PK plugin to use this mixin.

The consumer is responsible for migrating the table; be doesn’t generate Alembic migrations.

expires_at: Mapped[datetime] = <sqlalchemy.orm.properties.MappedColumn object>

When the current window ends. Rows with expires_at < now() are stale and reset on the next hit.

hits: Mapped[int] = <sqlalchemy.orm.properties.MappedColumn object>

Counter value for the current window.

key: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Rate-limit key. slowapi builds this from the route, the key_func output, and the limit string.

build_limiter(*, model, sync_url, key_func=None, default_limits=(), headers_enabled=True, engine=None)[source]

Build a slowapi slowapi.Limiter backed by Postgres.

The returned limiter has its _storage and _limiter fields swapped out for our PostgresStorage – slowapi constructs a placeholder memory:// storage internally because its public API only takes a URI, and we replace it rather than going through URI dispatch (the storage needs Python objects – the bucket model and a sessionmaker – that don’t round-trip through a URI).

Parameters:
  • model (type[RateLimitBucketMixin]) – The consumer’s bucket model class (must mix in RateLimitBucketMixin).

  • sync_url (str) – A synchronous Postgres DSN for the rate-limit storage. The app’s main async DSN (postgresql+asyncpg://...) is fine to reuse with the +asyncpg driver tag stripped.

  • key_func (Callable[[Request], str] | None) – Per-request key callable. Defaults to default_key_func() (client IP).

  • default_limits (Iterable[str]) – Iterable of limit strings applied to every route that doesn’t have its own @limiter.limit(...).

  • headers_enabled (bool) – Whether slowapi emits X-RateLimit-* response headers.

  • engine (Engine | None) – Pre-built sync engine. Optional escape hatch for tests / custom pools; production callers leave it None and let the helper build one from sync_url.

Return type:

Limiter

Returns:

A configured slowapi slowapi.Limiter.

default_key_func(request)[source]

Default rate-limit key: client IP, falling back to unknown.

Used when key_func is not configured. Behind a trusted proxy you almost certainly want to point key_func at a function that reads X-Forwarded-For instead – this default deliberately refuses to trust any header.

Return type:

str

Project-wide resource registry: value-provider engine.

Codegen emits one ResourceRegistry per project, populated declaratively with one ResourceEntry per resource. Subscripting it by slug – registry[slug] – yields a per-resource handle; the generated _values route handler delegates to it via registry[slug].values(...).

The filter catalog is no longer surfaced at runtime – it lives in the openapi spec (x-fsh-list) at build time, and the codegen FE bakes it into per-resource hooks. ResourceRegistry keeps the value-provider plumbing (trigram autocomplete over enum choices, ref labels, and free-text search columns).

The class is generic over the slug type (Slug: str = str) so a codegen consumer can declare ResourceRegistry[ResourceType] and get type-narrowed slug arguments on every method. The default of str keeps the class usable from hand-written code that doesn’t go through codegen.

Value endpoints are single-page – autocomplete UX narrows by typing more characters, not by paginating.

class Bool(name, operators=(FilterOperator.EQ,), kind='bool')[source]

Boolean toggle.

class Enum(name, enum_class, operators=(FilterOperator.EQ, FilterOperator.IN), kind='enum')[source]

Enum-typed filter field.

Discovery emits {value, label} choices; the values endpoint serves the same list q-filterable through a Postgres VALUES clause.

FilterField = fsh_lib.resource_registry.Enum | fsh_lib.resource_registry.Ref | fsh_lib.resource_registry.LiteralField | fsh_lib.resource_registry.Bool

Sum of every supported filter-field shape.

class FilterOperator(*values)[source]

Closed set of operators a filter field may declare.

class LiteralField(name, type, operators=(FilterOperator.EQ, FilterOperator.GT, FilterOperator.GTE, FilterOperator.LT, FilterOperator.LTE), kind='literal')[source]

Numeric / date / datetime input rendered natively on the FE.

class Ref(name, target, operators=(FilterOperator.EQ, FilterOperator.IN), kind='ref')[source]

Filter pointing at another resource (or this one).

The trigram subquery scores against the target’s first search_columns entry on its ResourceEntry; targets without any search columns fall back to the stringified pk.

class ResourceEntry(model, pk, fields=(), search_columns=(), default_rep_class=None, default_rep_serializer=None, object_actions=(), collection_actions=())[source]

One resource’s registry-side declaration.

search_columns are the model attributes used as the default field list when the values endpoint is called with empty fields — they’re trigram-matched the same way any other field is, so the empty-fields path is just a multi-column search over these defaults.

default_rep_class and default_rep_serializer describe the resource’s cross-resource link shape — the Pydantic class of its default_representation and the async (row, session) -> default_rep_class callable that produces it. Both are None when the resource doesn’t declare a default representation; ResourceRegistry.hydrate_refs() then returns an empty list for that slug.

collection_actions: tuple[ActionSpec, ...] = ()

Collection-scope action specs. Drives the collection-scope registry[slug].actions(...) path.

object_actions: tuple[ActionSpec, ...] = ()

Object-scope action specs. Drives the per-row registry[slug].actions(...) path.

class ResourceRegistry(entries)[source]

Project-wide discovery + value-provider dispatcher.

Construct with a {slug: ResourceEntry} map at module load time. Subscript by slug – registry[slug] – to get a bound handle exposing actions / values for that resource; hydrate_refs() stays on the registry itself since it dispatches on a runtime slug. Stateless after construction – safe to share across requests.

Generic over the slug type so a codegen consumer can declare ResourceRegistry[ResourceType] and get the project’s ResourceType enum on the subscript. Slug defaults to str for non-codegen use.

async hydrate_refs(resource, ids, db, session)[source]

Fetch ids of resource and serialize them via its default rep.

Used by fsh_lib.saved_views.hydrate_view() (and anything else dispatching by slug at runtime) to turn raw ref ids into hydrated link payloads. Lenient on missing slugs and dropped ids: an unknown resource, a resource without a default representation, or an empty ids list all return []; ids that don’t resolve to a row are silently skipped. Order of returned items mirrors ids.

Return type:

list[dict[str, Any]]

class ValuesPage(**data)[source]

Response shape for POST /_values.

Single-page only — autocomplete UX narrows by typing more characters, not by paginating. results is [{"value": ..., "label": ...}] for enum / free-text / single-field paths and the consumer’s link-payload shape (already model_dump-ed) for resource search. Multi-column union results add a "field" key indicating the source column.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Per-user saved views: mixin + payload schemas + hydration.

A saved view is a named filter+sort state stored on behalf of a single user. Mirrors the fsh_lib.files.FileMixin idiom: the consumer subclasses SavedViewMixin on their own DeclarativeBase and defines a normal kiln resource pointing at it.

A single mixed-in model serves every opted-in resource; resource_type discriminates rows so the codegen-generated CRUD scopes reads and writes per resource.

Stored payloads keep raw filter values, including raw ids on ref values. Read paths run those ids through hydrate_view(), which dispatches by slug through the project-wide fsh_lib.resource_registry.ResourceRegistry.hydrate_refs() to produce hydrated items. Stale or invisible refs are silently skipped.

HydrateRefs = 'Callable[[str, list[Any], AsyncSession, Any], Awaitable[list[dict[str, Any]]]]'

Type alias for the slug-keyed ref hydrator.

await fn(resource_slug, ids, db, session) returns hydrated link payloads (model_dump()-ed) for the rows matching ids. Returns [] for unknown slugs or empty ids. fsh_lib.resource_registry.ResourceRegistry.hydrate_refs() satisfies this shape. Kept as a string to avoid forcing SQLAlchemy / typing imports at module load.

class SavedViewCreate(**data)[source]

Request body for POST /views.

payload is the raw filter+sort spec — same shape as SavedViewUpdate’s, but name is required.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class SavedViewFilterEntry(**data)[source]

One filter committed to the saved-view payload.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

SavedViewFilterValue = str | int | float | bool | list[str] | fsh_lib.saved_views.SavedViewRefValue | None

Union of legal filter values after hydration.

Mirrors the FE’s SavedViewFilterValue – primitives / multi-select arrays / hydrated ref values pass through, None means “filter not active”.

class SavedViewMixin[source]

SQLAlchemy mixin supplying the columns of a saved-view row.

Subclass on a DeclarativeBase-derived class:

from fsh_lib.saved_views import SavedViewMixin

class SavedView(Base, SavedViewMixin):
    __tablename__ = "saved_views"

Then point each opted-in resource at the model:

{
  model: "myapp.models.Product",
  saved_views: { model: "myapp.models.SavedView" },
  representations: [
    {
      name: "default",
      fields: [
        { name: "id", type: "uuid" },
        { name: "name", type: "str" },
      ],
    },
  ],
  default_representation: "default",
  // ...
}

The mixin owns no primary key, created_at, or updated_at — pgcraft’s UUIDV4PKPlugin and TimestampPlugin inject those columns at factory run time when the consumer attaches them via __plugins__. Indexes on resource_type and owner_id are recommended; both columns drive every read filter.

name: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Caller-supplied display name.

order_index: Mapped[int] = <sqlalchemy.orm.properties.MappedColumn object>

Per-(owner, resource_type) display order. Lower values sort earlier; ties broken by created_at so newly-created views still land at a deterministic spot when the FE hasn’t yet stamped an explicit index. The generated list route’s order modifier sorts by this column ascending, and the update route exposes it as a writable field so the FE persists drag-reorders by PATCHing each affected row’s new index.

owner_id: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Stringified user id. Saved views are per-user; the generated routes filter by owner_id == str(session.<attr>) where <attr> is user_id_attr.

payload: Mapped[dict[str, Any]] = <sqlalchemy.orm.properties.MappedColumn object>

Raw filter+sort spec, stored as PostgreSQL JSONB. Ref values store ids only; hydration happens at read time via hydrate_view().

resource_type: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>

Slug of the parent resource (lowercase model class name). Drives the WHERE resource_type = ... clause every saved-view read / write inserts so views never bleed across resources.

class SavedViewPayloadResponse(**data)[source]

Hydrated payload returned by hydrate_view().

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class SavedViewRefValue(**data)[source]

Hydrated ref / self filter value.

Stored as {kind, type, ids} on write; hydrate_view() populates items with the labelled rows from the per-resource serializer so the FE can render a chip without a follow-up fetch. items payload-shape is per-resource and stays open (dict[str, Any]); the FE narrows it via the slug-keyed serializer registry on its end.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class SavedViewResponse(**data)[source]

Typed response shape for saved-view read / write routes.

Mirrors the dict hydrate_view() returns. Wired as the operation’s response_model so the generated FastAPI route declares it (and openapi-ts produces a typed FE return) instead of falling back to dict[str, Any].

id is UUID (not str) so the ORM column flows in raw – Pydantic / FastAPI serialise it to the canonical hex form on the wire and openapi-ts surfaces a string typed as a uuid format. created_at / updated_at are datetime for the same reason: ISO-format serialisation happens at the JSON boundary, not in the hydration call site.

order_index carries the row’s current drag-reorder position so the FE knows the persisted tab order without a separate query; the generated list route sorts by it ascending.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class SavedViewSort(**data)[source]

Stable serialised sort descriptor.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class SavedViewUpdate(**data)[source]

Request body for PATCH /views/{id}.

Both fields optional; missing fields leave the stored value untouched.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

async hydrate_view(view, hydrate_refs, db, session)[source]

Return the typed response payload for one saved view.

Walks each entry in view.payload["filters"]; for entries with value.kind in {"ref", "self"}, calls hydrate_refs with the slug + ids and replaces ids with the returned items. Stale or invisible rows are silently skipped so the dump never throws because of dangling refs.

Pydantic validation happens at construction so a stored payload that drifts from the schema (legacy data, manual SQL edits) raises here rather than silently shipping the bad shape to clients.

Always called against a persisted row (the route hands us the row it just inserted / fetched), so view.id is set – typed as Mapped[uuid.UUID] non-optional on the mixin.

Return type:

SavedViewResponse

Runtime helpers for OpenTelemetry-instrumented be apps.

This module is imported by the telemetry/setup.py and telemetry/decorators.py files generated by be.operations.telemetry.TelemetryScaffold. Generated apps without telemetry never import this module.

OpenTelemetry is an optional runtime dependency of be. Install it via pip install kiln-generator[opentelemetry], or directly from the pinned telemetry/requirements.txt that the scaffold emits next to the generated app. Importing this module without OTel installed raises ImportError.

The decorators here create internal spans named {resource}.{op} that survive ASGI middleware reordering and carry low-cardinality be.resource / be.op attributes for filtering. They compose cleanly with the request span emitted by opentelemetry.instrumentation.fastapi.FastAPIInstrumentor.

ATTR_OP = 'be.op'

Span attribute carrying the operation name (e.g. "get").

ATTR_RESOURCE = 'be.resource'

Span attribute carrying the resource name (e.g. "article").

ExporterName

Accepted values of the exporter parameter to build_tracer_provider(). Mirrors be.config.schema.ExporterName.

alias of Literal[‘otlp_http’, ‘otlp_grpc’, ‘console’, ‘none’]

SamplerName

Accepted values of the sampler parameter to build_tracer_provider(). Mirrors be.config.schema.SamplerName – duplicated here rather than imported because fsh_lib is a runtime package and must not depend on be.

alias of Literal[‘always_on’, ‘always_off’, ‘parentbased_always_on’, ‘parentbased_always_off’, ‘parentbased_traceidratio’, ‘traceidratio’]

build_logger_provider(*, service_name, service_version=None, environment_env=None, resource_attributes=None)[source]

Build a LoggerProvider configured via OTLP env vars.

Off by default in TelemetryConfig; only used when the consumer opts in to log export. The provider on its own is not enough – the generated init_telemetry also attaches a opentelemetry.sdk._logs.LoggingHandler to the stdlib root logger so logging.getLogger().info(...) calls flow through OTLP.

Return type:

LoggerProvider

build_meter_provider(*, service_name, service_version=None, environment_env=None, resource_attributes=None)[source]

Build a MeterProvider reading exporter config from env.

The metrics exporter is always selected via the standard OTEL_METRICS_EXPORTER env var (default otlp); we don’t expose a per-config override because metrics deployments almost universally pair with the same OTLP endpoint as traces.

Return type:

MeterProvider

build_tracer_provider(*, service_name, service_version=None, environment_env=None, resource_attributes=None, sampler='parentbased_always_on', sampler_ratio=None, exporter=None)[source]

Build and return a configured TracerProvider.

The caller is responsible for installing it via opentelemetry.trace.set_tracer_provider(). Splitting construction from installation keeps the helper testable without touching the global tracer provider.

environment_env is the name of the environment variable holding the deployment-environment value (e.g. "ENVIRONMENT"), not the value itself – the same generated artifact deploys across dev / staging / prod, so the value is resolved at startup. None skips the lookup; an unset / empty variable does the same.

The OTLP endpoint and headers are read by the SDK from the standard OTEL_EXPORTER_OTLP_* environment variables – there is no kiln-side override for the variable names.

Return type:

TracerProvider

scrub_current_span_attributes(*keys)[source]

Replace named attributes on the current span with a placeholder.

The auth router wraps its login/logout handlers in a span that would otherwise carry the credentials request body and the session response body once capture_request_body / capture_response_body are turned on globally. This helper is invoked from inside those handlers to overwrite specific attribute keys (set by a future body-capture hook) with [scrubbed] – a placeholder rather than removal so a “missing http.request.body” alert doesn’t mask an outage.

Independent of the global capture toggles: even if the consumer never turns on body capture, the auth router still calls this so any third-party span processor that chooses to attach bodies sees the scrubbed value.

Return type:

None

traced_entrypoint(entrypoint)[source]

Wrap a pgqueuer entrypoint coroutine in a consumer span.

pgqueuer entrypoints are async (job) -> None coroutines the pgq worker runs outside any HTTP request – there is no surrounding FastAPIInstrumentor server span to root them under or to record their failures. This decorator gives every job its own root span so worker activity lands in the same trace backend as the request path.

In pgqueuer the entrypoint is the queue – jobs are routed to handlers purely by entrypoint name – so entrypoint is emitted as the OTel messaging.destination.name, the queue/topic key a backend groups consumer traffic by.

The span is a standalone root (SpanKind.CONSUMER); it is deliberately not linked to the trace of the request that enqueued the job, since pgqueuer payloads are opaque bytes with no envelope to carry a traceparent – producer and worker traces stay separate.

Every attribute on the span is low-cardinality – the entrypoint name and the job priority, each from a small fixed set – so the span is safe to group by in dashboards and span-derived metrics. The unbounded per-job id is intentionally left off the span; it is carried on the worker’s log records instead, which share this span’s trace id when log correlation is enabled.

Unlike traced_handler(), exceptions raised by the wrapped coroutine are recorded and the span status set to ERROR (the start_as_current_span defaults, left in place here). A raising entrypoint is pgqueuer’s failure signal – the job is retried or moved to the dead-letter log – and with no server span downstream this span is the only place that failure surfaces in a trace.

Parameters:

entrypoint (str) – The pgqueuer entrypoint (== queue) name the handler is registered under, e.g. "fsh_lib_reports_dispatch". Low-cardinality: it names the span and is attached as messaging.destination.name.

Return type:

Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]

Returns:

A decorator wrapping an async (job, ...) -> Any coroutine and preserving its signature, so PgQueuer.entrypoint still sees the original callable shape.

traced_handler(span_name, *, resource, op)[source]

Wrap an async route handler in an internal span.

Used for both CRUD ops and user-defined actions – action names (publish, archive, …) flow through the same op parameter as CRUD names (get, list, …) and end up on the same ATTR_OP attribute. Distinguishing CRUD from actions in dashboards is left to the value: be’s CRUD names are a fixed small set, anything else is user-defined.

Exceptions raised from the handler are deliberately not recorded on this span and the span status is left OK. The final HTTP status – including FastAPI’s conversion of HTTPException into 4xx / 5xx responses – is captured by the surrounding FastAPIInstrumentor server span, which is the authoritative signal for “did the request succeed.” Recording here would double-count routine flow-control exceptions (HTTPException(404) from get_object_from_query_or_404, HTTPException(401) from the auth dep) as backend errors.

Parameters:
  • span_name (str) – Span name, conventionally f"{resource}.{op}".

  • resource (str) – Low-cardinality resource label (e.g. "article") attached as ATTR_RESOURCE.

  • op (str) – Low-cardinality op label (e.g. "get", "publish") attached as ATTR_OP.

Return type:

Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]

The returned decorator preserves the wrapped function’s signature so FastAPI’s dependency-injection introspection still works.

Body→kwargs transformer helpers shared by generated builders.

Generated update builders one-line through build_patch_kwargs() to read body.model_fields_set and emit a column-kwargs dict the SQL update(...).values(**...) consumes. The same one-shot extraction would otherwise be repeated per resource as a chain of if "<field>" in body.model_fields_set: out["<field>"] = ... lines in every transformer module.

build_patch_kwargs(body, fields)[source]

Project the explicitly-set subset of fields off body into a dict.

PATCH semantics: a field that the client did not send must not appear in the returned dict, so the SQL UPDATE only touches columns the client meant to change. body.model_fields_set is pydantic’s record of which attributes were populated from input (vs. left at their default), so iterating it is the correct gate – a None value the client explicitly sent still passes through.

Parameters:
  • body (BaseModel) – The parsed pydantic request model.

  • fields (Iterable[str]) – Field names the caller wants to consider. Names absent from body’s declared fields are ignored.

Return type:

dict[str, Any]

Returns:

A dict mapping each set field to its value on body. The caller typically annotates the receiving variable with the per-resource {Resource}UpdateKwargs TypedDict (total=False) so type-checkers see the result as the expected partial shape.

General-purpose runtime utilities used by generated apps.

Three unrelated concerns share this module by convention – get_object_from_query_or_404() used by every read-or-mutate CRUD handler, the run_once() decorator used by the generated telemetry init, and compile_query() for tests that assert against rendered SQL. Bundling them here keeps the public fsh_lib surface flat enough that consumers learn one import path (from fsh_lib.utils import ...) for everything that doesn’t fit under a more specific submodule.

compile_query(stmt, *, dialect=None, literal_binds=True)[source]

Render a SQLAlchemy statement to a single SQL string.

Test-oriented helper: tests that assert against generated SQL (locking modifiers, where-clause shape, computed expressions) repeatedly spell str(stmt.compile(compile_kwargs={"literal_binds": True})) and frequently need a Postgres dialect to surface pg-specific syntax (SKIP LOCKED, ON CONFLICT, …). Centralising the boilerplate keeps assertions readable and avoids per-test imports of the dialect submodule.

Parameters:
  • stmt (ClauseElement) – Any SQLAlchemy clause – select(), insert(), update(), raw text(), etc.

  • dialect (Literal['postgres', 'postgresql', 'sqlite'] | None) – Optional dialect name. None (the default) uses SQLAlchemy’s generic compiler, which strips dialect-specific clauses (FOR UPDATE survives; SKIP LOCKED does not). Pass "postgres" to render Postgres SQL or "sqlite" for sqlite.

  • literal_binds (bool) – When True (the default), bound parameters render inline – WHERE id = 'abc' rather than WHERE id = :id_1. Set False to inspect the parameter map separately (via stmt.compile().params).

Return type:

str

Returns:

Compiled SQL as a string.

async get_object_from_query_or_404(db, stmt, *, detail='Not found')[source]

Execute stmt and return the first row, or raise HTTP 404.

Parameters:
  • db (AsyncSession) – The async database session.

  • stmt (Any) – A SQLAlchemy selectable statement.

  • detail (str) – The error message for the 404 response.

Return type:

Any

Returns:

The first row from the result set.

Raises:

HTTPException – With status 404 when no row is found.

run_once(fn)[source]

Idempotency decorator: run fn once, return its result thereafter.

Unlike functools.cache(), the gate is argument-blind – a second call with a different argument set is still a no-op, not a fresh execution keyed on the new args. This is the correct shape for one-shot setup functions (init_telemetry, where a second init_telemetry(app2) must not install a second tracer provider) and for factory singletons (an OPA client built once at first use and reused thereafter).

The first call’s return value is cached and returned on every later call. Setup-only callers that ignore the return value are unaffected; factory callers @run_once their constructor and read the cached instance.

Return type:

Callable[..., TypeVar(T)]