API reference¶
Runtime helpers for kiln-generated FastAPI projects.
Each submodule groups related primitives:
fsh_lib.auth– JWT session auth (bearer + cookie transports).fsh_lib.comms– communication-platform primitives:CommTyperegistry, transport adapters, preference resolver, and a transactional-outboxsend_communication()that rides the request session via pgqueuer.fsh_lib.files–FileMixin(pgcraft- flavoured storage-column mixin) + S3 client + four ready-made action functions. Requires thefilesextra (pip install 'kiln-generator[files]') forboto3.fsh_lib.filters– declarative filter expressions.fsh_lib.identifiers–autogenerated_identifier_field(): a create-form field default for server-assigned identifiers (pgcraft’sPGCraftAutogeneratedIdentifierColumn), marking them optional, read-only, and tagged with thex-autogeneratedOpenAPI extension.fsh_lib.invalidation–QueryInvalidationsrequest-scoped collector that writesX-Invalidate-Queriesresponse headers consumed by the FE’s TanStack Query layer.fsh_lib.links– short-link primitives:ShortLinkMixinstorage columns,shorten()(explicit producer entry point with same-URL dedup and a configurable base62 code length), andresolve()for the redirect handler.fsh_lib.openapi–construct_openapi_extra()helper that builds the kiln-emittedopenapi_extradict (x-cache-key,x-resource,x-auth-role) every generated route uses.fsh_lib.loading–apply_eager_loads(): eager-load options that reuse an ordering join viacontains_eagerinstead of fetching the relationship twice.fsh_lib.numeric–DecimalString(and thedecimal_string()precision/scale factory): aDecimalthat stays exact in Python and crosses the wire as a precision-safe JSON string, typed{"type": "string"}so the FE’sopenapi-tscodegen renders it as a TypeScriptstring(parsed with a decimal library when the FE needs to compute) rather than a lossynumber.fsh_lib.geo–CoordinateSchema: the wire form of a geographic coordinate, an object withDecimalStringlatitude/longitudeparts (precision-safe strings,stringin the generated TypeScript) – whatDecimalStringis for a single decimal, for a lat/long pair.fsh_lib.ordering–apply_ordering(): ORDER BY application, returning the relationships it LEFT-joined.fsh_lib.pagination– keyset and offset pagination.fsh_lib.opa–OpaClientfor a stateless Rego (OPA) permissions service: assembles the decisioninput(subject, action, resource, roles, bindings) and POSTs it for aDecision. Requires theopaextra (pip install 'kiln-generator[opa]') forhttpx.fsh_lib.queue– pgqueuer integration:get_queue()for transactional-outbox enqueue from a SQLAlchemy session,open_worker_driver()for the worker-side asyncpg connection, andinstrument_entrypoint()to wrap an entrypoint coroutine in an OpenTelemetry consumer span.fsh_lib.transformers–build_patch_kwargs()for generated PATCH-builder one-liners that projectmodel_fields_setonto a kwargs dict.fsh_lib.utils–get_object_from_query_or_404(the load-or-404 helper used by every read/mutate handler) andrun_once().
Generated code imports from the owning submodule directly –
from fsh_lib.files import FileMixin,
from fsh_lib.auth import session_auth – so the package root is
intentionally empty. This keeps the public surface organized by
concern rather than as one flat namespace.
Everything here is pure Python – the be CLI knows to emit
imports pointing at these submodules instead of scaffolding a
utils.py into the generated app.
Action availability for kiln-generated FastAPI projects.
An action is anything you can do to (or with) a resource: the
built-in CRUD ops (get, list, create, update,
delete) plus any custom action endpoints declared in the
spec. Every action carries a single guard callable –
async def can_<name>(resource, session) -> bool
– that decides two things at once: whether the current session
may execute the action, and whether the action should appear in
serialized responses so the frontend can show the corresponding
button. Object-scope guards see the resource instance; collection-
scope guards see None (there is no per-row resource yet).
Generated code emits one actions.py per app holding tuples of
ActionSpec per resource (object and collection scopes
kept separate). The route-handler templates call
available_actions() against the right tuple to populate the
actions field on response payloads, and call the matching
can callable directly before executing each handler so the
visibility predicate and the authorization gate can never drift.
- class ActionRef(**data)[source]¶
One action exposed in a serialized response.
Carries the bare minimum the frontend needs to render a button: the action name (matches the operation name on the backend) and its scope. Kept Pydantic so the OpenAPI schema surfaces a stable shape; consumers downstream get a typed TS interface for free.
Generic over the
nameandscopetypes so generated code can subclass with aLiteralunion fornameand a single-memberscope–ActionRef[FooAction, Literal["object"]]– and have the OpenAPI schema (and the TypeScript types openapi-ts derives from it) carry the enum / const rather than a barestr. Narrowing through the type parameters keeps the override compatible, so no field has to be redeclared.- name¶
Operation name (e.g.
"publish","update").
- scope¶
"object"for per-row actions,"collection"for actions that target the resource as a whole.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ActionSpec(name, can, is_object_action)[source]¶
Generator-emitted descriptor for one action.
Lives in the per-app
actions.pyregistry. The route handlers and serializers consumeActionSpectuples viaavailable_actions(); nothing outside generated code should construct these by hand.- name¶
Operation name.
- can¶
Async guard returning
Truewhen the action is available. Bound toalways_true()when the spec did not declare acandotted path.
- is_object_action¶
Truefor object-scope actions,Falsefor collection-scope actions. Drives theActionRefscopefield and disambiguates which tuple a spec belongs in.
- property scope: Literal['object', 'collection']¶
Match
ActionRef.scopefor this spec.
- Scope¶
Whether an action targets a single resource or a collection.
Object-scope actions take
(resource, session); collection- scope actions take(None, session). The frontend uses this to decide where to render the button – per row or once on the list page.alias of
Literal[‘object’, ‘collection’]
- async always_true(_resource, _session)[source]¶
Default guard used when an action declares no
canpath.Returning
Trueunconditionally matches the historical behavior of generated handlers (auth handled at the route level, no per-action gating); opting in to action gating is additive.- Return type:
- async available_actions(resource, session, specs, ref_cls=<class 'fsh_lib.actions.ActionRef'>)[source]¶
Return the subset of specs whose guards pass for session.
The guard for each spec is awaited in declaration order; specs whose guard returns
Falseare dropped. Order is preserved so the frontend can rely on a stable button layout driven by the spec.ref_cls lets generated code substitute a per-resource typed
ActionRefsubclass – e.g.AssetObjectPermissionwhosenameis aLiteral["get", "update", "publish"]– so the OpenAPI schema (and the TypeScript types openapi-ts derives from it) surface the enum rather than a barestr. The default keeps the historical untyped shape for callers that don’t go through codegen.- Parameters:
resource (
Any) – The SQLAlchemy instance for object-scope dumps, orNonefor collection-scope dumps.session (
Any) – Whatever the auth dep resolved – passed through to each guard untouched.specs (
Iterable[ActionSpec]) – Iterable ofActionSpec; typically a tuple literal from the generated per-appactions.py.ref_cls (
type[TypeVar(T, bound=BaseModel)]) – Pydantic model used to construct each ref. Defaults toActionRef.
- Return type:
- Returns:
List of ref_cls instances, one per spec whose guard returned
True, in spec order.
- find_can(specs, name)[source]¶
Return the guard for the action name in specs.
Generated handlers resolve their own row-level guard at module-import time – e.g.
_CAN_LIST = find_can(...)– so the per-request path is a single attribute lookup rather than a tuple scan.- Parameters:
specs (
Iterable[ActionSpec]) – AnActionSpectuple from the per-app registry, typically the collection-scoped tuple for row-level filters or the object-scoped tuple for execution-time gates.name (
str) – Operation name to look up (e.g."list","publish").
- Return type:
- Returns:
The bound guard callable (a
CanCallable).- Raises:
KeyError – If name is not present in specs – the generated code should never reach this branch since the registry is built from the same operation list.
JWT auth primitives for kiln-generated FastAPI projects.
A session is a Pydantic model dumped into JWT claims. Tokens travel over one or both of two sources:
"bearer"–Authorizationheader; API clients."cookie"–httpOnlycookie; browser frontends (out of reach of JS so XSS can’t steal it).
The signing secret lives in an env var (caller-named, typically
JWT_SECRET) so generated source never embeds a key.
- DEFAULT_TOKEN_TTL = datetime.timedelta(seconds=1800)¶
Default
expstamped on tokens when the caller doesn’t set one.
- class LoginResponse(**data)[source]¶
OAuth2-shaped login body for the bearer case.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class OkResponse(**data)[source]¶
Minimal ack body for cookie-only login and every logout.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class SessionStore(*args, **kwargs)[source]¶
Hook pair for server-side session state (deny-list, sessions, …).
Turns the stateless-JWT flow stateful. The store receives the full session model so it can key on whatever identity claim the consumer picks (typically
jti);fsh_lib.authstays agnostic.Both methods are async so the store can hit a database.
- clear_session(response, *, sources, cookie_name=None, cookie_secure=True, cookie_samesite='lax')[source]¶
Delete the session cookie if configured; ack for bearer.
cookie_secureandcookie_samesitemust match the valuesissue_session()used – browsers refuse to overwrite an existing cookie when either attribute differs.- Return type:
- decode_jwt(token, *, secret_env, algorithm)[source]¶
Decode token and return its claims, or raise HTTP 401.
- encode_jwt(payload, *, secret_env, algorithm, ttl=datetime.timedelta(seconds=1800))[source]¶
Sign payload as a JWT; stamps
expif absent. Never mutates.- Return type:
- issue_session(response, session, *, sources, secret_env, algorithm, ttl=datetime.timedelta(seconds=1800), cookie_name=None, cookie_secure=True, cookie_samesite='lax')[source]¶
Mint a JWT and emit it to every configured transport.
Upstream validate_login returns a None session in the case of no user, a password that failed validation, etc.
- Return type:
- session_auth(schema, sources, *, secret_env, algorithm, token_url=None, cookie_name=None, store=None)[source]¶
Build a FastAPI dep that yields a validated schema instance.
The returned callable takes one parameter per supported transport; configured sources plug in their real extractors, unconfigured ones get a no-token shim (returns
None). The first non-Nonetoken wins. Claims parse throughmodel_validate()so handlers get the full model, not a raw dict.store, when supplied, turns every authenticated request into a deny-list check – avoids a wrapper dep on the consumer side.
Communication-platform primitives for kiln-generated apps.
Sends typed messages over pluggable delivery methods (email, SMS, push, …) using pgqueuer as the transactional outbox. The pieces:
CommType– a named, schema-validated communication. Carries a Pydanticcontext_schemaand a pair of template strings (subject + body);CommRegistryholds the set the consumer’s app supports.Renderer– a Protocol that turns aCommTypeplus a validated context into a rendered subject + body pair. The defaultJinjaRendereris in-process; the same surface fits an HTTP-call renderer that defers to a separate template service (a node renderer, MJML compiler, anything) without changing the platform’s call site.MessageMixin/RecipientMixin/NotificationPreferenceMixin– SQLAlchemy mixins supplying the storage columns for the three tables every comm-platform install needs. Same pgcraft-friendly idiom as the file / rate-limit mixins: consumer owns the table, we own the columns.Transport– a Protocol implemented per delivery method. One adapter per supported method, e.g.email,sms. Implementations live in consumer code or third-party adapters; this module shipsLoggingTransportfor tests and local development.PreferenceResolver– a Protocol that answers “should this recipient receive this comm-type via this method?” Looked up once per recipient insidesend_communication(); an opted-out recipient yields no row and no job (the message row still records the attempt for audit).send_communication()– the producer entry point. Validates context against theCommTypeschema, renders the templates, inserts one message row and one recipient row per delivery, then enqueues one pgqueuer job per recipient underDISPATCH_ENTRYPOINT. All writes ride the caller’s SQLAlchemy session and the pgqueuerQueriesis bound to the same connection (seefsh_lib.queue.get_queue()), so a singleawait session.commit()makes the message + recipients + jobs durable atomically.make_dispatch_entrypoint()– the worker-side counterpart. Returns an async(job) -> Nonecallable wired to the consumer’s session factory, transports, and mixin classes – register it againstDISPATCH_ENTRYPOINTon a pgqueuerPgQueuerinstance.
The module’s only optional dependency is jinja2 (already pulled
in by kiln-generator for codegen); no extras gate is needed.
- class AttachmentGroup(slot, resolver, label=None, description=None)[source]¶
Project-wide attachment slot definition.
Mirrors
RecipientGroupfor the attachment lane. The slot resolves to a singleAttachmentSuggestionFileat send time.
- class AttachmentSuggestionFile(name, ref=None, label=None, description=None)[source]¶
Concrete file the BE has handy at describe-options time.
- class AttachmentSuggestionResolver(slot, label=None, description=None)[source]¶
Late-bound attachment group (resolver-style).
- class CommRegistry[source]¶
Mutable registry of
CommTypeentries by name.Built once at app startup (or as a module-level global if the consumer prefers) and passed into
send_communication()andmake_dispatch_entrypoint(). Not thread-safe – mutate it only during startup.- register(comm_type)[source]¶
Add comm_type to the registry.
- Raises:
ValueError – If a type with the same name is already registered. Re-registration is almost always a bug (two modules thought they owned the same name); force callers to deregister first if they really want it.
- Return type:
- class CommType(name, context_schema, subject_template, body_template, default_methods=())[source]¶
A named communication: schema + templates + default methods.
A
CommTypeis the unit of design for the comm platform: it binds a Pydantic context schema to a pair of template strings plus the set of methods the consumer wants delivery on by default. The same instance is shared by the producer (which validates and renders) and the worker (which renders too if the rendered body wasn’t persisted).- name¶
Stable identifier (e.g.
"order_shipped"). Used as the registry key and stored on the message row so the audit log survives template churn.
- context_schema¶
Pydantic model describing the fields the templates reference.
send_communicationvalidates the caller-supplied context against this before rendering, so missing or mistyped fields fail fast at the call site instead of half-way through a template.
- subject_template¶
Source string for the subject line. Interpreted by the configured
Renderer; the defaultJinjaRenderertreats it as Jinja2.
- body_template¶
Source string for the body. Same renderer treatment as
subject_template.
- default_methods¶
Methods to deliver on when the caller doesn’t pass an explicit recipient list. Empty tuple means “no default; caller must specify recipients”.
- DISPATCH_ENTRYPOINT = 'fsh_lib_comms_dispatch'¶
pgqueuer entrypoint name jobs are enqueued under.
The producer side (
send_communication()) enqueues under this name; the worker side must register its handler under the same name (seemake_dispatch_entrypoint()). Exposed so consumers don’t hard-code the literal in two places.
- class DeliveryStatus(*values)[source]¶
Lifecycle states of a single recipient’s delivery attempt.
- FAILED = 'failed'¶
Transport raised;
errorcarries the message.
- PENDING = 'pending'¶
Row inserted, job enqueued, transport not yet called.
- SENT = 'sent'¶
Transport returned without raising.
- class GroupAttachmentResolver(*args, **kwargs)[source]¶
Resolve a project-wide attachment group.
Returns a single concrete file’s metadata. Like
GroupRecipientResolver, no instance – the file doesn’t depend on a specific target object.
- class GroupRecipientResolver(*args, **kwargs)[source]¶
Resolve a project-wide recipient group.
Signature differs from
RecipientResolver– no instance, since the slot isn’t anchored to a specific target object. The session is still threaded through so a resolver can query a directory / membership table.
- class GroupRegistry[source]¶
Project-wide recipient / attachment group registry.
One instance per app, populated at startup – typically from generated code that mirrors the project’s
recipient_groups/attachment_groupsjsonnet declarations. Looked up by the user-template route handlers (to surface groups in the editor’s suggestion popover) and byrender_user_template()(to expand resolver tokens whose slot isn’t owned by any per-target target).Not thread-safe – mutate only during startup.
- register_attachment(group)[source]¶
Add an attachment group.
- Raises:
ValueError – When slot is already registered.
- Return type:
- register_recipient(group)[source]¶
Add a recipient group.
- Raises:
ValueError – When slot is already registered.
- Return type:
- class HttpRenderer(base_url, *, theme=None, client=None, timeout=10.0)[source]¶
Renderer that defers to an external render microservice.
Drop-in replacement for
JinjaRendererwhen the project runs a separate render service (the kiln-render scaffold or any HTTP equivalent). Posts{template, context, theme}to the service’s/v1/renderendpoint and maps the response onto aRenderedMessage:output: "email"->{subject, html, text}->RenderedMessage(subject=subject, body=text, html=html).output: "pdf"->{subject, pdf_base64}->RenderedMessage(subject=subject, body="", pdf=bytes).
The service is the source of truth for
output; the renderer inspects the response shape rather than carrying a copy of the build-time contract at runtime. Discrepancies between the project’s declaredCommTypeConfigoutput and the service’s response are surfaced by the generatedrender-contract.json+ the render service’stscstep, not by this client.- Parameters:
base_url (
str) – Base URL of the render service, e.g."http://render:8200". Typically pulled from aRENDER_URLenv var by the consumer.theme (
dict[str,Any] |None) – Default theme payload merged into every request –{logoUrl, colors, typography, ...}. Per-send overrides are not supported on this surface yet; supply the theme once at construction.Nonesends no theme block.client (
Any|None) – An existinghttpx.AsyncClientto use.None(default) creates and owns one. The owned client is closed byaclose().timeout (
float) – Per-request timeout in seconds. Ignored when an explicit client is supplied.
- Raises:
ImportError –
httpxis not installed. Installfsh-lib[comms-http]to use this renderer.
- async render(comm_type, context)[source]¶
POST
/v1/renderand convert the response.The request envelope is:
{ "template": "<comm_type.name>", "context": <context.model_dump(mode="json")>, "theme": <self._theme or omitted> }
The response is one of the two shapes documented on the class. A non-2xx response or a response missing the expected keys raises
RuntimeError– the caller sees the same surface as a Jinja syntax error would produce.- Return type:
- class JinjaRenderer(*, autoescape=False)[source]¶
In-process Jinja2 renderer. The default for
send_communication().Treats
CommType.subject_templateandCommType.body_templateas Jinja2 source strings and renders them against the context dump (model_dump(mode="json")so dates/uuids stringify the same way they would over the wire).Trivial enough to construct inline; instances hold no state beyond the autoescape choice, which only matters for the body of HTML emails (callers building HTML bodies should pass
autoescape=True).
- class LoggingTransport[source]¶
Test/dev transport: records every send into an in-memory list.
Not async-safe across processes, obviously; the point is to give unit tests something to assert against and to give local development a no-credentials fallback. Production transports live in consumer code or third-party packages.
- class MessageMixin[source]¶
SQLAlchemy mixin for the message table.
One row per
send_communicationcall – represents the intent to communicate. Per-method delivery state lives on theRecipientMixinrows that point back here viaRecipientMixin.message_id.Subclass on a regular
Baseto materialise the table:from fsh_lib.comms import MessageMixin class CommMessage(Base, MessageMixin): __tablename__ = "comm_messages" id: Mapped[uuid.UUID] = mapped_column( Uuid, primary_key=True, default=uuid.uuid4, )
Storing the rendered
subject/body(rather than re-rendering fromcontextat send time) means template churn doesn’t invalidate the audit log: the message row reflects what the recipient actually saw.The mixin owns no primary key – declare
idon the consumer class (typically via a pgcraft PK plugin or an explicit UUID column) so the consumer’s PK convention wins. Same idiom asSavedViewMixin.- body: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>¶
Rendered body.
Textrather thanStringbecause bodies (especially HTML email) routinely exceed the 64 KiB Postgresvarcharceiling.
- comm_type: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>¶
Registry key (
CommType.name). Indexed so the audit log can group by type cheaply.
- class NotificationPreferenceMixin[source]¶
SQLAlchemy mixin for the per-recipient preference table.
One row per
(subject_key, comm_type, method)triple records whether that recipient wants that comm type via that method.send_communication()looks the row up via thePreferenceResolverprotocol – this mixin just supplies the columns; the resolver implementation lives in consumer code (or in a generated helper).subject_keyis intentionally a string rather than a typed foreign key: a comm platform routinely targets users, accounts, org-level addresses, or external identifiers, and a typed FK would lock the table to one of those.Like the rest of the comms mixins, the PK is left to the consumer to declare so pgcraft / consumer conventions own the column.
- comm_type: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>¶
Registry key (
CommType.name).
- enabled: Mapped[bool] = <sqlalchemy.orm.properties.MappedColumn object>¶
Truewhen the recipient consents to this(comm_type, method)combination. DefaultTrueso an absent row reads as opt-in by default; flip per consumer policy if your default is opt-out.
- class PreferenceResolver(*args, **kwargs)[source]¶
Hook: gate delivery on the recipient’s per-method opt-in.
- class RecipientGroup(slot, resolver, label=None, description=None)[source]¶
Project-wide recipient slot definition.
Registered against a
GroupRegistryso any template (regardless of scope) can drop@<slot>into to / cc / bcc and the send path resolves it via resolver at send time.- slot¶
Stable identifier stored on the recipient token’s
resolverfield; same shape as a per-target slot.
- resolver¶
Async callable returning concrete
RecipientSpecrows.
- label¶
Optional display string surfaced in the editor’s recipient picker. Defaults to
"@<slot>".
- description¶
Optional secondary line below label.
- RecipientKind¶
How a recipient is addressed in the outbound message.
For email: maps directly to RFC 5322
To:/Cc:/Bcc:headers; transports lay the headers out themselves so cc recipients see the to/cc lists and bcc recipients don’t. For methods without header semantics (SMS, push) this is informational – the dispatch path delivers each row regardless of kind.alias of
Literal[‘to’, ‘cc’, ‘bcc’]
- class RecipientMixin[source]¶
SQLAlchemy mixin for the recipient table.
One row per
(message, method, address)triple. The pgqueuer job carries the recipient id, so the dispatch path is: job -> recipient -> message -> transport lookup.The mixin deliberately doesn’t declare a foreign key to the consumer’s
MessageMixintable – the consumer names that table, so the FK has to come from their own subclass via asqlalchemy.ForeignKeyon themessage_idcolumn. Keeping the mixin FK-free means the same class works regardless of where the consumer mounts the message table.Like
MessageMixin, the PK is left to the consumer to declare so pgcraft / consumer conventions own the column.- address: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>¶
Method-specific destination (email address, phone number, push token, …). Opaque to this module.
- error: Mapped[str | None] = <sqlalchemy.orm.properties.MappedColumn object>¶
Truncated exception message from a failed delivery.
Noneuntil the dispatch path catches an error.
- kind: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>¶
One of
RecipientKind("to"/"cc"/"bcc"). Stored as a string so adding a new addressing kind doesn’t need a migration. Email transports read this off the row to lay out the RFC 5322 headers; non-email transports typically ignore it.
- message_id: Mapped[UUID] = <sqlalchemy.orm.properties.MappedColumn object>¶
Points back at the
MessageMixinrow. Consumers typically add an explicitForeignKeyin their subclass.
- method: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>¶
Delivery method (
"email","sms", …). Used to look up the right transport at dispatch time.
- sent_at: Mapped[datetime | None] = <sqlalchemy.orm.properties.MappedColumn object>¶
When the transport returned successfully.
Nonewhile pending or failed.
- status: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>¶
One of
DeliveryStatus’s values. Stored as a string (not a SQL enum) so adding a new state doesn’t require a migration.
- class RecipientResolver(*args, **kwargs)[source]¶
Hook: resolve a named recipient slot against the target object.
One implementation per slot a resource declares (
"assignee","watchers", …). The signature mirrors a route handler: the resolver gets the same async session the template render is running on, plus the loaded model instance and the template’s method, and returns zero or more concreteRecipientSpecrows.kindon the returned rows is preserved so a resolver can opt to bcc itself if it needs to.
- class RecipientSpec(method, address, kind='to', subject_key=None)[source]¶
Single recipient handed to
send_communication().- method¶
Delivery method (must match a transport key).
- address¶
Method-specific destination (email, phone, …).
- kind¶
To / Cc / Bcc – gives transports the information they need to lay the message out correctly. Defaults to
"to"so call sites that don’t care don’t need to set it.
- subject_key¶
Identifier whose preferences gate delivery.
Noneskips the preference check (e.g. transactional sends to non-user addresses like a billing inbox).
- class RecipientSuggestionLiteral(address, label=None, description=None)[source]¶
Concrete recipient the BE has handy at describe-options time.
Lets a resource expose directory entries (account-system users, a curated mailing list, etc.) right next to the resolver groups so the author picks a specific person without retyping the address.
- address¶
Method-specific destination (email, phone, …). Stored on the resulting literal recipient token in the editor’s saved JSON.
- label¶
Optional display string (typically the person’s name); the FE defaults to address when omitted.
- description¶
Optional secondary line (e.g. the address itself when label is the name).
- class RecipientSuggestionResolver(slot, label=None, description=None)[source]¶
Late-bound recipient group surfaced in the editor combobox.
Maps to a registered
RecipientResolver; the FE author drops one of these in to fan out at send time.- slot¶
Resolver key registered on the target (e.g.
"watchers"). Stored on the resulting resolver-style recipient token in the editor’s saved JSON.
- label¶
Optional display string; the FE defaults to
"@<slot>"when omitted.
- description¶
Optional secondary line shown beneath label in the combobox row (e.g.
"everyone watching the task").
- class RenderedMessage(subject, body, html=None, pdf=None)[source]¶
Output of a
Renderer– the strings we persist.- subject¶
Subject line. Populated by every renderer.
- body¶
Plain-text body. The legacy field
JinjaRendererproduces and that transports already consume;HttpRendererpopulates it from the render service’stextalternative for email output, or leaves it""for PDF output.
- html¶
HTML alternative body for email outputs, or
None.JinjaRenderernever sets it;HttpRenderersets it when the render service returnsoutput: "email". Email transports should send HTML + plain-text asmultipart/alternative.
- pdf¶
Raw PDF bytes for PDF outputs, or
None.HttpRenderersets it when the render service returnsoutput: "pdf"; transports treat it as a single attachment.
- class RenderedUserTemplate(subject, body, recipients, cc, bcc, attachments)[source]¶
Output of
render_user_template()– pre-send view.Returned to the FE preview endpoint directly; the send path persists it as a
MessageMixinrow + a fan-out ofRecipientMixinrows.
- class Renderer(*args, **kwargs)[source]¶
Hook: turn a
CommType+ context into rendered strings.The default
JinjaRendererevaluates the template strings in-process. A microservice-based renderer (e.g. a node service that compiles MJML or runs a richer template language) implements the same single method and gets dropped in via therendererargument tosend_communication()– no other code changes.context is the validated Pydantic model; implementations call
model_dump()themselves so they can pick the dump mode (jsonvs Python) that fits their wire format.
- class ResolvedRecipientAddress(address, label=None)[source]¶
Concrete recipient row produced by a resolver, with display label.
- class TargetResolutions(variables, recipient_slots)[source]¶
Concrete values for a target object, ready for FE display.
Returned by
resolve_target()when the editor renders against a known target – the FE drops these into pills and chips so the author sees the values that@watchersand{title}would actually expand to right now.- variables¶
Dump of the target’s representation Pydantic model. Same dict
render_user_template()walks for variable tokens; keys are top-level field names + nested objects.
- recipient_slots¶
Slot name → addresses each registered recipient resolver returns for this target. Empty list for slots that resolved to nothing.
- class Transport(*args, **kwargs)[source]¶
Method-specific delivery adapter.
Implementations are free to do whatever the method requires (SMTP send, Twilio API call, FCM push, …). Raise to mark the delivery failed; return normally to mark it sent. The dispatch path stamps
RecipientMixin.statusandRecipientMixin.sent_atbased on which path you take.
- USER_TEMPLATE_COMM_TYPE = '_user_template'¶
Sentinel comm-type stamped on
MessageMixin.comm_typefor sends originating from a user-defined template. The audit row carries the template id + target id inMessageMixin.contextso the audit log can distinguish developer-defined sends from user-template sends without a separate column.
- class UserCommTemplateMixin[source]¶
SQLAlchemy mixin for the user-authored template table.
One row per saved template. Subclass on a regular
Baseto materialise the table:from fsh_lib.comms import UserCommTemplateMixin class UserCommTemplate(Base, UserCommTemplateMixin): __tablename__ = "user_comm_templates"
The schema mirrors the editing UI: an optional target-resource slug (
None= free-form, no variables and no per-object recipient resolvers), a delivery method, tokenized subject / body / recipient lists, attachments, and bookkeeping columns. Tokens are persisted as JSON so the FE can round-trip its Slate document without a separate serialization layer.Token formats (all stored as JSON arrays):
Subject / body –
[{"kind": "text", "value": str}, ...]withvartokens of shape{"kind": "var", "path": str}mixed in.pathis a dot-walk into the variable representation dump (e.g."owner.email").Recipients / cc / bcc – one of ``{“kind”: “literal”, “method”: str, “address”: str,
“subject_key”: str | null}``
(concrete address hard-coded into the template) or
{"kind": "resolver", "resolver": str}(a named slot resolved at send time against the loaded target object).Attachments – consumer-defined dict shape, passed through to the rendered output unchanged so transports can decide what to do with it.
The mixin owns no primary key – declare
idon the consumer class (typically via a pgcraft PK plugin or an explicitMapped[uuid.UUID]column) so the consumer’s PK convention wins. Same idiom asSavedViewMixin.- attachments: Mapped[list[dict[str, Any]]] = <sqlalchemy.orm.properties.MappedColumn object>¶
Consumer-defined attachment refs. Passed through to the rendered output unchanged; rendering them is the transport’s job.
- bcc: Mapped[list[dict[str, Any]]] = <sqlalchemy.orm.properties.MappedColumn object>¶
Tokenized Bcc list.
- body: Mapped[list[dict[str, Any]]] = <sqlalchemy.orm.properties.MappedColumn object>¶
Tokenized body.
- cc: Mapped[list[dict[str, Any]]] = <sqlalchemy.orm.properties.MappedColumn object>¶
Tokenized Cc list.
- created_by: Mapped[str | None] = <sqlalchemy.orm.properties.MappedColumn object>¶
Identifier of the author – typically the session subject id. Nullable so seed templates can be inserted out-of-band.
- description: Mapped[str | None] = <sqlalchemy.orm.properties.MappedColumn object>¶
Optional author note. Shown in template-management screens.
- method: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>¶
Delivery method (matches a transport key). Stored once on the row – the FE chooses the method at template-creation time and every recipient inherits it (literal recipient tokens may override per-row).
- name: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>¶
Human-friendly title – shown in the picker UI.
- recipients: Mapped[list[dict[str, Any]]] = <sqlalchemy.orm.properties.MappedColumn object>¶
Tokenized To list.
- class UserTemplateTarget(resource_slug, load, representation_serializer, representation_class, recipient_resolvers, recipient_suggestions=None, attachment_suggestions=None)[source]¶
Per-resource wiring for user-template renders.
Registered once per resource that opts into being a template target.
representation_serializerreturns the variable dump – a Pydantic model whose fields the author can pill into the subject / body.recipient_resolversmaps the named slots the author can drop into the to / cc / bcc lists.- resource_slug¶
User-facing slug the FE uses to scope a template (e.g.
"asset"). Stored onUserCommTemplateMixin.target_resource.
- load¶
Async
(session, target_id) -> instance | Noneloader. ReturningNonefrom a send call surfaces as aLookupError.
- representation_serializer¶
(instance, session) -> BaseModel(sync) or the async equivalent that produces the variable dump for instance. Both shapes are supported so kiln’s fields-driven serializers (sync, no IO) and user-supplied async builders (may load related rows) drop in without adapter glue.
- representation_class¶
The Pydantic class returned by
representation_serializer. Exposed so the FE options endpoint can surface the field schema without a second round-trip.
- recipient_resolvers¶
Slot name ->
RecipientResolverimplementation.
- recipient_suggestions¶
Optional async builder that returns the list of
RecipientSuggestionrows the editor’s combobox surfaces. Empty / unset falls back todefault_recipient_suggestions()– one resolver row perrecipient_resolversentry. Set this to mix in directory entries (concrete addresses pulled from the BE’s user table, etc.) alongside the resolvers.
- attachment_suggestions¶
Optional async builder that returns the list of
AttachmentSuggestionrows for the attachment lane. Unset means the lane only shows the consumer-supplied upload affordance (or nothing).
- class UserTemplateTargetRegistry[source]¶
Registry of
UserTemplateTargetentries by resource slug.Built once at app startup – typically populated from generated code that mirrors the project’s
comms_targetdeclarations. Looked up by user-template route handlers and bysend_user_template()to resolve variables and recipient slots.Not thread-safe; mutate only during startup.
- get(slug)[source]¶
Return the target registered under slug.
- Raises:
KeyError – If slug isn’t registered.
- Return type:
- register(target)[source]¶
Add target to the registry.
- Raises:
ValueError – When the slug is already registered.
- Return type:
- async default_attachment_suggestions(_target, _session, *, groups=None)[source]¶
Build the attachment suggestions list from project groups.
Per-resource attachment groups aren’t a concept on
UserTemplateTarget– the platform only supports project-wide attachment groups via groups. Returns one resolver-style suggestion per registered attachment group.- Return type:
list[AttachmentSuggestionResolver|AttachmentSuggestionFile]
- async default_recipient_suggestions(target, _session, *, groups=None)[source]¶
Build a suggestions list from target’s resolvers + project groups.
Used by route handlers when the consumer didn’t supply a custom
UserTemplateTarget.recipient_suggestionsbuilder. Emits one resolver row per per-target slot first, then one per project-wide group from groups so the editor’s picker surfaces both flavours in a single combobox.- Return type:
list[RecipientSuggestionResolver|RecipientSuggestionLiteral]
- async load_recipients(session, recipient_cls, message_id)[source]¶
Return every recipient row for message_id, in insertion order.
Lightweight read helper for an audit endpoint – generated route handlers can call this without re-deriving the
message_id ==filter.- Return type:
- make_dispatch_entrypoint(*, session_factory, transports, message_cls, recipient_cls)[source]¶
Build the worker-side handler for
DISPATCH_ENTRYPOINT.Returns an
async (job) -> Nonecallable suitable forpgqueuer.PgQueuer.entrypoint. Per job:Decode the recipient id from
job.payload.Open a session from session_factory; load the recipient and the matching message.
Skip if the recipient is missing or already advanced past
DeliveryStatus.PENDING(job retried after a previous success / explicit failure).Look up the transport for the recipient’s method; mark the row failed if no transport is registered.
Call
transport.send– mark the row sent on success or failed (with the error message) on raise.
- Parameters:
session_factory (
async_sessionmaker[AsyncSession]) – Async sessionmaker; each job opens a short-lived session of its own.transports (
dict[str,Transport]) – Method ->Transportlookup.message_cls (
type[MessageMixin]) – Consumer’s concreteMessageMixin.recipient_cls (
type[RecipientMixin]) – Consumer’s concreteRecipientMixin.
- Return type:
- Returns:
Async handler the consumer registers under
DISPATCH_ENTRYPOINT. Wrapped viafsh_lib.queue.instrument_entrypoint(), so it emits an OpenTelemetry consumer span per job when thefsh-lib[opentelemetry]extra is installed and runs unwrapped otherwise.
- render_text_field(tokens, variables)[source]¶
Concatenate tokenized tokens into a plain string.
texttokens emit theirvalueverbatim;vartokens resolve through dotted-path lookup against variables and stringify. Missing orNonevalues render as the empty string – an unresolved variable at send time should produce a sendable message and let the FE surface a “this pill won’t fill in” warning rather than blow up the dispatch path.- Parameters:
- Return type:
- Returns:
Rendered field as a single string.
- Raises:
ValueError – When a token’s
kindis unrecognised – a sign of an old client encoding tokens the platform doesn’t know about, which is a real bug rather than something to silently swallow.
- async render_user_template(template, *, session, targets, target_id=None, groups=None)[source]¶
Render template against the target object identified by target_id.
For an unscoped template (
target_resource is None) the variable dump is empty andtarget_idmust also beNone; recipient tokens may still beliteralor reference a project-wide groups slot.For a scoped template, the target is loaded, the representation serializer runs to produce the variable dump, and resolver recipient tokens fire against the loaded instance (per-target resolvers win) or fall through to the project-wide group registry. Attachment tokens with
kind: "resolver"resolve via the same group registry.- Raises:
ValueError – When the scope / target_id combination is inconsistent or a resolver token references a slot that isn’t registered on the target or in the project group registry.
LookupError – When the target object can’t be loaded.
- Return type:
- async resolve_target(*, session, target, instance, method)[source]¶
Compute the variable dump + per-slot resolved recipients.
instance is the already-loaded target object – callers should use
UserTemplateTarget.loadupstream to fetch it (so a missing target maps toLookupErrorat the route boundary, not here). method drives any resolver that picks a method-specific address; templates carrymethodon the row itself.The resolution maps mirror what the FE editor’s
resolutionsprop expects – one variables dict, one recipient-slot map. Resolvers that raise propagate; the route boundary turns them into a 422.- Return type:
- async send_communication(*, session, queue, registry, comm_type, context, recipients, message_cls, recipient_cls, renderer=None, preferences=None)[source]¶
Validate, render, persist, and enqueue a communication.
The transactional-outbox guarantee:
Validate context against the comm-type’s schema.
Render the templates with renderer (defaults to
JinjaRenderer).Insert one
MessageMixinrow.For each recipient, consult preferences (if supplied); insert a
RecipientMixinrow for each one that passes.Enqueue one pgqueuer job per surviving recipient under
DISPATCH_ENTRYPOINT, payload = recipient id (UTF-8).
Steps 3-5 all ride session’s transaction (see
fsh_lib.queue.get_queue()for how queue is bound to the same connection). Commit the session and the message, recipients, and jobs all become durable atomically; roll back and the communication never happened.- Parameters:
session (
AsyncSession) – The async SQLAlchemy session running the caller’s transaction. All inserts ride it.queue (
Queries) – A pgqueuerQueriesbound to session’s connection. Build it withfsh_lib.queue.get_queue()immediately before this call.registry (
CommRegistry) – TheCommRegistrycontaining the type named by comm_type.comm_type (
str) – Registry key for the comm to send.context (
BaseModel|dict[str,Any]) – Either an instance of the type’sCommType.context_schemaor a dict that will be validated against it.recipients (
Sequence[RecipientSpec]) – Per-delivery specs (method + address + optional subject_key for preference lookup).message_cls (
type[MessageMixin]) – Consumer’s concreteMessageMixinsubclass.recipient_cls (
type[RecipientMixin]) – Consumer’s concreteRecipientMixinsubclass.renderer (
Renderer|None) – Override for the template renderer. Defaults toJinjaRenderer– swap in an HTTP-call renderer to defer rendering to a separate service.preferences (
PreferenceResolver|None) – Optional preference resolver. When omitted, every recipient is delivered to (subject tosubject_keysemantics onRecipientSpec).
- Return type:
- Returns:
The id of the inserted
MessageMixinrow.
- async send_user_template(*, session, queue, template, targets, message_cls, recipient_cls, target_id=None, preferences=None, groups=None)[source]¶
Render template against target_id and dispatch via the outbox.
Mirrors
send_communication()but skips the registry + Jinja steps – the template’s tokenized fields are rendered in-process byrender_user_template()and the result is persisted into the sameMessageMixin/RecipientMixintables. All three of the template’s recipient lists (to / cc / bcc) flow through one persist call so the worker delivers cc and bcc rows like any other recipient – with theirRecipientMixin.kindset so an email transport can lay out the headers correctly.The audit row’s
MessageMixin.contextcarriestemplate_id,template_name,target_resource, andtarget_idso a re-render or replay can re-fetch the same inputs. Attachment refs come along oncontext.attachmentsfor transports that need them.- Parameters:
session (
AsyncSession) – Async SQLAlchemy session.queue (
Queries) – pgqueuerQueriesbound to session’s connection.template (
UserCommTemplateMixin) – TheUserCommTemplateMixinrow to send.targets (
UserTemplateTargetRegistry) – Registry binding resource slugs to target wiring.message_cls (
type[MessageMixin]) – Consumer’s concreteMessageMixin.recipient_cls (
type[RecipientMixin]) – Consumer’s concreteRecipientMixin.target_id (
Any|None) – Target-object PK.Nonefor unscoped templates.preferences (
PreferenceResolver|None) – Optional preference resolver – gates per-method opt-in just likesend_communication().groups (
GroupRegistry|None) – Optional project-wideGroupRegistryfor recipient + attachment slots that aren’t per-target. Forwarded torender_user_template().
- Return type:
- Returns:
Id of the inserted message row.
- RecipientSuggestion¶
Represent a union type
E.g. for int | str
- AttachmentSuggestion¶
Represent a union type
E.g. for int | str
File storage primitives for kiln-generated FastAPI projects.
This module’s runtime dependency on boto3 is gated behind the
files extra. Install with:
pip install 'kiln-generator[files]'
# or: uv add 'kiln-generator[files]'
Importing this module without the extra raises ModuleNotFoundError
on import boto3 – so the gate is honest rather than lazy:
either the dep is there and everything works, or it isn’t and the
import surface fails fast.
A file is a binary blob (image, PDF, attachment) tracked by a metadata row in the consumer’s database and a corresponding object in S3-compatible storage. This module ships three pieces:
FileMixin– a pgcraft-compatible mixin supplying the six storage columns every file row needs (s3_key,content_type,size_bytes,original_filename,created_at,uploaded_at). Consumers subclass it on a pgcraft model and add a PK plugin (typicallyUUIDV4PKPlugin) for theidcolumn.S3Storage– a small wrapper aroundboto3that exposes the three operations a presigned-upload flow actually needs: mint a presigned PUT URL, mint a presigned GET URL, delete an object. The constructor takes explicit config so it’s testable;default_storage()builds one fromFSH_S3_*env vars for the common case.Action functions –
request_upload(),complete_upload(),download(), anddelete_file(). These plug into be’sActionoperation: the consumer pointsresource.actionentries at them directly (no per-resource wrapper module). TheFileMixin-typed parameters (instance for object actions, class for collection actions) match any concrete subclass via the introspector’s supertype check, so the same four functions serve every file resource.
- DEFAULT_PRESIGN_TTL = 900¶
Presigned URL lifetime in seconds (15 min).
Long enough for a browser to PUT a multi-megabyte file over a slow connection; short enough that a leaked URL stops working before it shows up in logs anyone reads.
- class DownloadResponse(**data)[source]¶
Response for the download action – a short-lived GET URL.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class FileMixin[source]¶
pgcraft mixin supplying the storage columns of a file record.
Subclass on a pgcraft-mapped model alongside a PK plugin (the plugin owns
id):from fsh_lib.files import FileMixin from pgcraft.factory import PGCraftSimple from pgcraft.plugins.pk import UUIDV4PKPlugin class Attachment(Base, FileMixin): __tablename__ = "attachments" __table_args__ = {"schema": "public"} __factory__ = PGCraftSimple __plugins__ = [UUIDV4PKPlugin()]
The mixin deliberately doesn’t declare
id– pgcraft’s idiom is that primary keys are plugin-owned, and declaring it on the mixin would collide with the plugin’s column at table-build time. TheTYPE_CHECKINGannotation below keepsfile.idtyped for the action helpers without committing to a column.A row with
uploaded_at is Nonerepresents a file the server has reserved a key for (and handed the client a presigned PUT URL) but whose upload hasn’t yet been confirmed. Consumers typically clear or expire these rows on a schedule.- content_type: Mapped[str | None] = <sqlalchemy.orm.properties.MappedColumn object>¶
MIME type the client declared at upload time, when known.
- original_filename: Mapped[str | None] = <sqlalchemy.orm.properties.MappedColumn object>¶
Filename the client supplied; useful for
Content-Dispositionon download. Not used for storage – the canonical name iss3_key.
- s3_key: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>¶
Object key in the storage bucket. Unique so a row maps to exactly one blob; collision is a programming error, not a race.
- class S3Storage(bucket, region=None, endpoint_url=None, client_factory=<function client>)[source]¶
boto3-backed S3 client wrapper.The constructor takes explicit config so tests can build an instance pointed at a stub or a localstack endpoint without setting env vars.
default_storage()is the env-driven factory for production use.client_factoryis plumbed through so tests can inject aMagicMockinstead of a realboto3.client.- property client: Any[source]¶
Lazily-built
boto3S3 client.Cached so a single
S3Storageinstance reuses one connection pool across calls.
- client_factory(**kwargs)¶
Create a low-level service client by name using the default session.
See
boto3.session.Session.client().
- delete(key)[source]¶
Delete the object at key.
S3’s
DeleteObjectis idempotent – deleting a missing key returns 204 the same as deleting an existing one – so callers don’t need to guard against double-delete races.- Return type:
- class UploadRequest(**data)[source]¶
Body for the request-upload action.
Carries everything
request_upload()needs to reserve a key and bind the presigned PUT URL to the right content type.- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class UploadResponse(**data)[source]¶
Response for the request-upload action.
The client PUTs the file bytes to
upload_url(it must send a matchingContent-Typeheader), then calls the complete-upload action withidto flip the row out of pending state.- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- async complete_upload(file, *, db)[source]¶
Mark file as uploaded.
Returns
Noneso the action op emits 204 No Content – a completed upload has no useful body to return; the client already knows the id.Issues a Core
UPDATErather than mutating the loaded ORM instance, so the persistence path is identical regardless of whether the caller’s session has autoflush quirks. Idempotent – calling twice just refreshes the timestamp.- Return type:
- default_storage()[source]¶
Build an
S3StoragefromFSH_S3_*env vars.Reads:
FSH_S3_BUCKET– bucket name (required).FSH_S3_REGION– AWS region; optional, falls back to the boto3 default chain.FSH_S3_ENDPOINT_URL– override for MinIO / localstack / non-AWS S3-compatible endpoints; optional.
- Raises:
RuntimeError – When
FSH_S3_BUCKETis not set.- Return type:
- async delete_file(file, *, db)[source]¶
Cascade-delete file: remove the S3 object then the row.
Returns
Noneso the action op emits 204 No Content – the client doesn’t need a body to know the row is gone.S3 first because
S3Storage.delete()is idempotent – a crash between the two steps leaves an orphan row, which the next delete attempt cleans up. Reversing the order would instead leak S3 objects, which are harder to find later.- Return type:
- async download(file, *, db)[source]¶
Return a presigned GET URL for file.
Refuses with 404 when
uploaded_at is None– the row exists but the client never confirmed the PUT, so the object may not be in S3 and a presigned URL would just 404 noisily.- Return type:
- async request_upload(*, model_cls, db, body)[source]¶
Reserve a key and return a presigned PUT URL.
The row is created with
uploaded_at=NULL; the client confirms the actual byte upload viacomplete_upload().model_cls is supplied by the action handler, which detects the
type[FileMixin]annotation and passes the resource’s mapped class. No per-resource factory binding needed – consumers point a resource’sactionconfig at this function directly.- Return type:
Body schema for the project-wide value-provider endpoint.
Carries the autocomplete query and an optional limit. The endpoint is intentionally single-page (no cursor): autocomplete UX narrows by typing more characters, not by paginating.
- class FilterValuesRequest(**data)[source]¶
Common search params used by every value-provider runner.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- resolved_limit(req_limit)[source]¶
Clamp req_limit to
[1, _MAX_LIMIT]with a sensible default.- Return type:
Filter-clause construction for typed Pydantic filter trees.
- FilterOp¶
Operator keys accepted by a condition node’s
opfield.Callers’ Pydantic condition models should declare
opas aLiteralover this set (or a subset of it).alias of
Literal[‘eq’, ‘neq’, ‘gt’, ‘gte’, ‘lt’, ‘lte’, ‘contains’, ‘starts_with’, ‘in’, ‘is_null’]
- apply_filters(stmt, node, model)[source]¶
Build WHERE clauses from a typed filter expression.
Accepts a typed Pydantic filter model – either a single condition (with
field,op,value) or a combiner (withand_/or_lists of nested conditions). Models that match none of these shapes are treated as a no-op and the statement is returned unchanged.Noneis also a no-op so call sites can invokeapply_filters(stmt, body.filter, Model)unconditionally without an outerif body.filter is not Nonebranch.- Parameters:
- Return type:
- Returns:
The statement with WHERE clauses applied.
- apply_search(stmt, model, columns, q, strategy='trigram')[source]¶
Filter stmt to rows where any of columns matches q.
Powers the list endpoint’s free-text search box – the typed counterpart to
apply_filters(), driven by the resource’s configuredsearchfields rather than an explicit filter tree.strategyis chosen per resource in the BE config (SearchConfig.strategy):"trigram"– each column is matched two ways and OR’d: apg_trgmsimilarity hit (the%operator) for fuzzy, typo-tolerant matches, plus a case-insensitive substring (ILIKE) so queries too short to form a trigram still match. Requires thepg_trgmextension.columnsare the resource’s text columns."tsvector"– each column is a maintainedtsvectorcolumn matched with@@againstwebsearch_to_tsquery. Word/lexeme search with stemming – right for prose columns.columnsaretsvectorcolumn name(s); theenglishconfig here must match the one the column was built with.
qofNoneor blank is a no-op, and so is an emptycolumns, so call sites can invoke this unconditionally without an outer guard.- Parameters:
stmt (
Select) – The SQLAlchemy SELECT statement to filter.model (
type) – The SQLAlchemy model class providing columns.columns (
Sequence[str]) – Model attribute names to matchqagainst – text columns fortrigram,tsvectorcolumns fortsvector.q (
str|None) – The free-text query, orNoneto skip search.strategy (
Literal['trigram','tsvector']) – Matching strategy –"trigram"(default) or"tsvector".
- Return type:
- Returns:
The statement with the search WHERE clause applied.
Create-form field helper for server-assigned identifiers.
A column whose value the database fills in – an autogenerated
identifier like the 100001 in “Order #100001”, produced by
pgcraft’s PGCraftAutogeneratedIdentifierColumn – must not
appear as a required input on a create form. The client never
supplies it; the BEFORE INSERT trigger does.
Generated create models declare such a field through
autogenerated_identifier_field(). It marks the field
optional and read-only and tags it with the x-autogenerated
OpenAPI extension, mirroring the x-* convention in
fsh_lib.openapi: the FE codegen layer reads the extension and
renders the field as a server-assigned, non-editable value instead
of a form input.
- X_AUTOGENERATED = 'x-autogenerated'¶
OpenAPI schema-extension key carrying the pgcraft identifier name (the “enum” key) that produces a field’s value. Present on every field built by
autogenerated_identifier_field().
- autogenerated_identifier_field(*, name, description=None)[source]¶
Build a pydantic field for a server-assigned identifier.
The returned field is optional (defaults to
None, so a create request may omit it) and read-only, and carries theX_AUTOGENERATEDextension so FE codegen can label it with the originating counter and suppress the input control.Use it as the default of a create-model field:
class OrderCreate(BaseModel): customer: str order_no: str | None = autogenerated_identifier_field( name="order", )
- Parameters:
name (
str) – The pgcraft identifier-counter name (the “enum” key) that produces this value. Surfaced underX_AUTOGENERATEDso the FE can label the field.description (
str|None) – Optional human-readable description for the generated JSON schema.
- Return type:
- Returns:
A pydantic
FieldInfo– typedAnyso it can sit directly as a model-field default – declaring the field optional, read-only, and taggedx-autogenerated.
Server-driven cache invalidation for kiln-generated apps.
Kiln-generated mutations don’t tell the FE what to invalidate –
the BE does, via the X-Invalidate-Queries response header.
Handlers grab a QueryInvalidations collector through
FastAPI’s dependency system and call invalidations.add(key)
for each TanStack queryKey the FE should drop after the call
succeeds; the collector serializes the running list onto the
header.
Generated CRUD handlers wire this dependency automatically and register the resource’s own cache_key, so the typical case needs no hand-coding. The helper is exposed for hand-written handlers that need to opt in or extend the auto-generated set:
from typing import Annotated
from fastapi import APIRouter, Depends
from fsh_lib.invalidation import QueryInvalidations
router = APIRouter(prefix="/projects")
@router.delete("/{id}")
async def delete_project(
id: int,
invalidations: Annotated[
QueryInvalidations, Depends(QueryInvalidations)
],
) -> None:
# ...delete the row...
invalidations.add_all("projects") # list + every detail
# or, narrower: invalidations.add_one("projects", id)
- class QueryInvalidations(response)[source]¶
Per-request collector for TanStack queryKeys to invalidate.
Construct via FastAPI dependency injection (
Depends(QueryInvalidations)); the framework hands us the request’s mutableResponseso eachadd_*call updates the response header eagerly.Two scopes:
add_all()– “blow everything for this resource”. The FE side prefix-matches and drops list caches plus every per-id detail in one shot. Right for create / delete / anything that affects multiple list rows.add_one()– “blow this specific id only”. Just the detail cache for that row; lists are left alone. Useful when you’ve changed one row’s representation but no list could be filtering on the changed field.
- QueryKey = str | int | bool | float | None¶
One segment of a TanStack queryKey. TanStack accepts arbitrary JSON, but in practice keys are arrays of these scalars; we restrict the type so the header stays small and predictable.
Link-shortening primitives for kiln-generated FastAPI projects.
A short link maps a compact, URL-safe code to a longer
target_url. Useful for fitting links into SMS bodies (the
160-character single-segment limit is unforgiving) and for emitting
click counts on outbound comms.
Shortening is explicit: the producer calls shorten() to
swap a long URL for a short one before assembling the message body
(e.g. just before send_communication()). There
is intentionally no auto-applied Jinja filter – the row write
happens at a predictable point, and shortened URLs that never get
sent (because PreferenceResolver filters the
recipient out, or the caller’s transaction rolls back) don’t leak
into the table.
The module ships three primitives, following the same pgcraft-
flavoured idiom as fsh_lib.files – consumer owns the table,
we own the columns:
ShortLinkMixin– pgcraft-compatible mixin supplying the storage columns (code,target_url,click_count).created_atis managed by pgcraft’sTimestampPlugin, whichPGCraftSimpleauto-adds;idis plugin-owned (the consumer attaches e.g.UUIDV4PKPlugin), matching theFileMixinpattern.shorten()– producer entry point. A singleINSERT ... ON CONFLICT DO NOTHING RETURNING codedoes both the dedup (“shorten X twice, get the same code back” becausetarget_urlisUNIQUE) and the code-collision check in one round trip – no SELECT-then-INSERT race window. Returns the full short URL{base_url}/{code}.resolve()– redirect-handler helper. SingleUPDATE ... RETURNINGatomically incrementsShortLinkMixin.click_countand returns the row’starget_url(orNonefor unknown codes). Consumers wire it into a 5-line FastAPI route:from fastapi import APIRouter, HTTPException from fastapi.responses import RedirectResponse from fsh_lib.links import resolve router = APIRouter() @router.get("/l/{code}") async def follow(code: str, db: AsyncSession = Depends(get_db)): url = await resolve(model_cls=ShortLink, db=db, code=code) if url is None: raise HTTPException(status_code=404) return RedirectResponse(url, status_code=302)
- DEFAULT_CODE_LENGTH = 7¶
Default short-code length.
Seven base26 (lowercase-letter) characters give 26**7 ≈ 8.0e9 possible codes. At a million rows the per-insert collision rate is ≈ 1.2e-4 – low enough that
shorten()’s retry loop is overwhelmingly a no-op.Tune via the
code_lengthargument when shorter codes are needed (smaller keyspace, higher collision rate – a 5-char code with 100k rows collides ≈ 0.8% of the time, well withinMAX_CODE_RETRIES) or when longer is acceptable.
- MAX_CODE_RETRIES = 5¶
How many times
shorten()retries a colliding code.A retry triggers when
INSERT ... ON CONFLICT DO NOTHINGreturns no row and the follow-up SELECT confirms it was thecodecollision (not thetarget_urldedup). With default-length codes the loop is virtually never entered; the bound exists so a saturated keyspace (very smallcode_length, many rows) raises loudly instead of looping forever.
- class ShortLinkMixin[source]¶
pgcraft mixin supplying the storage columns of a short link.
Subclass on a pgcraft-mapped model alongside a PK plugin (the plugin owns
id):from fsh_lib.links import ShortLinkMixin from pgcraft.factory import PGCraftSimple from pgcraft.plugins.pk import UUIDV4PKPlugin class ShortLink(Base, ShortLinkMixin): __tablename__ = "short_links" __factory__ = PGCraftSimple __plugins__ = [UUIDV4PKPlugin()]
Like
fsh_lib.files.FileMixin, the mixin deliberately doesn’t declareid– the consumer’s PK plugin owns it.created_atis also pgcraft-owned:PGCraftSimpleauto-addsTimestampPlugin, which injectscreated_atwithserver_default=now().Both
codeandtarget_urlareUNIQUE. The code uniqueness letsshorten()retry on the rare random-code collision; the target_url uniqueness collapses same-URL dedup into the same atomic INSERT (no SELECT-then- INSERT race).- click_count: Mapped[int] = <sqlalchemy.orm.properties.MappedColumn object>¶
Times
resolve()has served this row.BigIntegerso popular links don’t wrap in any realistic horizon.
- class ShortenRequest(**data)[source]¶
Request body for
shorten_action().Field shape matches the dotted-path pattern be’s introspector expects for a collection-scoped action body.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ShortenResponse(**data)[source]¶
Response from
shorten_action().- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- default_base_url()[source]¶
Return the public origin + redirect prefix from env.
Reads
LINK_BASE_URL(e.g."https://l.example.com/l") – the valueshorten()joins with the random code to form{base}/{code}. Mirrorsfsh_lib.files.default_storage()in shape: the action’s resource lookup is env-driven so the same generated handler works across environments.- Raises:
RuntimeError – when
LINK_BASE_URLis not set.- Return type:
- generate_code(length=7)[source]¶
Return a random lowercase-letter code of length characters.
Uses
secrets.choice()– code generation is security- adjacent (a guessable code lets an attacker enumerate link targets), so the CSPRNG matters even though the keyspace is large for default lengths.- Raises:
ValueError – When length is less than 1.
- Return type:
- async resolve(*, model_cls, db, code)[source]¶
Return the target URL for code, or
Noneif unknown.Atomically increments
ShortLinkMixin.click_countand returns the row’starget_urlin a singleUPDATE ... RETURNING– the click counter and the lookup can’t drift even under concurrent redirects.- Parameters:
model_cls (
type[ShortLinkMixin]) – The consumer’s short-link model class.db (
AsyncSession) – Async SQLAlchemy session. The caller commits – if the surrounding transaction rolls back, the click increment is rolled back with it, which is the right behaviour (a failed redirect didn’t actually serve the link).code (
str) – The short code from the request path.
- Return type:
- Returns:
The target URL on hit,
Nonewhen code is unknown.
- async shorten(*, model_cls, db, target_url, base_url, code_length=7)[source]¶
Return a short URL for target_url.
Reuses the existing row for target_url when one is found – “shorten X twice, get the same code back”. The dedup rides the
UNIQUE (target_url)constraint so it’s race-free: two concurrent shortens of the same URL can’t both insert, and the loser’s INSERT bounces off the unique violation rather than racing past a stale SELECT.- Parameters:
model_cls (
type[ShortLinkMixin]) – The consumer’s short-link model class (must mix inShortLinkMixin).db (
AsyncSession) – Async SQLAlchemy session for the insert. The caller commits.target_url (
str) – Long URL to shorten.base_url (
str) – Base URL of the redirect endpoint, without a trailing slash. The returned short URL is{base_url}/{code}. Typically a short domain the redirect route is mounted under, e.g."https://l.example.com".code_length (
int) – Number of base26 letters to generate when inserting a new row. Defaults toDEFAULT_CODE_LENGTH. Has no effect when the dedup hits – the existing row’s code is returned regardless.
- Returns:
{base_url}/{code}.- Return type:
- Raises:
RuntimeError – If
MAX_CODE_RETRIESrandom codes in a row collided with existing rows – almost certainly means code_length is too small for the current row count.
- async shorten_action(*, model_cls, db, body)[source]¶
Collection-scoped action wrapping
shorten().Wired into a generated POST route by be’s action codegen – the
type[ShortLinkMixin]annotation matches any concrete subclass via the introspector’s supertype check, so the same function serves every consumer’s short-link model. Mirrorsfsh_lib.files.request_upload()’s shape.base_urlcomes fromdefault_base_url()(i.e. theLINK_BASE_URLenv var) so the same generated handler works across dev / staging / prod without scaffold-time configuration.- Return type:
Eager-load application for list endpoints.
apply_eager_loads() is the companion of
fsh_lib.ordering.apply_ordering(): it turns a representation’s
EagerLoad plan into select(...).options(...) loader
options. The one piece of coordination is joined – the set of
relationships apply_ordering already LEFT-joined for an
ORDER BY. A top-level relationship in that set is loaded with
contains_eager (reuse the existing join) instead of being
fetched a second time by selectinload / joinedload.
Generated list handlers wire the two together explicitly:
stmt = select(Task)
stmt, joined = apply_ordering(stmt=stmt, sort_clauses=body.sort, ...)
stmt = apply_filters(stmt=stmt, node=body.filter, model=Task)
stmt = apply_eager_loads(
stmt, [EagerLoad(Task.project, "joined")], joined=joined
)
- class EagerLoad(attr, strategy=EagerStrategy.SELECTIN, children=())[source]¶
One relationship to eager-load, with its strategy.
- attr¶
The relationship attribute, e.g.
Task.project.
- strategy¶
The
EagerStrategyto load it with. Overridden bycontains_eagerfor a top-level relationship a sort already joined.
- children¶
Nested eager loads one level deeper, e.g. a
project’s ownowner. Their strategy is always honoured –contains_eagerreuse only applies to a top-level relationship a sort actually joined.
- class EagerStrategy(*values)[source]¶
Eager-loading strategy for an
EagerLoad.The three relationship loader strategies that make sense for a representation dump. Each member’s value is the string token SQLAlchemy’s
relationship(lazy=...)accepts – a precise subset of SQLAlchemy’s (private)_LazyLoadArgumentType.contains_eageris deliberately not a member: it isn’t a per-field choice but is applied automatically byapply_eager_loads()when a sort already joined the relationship.- JOINED = 'joined'¶
LEFT JOIN – right for scalar to-one relationships.
- SELECTIN = 'selectin'¶
Separate
WHERE ... INquery – safe for collections.
- SUBQUERY = 'subquery'¶
Separate query correlated by subquery.
- apply_eager_loads(stmt, loads, joined=None)[source]¶
Apply eager-load options to a SELECT, reusing ordering’s joins.
- Parameters:
stmt (
Select) – The SQLAlchemy SELECT to add loader options to.loads (
Iterable[EagerLoad]) – The representation’sEagerLoadplan.joined (
set[RelationshipProperty] |None) – Relationships already LEFT-joined byfsh_lib.ordering.apply_ordering()(its second return value). A top-level load whose relationship is in this set is loaded withcontains_eagerrather than re-fetched.Nonemeans nothing was joined.
- Return type:
- Returns:
The statement with
.options(...)applied.
kiln’s openapi-extension keys + their runtime constructor.
Routes the BE generates carry two custom extensions consumed by the FE codegen layer:
X_CACHE_KEY("x-cache-key") – string identifying the TanStack Query key root the FE should use for this route’s response cache. Only emitted on read routes (any GET, plus the POST search endpoints that are semantically reads). Its presence is the read signal: the FE generatesuseQuery/useSuspenseQuerywrappers for any GET or any route carrying a cache key, and treats everything else as a mutation – so no separate “this is a query” extension is needed.X_RESOURCE("x-resource") – the resource’s singular slug (the model’s snake name, e.g."task"for thetasksresource). Stamped on every CRUD route (reads and writes alike, unlikeX_CACHE_KEY). It’s the canonical resource identity: the FE groups a resource’s routes by it (to discover the CRUD SDK fns), and derives the saved-viewresource_typediscriminator + default singular display label from it. The plural cache namespace is the separateX_CACHE_KEY(e.g."tasks"); a read route carries both, which is how the FE bridges its plural config key to this singular identity.X_AUTH_ROLE("x-auth-role") –"login"/"validate"/"logout"on the three auth-router routes. Lets the FE derive the auth SDK fns (and, off them, the session + credentials types) instead of naming them in fe.jsonnet.
construct_openapi_extra() builds a dict suitable for
FastAPI’s openapi_extra= kwarg. Generated handlers call it
inline so the extension keys live in exactly one place (this
module). Hand-written handlers can use it the same way.
- construct_openapi_extra(*, cache_key=None, resource=None, auth_role=None)[source]¶
Build the
openapi_extrapayload for a kiln-generated route.- Parameters:
cache_key (
str|None) – TanStack Query key root for the route’s response cache.Noneskips theX_CACHE_KEYentry (right for write routes, which don’t seed any cache). Its presence doubles as the read signal – the FE treats any GET or any cache-keyed route as a query, so a read POST (a search endpoint) just sets this.resource (
str|None) – The resource’s singular slug (model snake name, e.g."task").Noneskips theX_RESOURCEentry. Stamped on both read and write routes; the canonical resource identity the FE groups routes by and derives the saved-view discriminator + label from.auth_role (
str|None) –"login"/"validate"/"logout"on an auth-router route.Noneskips theX_AUTH_ROLEentry. Lets the FE derive its auth SDK fns and the session / credentials types.
- Return type:
- Returns:
A dict suitable for
@router.<method>(..., openapi_extra=...). Empty when no argument is set.
ORDER BY application from typed Pydantic sort clauses.
- apply_ordering(stmt, sort_clauses, model, default_field, default_dir='asc')[source]¶
Apply one or more sort clauses to a SELECT statement.
Each clause is a Pydantic model with
field(an enum whose.valueis the sort field) anddir(SortDirection).A sort field naming a column sorts by that column. A field naming a (to-one) relationship –
"vendor"on a model with avendormany-to-one – is intuited as the related row’s representative column (vendor.name, falling back to its primary key), with the related table LEFT-joined in so rows with a null relationship still appear. The sort field stays the bare relationship name end to end – callers never see or send"vendor.name".When sort_clauses is
Noneor empty, the default field and direction are used.- Parameters:
stmt (
Select) – The SQLAlchemy SELECT statement to sort.sort_clauses (
Sequence[BaseModel] |None) – List of sort clause models, orNone.model (
type) – The SQLAlchemy model class providing columns.default_field (
str) – Field to sort by when no clauses are provided.default_dir (
Literal['asc','desc']) – Direction for the default sort.
- Return type:
- Returns:
(stmt, joined)– the statement with ORDER BY applied, and the set of relationships LEFT-joined to resolve a relationship sort field. Hand the joined set tofsh_lib.loading.apply_eager_loads()so an eager load of one of those relationships reuses the join (contains_eager) instead of fetching it again.
Keyset and offset pagination helpers.
- apply_keyset_pagination(stmt, model, cursor, cursor_field, page_size, max_page_size)[source]¶
Apply keyset (cursor-based) pagination to a SELECT.
Adds a
WHERE cursor_field > cursorclause when a cursor is provided, clamps page_size, and addsLIMIT page_size + 1(the extra row detects whether more results exist).- Parameters:
stmt (
Select) – The SQLAlchemy SELECT statement.model (
type) – The SQLAlchemy model class providing columns.cursor (
Any) – The cursor value (already cast to the correct type), orNone.cursor_field (
str) – Name of the cursor column.page_size (
int) – Requested page size.max_page_size (
int) – Maximum allowed page size.
- Return type:
- Returns:
(paginated_stmt, effective_page_size)tuple. The caller is responsible for executing paginated_stmt.
- apply_offset_pagination(stmt, offset, limit, max_page_size)[source]¶
Apply offset pagination to a SELECT.
Clamps limit to max_page_size, applies
OFFSET/LIMIT, and builds a companionCOUNT(*)statement so the caller can fetch the total alongside the page.- Parameters:
- Return type:
- Returns:
(paginated_stmt, count_stmt, effective_limit)tuple. The caller is responsible for executing both statements.
Client for a Rego (OPA) permissions service.
A kiln permissions service is a stateless Open Policy Agent deployment: it carries the policy (the generic RBAC engine plus any custom rules) but no data. The roles and role bindings live in the backend’s own database; this module ships them – together with the subject, the action, and the resource – to OPA on every call.
Three query modes:
Point check – a decision about one resource (get / create / update / delete).
OpaClient.check()POSTs the decisioninputto/v1/data/<opa_package>/decisionand returns aDecision.Bulk check – many
(action, resource)decisions for one subject in a single round-trip (the actions-envelope case: every row of a list page x every action).OpaClient.check_many()POSTs to/v1/data/<opa_package>/decisionsand returns oneDecisionper item.List filter – a collection request where every candidate row must be checked.
OpaClient.compile_filter()asks OPA to partially evaluate theallowrule with the resource left unknown (the/v1/compileAPI). OPA returns residual conditions on the resource;FilterResult.to_sqlalchemy()turns them into a SQLAlchemyWHEREclause the backend folds into the list query. One round-trip filters the whole collection – no per-row check.
Requires the opa extra (pip install 'kiln-generator[opa]')
for httpx.
Example – point check:
client = OpaClient("http://opa:8181", opa_package="authz")
decision = await client.check(
subject=Subject("user", "alice"),
action="task:update",
resource=ResourceRef("Task", "t-1", {"created_by": "alice"}),
roles={"editor": {"permissions": ["task:read", "task:update"]}},
bindings=[RoleBinding(Subject("user", "alice"), "editor")],
)
if not decision.permit:
raise HTTPException(status_code=403)
Example – list filter:
result = await client.compile_filter(
subject=Subject("user", "alice"),
action="task:list",
resource_type="Task",
roles=roles,
bindings=bindings,
)
stmt = select(Task).where(result.to_sqlalchemy({"id": Task.id}))
- class Condition(field, op, value, negated=False)[source]¶
One residual constraint on a resource field.
A
Conditionis the translated form of a single comparison OPA left unresolved when it partially evaluated the policy – e.g.input.resource.id == "t-1"becomesCondition("id", "eq", "t-1").- field¶
The resource field the constraint is on – the path after
input.resource., dotted for a nested field.
- op¶
The comparison:
"eq","ne","lt","le","gt","ge", or"in".
- value¶
The literal the field is compared against (a list for
"in").
- negated¶
Truewhen OPA emitted the expression negated.
- class Decision(permit, allow, deny, raw)[source]¶
The answer a point permissions check returns.
- permit¶
The bottom line.
Trueiff some grant applies and no veto overrides it – the value the backend gates on.
- allow¶
Trueiff a grant applied (generic RBAC or a custom allow rule), before vetoes.
- deny¶
Trueiff a custom deny rule vetoed the request.
- raw¶
The full
resultobject the service returned, for logging or richer custom decisions.
- class FilterResult(always_allow, always_deny, conjunctions)[source]¶
A compiled list policy: which rows a subject may see.
The outcome of partially evaluating the
allowrule with the resource unknown. It is one of three shapes:always allow – no constraint; every row passes.
always deny – the policy can never hold; no row passes.
conditional –
conjunctionsis an OR of ANDs ofConditionobjects (disjunctive normal form, the shape OPA’s/v1/compilereturns).
- always_allow¶
Every row passes;
to_sqlalchemy()returns a tautology.
- always_deny¶
No row passes;
to_sqlalchemy()returns a contradiction.
- conjunctions¶
OR-of-ANDs residual. Each inner tuple is a conjunction of
Conditionobjects; a row passes when it satisfies any conjunction.
- to_sqlalchemy(columns)[source]¶
Render the residual as a SQLAlchemy boolean clause.
- Parameters:
columns (
Mapping[str,ColumnElement[Any]]) – Maps a resource field name (as it appears inCondition.field) to the SQLAlchemy column it constrains, e.g.{"id": Task.id}.- Returns:
a tautology when
always_allow, a contradiction whenalways_deny, otherwise the OR-of-ANDs.- Return type:
- Raises:
OpaError – A residual constrains a field absent from columns.
- class OpaClient(base_url, *, opa_package='authz', client=None, timeout=5.0)[source]¶
Async client for a kiln Rego permissions service.
Wraps an
httpx.AsyncClientpointed at an OPA decision server. Construct one per application (it is cheap to keep open) and close it on shutdown viaaclose(), or use it as an async context manager.- Parameters:
base_url (
str) – Base URL of the OPA server, e.g."http://opa:8181".opa_package (
str) – Rego package of the decision entrypoint. The point-check path is/v1/data/<opa_package>/decision; the list filter compilesdata.<opa_package>.allow. Defaults to"authz".client (
AsyncClient|None) – An existinghttpx.AsyncClientto use. WhenNone(the default) theOpaClientcreates and owns one, andaclose()closes it.timeout (
float) – Per-request timeout in seconds. Ignored when an explicit client is supplied.
- async check(*, subject, action, resource, roles, bindings)[source]¶
Ask the service whether subject may take action.
A point check – one decision about one resource.
- Parameters:
subject (
Subject) – The principal making the request.action (
str) – The permission string being checked, e.g."task:update".resource (
ResourceRef) – The resource the action concerns.roles (
Mapping[str,Any]) – The role catalogue –{role_name: {"permissions": [...]}}. The backend loads this from its own store; only the roles relevant to bindings need be present.bindings (
Sequence[RoleBinding]) – The subject’s role bindings, loaded by the backend from its own store.
- Return type:
- Returns:
The service’s
Decision.- Raises:
OpaError – The service was unreachable, timed out, or answered with a non-2xx status.
- async check_many(*, subject, roles, bindings, items)[source]¶
Decide a batch of
(action, resource)checks in one call.Every item shares one subject, roles, and bindings – the actions-envelope case: one user, many
row x actionchecks for a whole list page. The service evaluates the same per-one policy for each item, so a page costs a single OPA round-trip rather than one call per item.
- async compile_filter(*, subject, action, resource_type, roles, bindings, unknowns=('input.resource.id',))[source]¶
Compile the list policy into a row filter for resource_type.
Asks OPA to partially evaluate
data.<opa_package>.allowwith the resource left unknown, then translates the residual into aFilterResult. The resource type is supplied as a known value so type-scoped and global bindings resolve fully and only instance-level constraints survive into the residual.- Parameters:
subject (
Subject) – The principal making the request.action (
str) – The collection action, e.g."task:list".resource_type (
str) – The resource type being listed, e.g."Task". Passed as a known value so the residual is purely about instance fields.roles (
Mapping[str,Any]) – The role catalogue (seecheck()).bindings (
Sequence[RoleBinding]) – The subject’s role bindings.unknowns (
Sequence[str]) – Theinputpaths OPA treats as symbolic. Defaults to("input.resource.id",)– enough for generic RBAC. Extend it (e.g. with"input.resource.created_by") when a custom rule constrains other resource fields.
- Return type:
- Returns:
The compiled
FilterResult.- Raises:
OpaError – The service failed, or returned a residual the translator cannot represent as a SQL filter.
- exception OpaError[source]¶
A permissions-service call failed.
Raised when the service is unreachable, times out, answers with a non-2xx status, or returns a residual the translator cannot turn into a SQL filter. The caller decides whether an error means “deny” (fail-closed, the safe default for authz) or “allow” (fail-open) – this module never silently picks one.
- class ResourceRef(type, id=None, attributes=<factory>)[source]¶
The resource a decision concerns.
- type¶
Resource type, e.g.
"Task". Matched against the permission catalogue and against object-scoped bindings.
- id¶
Resource instance id, or
Nonefor a collection-level decision (alist/createwhere no single instance exists yet).
- attributes¶
Extra fields folded into
input.resourceso custom Rego rules can read them – e.g.{"created_by": "alice"}for an ownership rule.typeandidalways take precedence over a same-named attribute key.
- class RoleBinding(subject, role, scope=None)[source]¶
A role bound to a subject, optionally scoped to a resource.
- subject¶
The principal the role is granted to.
- role¶
Name of the granted role. Must be a key of the
rolescatalogue passed to the query method.
- scope¶
Nonefor a global binding (applies to every resource); aResourceRefwith noidfor a type-scoped binding (every instance of a type); aResourceRefwith anidfor an instance-scoped binding.
- class Subject(type, id)[source]¶
The principal a decision is made for.
- type¶
"user"or"token". A binding for one kind never satisfies a request for the other – the RBAC engine matches on the pair.
- id¶
Stable identifier of the principal (a user id, a token id /
jti).
pgqueuer integration for kiln-generated FastAPI projects.
Three helpers — that’s the whole surface be contributes to the
queue story. Everything else (worker run loop, @entrypoint,
CLI) is pgqueuer’s own; use it directly per the upstream docs.
get_queue()(producer) — wraps the asyncpg connection underlying a SQLAlchemyAsyncSessionso jobs enqueue inside the same transaction as the request’s other writes. The job becomes durable when the session commits; if the session rolls back, the job is gone. This is the transactional-outbox pattern, and it’s the one piece pgqueuer doesn’t ship.open_worker_driver()(worker bootstrap) — opens a dedicated asyncpg connection from a DSN, coercing SQLAlchemy’spostgresql+asyncpg://URL form to plainpostgresql://so the same env var works for both sides. Use as the outermostasync within your pgqueuer factory.instrument_entrypoint()(worker observability) — wraps an entrypoint coroutine in an OpenTelemetry consumer span when thefsh-lib[opentelemetry]extra is installed, and returns it untouched otherwise. Safe to call unconditionally from a worker factory; the fsh_lib-supplied dispatch entrypoints apply it themselves.
The two connection helpers assume the database is reached through the asyncpg driver. Other drivers (psycopg, etc.) would need a parallel shim and are not supported today.
- async get_queue(session)[source]¶
Return a
pgqueuer.Queriesbound to session’s connection.Calls to
await queue.enqueue(...)issue SQL on the same asyncpg connection the SQLAlchemy session is using, so they join the session’s transaction. Commit the session and the job is durable; roll back and it never existed.- Parameters:
session (
AsyncSession) – A SQLAlchemy async session backed by the asyncpg driver. The session is checked out from a connection so the underlyingasyncpg.Connectioncan be unwrapped.- Return type:
Queries- Returns:
A
pgqueuer.Querieswhose driver wraps the session’s asyncpg connection.
- instrument_entrypoint(entrypoint, handler)[source]¶
Wrap a pgqueuer entrypoint handler in an OpenTelemetry span.
Returns handler decorated with
fsh_lib.telemetry.traced_entrypoint()when OpenTelemetry is importable, and handler unchanged when it is not — so a worker factory can call this unconditionally without caring whether the consumer installed thefsh-lib[opentelemetry]extra.Importable is not the same as configured: when OTel is installed but the app never builds a
TracerProvider, the wrapped handler emits no-op spans at negligible cost. Real spans start appearing once the consumer’sinit_telemetryruns — so it is safe to instrument an entrypoint before deciding whether a given deployment exports traces at all.- Parameters:
- Return type:
- Returns:
Either the span-wrapped handler or handler itself; both keep the
(job) -> NoneshapePgQueuer.entrypointexpects.
- open_worker_driver(dsn)[source]¶
Open a dedicated worker connection and yield an
AsyncpgDriver.Workers need a long-lived connection of their own so pgqueuer can
LISTENfor new-job notifications on it. Use this as the outermostasync within the factory you hand topgq run, and shape the factory itself as anasynccontextmanager()so pgqueuer’srun_factorykeeps the connection alive for the lifetime of the worker – a plainasync def main() -> PgQueuerexits thisasync withbefore pgqueuer can use the driver and the supervisor crashes withInterfaceError: connection is closed:from contextlib import asynccontextmanager from fsh_lib.queue import open_worker_driver from pgqueuer import PgQueuer @asynccontextmanager async def main(): async with open_worker_driver( os.environ["DATABASE_URL"], ) as driver: pgq = PgQueuer(driver) @pgq.entrypoint("ping") async def ping(job): ... yield pgq
- Parameters:
dsn (
str) – A PostgreSQL DSN. Either plain (postgresql://user:pw@host/db) or SQLAlchemy-shaped (postgresql+asyncpg://...) — the latter is rewritten so the same env var works for both the request path and the worker.- Yields:
An
AsyncpgDriverwrapping the freshly-opened connection.
Rate-limiting primitives for kiln-generated FastAPI projects.
This module’s runtime dependency on slowapi and limits is
gated behind the rate-limit extra. Install with:
pip install 'kiln-generator[rate-limit]'
# or: uv add 'kiln-generator[rate-limit]'
The pieces:
RateLimitBucketMixin– a SQLAlchemy mixin supplying the three columns every counter row needs (key,hits,expires_at). Same idiom asfsh_lib.files.FileMixin: the consumer subclasses it on their own model so they own the table and we own the columns.PostgresStorage– alimits-compatible synchronous storage backend backed by a small dedicated SQLAlchemy engine.slowapi’s enforcement path callslimiter.hit(...)synchronously (not awaited) so an async storage cannot satisfy it; we use a separate sync engine targeting the same Postgres database the rest of the app talks to.build_limiter()– factory that constructs a slowapislowapi.Limiterand wires ourPostgresStoragein as its backing store, swapping out the placeholdermemory://storage slowapi creates internally.default_key_func()– the per-request rate-limit key callable used by default (client IP).
- class PostgresStorage(*, model, session_maker)[source]¶
limits-compatible storage backed by a sync Postgres engine.slowapi’s enforcement path is synchronous (
limiter.hit(...)is not awaited), so an async storage cannot satisfy it. This class uses a dedicated synchronous SQLAlchemy engine pointed at the same Postgres database as the rest of the app – separate connection pool, same data.The counter row is upserted with Postgres
INSERT ... ON CONFLICT DO UPDATE: a hit on a fresh window inserts a row; a hit on an active window incrementshits; a hit on an expired window resetshitstoamountand shiftsexpires_atforward.- STORAGE_SCHEME: list[str] | None = ['postgres-rate-limit']¶
URI scheme this storage registers under. Not used for instantiation –
build_limiter()constructs the storage directly and patches it onto the slowapi limiter – butlimitsrequires the attribute on every storage subclass.Declared as an instance attribute (not
ClassVar) to mirror the baseStorageclass –limitsannotates it as such and aClassVaroverride would conflict at type-check time.
- property base_exceptions: Any¶
Exception class(es)
limitsshould treat as storage failures.The
limitsbase class types this astype[Exception] | tuple[type[Exception], ...]; we narrow to a single class and annotateAnyhere to keep autodoc from cross-referencingtype(which collides with three unrelatedtype:discriminator fields in the be schema). ReturnsSQLAlchemyError.
- check()[source]¶
Return whether the storage is reachable.
limitscalls this opportunistically when a previous call raised; we keep it cheap by issuingSELECT 1rather than touching the bucket table.- Return type:
- get_expiry(key)[source]¶
Return the window expiry for key as a UNIX timestamp.
limitstreats a value in the past as “no active window”.- Return type:
- class RateLimitBucketMixin[source]¶
SQLAlchemy mixin supplying the columns of a rate-limit bucket.
Subclass on a regular SQLAlchemy
Baseto carry the storage columns:from fsh_lib.rate_limit import RateLimitBucketMixin class RateLimitBucket(Base, RateLimitBucketMixin): __tablename__ = "rate_limit_buckets"
Unlike
fsh_lib.files.FileMixin, the natural primary key here iskeyitself (the limit identifier produced by thekey_funcplus the limit string). Declaring itprimary_key=Truemeans the consumer doesn’t need to bring their own PK plugin to use this mixin.The consumer is responsible for migrating the table;
bedoesn’t generate Alembic migrations.- expires_at: Mapped[datetime] = <sqlalchemy.orm.properties.MappedColumn object>¶
When the current window ends. Rows with
expires_at < now()are stale and reset on the next hit.
- build_limiter(*, model, sync_url, key_func=None, default_limits=(), headers_enabled=True, engine=None)[source]¶
Build a slowapi
slowapi.Limiterbacked by Postgres.The returned limiter has its
_storageand_limiterfields swapped out for ourPostgresStorage– slowapi constructs a placeholdermemory://storage internally because its public API only takes a URI, and we replace it rather than going through URI dispatch (the storage needs Python objects – the bucket model and a sessionmaker – that don’t round-trip through a URI).- Parameters:
model (
type[RateLimitBucketMixin]) – The consumer’s bucket model class (must mix inRateLimitBucketMixin).sync_url (
str) – A synchronous Postgres DSN for the rate-limit storage. The app’s main async DSN (postgresql+asyncpg://...) is fine to reuse with the+asyncpgdriver tag stripped.key_func (
Callable[[Request],str] |None) – Per-request key callable. Defaults todefault_key_func()(client IP).default_limits (
Iterable[str]) – Iterable of limit strings applied to every route that doesn’t have its own@limiter.limit(...).headers_enabled (
bool) – Whether slowapi emitsX-RateLimit-*response headers.engine (
Engine|None) – Pre-built sync engine. Optional escape hatch for tests / custom pools; production callers leave itNoneand let the helper build one from sync_url.
- Return type:
Limiter- Returns:
A configured slowapi
slowapi.Limiter.
- default_key_func(request)[source]¶
Default rate-limit key: client IP, falling back to
unknown.Used when
key_funcis not configured. Behind a trusted proxy you almost certainly want to pointkey_funcat a function that readsX-Forwarded-Forinstead – this default deliberately refuses to trust any header.- Return type:
Project-wide resource registry: value-provider engine.
Codegen emits one ResourceRegistry per project, populated
declaratively with one ResourceEntry per resource.
Subscripting it by slug – registry[slug] – yields a
per-resource handle; the generated _values route handler
delegates to it via registry[slug].values(...).
The filter catalog is no longer surfaced at runtime – it lives
in the openapi spec (x-fsh-list) at build time, and the
codegen FE bakes it into per-resource hooks. ResourceRegistry
keeps the value-provider plumbing (trigram autocomplete over enum
choices, ref labels, and free-text search columns).
The class is generic over the slug type (Slug: str = str) so a
codegen consumer can declare ResourceRegistry[ResourceType] and
get type-narrowed slug arguments on every method. The default of
str keeps the class usable from hand-written code that doesn’t
go through codegen.
Value endpoints are single-page – autocomplete UX narrows by typing more characters, not by paginating.
- class Enum(name, enum_class, operators=(FilterOperator.EQ, FilterOperator.IN), kind='enum')[source]¶
Enum-typed filter field.
Discovery emits
{value, label}choices; the values endpoint serves the same listq-filterable through a PostgresVALUESclause.
- FilterField = fsh_lib.resource_registry.Enum | fsh_lib.resource_registry.Ref | fsh_lib.resource_registry.LiteralField | fsh_lib.resource_registry.Bool¶
Sum of every supported filter-field shape.
- class LiteralField(name, type, operators=(FilterOperator.EQ, FilterOperator.GT, FilterOperator.GTE, FilterOperator.LT, FilterOperator.LTE), kind='literal')[source]¶
Numeric / date / datetime input rendered natively on the FE.
- class Ref(name, target, operators=(FilterOperator.EQ, FilterOperator.IN), kind='ref')[source]¶
Filter pointing at another resource (or this one).
The trigram subquery scores against the target’s first
search_columnsentry on itsResourceEntry; targets without any search columns fall back to the stringified pk.
- class ResourceEntry(model, pk, fields=(), search_columns=(), default_rep_class=None, default_rep_serializer=None, object_actions=(), collection_actions=())[source]¶
One resource’s registry-side declaration.
search_columnsare the model attributes used as the default field list when the values endpoint is called with emptyfields— they’re trigram-matched the same way any other field is, so the empty-fields path is just a multi-column search over these defaults.default_rep_classanddefault_rep_serializerdescribe the resource’s cross-resource link shape — the Pydantic class of itsdefault_representationand the async(row, session) -> default_rep_classcallable that produces it. Both areNonewhen the resource doesn’t declare a default representation;ResourceRegistry.hydrate_refs()then returns an empty list for that slug.- collection_actions: tuple[ActionSpec, ...] = ()¶
Collection-scope action specs. Drives the collection-scope
registry[slug].actions(...)path.
- object_actions: tuple[ActionSpec, ...] = ()¶
Object-scope action specs. Drives the per-row
registry[slug].actions(...)path.
- class ResourceRegistry(entries)[source]¶
Project-wide discovery + value-provider dispatcher.
Construct with a
{slug: ResourceEntry}map at module load time. Subscript by slug –registry[slug]– to get a bound handle exposingactions/valuesfor that resource;hydrate_refs()stays on the registry itself since it dispatches on a runtime slug. Stateless after construction – safe to share across requests.Generic over the slug type so a codegen consumer can declare
ResourceRegistry[ResourceType]and get the project’sResourceTypeenum on the subscript.Slugdefaults tostrfor non-codegen use.- async hydrate_refs(resource, ids, db, session)[source]¶
Fetch ids of resource and serialize them via its default rep.
Used by
fsh_lib.saved_views.hydrate_view()(and anything else dispatching by slug at runtime) to turn raw ref ids into hydrated link payloads. Lenient on missing slugs and dropped ids: an unknown resource, a resource without a default representation, or an empty ids list all return[]; ids that don’t resolve to a row are silently skipped. Order of returned items mirrors ids.
- class ValuesPage(**data)[source]¶
Response shape for
POST /_values.Single-page only — autocomplete UX narrows by typing more characters, not by paginating.
resultsis[{"value": ..., "label": ...}]for enum / free-text / single-field paths and the consumer’s link-payload shape (alreadymodel_dump-ed) for resource search. Multi-column union results add a"field"key indicating the source column.- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Per-user saved views: mixin + payload schemas + hydration.
A saved view is a named filter+sort state stored on behalf of a
single user. Mirrors the fsh_lib.files.FileMixin idiom:
the consumer subclasses SavedViewMixin on their own
DeclarativeBase and defines a normal kiln resource pointing
at it.
A single mixed-in model serves every opted-in resource;
resource_type discriminates rows so the codegen-generated
CRUD scopes reads and writes per resource.
Stored payloads keep raw filter values, including raw ids on
ref values. Read paths run those ids through
hydrate_view(), which dispatches by slug through the
project-wide fsh_lib.resource_registry.ResourceRegistry.hydrate_refs()
to produce hydrated items. Stale or invisible refs are
silently skipped.
- HydrateRefs = 'Callable[[str, list[Any], AsyncSession, Any], Awaitable[list[dict[str, Any]]]]'¶
Type alias for the slug-keyed ref hydrator.
await fn(resource_slug, ids, db, session)returns hydrated link payloads (model_dump()-ed) for the rows matching ids. Returns[]for unknown slugs or empty ids.fsh_lib.resource_registry.ResourceRegistry.hydrate_refs()satisfies this shape. Kept as a string to avoid forcing SQLAlchemy / typing imports at module load.
- class SavedViewCreate(**data)[source]¶
Request body for
POST /views.payloadis the raw filter+sort spec — same shape asSavedViewUpdate’s, butnameis required.- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class SavedViewFilterEntry(**data)[source]¶
One filter committed to the saved-view payload.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- SavedViewFilterValue = str | int | float | bool | list[str] | fsh_lib.saved_views.SavedViewRefValue | None¶
Union of legal filter values after hydration.
Mirrors the FE’s
SavedViewFilterValue– primitives / multi-select arrays / hydrated ref values pass through,Nonemeans “filter not active”.
- class SavedViewMixin[source]¶
SQLAlchemy mixin supplying the columns of a saved-view row.
Subclass on a
DeclarativeBase-derived class:from fsh_lib.saved_views import SavedViewMixin class SavedView(Base, SavedViewMixin): __tablename__ = "saved_views"
Then point each opted-in resource at the model:
{ model: "myapp.models.Product", saved_views: { model: "myapp.models.SavedView" }, representations: [ { name: "default", fields: [ { name: "id", type: "uuid" }, { name: "name", type: "str" }, ], }, ], default_representation: "default", // ... }
The mixin owns no primary key,
created_at, orupdated_at— pgcraft’sUUIDV4PKPluginandTimestampPlugininject those columns at factory run time when the consumer attaches them via__plugins__. Indexes onresource_typeandowner_idare recommended; both columns drive every read filter.- order_index: Mapped[int] = <sqlalchemy.orm.properties.MappedColumn object>¶
Per-(owner, resource_type) display order. Lower values sort earlier; ties broken by
created_atso newly-created views still land at a deterministic spot when the FE hasn’t yet stamped an explicit index. The generated list route’sordermodifier sorts by this column ascending, and the update route exposes it as a writable field so the FE persists drag-reorders by PATCHing each affected row’s new index.
- owner_id: Mapped[str] = <sqlalchemy.orm.properties.MappedColumn object>¶
Stringified user id. Saved views are per-user; the generated routes filter by
owner_id == str(session.<attr>)where<attr>isuser_id_attr.
- class SavedViewPayloadResponse(**data)[source]¶
Hydrated payload returned by
hydrate_view().- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class SavedViewRefValue(**data)[source]¶
Hydrated
ref/selffilter value.Stored as
{kind, type, ids}on write;hydrate_view()populatesitemswith the labelled rows from the per-resource serializer so the FE can render a chip without a follow-up fetch.itemspayload-shape is per-resource and stays open (dict[str, Any]); the FE narrows it via the slug-keyed serializer registry on its end.- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class SavedViewResponse(**data)[source]¶
Typed response shape for saved-view read / write routes.
Mirrors the dict
hydrate_view()returns. Wired as the operation’sresponse_modelso the generated FastAPI route declares it (and openapi-ts produces a typed FE return) instead of falling back todict[str, Any].idisUUID(notstr) so the ORM column flows in raw – Pydantic / FastAPI serialise it to the canonical hex form on the wire and openapi-ts surfaces astringtyped as a uuid format.created_at/updated_ataredatetimefor the same reason: ISO-format serialisation happens at the JSON boundary, not in the hydration call site.order_indexcarries the row’s current drag-reorder position so the FE knows the persisted tab order without a separate query; the generated list route sorts by it ascending.- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class SavedViewSort(**data)[source]¶
Stable serialised sort descriptor.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class SavedViewUpdate(**data)[source]¶
Request body for
PATCH /views/{id}.Both fields optional; missing fields leave the stored value untouched.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- async hydrate_view(view, hydrate_refs, db, session)[source]¶
Return the typed response payload for one saved view.
Walks each entry in
view.payload["filters"]; for entries withvalue.kind in {"ref", "self"}, calls hydrate_refs with the slug + ids and replacesidswith the returneditems. Stale or invisible rows are silently skipped so the dump never throws because of dangling refs.Pydantic validation happens at construction so a stored payload that drifts from the schema (legacy data, manual SQL edits) raises here rather than silently shipping the bad shape to clients.
Always called against a persisted row (the route hands us the row it just inserted / fetched), so
view.idis set – typed asMapped[uuid.UUID]non-optional on the mixin.- Return type:
Runtime helpers for OpenTelemetry-instrumented be apps.
This module is imported by the telemetry/setup.py and
telemetry/decorators.py files generated by
be.operations.telemetry.TelemetryScaffold. Generated
apps without telemetry never import this module.
OpenTelemetry is an optional runtime dependency of be. Install
it via pip install kiln-generator[opentelemetry], or directly
from the pinned telemetry/requirements.txt that the scaffold
emits next to the generated app. Importing this module without
OTel installed raises ImportError.
The decorators here create internal spans named {resource}.{op}
that survive ASGI middleware reordering and carry low-cardinality
be.resource / be.op attributes for filtering. They
compose cleanly with the request span emitted by
opentelemetry.instrumentation.fastapi.FastAPIInstrumentor.
- ATTR_OP = 'be.op'¶
Span attribute carrying the operation name (e.g.
"get").
- ATTR_RESOURCE = 'be.resource'¶
Span attribute carrying the resource name (e.g.
"article").
- ExporterName¶
Accepted values of the
exporterparameter tobuild_tracer_provider(). Mirrorsbe.config.schema.ExporterName.alias of
Literal[‘otlp_http’, ‘otlp_grpc’, ‘console’, ‘none’]
- SamplerName¶
Accepted values of the
samplerparameter tobuild_tracer_provider(). Mirrorsbe.config.schema.SamplerName– duplicated here rather than imported becausefsh_libis a runtime package and must not depend onbe.alias of
Literal[‘always_on’, ‘always_off’, ‘parentbased_always_on’, ‘parentbased_always_off’, ‘parentbased_traceidratio’, ‘traceidratio’]
- build_logger_provider(*, service_name, service_version=None, environment_env=None, resource_attributes=None)[source]¶
Build a
LoggerProviderconfigured via OTLP env vars.Off by default in
TelemetryConfig; only used when the consumer opts in to log export. The provider on its own is not enough – the generatedinit_telemetryalso attaches aopentelemetry.sdk._logs.LoggingHandlerto the stdlib root logger sologging.getLogger().info(...)calls flow through OTLP.- Return type:
LoggerProvider
- build_meter_provider(*, service_name, service_version=None, environment_env=None, resource_attributes=None)[source]¶
Build a
MeterProviderreading exporter config from env.The metrics exporter is always selected via the standard
OTEL_METRICS_EXPORTERenv var (defaultotlp); we don’t expose a per-config override because metrics deployments almost universally pair with the same OTLP endpoint as traces.- Return type:
MeterProvider
- build_tracer_provider(*, service_name, service_version=None, environment_env=None, resource_attributes=None, sampler='parentbased_always_on', sampler_ratio=None, exporter=None)[source]¶
Build and return a configured
TracerProvider.The caller is responsible for installing it via
opentelemetry.trace.set_tracer_provider(). Splitting construction from installation keeps the helper testable without touching the global tracer provider.environment_envis the name of the environment variable holding the deployment-environment value (e.g."ENVIRONMENT"), not the value itself – the same generated artifact deploys across dev / staging / prod, so the value is resolved at startup.Noneskips the lookup; an unset / empty variable does the same.The OTLP endpoint and headers are read by the SDK from the standard
OTEL_EXPORTER_OTLP_*environment variables – there is no kiln-side override for the variable names.- Return type:
TracerProvider
- scrub_current_span_attributes(*keys)[source]¶
Replace named attributes on the current span with a placeholder.
The auth router wraps its login/logout handlers in a span that would otherwise carry the credentials request body and the session response body once
capture_request_body/capture_response_bodyare turned on globally. This helper is invoked from inside those handlers to overwrite specific attribute keys (set by a future body-capture hook) with[scrubbed]– a placeholder rather than removal so a “missing http.request.body” alert doesn’t mask an outage.Independent of the global capture toggles: even if the consumer never turns on body capture, the auth router still calls this so any third-party span processor that chooses to attach bodies sees the scrubbed value.
- Return type:
- traced_entrypoint(entrypoint)[source]¶
Wrap a pgqueuer entrypoint coroutine in a consumer span.
pgqueuer entrypoints are async
(job) -> Nonecoroutines thepgqworker runs outside any HTTP request – there is no surroundingFastAPIInstrumentorserver span to root them under or to record their failures. This decorator gives every job its own root span so worker activity lands in the same trace backend as the request path.In pgqueuer the entrypoint is the queue – jobs are routed to handlers purely by entrypoint name – so entrypoint is emitted as the OTel
messaging.destination.name, the queue/topic key a backend groups consumer traffic by.The span is a standalone root (
SpanKind.CONSUMER); it is deliberately not linked to the trace of the request that enqueued the job, since pgqueuer payloads are opaquebyteswith no envelope to carry atraceparent– producer and worker traces stay separate.Every attribute on the span is low-cardinality – the entrypoint name and the job priority, each from a small fixed set – so the span is safe to group by in dashboards and span-derived metrics. The unbounded per-job id is intentionally left off the span; it is carried on the worker’s log records instead, which share this span’s trace id when log correlation is enabled.
Unlike
traced_handler(), exceptions raised by the wrapped coroutine are recorded and the span status set toERROR(thestart_as_current_spandefaults, left in place here). A raising entrypoint is pgqueuer’s failure signal – the job is retried or moved to the dead-letter log – and with no server span downstream this span is the only place that failure surfaces in a trace.- Parameters:
entrypoint (
str) – The pgqueuer entrypoint (== queue) name the handler is registered under, e.g."fsh_lib_reports_dispatch". Low-cardinality: it names the span and is attached asmessaging.destination.name.- Return type:
Callable[[Callable[...,Awaitable[Any]]],Callable[...,Awaitable[Any]]]- Returns:
A decorator wrapping an async
(job, ...) -> Anycoroutine and preserving its signature, soPgQueuer.entrypointstill sees the original callable shape.
- traced_handler(span_name, *, resource, op)[source]¶
Wrap an async route handler in an internal span.
Used for both CRUD ops and user-defined actions – action names (
publish,archive, …) flow through the sameopparameter as CRUD names (get,list, …) and end up on the sameATTR_OPattribute. Distinguishing CRUD from actions in dashboards is left to the value: be’s CRUD names are a fixed small set, anything else is user-defined.Exceptions raised from the handler are deliberately not recorded on this span and the span status is left
OK. The final HTTP status – including FastAPI’s conversion ofHTTPExceptioninto 4xx / 5xx responses – is captured by the surroundingFastAPIInstrumentorserver span, which is the authoritative signal for “did the request succeed.” Recording here would double-count routine flow-control exceptions (HTTPException(404)fromget_object_from_query_or_404,HTTPException(401)from the auth dep) as backend errors.- Parameters:
span_name (
str) – Span name, conventionallyf"{resource}.{op}".resource (
str) – Low-cardinality resource label (e.g."article") attached asATTR_RESOURCE.op (
str) – Low-cardinality op label (e.g."get","publish") attached asATTR_OP.
- Return type:
Callable[[Callable[...,Awaitable[Any]]],Callable[...,Awaitable[Any]]]
The returned decorator preserves the wrapped function’s signature so FastAPI’s dependency-injection introspection still works.
Body→kwargs transformer helpers shared by generated builders.
Generated update builders one-line through build_patch_kwargs()
to read body.model_fields_set and emit a column-kwargs dict the
SQL update(...).values(**...) consumes. The same one-shot
extraction would otherwise be repeated per resource as a chain of
if "<field>" in body.model_fields_set: out["<field>"] = ... lines
in every transformer module.
- build_patch_kwargs(body, fields)[source]¶
Project the explicitly-set subset of fields off body into a dict.
PATCH semantics: a field that the client did not send must not appear in the returned dict, so the SQL
UPDATEonly touches columns the client meant to change.body.model_fields_setis pydantic’s record of which attributes were populated from input (vs. left at their default), so iterating it is the correct gate – aNonevalue the client explicitly sent still passes through.- Parameters:
- Return type:
- Returns:
A
dictmapping each set field to its value on body. The caller typically annotates the receiving variable with the per-resource{Resource}UpdateKwargsTypedDict(total=False) so type-checkers see the result as the expected partial shape.
General-purpose runtime utilities used by generated apps.
Three unrelated concerns share this module by convention –
get_object_from_query_or_404() used by every read-or-mutate
CRUD handler, the run_once() decorator used by the
generated telemetry init, and compile_query() for tests
that assert against rendered SQL. Bundling them here keeps the
public fsh_lib surface flat enough that consumers learn one
import path (from fsh_lib.utils import ...) for everything that
doesn’t fit under a more specific submodule.
- compile_query(stmt, *, dialect=None, literal_binds=True)[source]¶
Render a SQLAlchemy statement to a single SQL string.
Test-oriented helper: tests that assert against generated SQL (locking modifiers, where-clause shape, computed expressions) repeatedly spell
str(stmt.compile(compile_kwargs={"literal_binds": True}))and frequently need a Postgres dialect to surface pg-specific syntax (SKIP LOCKED,ON CONFLICT, …). Centralising the boilerplate keeps assertions readable and avoids per-test imports of the dialect submodule.- Parameters:
stmt (
ClauseElement) – Any SQLAlchemy clause –select(),insert(),update(), rawtext(), etc.dialect (
Literal['postgres','postgresql','sqlite'] |None) – Optional dialect name.None(the default) uses SQLAlchemy’s generic compiler, which strips dialect-specific clauses (FOR UPDATEsurvives;SKIP LOCKEDdoes not). Pass"postgres"to render Postgres SQL or"sqlite"for sqlite.literal_binds (
bool) – WhenTrue(the default), bound parameters render inline –WHERE id = 'abc'rather thanWHERE id = :id_1. SetFalseto inspect the parameter map separately (viastmt.compile().params).
- Return type:
- Returns:
Compiled SQL as a string.
- async get_object_from_query_or_404(db, stmt, *, detail='Not found')[source]¶
Execute stmt and return the first row, or raise HTTP 404.
- Parameters:
db (
AsyncSession) – The async database session.stmt (
Any) – A SQLAlchemy selectable statement.detail (
str) – The error message for the 404 response.
- Return type:
- Returns:
The first row from the result set.
- Raises:
HTTPException – With status 404 when no row is found.
- run_once(fn)[source]¶
Idempotency decorator: run
fnonce, return its result thereafter.Unlike
functools.cache(), the gate is argument-blind – a second call with a different argument set is still a no-op, not a fresh execution keyed on the new args. This is the correct shape for one-shot setup functions (init_telemetry, where a secondinit_telemetry(app2)must not install a second tracer provider) and for factory singletons (an OPA client built once at first use and reused thereafter).The first call’s return value is cached and returned on every later call. Setup-only callers that ignore the return value are unaffected; factory callers
@run_oncetheir constructor and read the cached instance.