"""File storage primitives for kiln-generated FastAPI projects.
This module's runtime dependency on ``boto3`` is gated behind the
``files`` extra. Install with::
pip install 'kiln-generator[files]'
# or: uv add 'kiln-generator[files]'
Importing this module without the extra raises ``ModuleNotFoundError``
on ``import boto3`` -- so the gate is honest rather than lazy:
either the dep is there and everything works, or it isn't and the
import surface fails fast.
A *file* is a binary blob (image, PDF, attachment) tracked by a
metadata row in the consumer's database and a corresponding object
in S3-compatible storage. This module ships three pieces:
* :class:`FileMixin` -- a pgcraft-compatible mixin supplying the
six storage columns every file row needs (``s3_key``,
``content_type``, ``size_bytes``, ``original_filename``,
``created_at``, ``uploaded_at``). Consumers subclass it on a
pgcraft model and add a PK plugin (typically
``UUIDV4PKPlugin``) for the ``id`` column.
* :class:`S3Storage` -- a small wrapper around ``boto3`` that
exposes the three operations a presigned-upload flow actually
needs: mint a presigned PUT URL, mint a presigned GET URL, delete
an object. The constructor takes explicit config so it's
testable; :func:`default_storage` builds one from ``FSH_S3_*``
env vars for the common case.
* Action functions -- :func:`request_upload`,
:func:`complete_upload`, :func:`download`, and :func:`delete_file`.
These plug into be's
:class:`~be.operations.action.Action` operation: the consumer
points ``resource.action`` entries at them directly (no
per-resource wrapper module). The :class:`FileMixin`-typed
parameters (instance for object actions, class for collection
actions) match any concrete subclass via the introspector's
supertype check, so the same four functions serve every file
resource.
"""
import datetime
import os
import uuid
from dataclasses import dataclass, field
from functools import cached_property
from typing import TYPE_CHECKING, Any
import boto3
from fastapi import HTTPException, status
from pydantic import BaseModel
from sqlalchemy import (
BigInteger,
DateTime,
String,
delete,
insert,
update,
)
from sqlalchemy.orm import Mapped, mapped_column
if TYPE_CHECKING:
from collections.abc import Callable
from sqlalchemy.ext.asyncio import AsyncSession
DEFAULT_PRESIGN_TTL = 900
"""Presigned URL lifetime in seconds (15 min).
Long enough for a browser to PUT a multi-megabyte file over a slow
connection; short enough that a leaked URL stops working before it
shows up in logs anyone reads.
"""
[docs]
class FileMixin:
"""pgcraft mixin supplying the storage columns of a file record.
Subclass on a pgcraft-mapped model alongside a PK plugin (the
plugin owns ``id``):
.. code-block:: python
from fsh_lib.files import FileMixin
from pgcraft.factory import PGCraftSimple
from pgcraft.plugins.pk import UUIDV4PKPlugin
class Attachment(Base, FileMixin):
__tablename__ = "attachments"
__table_args__ = {"schema": "public"}
__factory__ = PGCraftSimple
__plugins__ = [UUIDV4PKPlugin()]
The mixin deliberately doesn't declare ``id`` -- pgcraft's idiom
is that primary keys are plugin-owned, and declaring it on the
mixin would collide with the plugin's column at table-build time.
The ``TYPE_CHECKING`` annotation below keeps ``file.id`` typed
for the action helpers without committing to a column.
A row with ``uploaded_at is None`` represents a file the
server has reserved a key for (and handed the client a presigned
PUT URL) but whose upload hasn't yet been confirmed. Consumers
typically clear or expire these rows on a schedule.
"""
if TYPE_CHECKING:
# Type-only -- the actual column comes from the consumer's
# pgcraft PK plugin. Declaring it as a mapped column here
# at runtime would collide with the plugin's column at
# table-build time, so the column lives only in the type-
# checker's view. ``Mapped[uuid.UUID]`` (rather than bare
# ``uuid.UUID``) keeps ``cls.id == file.id`` typed as a SQL
# expression in the action helpers, not a bool.
id: Mapped[uuid.UUID]
s3_key: Mapped[str] = mapped_column(String(1024), unique=True)
"""Object key in the storage bucket. Unique so a row maps to
exactly one blob; collision is a programming error, not a race."""
content_type: Mapped[str | None] = mapped_column(
String(255),
nullable=True,
)
"""MIME type the client declared at upload time, when known."""
size_bytes: Mapped[int | None] = mapped_column(
BigInteger,
nullable=True,
)
"""Object size in bytes after the upload completes. ``BigInteger``
because ``Integer`` tops out around 2 GiB and large media uploads
blow past that."""
original_filename: Mapped[str | None] = mapped_column(
String(512),
nullable=True,
)
"""Filename the client supplied; useful for ``Content-Disposition``
on download. Not used for storage -- the canonical name is
:attr:`s3_key`."""
# ``created_at`` (PUT URL issued) is pgcraft-managed -- the
# consumer's factory injects it via :class:`TimestampPlugin`.
uploaded_at: Mapped[datetime.datetime | None] = mapped_column(
DateTime(timezone=True),
nullable=True,
)
"""When the upload was confirmed. ``None`` means pending --
metadata exists but the blob may or may not be in S3."""
[docs]
@dataclass
class S3Storage:
"""``boto3``-backed S3 client wrapper.
The constructor takes explicit config so tests can build an
instance pointed at a stub or a localstack endpoint without
setting env vars. :func:`default_storage` is the env-driven
factory for production use.
``client_factory`` is plumbed through so tests can inject a
``MagicMock`` instead of a real ``boto3.client``.
"""
bucket: str
region: str | None = None
endpoint_url: str | None = None
client_factory: Callable[..., Any] = field(default=boto3.client)
[docs]
@cached_property
def client(self) -> Any:
"""Lazily-built ``boto3`` S3 client.
Cached so a single :class:`S3Storage` instance reuses one
connection pool across calls.
"""
kwargs: dict[str, Any] = {"service_name": "s3"}
if self.region is not None:
kwargs["region_name"] = self.region
if self.endpoint_url is not None:
kwargs["endpoint_url"] = self.endpoint_url
return self.client_factory(**kwargs)
[docs]
def presigned_put_url(
self,
key: str,
*,
expires_in: int = DEFAULT_PRESIGN_TTL,
content_type: str | None = None,
) -> str:
"""Mint a presigned PUT URL for *key*.
When *content_type* is supplied, the client must send a
matching ``Content-Type`` header on the PUT or S3 rejects
the request -- this binds the upload to the type the row
was created for.
"""
params: dict[str, Any] = {"Bucket": self.bucket, "Key": key}
if content_type is not None:
params["ContentType"] = content_type
url = self.client.generate_presigned_url(
"put_object",
Params=params,
ExpiresIn=expires_in,
)
return str(url)
[docs]
def presigned_get_url(
self,
key: str,
*,
expires_in: int = DEFAULT_PRESIGN_TTL,
) -> str:
"""Mint a presigned GET URL for *key*."""
url = self.client.generate_presigned_url(
"get_object",
Params={"Bucket": self.bucket, "Key": key},
ExpiresIn=expires_in,
)
return str(url)
[docs]
def delete(self, key: str) -> None:
"""Delete the object at *key*.
S3's ``DeleteObject`` is idempotent -- deleting a missing
key returns 204 the same as deleting an existing one -- so
callers don't need to guard against double-delete races.
"""
self.client.delete_object(Bucket=self.bucket, Key=key)
[docs]
def default_storage() -> S3Storage:
"""Build an :class:`S3Storage` from ``FSH_S3_*`` env vars.
Reads:
* ``FSH_S3_BUCKET`` -- bucket name (required).
* ``FSH_S3_REGION`` -- AWS region; optional, falls back to the
boto3 default chain.
* ``FSH_S3_ENDPOINT_URL`` -- override for MinIO / localstack /
non-AWS S3-compatible endpoints; optional.
Raises:
RuntimeError: When ``FSH_S3_BUCKET`` is not set.
"""
bucket = os.environ.get("FSH_S3_BUCKET")
if not bucket:
msg = "FSH_S3_BUCKET environment variable is required"
raise RuntimeError(msg)
return S3Storage(
bucket=bucket,
region=os.environ.get("FSH_S3_REGION"),
endpoint_url=os.environ.get("FSH_S3_ENDPOINT_URL"),
)
# --- Action request/response schemas --------------------------------------
[docs]
class UploadRequest(BaseModel):
"""Body for the request-upload action.
Carries everything :func:`~fsh_lib.files.request_upload` needs to reserve
a key and bind the presigned PUT URL to the right content type.
"""
filename: str
content_type: str
size_bytes: int
[docs]
class UploadResponse(BaseModel):
"""Response for the request-upload action.
The client PUTs the file bytes to ``upload_url`` (it must send
a matching ``Content-Type`` header), then calls the
complete-upload action with ``id`` to flip the row out of
pending state.
"""
id: uuid.UUID
upload_url: str
[docs]
class DownloadResponse(BaseModel):
"""Response for the download action -- a short-lived GET URL."""
download_url: str
# --- Action functions -----------------------------------------------------
[docs]
async def request_upload(
*,
model_cls: type[FileMixin],
db: AsyncSession,
body: UploadRequest,
) -> UploadResponse:
"""Reserve a key and return a presigned PUT URL.
The row is created with ``uploaded_at=NULL``; the client
confirms the actual byte upload via :func:`complete_upload`.
*model_cls* is supplied by the action handler, which detects
the ``type[FileMixin]`` annotation and passes the resource's
mapped class. No per-resource factory binding needed --
consumers point a resource's ``action`` config at this function
directly.
"""
file_id = uuid.uuid4()
# Prefix the key with the file id so collisions on the
# consumer-supplied filename can't reach across rows.
key = f"{file_id.hex}/{body.filename}"
await db.execute(
insert(model_cls).values(
id=file_id,
s3_key=key,
content_type=body.content_type,
size_bytes=body.size_bytes,
original_filename=body.filename,
)
)
storage = default_storage()
upload_url = storage.presigned_put_url(
key,
content_type=body.content_type,
)
return UploadResponse(
id=file_id,
upload_url=upload_url,
)
[docs]
async def complete_upload(
file: FileMixin,
*,
db: AsyncSession,
) -> None:
"""Mark *file* as uploaded.
Returns ``None`` so the action op emits 204 No Content -- a
completed upload has no useful body to return; the client
already knows the id.
Issues a Core ``UPDATE`` rather than mutating the loaded ORM
instance, so the persistence path is identical regardless of
whether the caller's session has autoflush quirks. Idempotent
-- calling twice just refreshes the timestamp.
"""
cls = type(file)
await db.execute(
update(cls)
.where(cls.id == file.id)
.values(uploaded_at=datetime.datetime.now(tz=datetime.UTC)),
)
[docs]
async def download(
file: FileMixin,
*,
db: AsyncSession, # noqa: ARG001 -- action handler passes this
) -> DownloadResponse:
"""Return a presigned GET URL for *file*.
Refuses with 404 when ``uploaded_at is None`` -- the row exists
but the client never confirmed the PUT, so the object may not
be in S3 and a presigned URL would just 404 noisily.
"""
if file.uploaded_at is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="File upload not complete",
)
storage = default_storage()
return DownloadResponse(
download_url=storage.presigned_get_url(file.s3_key),
)
[docs]
async def delete_file(
file: FileMixin,
*,
db: AsyncSession,
) -> None:
"""Cascade-delete *file*: remove the S3 object then the row.
Returns ``None`` so the action op emits 204 No Content -- the
client doesn't need a body to know the row is gone.
S3 first because :meth:`S3Storage.delete` is idempotent -- a
crash between the two steps leaves an orphan row, which the
next delete attempt cleans up. Reversing the order would
instead leak S3 objects, which are harder to find later.
"""
storage = default_storage()
storage.delete(file.s3_key)
cls = type(file)
await db.execute(delete(cls).where(cls.id == file.id))