Source code for fsh_lib.queue
"""pgqueuer integration for kiln-generated FastAPI projects.
Three helpers — that's the whole surface be contributes to the
queue story. Everything else (worker run loop, ``@entrypoint``,
CLI) is pgqueuer's own; use it directly per the upstream docs.
* :func:`get_queue` (producer) — wraps the asyncpg connection
underlying a SQLAlchemy ``AsyncSession`` so jobs enqueue inside
the *same* transaction as the request's other writes. The job
becomes durable when the session commits; if the session rolls
back, the job is gone. This is the transactional-outbox
pattern, and it's the one piece pgqueuer doesn't ship.
* :func:`open_worker_driver` (worker bootstrap) — opens a
dedicated asyncpg connection from a DSN, coercing SQLAlchemy's
``postgresql+asyncpg://`` URL form to plain ``postgresql://``
so the same env var works for both sides. Use as the outermost
``async with`` in your pgqueuer factory.
* :func:`instrument_entrypoint` (worker observability) — wraps an
entrypoint coroutine in an OpenTelemetry consumer span when the
``fsh-lib[opentelemetry]`` extra is installed, and returns it
untouched otherwise. Safe to call unconditionally from a worker
factory; the fsh_lib-supplied dispatch entrypoints apply it
themselves.
The two connection helpers assume the database is reached through
the asyncpg driver. Other drivers (psycopg, etc.) would need a
parallel shim and are not supported today.
"""
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING
import asyncpg
from pgqueuer import Queries
from pgqueuer.db import AsyncpgDriver
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Awaitable, Callable
from sqlalchemy.ext.asyncio import AsyncSession
_SQLALCHEMY_ASYNCPG_PREFIX = "postgresql+asyncpg://"
_PLAIN_POSTGRES_PREFIX = "postgresql://"
[docs]
async def get_queue(session: AsyncSession) -> Queries:
"""Return a ``pgqueuer.Queries`` bound to *session*'s connection.
Calls to ``await queue.enqueue(...)`` issue SQL on the same
asyncpg connection the SQLAlchemy session is using, so they
join the session's transaction. Commit the session and the
job is durable; roll back and it never existed.
Args:
session: A SQLAlchemy async session backed by the asyncpg
driver. The session is checked out from a connection
so the underlying ``asyncpg.Connection`` can be
unwrapped.
Returns:
A ``pgqueuer.Queries`` whose driver wraps the
session's asyncpg connection.
"""
raw_connection = await session.connection()
asyncpg_wrapper = await raw_connection.get_raw_connection()
driver_connection = asyncpg_wrapper.driver_connection
if driver_connection is None:
msg = (
"Session is not backed by a live driver connection — "
"ensure the engine uses postgresql+asyncpg:// and the "
"session is checked out before calling get_queue()."
)
raise RuntimeError(msg)
return Queries(AsyncpgDriver(driver_connection))
def _coerce_to_asyncpg_dsn(dsn: str) -> str:
"""Strip SQLAlchemy's ``+asyncpg`` prefix if present.
SQLAlchemy URLs use ``postgresql+asyncpg://...``; raw asyncpg
wants ``postgresql://...``. Other prefixes pass through
unchanged, so a caller who already supplies a plain DSN
needs no special-casing.
"""
if dsn.startswith(_SQLALCHEMY_ASYNCPG_PREFIX):
return _PLAIN_POSTGRES_PREFIX + dsn[len(_SQLALCHEMY_ASYNCPG_PREFIX) :]
return dsn
[docs]
def instrument_entrypoint(
entrypoint: str,
handler: Callable[..., Awaitable[None]],
) -> Callable[..., Awaitable[None]]:
"""Wrap a pgqueuer entrypoint handler in an OpenTelemetry span.
Returns *handler* decorated with
:func:`fsh_lib.telemetry.traced_entrypoint` when OpenTelemetry is
importable, and *handler* unchanged when it is not — so a worker
factory can call this unconditionally without caring whether the
consumer installed the ``fsh-lib[opentelemetry]`` extra.
Importable is not the same as configured: when OTel is installed
but the app never builds a ``TracerProvider``, the wrapped
handler emits no-op spans at negligible cost. Real spans start
appearing once the consumer's ``init_telemetry`` runs — so it is
safe to instrument an entrypoint before deciding whether a given
deployment exports traces at all.
Args:
entrypoint: The pgqueuer entrypoint (== queue) name the
handler is registered under — names the span and is
attached as the ``messaging.destination.name`` attribute.
handler: The async ``(job) -> None`` coroutine pgqueuer runs
for that entrypoint.
Returns:
Either the span-wrapped handler or *handler* itself; both
keep the ``(job) -> None`` shape ``PgQueuer.entrypoint``
expects.
"""
try:
from fsh_lib.telemetry import traced_entrypoint # noqa: PLC0415
except ImportError:
return handler
return traced_entrypoint(entrypoint)(handler)
[docs]
@asynccontextmanager
async def open_worker_driver(dsn: str) -> AsyncIterator[AsyncpgDriver]:
"""Open a dedicated worker connection and yield an :class:`AsyncpgDriver`.
Workers need a long-lived connection of their own so pgqueuer
can ``LISTEN`` for new-job notifications on it. Use this as
the outermost ``async with`` in the factory you hand to
``pgq run``, and shape the factory itself as an
:func:`~contextlib.asynccontextmanager` so pgqueuer's
``run_factory`` keeps the connection alive for the lifetime
of the worker -- a plain ``async def main() -> PgQueuer``
exits this ``async with`` before pgqueuer can use the driver
and the supervisor crashes with ``InterfaceError: connection
is closed``::
from contextlib import asynccontextmanager
from fsh_lib.queue import open_worker_driver
from pgqueuer import PgQueuer
@asynccontextmanager
async def main():
async with open_worker_driver(
os.environ["DATABASE_URL"],
) as driver:
pgq = PgQueuer(driver)
@pgq.entrypoint("ping")
async def ping(job): ...
yield pgq
Args:
dsn: A PostgreSQL DSN. Either plain
(``postgresql://user:pw@host/db``) or SQLAlchemy-shaped
(``postgresql+asyncpg://...``) — the latter is rewritten
so the same env var works for both the request path and
the worker.
Yields:
An :class:`AsyncpgDriver` wrapping the freshly-opened
connection.
"""
connection = await asyncpg.connect(_coerce_to_asyncpg_dsn(dsn))
try:
yield AsyncpgDriver(connection)
finally:
await connection.close()