"""Client for a Rego (OPA) permissions service.
A kiln permissions service is a stateless Open Policy Agent
deployment: it carries the policy (the generic RBAC engine plus
any custom rules) but no data. The roles and role bindings live
in the backend's own database; this module ships them -- together
with the subject, the action, and the resource -- to OPA on every
call.
Three query modes:
* **Point check** -- a decision about one resource (get / create /
update / delete). :meth:`OpaClient.check` POSTs the decision
``input`` to ``/v1/data/<opa_package>/decision`` and returns a
:class:`Decision`.
* **Bulk check** -- many ``(action, resource)`` decisions for one
subject in a single round-trip (the actions-envelope case:
every row of a list page x every action).
:meth:`OpaClient.check_many` POSTs to
``/v1/data/<opa_package>/decisions`` and returns one
:class:`Decision` per item.
* **List filter** -- a collection request where every candidate
row must be checked. :meth:`OpaClient.compile_filter` asks
OPA to *partially evaluate* the ``allow`` rule with the resource
left unknown (the ``/v1/compile`` API). OPA returns residual
conditions on the resource; :meth:`FilterResult.to_sqlalchemy`
turns them into a SQLAlchemy ``WHERE`` clause the backend folds
into the list query. One round-trip filters the whole
collection -- no per-row check.
Requires the ``opa`` extra (``pip install 'kiln-generator[opa]'``)
for :mod:`httpx`.
Example -- point check::
client = OpaClient("http://opa:8181", opa_package="authz")
decision = await client.check(
subject=Subject("user", "alice"),
action="task:update",
resource=ResourceRef("Task", "t-1", {"created_by": "alice"}),
roles={"editor": {"permissions": ["task:read", "task:update"]}},
bindings=[RoleBinding(Subject("user", "alice"), "editor")],
)
if not decision.permit:
raise HTTPException(status_code=403)
Example -- list filter::
result = await client.compile_filter(
subject=Subject("user", "alice"),
action="task:list",
resource_type="Task",
roles=roles,
bindings=bindings,
)
stmt = select(Task).where(result.to_sqlalchemy({"id": Task.id}))
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Self
import httpx
from sqlalchemy import and_, false, not_, or_, true
if TYPE_CHECKING:
from collections.abc import Mapping, Sequence
from sqlalchemy.sql.elements import ColumnElement
#: Default per-request timeout, in seconds. A permissions check
#: sits on the request path, so the timeout is short -- a slow
#: service should fail fast and let the caller apply its
#: fail-open / fail-closed policy.
_DEFAULT_TIMEOUT = 5.0
[docs]
class OpaError(RuntimeError):
"""A permissions-service call failed.
Raised when the service is unreachable, times out, answers
with a non-2xx status, or returns a residual the translator
cannot turn into a SQL filter. The caller decides whether an
error means "deny" (fail-closed, the safe default for authz)
or "allow" (fail-open) -- this module never silently picks
one.
"""
[docs]
@dataclass(frozen=True)
class Subject:
"""The principal a decision is made for.
Attributes:
type: ``"user"`` or ``"token"``. A binding for one kind
never satisfies a request for the other -- the RBAC
engine matches on the pair.
id: Stable identifier of the principal (a user id, a token
id / ``jti``).
"""
type: str
id: str
[docs]
@dataclass(frozen=True)
class ResourceRef:
"""The resource a decision concerns.
Attributes:
type: Resource type, e.g. ``"Task"``. Matched against the
permission catalogue and against object-scoped
bindings.
id: Resource instance id, or ``None`` for a
collection-level decision (a ``list`` / ``create``
where no single instance exists yet).
attributes: Extra fields folded into ``input.resource`` so
custom Rego rules can read them -- e.g.
``{"created_by": "alice"}`` for an ownership rule.
``type`` and ``id`` always take precedence over a
same-named attribute key.
"""
type: str
id: str | None = None
attributes: Mapping[str, Any] = field(default_factory=dict)
[docs]
@dataclass(frozen=True)
class RoleBinding:
"""A role bound to a subject, optionally scoped to a resource.
Attributes:
subject: The principal the role is granted to.
role: Name of the granted role. Must be a key of the
``roles`` catalogue passed to the query method.
scope: ``None`` for a global binding (applies to every
resource); a :class:`ResourceRef` with no ``id`` for a
type-scoped binding (every instance of a type); a
:class:`ResourceRef` with an ``id`` for an
instance-scoped binding.
"""
subject: Subject
role: str
scope: ResourceRef | None = None
[docs]
@dataclass(frozen=True)
class Decision:
"""The answer a point permissions check returns.
Attributes:
permit: The bottom line. ``True`` iff some grant applies
and no veto overrides it -- the value the backend
gates on.
allow: ``True`` iff a grant applied (generic RBAC or a
custom allow rule), before vetoes.
deny: ``True`` iff a custom deny rule vetoed the request.
raw: The full ``result`` object the service returned, for
logging or richer custom decisions.
"""
permit: bool
allow: bool
deny: bool
raw: Mapping[str, Any]
[docs]
@classmethod
def from_result(cls, result: Mapping[str, Any]) -> Decision:
"""Build a :class:`Decision` from a decision ``result`` object.
Missing keys default to the safe value (``False``): a
service that answers with a partial document is treated as
a denial rather than a silent allow.
"""
return cls(
permit=bool(result.get("permit", False)),
allow=bool(result.get("allow", False)),
deny=bool(result.get("deny", False)),
raw=result,
)
[docs]
@dataclass(frozen=True)
class Condition:
"""One residual constraint on a resource field.
A :class:`Condition` is the translated form of a single
comparison OPA left unresolved when it partially evaluated the
policy -- e.g. ``input.resource.id == "t-1"`` becomes
``Condition("id", "eq", "t-1")``.
Attributes:
field: The resource field the constraint is on -- the path
after ``input.resource.``, dotted for a nested field.
op: The comparison: ``"eq"``, ``"ne"``, ``"lt"``,
``"le"``, ``"gt"``, ``"ge"``, or ``"in"``.
value: The literal the field is compared against (a list
for ``"in"``).
negated: ``True`` when OPA emitted the expression negated.
"""
field: str
op: str
value: Any
negated: bool = False
#: Builds a SQLAlchemy clause from a :class:`Condition`'s op.
_CLAUSE_BUILDERS = {
"eq": lambda col, value: col == value,
"ne": lambda col, value: col != value,
"lt": lambda col, value: col < value,
"le": lambda col, value: col <= value,
"gt": lambda col, value: col > value,
"ge": lambda col, value: col >= value,
"in": lambda col, value: col.in_(value),
}
[docs]
@dataclass(frozen=True)
class FilterResult:
"""A compiled list policy: which rows a subject may see.
The outcome of partially evaluating the ``allow`` rule with
the resource unknown. It is one of three shapes:
* **always allow** -- no constraint; every row passes.
* **always deny** -- the policy can never hold; no row passes.
* **conditional** -- :attr:`conjunctions` is an OR of ANDs of
:class:`Condition` objects (disjunctive normal form, the
shape OPA's ``/v1/compile`` returns).
Attributes:
always_allow: Every row passes; :meth:`to_sqlalchemy`
returns a tautology.
always_deny: No row passes; :meth:`to_sqlalchemy` returns
a contradiction.
conjunctions: OR-of-ANDs residual. Each inner tuple is a
conjunction of :class:`Condition` objects; a row
passes when it satisfies *any* conjunction.
"""
always_allow: bool
always_deny: bool
conjunctions: tuple[tuple[Condition, ...], ...]
[docs]
def to_sqlalchemy(
self,
columns: Mapping[str, ColumnElement[Any]],
) -> ColumnElement[bool]:
"""Render the residual as a SQLAlchemy boolean clause.
Args:
columns: Maps a resource field name (as it appears in
:attr:`Condition.field`) to the SQLAlchemy column
it constrains, e.g. ``{"id": Task.id}``.
Returns:
A clause for a ``WHERE`` / ``.where()``: a tautology
when :attr:`always_allow`, a contradiction when
:attr:`always_deny`, otherwise the OR-of-ANDs.
Raises:
OpaError: A residual constrains a field absent from
*columns*.
"""
if self.always_deny:
return false()
if self.always_allow:
return true()
disjuncts = [
and_(*(self._clause(cond, columns) for cond in conjunction))
for conjunction in self.conjunctions
]
return or_(*disjuncts)
@staticmethod
def _clause(
cond: Condition,
columns: Mapping[str, ColumnElement[Any]],
) -> ColumnElement[bool]:
"""Render one :class:`Condition` as a SQLAlchemy clause."""
column = columns.get(cond.field)
if column is None:
msg = (
f"residual constrains resource field {cond.field!r}, "
f"which is not in the supplied column map "
f"{sorted(columns)}"
)
raise OpaError(msg)
clause = _CLAUSE_BUILDERS[cond.op](column, cond.value)
return not_(clause) if cond.negated else clause
[docs]
class OpaClient:
"""Async client for a kiln Rego permissions service.
Wraps an :class:`httpx.AsyncClient` pointed at an OPA decision
server. Construct one per application (it is cheap to keep
open) and close it on shutdown via :meth:`aclose`, or use it
as an async context manager.
Args:
base_url: Base URL of the OPA server, e.g.
``"http://opa:8181"``.
opa_package: Rego package of the decision entrypoint. The
point-check path is ``/v1/data/<opa_package>/decision``;
the list filter compiles ``data.<opa_package>.allow``.
Defaults to ``"authz"``.
client: An existing :class:`httpx.AsyncClient` to use.
When ``None`` (the default) the :class:`OpaClient`
creates and owns one, and :meth:`aclose` closes it.
timeout: Per-request timeout in seconds. Ignored when an
explicit *client* is supplied.
"""
def __init__(
self,
base_url: str,
*,
opa_package: str = "authz",
client: httpx.AsyncClient | None = None,
timeout: float = _DEFAULT_TIMEOUT,
) -> None:
"""Store the decision paths and the (owned or borrowed) client."""
self._opa_package = opa_package
self._decision_path = f"/v1/data/{opa_package}/decision"
self._bulk_path = f"/v1/data/{opa_package}/decisions"
self._owns_client = client is None
self._client = client or httpx.AsyncClient(
base_url=base_url,
timeout=timeout,
)
async def __aenter__(self) -> Self:
"""Enter the async context, returning ``self``."""
return self
async def __aexit__(self, *_exc: object) -> None:
"""Close an owned client on context exit."""
await self.aclose()
[docs]
async def aclose(self) -> None:
"""Close the underlying client iff this instance owns it."""
if self._owns_client:
await self._client.aclose()
[docs]
async def check(
self,
*,
subject: Subject,
action: str,
resource: ResourceRef,
roles: Mapping[str, Any],
bindings: Sequence[RoleBinding],
) -> Decision:
"""Ask the service whether *subject* may take *action*.
A point check -- one decision about one resource.
Args:
subject: The principal making the request.
action: The permission string being checked, e.g.
``"task:update"``.
resource: The resource the action concerns.
roles: The role catalogue -- ``{role_name: {"permissions":
[...]}}``. The backend loads this from its own
store; only the roles relevant to *bindings* need
be present.
bindings: The subject's role bindings, loaded by the
backend from its own store.
Returns:
The service's :class:`Decision`.
Raises:
OpaError: The service was unreachable, timed out, or
answered with a non-2xx status.
"""
document = {
"subject": subject.as_input(),
"action": action,
"resource": resource.as_input(),
"roles": dict(roles),
"bindings": [binding.as_input() for binding in bindings],
}
response = await self._post(self._decision_path, {"input": document})
return Decision.from_result(response.get("result", {}))
[docs]
async def check_many(
self,
*,
subject: Subject,
roles: Mapping[str, Any],
bindings: Sequence[RoleBinding],
items: Sequence[tuple[str, ResourceRef]],
) -> list[bool]:
"""Decide a batch of ``(action, resource)`` checks in one call.
Every item shares one *subject*, *roles*, and *bindings* --
the actions-envelope case: one user, many ``row x action``
checks for a whole list page. The service evaluates the
same per-one policy for each item, so a page costs a
single OPA round-trip rather than one call per item.
Args:
subject: The principal making the request.
roles: The role catalogue (see :meth:`check`).
bindings: The subject's role bindings.
items: The ``(action, resource)`` pairs to decide.
Returns:
One ``permit`` verdict per item, in the same order as
*items*. The bulk entrypoint returns the bottom-line
``permit`` only -- use :meth:`check` for the full
allow / deny breakdown.
Raises:
OpaError: The service was unreachable, timed out, or
answered with a non-2xx status.
"""
subject_input = subject.as_input()
roles_input = dict(roles)
bindings_input = [binding.as_input() for binding in bindings]
queries = [
{
"subject": subject_input,
"action": action,
"resource": resource.as_input(),
"roles": roles_input,
"bindings": bindings_input,
}
for action, resource in items
]
response = await self._post(
self._bulk_path,
{"input": {"queries": queries}},
)
return _parse_bulk_result(response.get("result", []), len(items))
[docs]
async def compile_filter(
self,
*,
subject: Subject,
action: str,
resource_type: str,
roles: Mapping[str, Any],
bindings: Sequence[RoleBinding],
unknowns: Sequence[str] = ("input.resource.id",),
) -> FilterResult:
"""Compile the list policy into a row filter for *resource_type*.
Asks OPA to partially evaluate ``data.<opa_package>.allow``
with the resource left unknown, then translates the
residual into a :class:`FilterResult`. The resource
*type* is supplied as a known value so type-scoped and
global bindings resolve fully and only instance-level
constraints survive into the residual.
Args:
subject: The principal making the request.
action: The collection action, e.g. ``"task:list"``.
resource_type: The resource type being listed, e.g.
``"Task"``. Passed as a known value so the
residual is purely about instance fields.
roles: The role catalogue (see :meth:`check`).
bindings: The subject's role bindings.
unknowns: The ``input`` paths OPA treats as symbolic.
Defaults to ``("input.resource.id",)`` -- enough
for generic RBAC. Extend it (e.g. with
``"input.resource.created_by"``) when a custom
rule constrains other resource fields.
Returns:
The compiled :class:`FilterResult`.
Raises:
OpaError: The service failed, or returned a residual
the translator cannot represent as a SQL filter.
"""
body = {
"query": f"data.{self._opa_package}.allow == true",
"input": {
"subject": subject.as_input(),
"action": action,
"resource": {"type": resource_type},
"roles": dict(roles),
"bindings": [binding.as_input() for binding in bindings],
},
"unknowns": list(unknowns),
}
response = await self._post("/v1/compile", body)
return _parse_compile_result(response.get("result", {}))
async def _post(
self,
path: str,
body: Mapping[str, Any],
) -> dict[str, Any]:
"""POST *body* as JSON to *path*, returning the parsed response.
Raises:
OpaError: The service was unreachable, timed out, or
answered with a non-2xx status.
"""
try:
response = await self._client.post(path, json=dict(body))
response.raise_for_status()
except httpx.HTTPError as exc:
msg = f"permissions service call failed: {exc}"
raise OpaError(msg) from exc
return response.json()
def _parse_bulk_result(result: Any, count: int) -> list[bool]:
"""Order a bulk ``decisions`` result back into a per-item list.
The bulk entrypoint returns a *set* of ``{index, permit}``
objects, so the response array has no guaranteed order;
``index`` ties each verdict to its query. An item with no
matching verdict defaults to ``False`` -- a partial answer is
never read as a silent allow.
"""
by_index: dict[int, bool] = {}
if isinstance(result, list):
for entry in result:
index = entry.get("index")
if isinstance(index, int):
by_index[index] = bool(entry.get("permit", False))
return [by_index.get(position, False) for position in range(count)]
# -- Compile-result translation -------------------------------------
#
# OPA's /v1/compile returns the residual as a disjunction of
# conjunctions of expressions (an AST). The helpers below walk
# that AST into a :class:`FilterResult`. Only the shapes a
# resource-scoped RBAC residual produces are supported -- a
# comparison between an ``input.resource.<field>`` reference and a
# literal; anything else raises :class:`OpaError` rather than
# silently dropping a constraint.
#: OPA builtin name -> :class:`Condition` op. ``flip`` is the op
#: to use when the literal is on the left and the field reference
#: on the right (comparisons are not symmetric).
_OPERATORS: dict[str, tuple[str, str]] = {
"equal": ("eq", "eq"),
"eq": ("eq", "eq"),
"neq": ("ne", "ne"),
"lt": ("lt", "gt"),
"lte": ("le", "ge"),
"gt": ("gt", "lt"),
"gte": ("ge", "le"),
"internal.member_2": ("in", "in"),
}
#: Prefix every residual reference this translator understands
#: starts with. Generic RBAC list filtering leaves the resource
#: unknown, so every surviving constraint is on a resource field.
_RESOURCE_PREFIX = ("input", "resource")
#: A FilterResult that passes every row.
_ALWAYS_ALLOW = FilterResult(
always_allow=True,
always_deny=False,
conjunctions=(),
)
def _parse_compile_result(result: Mapping[str, Any]) -> FilterResult:
"""Translate an OPA ``/v1/compile`` ``result`` into a FilterResult.
Handles both shapes OPA's partial evaluation produces:
* the flat form -- ``queries`` is an OR of conjunctions of
comparison expressions;
* the support form -- ``queries`` references a generated
support rule (OPA factors a ``default``-bearing rule out),
and ``support`` carries that rule's definitions.
Raises:
OpaError: A residual expression the translator cannot
represent as a SQL filter.
"""
queries = result.get("queries")
# No `queries` key at all -- the query is unsatisfiable, so no
# row can ever pass.
if queries is None:
return _deny()
support: list[Mapping[str, Any]] = result.get("support") or []
if support:
return _expand_support(queries, support)
return _from_conjunctions(
[_conjunction(query) for query in queries],
)
def _deny() -> FilterResult:
"""Return a fresh always-deny :class:`FilterResult`."""
return FilterResult(
always_allow=False,
always_deny=True,
conjunctions=(),
)
def _from_conjunctions(
conjunctions: list[tuple[Condition, ...] | None],
) -> FilterResult:
"""Fold parsed conjunctions into a :class:`FilterResult`.
A ``None`` entry is an unconditionally-true branch (an empty
conjunction): if any OR branch is free, the whole filter
passes every row.
"""
if any(conjunction is None for conjunction in conjunctions):
return _ALWAYS_ALLOW
kept = [c for c in conjunctions if c]
if not kept:
return _deny()
return FilterResult(
always_allow=False,
always_deny=False,
conjunctions=tuple(kept),
)
def _conjunction(
exprs: list[Mapping[str, Any]],
) -> tuple[Condition, ...] | None:
"""Parse a query body into a conjunction of conditions.
Returns ``None`` for an unconditionally-true body (empty, or
every expression a ``true`` literal). A ``false`` literal
expression makes the whole conjunction dead -- represented as
an empty tuple, which :func:`_from_conjunctions` drops.
"""
conditions: list[Condition] = []
for expr in exprs:
literal = _boolean_literal(expr)
if literal is True:
continue
if literal is False:
return ()
conditions.append(_parse_expr(expr))
return None if not conditions else tuple(conditions)
def _boolean_literal(expr: Mapping[str, Any]) -> bool | None:
"""Return the bool an expr is, when it is a bare boolean term.
OPA renders an always-true / always-false body expression as a
single boolean ``terms`` object (not a comparison call).
"""
terms = expr.get("terms")
if isinstance(terms, dict) and terms.get("type") == "boolean":
return bool(terms.get("value"))
return None
def _expand_support(
queries: list[list[Mapping[str, Any]]],
support: list[Mapping[str, Any]],
) -> FilterResult:
"""Resolve a support-rule compile result into a FilterResult.
OPA factors a ``default``-bearing rule into ``support``: the
single ``queries`` entry references a generated support rule
(``data.partial.<pkg>.<name>``), and ``support`` carries that
rule's definitions -- a ``default`` plus zero or more
conditional clauses. The residual is the OR of the
conditional clauses' bodies.
Raises:
OpaError: The ``queries``/``support`` shape is not the
single-support-rule form this translator handles.
"""
if len(queries) != 1 or len(queries[0]) != 1:
msg = (
"unsupported compile result: expected a single query "
"referencing one support rule"
)
raise OpaError(msg)
ref_path = _ref_path(queries[0][0].get("terms", {}))
if ref_path is None:
msg = "unsupported compile result: query is not a rule reference"
raise OpaError(msg)
rule_name = ref_path[-1]
package_path = tuple(ref_path[:-1])
clauses: list[tuple[Condition, ...] | None] = []
for entry in support:
if _support_package_path(entry) != package_path:
continue
for rule in entry.get("rules", []):
if rule.get("default"):
continue
if _support_rule_name(rule) != rule_name:
continue
clauses.append(_conjunction(rule.get("body", [])))
return _from_conjunctions(clauses)
def _support_package_path(entry: Mapping[str, Any]) -> tuple[str, ...]:
"""Return the dotted path of a support package as a tuple."""
parts = entry.get("package", {}).get("path", [])
return tuple(part.get("value") for part in parts)
def _support_rule_name(rule: Mapping[str, Any]) -> str | None:
"""Return the name of a support rule, or ``None``."""
return rule.get("head", {}).get("name")
def _ref_path(term: Mapping[str, Any]) -> list[str] | None:
"""Return the dotted path of a ref term, or ``None``."""
if term.get("type") != "ref":
return None
return [part.get("value") for part in term.get("value", [])]
def _parse_expr(expr: Mapping[str, Any]) -> Condition:
"""Translate one residual expression into a :class:`Condition`.
Raises:
OpaError: The expression is not a binary comparison
between an ``input.resource`` field and a literal.
"""
terms = expr.get("terms")
negated = bool(expr.get("negated", False))
if not isinstance(terms, list) or len(terms) != _BINARY_TERMS:
msg = (
f"cannot translate residual expression {expr!r}: only "
f"binary comparisons of a resource field against a "
f"literal are supported"
)
raise OpaError(msg)
operator_term, left, right = terms
builtin = _operator_name(operator_term)
ops = _OPERATORS.get(builtin)
if ops is None:
msg = f"unsupported residual operator {builtin!r}"
raise OpaError(msg)
field, value, flipped = _split_operands(left, right)
if flipped and builtin == "internal.member_2":
msg = "unsupported residual: `in` with the field on the right"
raise OpaError(msg)
op = ops[1] if flipped else ops[0]
return Condition(field=field, op=op, value=value, negated=negated)
#: A binary-comparison expression has exactly three terms: the
#: operator reference and its two operands.
_BINARY_TERMS = 3
def _operator_name(term: Mapping[str, Any]) -> str:
"""Return the builtin name of an operator term (e.g. ``"equal"``).
Raises:
OpaError: The term is not an operator reference.
"""
if term.get("type") != "ref":
msg = f"expected an operator reference, got {term!r}"
raise OpaError(msg)
return ".".join(part["value"] for part in term["value"])
def _split_operands(
left: Mapping[str, Any],
right: Mapping[str, Any],
) -> tuple[str, Any, bool]:
"""Split a comparison's operands into ``(field, literal, flipped)``.
Exactly one operand must be an ``input.resource.<field>``
reference; the other must be a literal. ``flipped`` is
``True`` when the reference was the right operand.
Raises:
OpaError: Neither or both operands are resource
references, or the literal is not a scalar / array.
"""
left_field = _resource_field(left)
right_field = _resource_field(right)
if left_field is not None and right_field is None:
return left_field, _literal(right), False
if right_field is not None and left_field is None:
return right_field, _literal(left), True
msg = (
"cannot translate residual: a comparison must be between "
"exactly one input.resource field and one literal"
)
raise OpaError(msg)
def _resource_field(term: Mapping[str, Any]) -> str | None:
"""Return the field path of an ``input.resource`` ref, else ``None``.
``input.resource.id`` yields ``"id"``; a nested
``input.resource.owner.id`` yields ``"owner.id"``. A
reference to anything other than an ``input.resource`` field
yields ``None`` (it is treated as a literal-bearing operand by
the caller, which then rejects it).
"""
if term.get("type") != "ref":
return None
parts = term["value"]
head = tuple(part.get("value") for part in parts[: len(_RESOURCE_PREFIX)])
if head != _RESOURCE_PREFIX or len(parts) <= len(_RESOURCE_PREFIX):
return None
return ".".join(part["value"] for part in parts[len(_RESOURCE_PREFIX) :])
def _literal(term: Mapping[str, Any]) -> Any:
"""Return the Python value of a scalar / array / set term.
Raises:
OpaError: The term is not a literal the translator can
represent (e.g. another reference, or a call).
"""
kind = term.get("type")
if kind in {"string", "number", "boolean", "null"}:
return term.get("value")
if kind in {"array", "set"}:
return [_literal(element) for element in term["value"]]
msg = f"cannot translate residual operand of type {kind!r}"
raise OpaError(msg)