Source code for fsh_lib.opa

"""Client for a Rego (OPA) permissions service.

A kiln permissions service is a stateless Open Policy Agent
deployment: it carries the policy (the generic RBAC engine plus
any custom rules) but no data.  The roles and role bindings live
in the backend's own database; this module ships them -- together
with the subject, the action, and the resource -- to OPA on every
call.

Three query modes:

* **Point check** -- a decision about one resource (get / create /
  update / delete).  :meth:`OpaClient.check` POSTs the decision
  ``input`` to ``/v1/data/<opa_package>/decision`` and returns a
  :class:`Decision`.

* **Bulk check** -- many ``(action, resource)`` decisions for one
  subject in a single round-trip (the actions-envelope case:
  every row of a list page x every action).
  :meth:`OpaClient.check_many` POSTs to
  ``/v1/data/<opa_package>/decisions`` and returns one
  :class:`Decision` per item.

* **List filter** -- a collection request where every candidate
  row must be checked.  :meth:`OpaClient.compile_filter` asks
  OPA to *partially evaluate* the ``allow`` rule with the resource
  left unknown (the ``/v1/compile`` API).  OPA returns residual
  conditions on the resource; :meth:`FilterResult.to_sqlalchemy`
  turns them into a SQLAlchemy ``WHERE`` clause the backend folds
  into the list query.  One round-trip filters the whole
  collection -- no per-row check.

Requires the ``opa`` extra (``pip install 'kiln-generator[opa]'``)
for :mod:`httpx`.

Example -- point check::

    client = OpaClient("http://opa:8181", opa_package="authz")
    decision = await client.check(
        subject=Subject("user", "alice"),
        action="task:update",
        resource=ResourceRef("Task", "t-1", {"created_by": "alice"}),
        roles={"editor": {"permissions": ["task:read", "task:update"]}},
        bindings=[RoleBinding(Subject("user", "alice"), "editor")],
    )
    if not decision.permit:
        raise HTTPException(status_code=403)

Example -- list filter::

    result = await client.compile_filter(
        subject=Subject("user", "alice"),
        action="task:list",
        resource_type="Task",
        roles=roles,
        bindings=bindings,
    )
    stmt = select(Task).where(result.to_sqlalchemy({"id": Task.id}))
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Self

import httpx
from sqlalchemy import and_, false, not_, or_, true

if TYPE_CHECKING:
    from collections.abc import Mapping, Sequence

    from sqlalchemy.sql.elements import ColumnElement

#: Default per-request timeout, in seconds.  A permissions check
#: sits on the request path, so the timeout is short -- a slow
#: service should fail fast and let the caller apply its
#: fail-open / fail-closed policy.
_DEFAULT_TIMEOUT = 5.0


[docs] class OpaError(RuntimeError): """A permissions-service call failed. Raised when the service is unreachable, times out, answers with a non-2xx status, or returns a residual the translator cannot turn into a SQL filter. The caller decides whether an error means "deny" (fail-closed, the safe default for authz) or "allow" (fail-open) -- this module never silently picks one. """
[docs] @dataclass(frozen=True) class Subject: """The principal a decision is made for. Attributes: type: ``"user"`` or ``"token"``. A binding for one kind never satisfies a request for the other -- the RBAC engine matches on the pair. id: Stable identifier of the principal (a user id, a token id / ``jti``). """ type: str id: str
[docs] def as_input(self) -> dict[str, str]: """Return the ``{"type", "id"}`` pair the Rego engine reads.""" return {"type": self.type, "id": self.id}
[docs] @dataclass(frozen=True) class ResourceRef: """The resource a decision concerns. Attributes: type: Resource type, e.g. ``"Task"``. Matched against the permission catalogue and against object-scoped bindings. id: Resource instance id, or ``None`` for a collection-level decision (a ``list`` / ``create`` where no single instance exists yet). attributes: Extra fields folded into ``input.resource`` so custom Rego rules can read them -- e.g. ``{"created_by": "alice"}`` for an ownership rule. ``type`` and ``id`` always take precedence over a same-named attribute key. """ type: str id: str | None = None attributes: Mapping[str, Any] = field(default_factory=dict)
[docs] def as_input(self) -> dict[str, Any]: """Return the ``input.resource`` object the Rego engine reads.""" resource: dict[str, Any] = dict(self.attributes) resource["type"] = self.type if self.id is not None: resource["id"] = self.id return resource
[docs] @dataclass(frozen=True) class RoleBinding: """A role bound to a subject, optionally scoped to a resource. Attributes: subject: The principal the role is granted to. role: Name of the granted role. Must be a key of the ``roles`` catalogue passed to the query method. scope: ``None`` for a global binding (applies to every resource); a :class:`ResourceRef` with no ``id`` for a type-scoped binding (every instance of a type); a :class:`ResourceRef` with an ``id`` for an instance-scoped binding. """ subject: Subject role: str scope: ResourceRef | None = None
[docs] def as_input(self) -> dict[str, Any]: """Return the binding object one ``input.bindings`` entry expects.""" scope: dict[str, Any] | None = None if self.scope is not None: scope = {"type": self.scope.type} # A type-scoped binding omits ``id`` entirely -- the # engine treats a present-but-null id as "no instance", # so it must be absent, not null. if self.scope.id is not None: scope["id"] = self.scope.id return { "subject": self.subject.as_input(), "role": self.role, "object": scope, }
[docs] @dataclass(frozen=True) class Decision: """The answer a point permissions check returns. Attributes: permit: The bottom line. ``True`` iff some grant applies and no veto overrides it -- the value the backend gates on. allow: ``True`` iff a grant applied (generic RBAC or a custom allow rule), before vetoes. deny: ``True`` iff a custom deny rule vetoed the request. raw: The full ``result`` object the service returned, for logging or richer custom decisions. """ permit: bool allow: bool deny: bool raw: Mapping[str, Any]
[docs] @classmethod def from_result(cls, result: Mapping[str, Any]) -> Decision: """Build a :class:`Decision` from a decision ``result`` object. Missing keys default to the safe value (``False``): a service that answers with a partial document is treated as a denial rather than a silent allow. """ return cls( permit=bool(result.get("permit", False)), allow=bool(result.get("allow", False)), deny=bool(result.get("deny", False)), raw=result, )
[docs] @dataclass(frozen=True) class Condition: """One residual constraint on a resource field. A :class:`Condition` is the translated form of a single comparison OPA left unresolved when it partially evaluated the policy -- e.g. ``input.resource.id == "t-1"`` becomes ``Condition("id", "eq", "t-1")``. Attributes: field: The resource field the constraint is on -- the path after ``input.resource.``, dotted for a nested field. op: The comparison: ``"eq"``, ``"ne"``, ``"lt"``, ``"le"``, ``"gt"``, ``"ge"``, or ``"in"``. value: The literal the field is compared against (a list for ``"in"``). negated: ``True`` when OPA emitted the expression negated. """ field: str op: str value: Any negated: bool = False
#: Builds a SQLAlchemy clause from a :class:`Condition`'s op. _CLAUSE_BUILDERS = { "eq": lambda col, value: col == value, "ne": lambda col, value: col != value, "lt": lambda col, value: col < value, "le": lambda col, value: col <= value, "gt": lambda col, value: col > value, "ge": lambda col, value: col >= value, "in": lambda col, value: col.in_(value), }
[docs] @dataclass(frozen=True) class FilterResult: """A compiled list policy: which rows a subject may see. The outcome of partially evaluating the ``allow`` rule with the resource unknown. It is one of three shapes: * **always allow** -- no constraint; every row passes. * **always deny** -- the policy can never hold; no row passes. * **conditional** -- :attr:`conjunctions` is an OR of ANDs of :class:`Condition` objects (disjunctive normal form, the shape OPA's ``/v1/compile`` returns). Attributes: always_allow: Every row passes; :meth:`to_sqlalchemy` returns a tautology. always_deny: No row passes; :meth:`to_sqlalchemy` returns a contradiction. conjunctions: OR-of-ANDs residual. Each inner tuple is a conjunction of :class:`Condition` objects; a row passes when it satisfies *any* conjunction. """ always_allow: bool always_deny: bool conjunctions: tuple[tuple[Condition, ...], ...]
[docs] def to_sqlalchemy( self, columns: Mapping[str, ColumnElement[Any]], ) -> ColumnElement[bool]: """Render the residual as a SQLAlchemy boolean clause. Args: columns: Maps a resource field name (as it appears in :attr:`Condition.field`) to the SQLAlchemy column it constrains, e.g. ``{"id": Task.id}``. Returns: A clause for a ``WHERE`` / ``.where()``: a tautology when :attr:`always_allow`, a contradiction when :attr:`always_deny`, otherwise the OR-of-ANDs. Raises: OpaError: A residual constrains a field absent from *columns*. """ if self.always_deny: return false() if self.always_allow: return true() disjuncts = [ and_(*(self._clause(cond, columns) for cond in conjunction)) for conjunction in self.conjunctions ] return or_(*disjuncts)
@staticmethod def _clause( cond: Condition, columns: Mapping[str, ColumnElement[Any]], ) -> ColumnElement[bool]: """Render one :class:`Condition` as a SQLAlchemy clause.""" column = columns.get(cond.field) if column is None: msg = ( f"residual constrains resource field {cond.field!r}, " f"which is not in the supplied column map " f"{sorted(columns)}" ) raise OpaError(msg) clause = _CLAUSE_BUILDERS[cond.op](column, cond.value) return not_(clause) if cond.negated else clause
[docs] class OpaClient: """Async client for a kiln Rego permissions service. Wraps an :class:`httpx.AsyncClient` pointed at an OPA decision server. Construct one per application (it is cheap to keep open) and close it on shutdown via :meth:`aclose`, or use it as an async context manager. Args: base_url: Base URL of the OPA server, e.g. ``"http://opa:8181"``. opa_package: Rego package of the decision entrypoint. The point-check path is ``/v1/data/<opa_package>/decision``; the list filter compiles ``data.<opa_package>.allow``. Defaults to ``"authz"``. client: An existing :class:`httpx.AsyncClient` to use. When ``None`` (the default) the :class:`OpaClient` creates and owns one, and :meth:`aclose` closes it. timeout: Per-request timeout in seconds. Ignored when an explicit *client* is supplied. """ def __init__( self, base_url: str, *, opa_package: str = "authz", client: httpx.AsyncClient | None = None, timeout: float = _DEFAULT_TIMEOUT, ) -> None: """Store the decision paths and the (owned or borrowed) client.""" self._opa_package = opa_package self._decision_path = f"/v1/data/{opa_package}/decision" self._bulk_path = f"/v1/data/{opa_package}/decisions" self._owns_client = client is None self._client = client or httpx.AsyncClient( base_url=base_url, timeout=timeout, ) async def __aenter__(self) -> Self: """Enter the async context, returning ``self``.""" return self async def __aexit__(self, *_exc: object) -> None: """Close an owned client on context exit.""" await self.aclose()
[docs] async def aclose(self) -> None: """Close the underlying client iff this instance owns it.""" if self._owns_client: await self._client.aclose()
[docs] async def check( self, *, subject: Subject, action: str, resource: ResourceRef, roles: Mapping[str, Any], bindings: Sequence[RoleBinding], ) -> Decision: """Ask the service whether *subject* may take *action*. A point check -- one decision about one resource. Args: subject: The principal making the request. action: The permission string being checked, e.g. ``"task:update"``. resource: The resource the action concerns. roles: The role catalogue -- ``{role_name: {"permissions": [...]}}``. The backend loads this from its own store; only the roles relevant to *bindings* need be present. bindings: The subject's role bindings, loaded by the backend from its own store. Returns: The service's :class:`Decision`. Raises: OpaError: The service was unreachable, timed out, or answered with a non-2xx status. """ document = { "subject": subject.as_input(), "action": action, "resource": resource.as_input(), "roles": dict(roles), "bindings": [binding.as_input() for binding in bindings], } response = await self._post(self._decision_path, {"input": document}) return Decision.from_result(response.get("result", {}))
[docs] async def check_many( self, *, subject: Subject, roles: Mapping[str, Any], bindings: Sequence[RoleBinding], items: Sequence[tuple[str, ResourceRef]], ) -> list[bool]: """Decide a batch of ``(action, resource)`` checks in one call. Every item shares one *subject*, *roles*, and *bindings* -- the actions-envelope case: one user, many ``row x action`` checks for a whole list page. The service evaluates the same per-one policy for each item, so a page costs a single OPA round-trip rather than one call per item. Args: subject: The principal making the request. roles: The role catalogue (see :meth:`check`). bindings: The subject's role bindings. items: The ``(action, resource)`` pairs to decide. Returns: One ``permit`` verdict per item, in the same order as *items*. The bulk entrypoint returns the bottom-line ``permit`` only -- use :meth:`check` for the full allow / deny breakdown. Raises: OpaError: The service was unreachable, timed out, or answered with a non-2xx status. """ subject_input = subject.as_input() roles_input = dict(roles) bindings_input = [binding.as_input() for binding in bindings] queries = [ { "subject": subject_input, "action": action, "resource": resource.as_input(), "roles": roles_input, "bindings": bindings_input, } for action, resource in items ] response = await self._post( self._bulk_path, {"input": {"queries": queries}}, ) return _parse_bulk_result(response.get("result", []), len(items))
[docs] async def compile_filter( self, *, subject: Subject, action: str, resource_type: str, roles: Mapping[str, Any], bindings: Sequence[RoleBinding], unknowns: Sequence[str] = ("input.resource.id",), ) -> FilterResult: """Compile the list policy into a row filter for *resource_type*. Asks OPA to partially evaluate ``data.<opa_package>.allow`` with the resource left unknown, then translates the residual into a :class:`FilterResult`. The resource *type* is supplied as a known value so type-scoped and global bindings resolve fully and only instance-level constraints survive into the residual. Args: subject: The principal making the request. action: The collection action, e.g. ``"task:list"``. resource_type: The resource type being listed, e.g. ``"Task"``. Passed as a known value so the residual is purely about instance fields. roles: The role catalogue (see :meth:`check`). bindings: The subject's role bindings. unknowns: The ``input`` paths OPA treats as symbolic. Defaults to ``("input.resource.id",)`` -- enough for generic RBAC. Extend it (e.g. with ``"input.resource.created_by"``) when a custom rule constrains other resource fields. Returns: The compiled :class:`FilterResult`. Raises: OpaError: The service failed, or returned a residual the translator cannot represent as a SQL filter. """ body = { "query": f"data.{self._opa_package}.allow == true", "input": { "subject": subject.as_input(), "action": action, "resource": {"type": resource_type}, "roles": dict(roles), "bindings": [binding.as_input() for binding in bindings], }, "unknowns": list(unknowns), } response = await self._post("/v1/compile", body) return _parse_compile_result(response.get("result", {}))
async def _post( self, path: str, body: Mapping[str, Any], ) -> dict[str, Any]: """POST *body* as JSON to *path*, returning the parsed response. Raises: OpaError: The service was unreachable, timed out, or answered with a non-2xx status. """ try: response = await self._client.post(path, json=dict(body)) response.raise_for_status() except httpx.HTTPError as exc: msg = f"permissions service call failed: {exc}" raise OpaError(msg) from exc return response.json()
def _parse_bulk_result(result: Any, count: int) -> list[bool]: """Order a bulk ``decisions`` result back into a per-item list. The bulk entrypoint returns a *set* of ``{index, permit}`` objects, so the response array has no guaranteed order; ``index`` ties each verdict to its query. An item with no matching verdict defaults to ``False`` -- a partial answer is never read as a silent allow. """ by_index: dict[int, bool] = {} if isinstance(result, list): for entry in result: index = entry.get("index") if isinstance(index, int): by_index[index] = bool(entry.get("permit", False)) return [by_index.get(position, False) for position in range(count)] # -- Compile-result translation ------------------------------------- # # OPA's /v1/compile returns the residual as a disjunction of # conjunctions of expressions (an AST). The helpers below walk # that AST into a :class:`FilterResult`. Only the shapes a # resource-scoped RBAC residual produces are supported -- a # comparison between an ``input.resource.<field>`` reference and a # literal; anything else raises :class:`OpaError` rather than # silently dropping a constraint. #: OPA builtin name -> :class:`Condition` op. ``flip`` is the op #: to use when the literal is on the left and the field reference #: on the right (comparisons are not symmetric). _OPERATORS: dict[str, tuple[str, str]] = { "equal": ("eq", "eq"), "eq": ("eq", "eq"), "neq": ("ne", "ne"), "lt": ("lt", "gt"), "lte": ("le", "ge"), "gt": ("gt", "lt"), "gte": ("ge", "le"), "internal.member_2": ("in", "in"), } #: Prefix every residual reference this translator understands #: starts with. Generic RBAC list filtering leaves the resource #: unknown, so every surviving constraint is on a resource field. _RESOURCE_PREFIX = ("input", "resource") #: A FilterResult that passes every row. _ALWAYS_ALLOW = FilterResult( always_allow=True, always_deny=False, conjunctions=(), ) def _parse_compile_result(result: Mapping[str, Any]) -> FilterResult: """Translate an OPA ``/v1/compile`` ``result`` into a FilterResult. Handles both shapes OPA's partial evaluation produces: * the flat form -- ``queries`` is an OR of conjunctions of comparison expressions; * the support form -- ``queries`` references a generated support rule (OPA factors a ``default``-bearing rule out), and ``support`` carries that rule's definitions. Raises: OpaError: A residual expression the translator cannot represent as a SQL filter. """ queries = result.get("queries") # No `queries` key at all -- the query is unsatisfiable, so no # row can ever pass. if queries is None: return _deny() support: list[Mapping[str, Any]] = result.get("support") or [] if support: return _expand_support(queries, support) return _from_conjunctions( [_conjunction(query) for query in queries], ) def _deny() -> FilterResult: """Return a fresh always-deny :class:`FilterResult`.""" return FilterResult( always_allow=False, always_deny=True, conjunctions=(), ) def _from_conjunctions( conjunctions: list[tuple[Condition, ...] | None], ) -> FilterResult: """Fold parsed conjunctions into a :class:`FilterResult`. A ``None`` entry is an unconditionally-true branch (an empty conjunction): if any OR branch is free, the whole filter passes every row. """ if any(conjunction is None for conjunction in conjunctions): return _ALWAYS_ALLOW kept = [c for c in conjunctions if c] if not kept: return _deny() return FilterResult( always_allow=False, always_deny=False, conjunctions=tuple(kept), ) def _conjunction( exprs: list[Mapping[str, Any]], ) -> tuple[Condition, ...] | None: """Parse a query body into a conjunction of conditions. Returns ``None`` for an unconditionally-true body (empty, or every expression a ``true`` literal). A ``false`` literal expression makes the whole conjunction dead -- represented as an empty tuple, which :func:`_from_conjunctions` drops. """ conditions: list[Condition] = [] for expr in exprs: literal = _boolean_literal(expr) if literal is True: continue if literal is False: return () conditions.append(_parse_expr(expr)) return None if not conditions else tuple(conditions) def _boolean_literal(expr: Mapping[str, Any]) -> bool | None: """Return the bool an expr is, when it is a bare boolean term. OPA renders an always-true / always-false body expression as a single boolean ``terms`` object (not a comparison call). """ terms = expr.get("terms") if isinstance(terms, dict) and terms.get("type") == "boolean": return bool(terms.get("value")) return None def _expand_support( queries: list[list[Mapping[str, Any]]], support: list[Mapping[str, Any]], ) -> FilterResult: """Resolve a support-rule compile result into a FilterResult. OPA factors a ``default``-bearing rule into ``support``: the single ``queries`` entry references a generated support rule (``data.partial.<pkg>.<name>``), and ``support`` carries that rule's definitions -- a ``default`` plus zero or more conditional clauses. The residual is the OR of the conditional clauses' bodies. Raises: OpaError: The ``queries``/``support`` shape is not the single-support-rule form this translator handles. """ if len(queries) != 1 or len(queries[0]) != 1: msg = ( "unsupported compile result: expected a single query " "referencing one support rule" ) raise OpaError(msg) ref_path = _ref_path(queries[0][0].get("terms", {})) if ref_path is None: msg = "unsupported compile result: query is not a rule reference" raise OpaError(msg) rule_name = ref_path[-1] package_path = tuple(ref_path[:-1]) clauses: list[tuple[Condition, ...] | None] = [] for entry in support: if _support_package_path(entry) != package_path: continue for rule in entry.get("rules", []): if rule.get("default"): continue if _support_rule_name(rule) != rule_name: continue clauses.append(_conjunction(rule.get("body", []))) return _from_conjunctions(clauses) def _support_package_path(entry: Mapping[str, Any]) -> tuple[str, ...]: """Return the dotted path of a support package as a tuple.""" parts = entry.get("package", {}).get("path", []) return tuple(part.get("value") for part in parts) def _support_rule_name(rule: Mapping[str, Any]) -> str | None: """Return the name of a support rule, or ``None``.""" return rule.get("head", {}).get("name") def _ref_path(term: Mapping[str, Any]) -> list[str] | None: """Return the dotted path of a ref term, or ``None``.""" if term.get("type") != "ref": return None return [part.get("value") for part in term.get("value", [])] def _parse_expr(expr: Mapping[str, Any]) -> Condition: """Translate one residual expression into a :class:`Condition`. Raises: OpaError: The expression is not a binary comparison between an ``input.resource`` field and a literal. """ terms = expr.get("terms") negated = bool(expr.get("negated", False)) if not isinstance(terms, list) or len(terms) != _BINARY_TERMS: msg = ( f"cannot translate residual expression {expr!r}: only " f"binary comparisons of a resource field against a " f"literal are supported" ) raise OpaError(msg) operator_term, left, right = terms builtin = _operator_name(operator_term) ops = _OPERATORS.get(builtin) if ops is None: msg = f"unsupported residual operator {builtin!r}" raise OpaError(msg) field, value, flipped = _split_operands(left, right) if flipped and builtin == "internal.member_2": msg = "unsupported residual: `in` with the field on the right" raise OpaError(msg) op = ops[1] if flipped else ops[0] return Condition(field=field, op=op, value=value, negated=negated) #: A binary-comparison expression has exactly three terms: the #: operator reference and its two operands. _BINARY_TERMS = 3 def _operator_name(term: Mapping[str, Any]) -> str: """Return the builtin name of an operator term (e.g. ``"equal"``). Raises: OpaError: The term is not an operator reference. """ if term.get("type") != "ref": msg = f"expected an operator reference, got {term!r}" raise OpaError(msg) return ".".join(part["value"] for part in term["value"]) def _split_operands( left: Mapping[str, Any], right: Mapping[str, Any], ) -> tuple[str, Any, bool]: """Split a comparison's operands into ``(field, literal, flipped)``. Exactly one operand must be an ``input.resource.<field>`` reference; the other must be a literal. ``flipped`` is ``True`` when the reference was the right operand. Raises: OpaError: Neither or both operands are resource references, or the literal is not a scalar / array. """ left_field = _resource_field(left) right_field = _resource_field(right) if left_field is not None and right_field is None: return left_field, _literal(right), False if right_field is not None and left_field is None: return right_field, _literal(left), True msg = ( "cannot translate residual: a comparison must be between " "exactly one input.resource field and one literal" ) raise OpaError(msg) def _resource_field(term: Mapping[str, Any]) -> str | None: """Return the field path of an ``input.resource`` ref, else ``None``. ``input.resource.id`` yields ``"id"``; a nested ``input.resource.owner.id`` yields ``"owner.id"``. A reference to anything other than an ``input.resource`` field yields ``None`` (it is treated as a literal-bearing operand by the caller, which then rejects it). """ if term.get("type") != "ref": return None parts = term["value"] head = tuple(part.get("value") for part in parts[: len(_RESOURCE_PREFIX)]) if head != _RESOURCE_PREFIX or len(parts) <= len(_RESOURCE_PREFIX): return None return ".".join(part["value"] for part in parts[len(_RESOURCE_PREFIX) :]) def _literal(term: Mapping[str, Any]) -> Any: """Return the Python value of a scalar / array / set term. Raises: OpaError: The term is not a literal the translator can represent (e.g. another reference, or a call). """ kind = term.get("type") if kind in {"string", "number", "boolean", "null"}: return term.get("value") if kind in {"array", "set"}: return [_literal(element) for element in term["value"]] msg = f"cannot translate residual operand of type {kind!r}" raise OpaError(msg)