Source code for fsh_lib.queue

"""pgqueuer integration for kiln-generated FastAPI projects.

Three helpers — that's the whole surface be contributes to the
queue story.  Everything else (worker run loop, ``@entrypoint``,
CLI) is pgqueuer's own; use it directly per the upstream docs.

* :func:`get_queue` (producer) — wraps the asyncpg connection
  underlying a SQLAlchemy ``AsyncSession`` so jobs enqueue inside
  the *same* transaction as the request's other writes.  The job
  becomes durable when the session commits; if the session rolls
  back, the job is gone.  This is the transactional-outbox
  pattern, and it's the one piece pgqueuer doesn't ship.

* :func:`open_worker_driver` (worker bootstrap) — opens a
  dedicated asyncpg connection from a DSN, coercing SQLAlchemy's
  ``postgresql+asyncpg://`` URL form to plain ``postgresql://``
  so the same env var works for both sides.  Use as the outermost
  ``async with`` in your pgqueuer factory.

* :func:`instrument_entrypoint` (worker observability) — wraps an
  entrypoint coroutine in an OpenTelemetry consumer span when the
  ``fsh-lib[opentelemetry]`` extra is installed, and returns it
  untouched otherwise.  Safe to call unconditionally from a worker
  factory; the fsh_lib-supplied dispatch entrypoints apply it
  themselves.

The two connection helpers assume the database is reached through
the asyncpg driver.  Other drivers (psycopg, etc.) would need a
parallel shim and are not supported today.
"""

from contextlib import asynccontextmanager
from typing import TYPE_CHECKING

import asyncpg
from pgqueuer import Queries
from pgqueuer.db import AsyncpgDriver

if TYPE_CHECKING:
    from collections.abc import AsyncIterator, Awaitable, Callable

    from sqlalchemy.ext.asyncio import AsyncSession

_SQLALCHEMY_ASYNCPG_PREFIX = "postgresql+asyncpg://"
_PLAIN_POSTGRES_PREFIX = "postgresql://"


[docs] async def get_queue(session: AsyncSession) -> Queries: """Return a ``pgqueuer.Queries`` bound to *session*'s connection. Calls to ``await queue.enqueue(...)`` issue SQL on the same asyncpg connection the SQLAlchemy session is using, so they join the session's transaction. Commit the session and the job is durable; roll back and it never existed. Args: session: A SQLAlchemy async session backed by the asyncpg driver. The session is checked out from a connection so the underlying ``asyncpg.Connection`` can be unwrapped. Returns: A ``pgqueuer.Queries`` whose driver wraps the session's asyncpg connection. """ raw_connection = await session.connection() asyncpg_wrapper = await raw_connection.get_raw_connection() driver_connection = asyncpg_wrapper.driver_connection if driver_connection is None: msg = ( "Session is not backed by a live driver connection — " "ensure the engine uses postgresql+asyncpg:// and the " "session is checked out before calling get_queue()." ) raise RuntimeError(msg) return Queries(AsyncpgDriver(driver_connection))
def _coerce_to_asyncpg_dsn(dsn: str) -> str: """Strip SQLAlchemy's ``+asyncpg`` prefix if present. SQLAlchemy URLs use ``postgresql+asyncpg://...``; raw asyncpg wants ``postgresql://...``. Other prefixes pass through unchanged, so a caller who already supplies a plain DSN needs no special-casing. """ if dsn.startswith(_SQLALCHEMY_ASYNCPG_PREFIX): return _PLAIN_POSTGRES_PREFIX + dsn[len(_SQLALCHEMY_ASYNCPG_PREFIX) :] return dsn
[docs] def instrument_entrypoint( entrypoint: str, handler: Callable[..., Awaitable[None]], ) -> Callable[..., Awaitable[None]]: """Wrap a pgqueuer entrypoint handler in an OpenTelemetry span. Returns *handler* decorated with :func:`fsh_lib.telemetry.traced_entrypoint` when OpenTelemetry is importable, and *handler* unchanged when it is not — so a worker factory can call this unconditionally without caring whether the consumer installed the ``fsh-lib[opentelemetry]`` extra. Importable is not the same as configured: when OTel is installed but the app never builds a ``TracerProvider``, the wrapped handler emits no-op spans at negligible cost. Real spans start appearing once the consumer's ``init_telemetry`` runs — so it is safe to instrument an entrypoint before deciding whether a given deployment exports traces at all. Args: entrypoint: The pgqueuer entrypoint (== queue) name the handler is registered under — names the span and is attached as the ``messaging.destination.name`` attribute. handler: The async ``(job) -> None`` coroutine pgqueuer runs for that entrypoint. Returns: Either the span-wrapped handler or *handler* itself; both keep the ``(job) -> None`` shape ``PgQueuer.entrypoint`` expects. """ try: from fsh_lib.telemetry import traced_entrypoint # noqa: PLC0415 except ImportError: return handler return traced_entrypoint(entrypoint)(handler)
[docs] @asynccontextmanager async def open_worker_driver(dsn: str) -> AsyncIterator[AsyncpgDriver]: """Open a dedicated worker connection and yield an :class:`AsyncpgDriver`. Workers need a long-lived connection of their own so pgqueuer can ``LISTEN`` for new-job notifications on it. Use this as the outermost ``async with`` in the factory you hand to ``pgq run``, and shape the factory itself as an :func:`~contextlib.asynccontextmanager` so pgqueuer's ``run_factory`` keeps the connection alive for the lifetime of the worker -- a plain ``async def main() -> PgQueuer`` exits this ``async with`` before pgqueuer can use the driver and the supervisor crashes with ``InterfaceError: connection is closed``:: from contextlib import asynccontextmanager from fsh_lib.queue import open_worker_driver from pgqueuer import PgQueuer @asynccontextmanager async def main(): async with open_worker_driver( os.environ["DATABASE_URL"], ) as driver: pgq = PgQueuer(driver) @pgq.entrypoint("ping") async def ping(job): ... yield pgq Args: dsn: A PostgreSQL DSN. Either plain (``postgresql://user:pw@host/db``) or SQLAlchemy-shaped (``postgresql+asyncpg://...``) — the latter is rewritten so the same env var works for both the request path and the worker. Yields: An :class:`AsyncpgDriver` wrapping the freshly-opened connection. """ connection = await asyncpg.connect(_coerce_to_asyncpg_dsn(dsn)) try: yield AsyncpgDriver(connection) finally: await connection.close()