Skip to content

API Reference

Docstrings on this page are pulled directly from the source under src/ by mkdocstrings. Edit the docstring, rebuild, and the rendered page updates.


lex_align_client

The thin client used by developers and AI agents. Provides the lex-align-client CLI as well as the Claude Code and git pre-commit hooks.

lex_align_client

api

HTTP client for the lex-align server.

Synchronous; the CLI is one-shot, so the simplicity of httpx.Client is worth more than the throughput of an async client. Failure semantics:

  • connection error and fail_open=true → return a synthetic ALLOWED verdict with transport_error=True so the caller can warn.
  • connection error and fail_open=false → raise ServerUnreachable.
  • server 4xx/5xx → raise ServerError(detail).

LexAlignClient

LexAlignClient(config: ClientConfig, http_client: Client | None = None, *, agent_model: Optional[str] = None, agent_version: Optional[str] = None)
Source code in src/lex_align_client/api.py
def __init__(
    self,
    config: ClientConfig,
    http_client: httpx.Client | None = None,
    *,
    agent_model: Optional[str] = None,
    agent_version: Optional[str] = None,
):
    self.config = config
    self._http = http_client or httpx.Client(timeout=5.0)
    self._owns_client = http_client is None
    # Explicit kwargs win, then env vars, then None. The agent identity
    # is purely informational (it tags audit rows for the dashboards),
    # so we never reject a request because it's missing.
    self.agent_model = (
        agent_model
        if agent_model is not None
        else (os.environ.get(AGENT_MODEL_ENV) or None)
    )
    self.agent_version = (
        agent_version
        if agent_version is not None
        else (os.environ.get(AGENT_VERSION_ENV) or None)
    )
pending_approvals
pending_approvals(project: Optional[str] = None) -> list[dict]

List PENDING_REVIEW approval requests for project.

Defaults to the project bound to this client. Used by lex-align-client status and the SessionStart brief to surface the queue without an extra trip to the dashboard.

Source code in src/lex_align_client/api.py
def pending_approvals(self, project: Optional[str] = None) -> list[dict]:
    """List PENDING_REVIEW approval requests for ``project``.

    Defaults to the project bound to this client. Used by
    ``lex-align-client status`` and the SessionStart brief to surface
    the queue without an extra trip to the dashboard.
    """
    params = {
        "project": project or self.config.project,
        "status": "PENDING_REVIEW",
    }
    resp = self._http.get(
        f"{self.config.server_url}/api/v1/reports/approval-requests",
        params=params,
        headers=self._headers(),
    )
    resp.raise_for_status()
    body = resp.json() or {}
    return list(body.get("items") or [])
security_report
security_report(project: Optional[str] = None) -> dict

Project-scoped CVE-driven denial rollup.

Used by status and the SessionStart brief to surface critical CVE pressure on packages this project has been touching, so a freshly-published advisory is visible the next time a session starts — not at commit time.

Source code in src/lex_align_client/api.py
def security_report(self, project: Optional[str] = None) -> dict:
    """Project-scoped CVE-driven denial rollup.

    Used by ``status`` and the SessionStart brief to surface critical
    CVE pressure on packages this project has been touching, so a
    freshly-published advisory is visible the next time a session
    starts — not at commit time.
    """
    params = {"project": project or self.config.project}
    resp = self._http.get(
        f"{self.config.server_url}/api/v1/reports/security",
        params=params,
        headers=self._headers(),
    )
    resp.raise_for_status()
    return resp.json() or {}

audit

Bulk-audit a project's runtime dependencies without committing.

lex-align-client audit is the read-only sibling of the pre-commit hook: walk [project].dependencies, evaluate each one against the server, and print a summary. Useful when adopting lex-align on an existing project or before sending a PR for review — you don't have to git commit just to find out which packages are out of policy.

evaluate

evaluate(project_root: Path, config: ClientConfig, *, agent_model: str | None = None, agent_version: str | None = None) -> AuditReport

Evaluate every runtime dep in project_root/pyproject.toml.

Source code in src/lex_align_client/audit.py
def evaluate(
    project_root: Path,
    config: ClientConfig,
    *,
    agent_model: str | None = None,
    agent_version: str | None = None,
) -> AuditReport:
    """Evaluate every runtime dep in ``project_root/pyproject.toml``."""
    deps = get_runtime_deps(project_root / "pyproject.toml")
    verdicts: list[Verdict] = []
    if not deps:
        return AuditReport(project=config.project, deps_total=0, verdicts=[])
    with LexAlignClient(
        config, agent_model=agent_model, agent_version=agent_version
    ) as client:
        for name, spec in sorted(deps.items()):
            version = extract_pinned_version(spec)
            verdicts.append(client.check(name, version))
    return AuditReport(
        project=config.project,
        deps_total=len(deps),
        verdicts=verdicts,
    )

format_report

format_report(report: AuditReport) -> str

Render an :class:AuditReport as a human-readable summary.

Source code in src/lex_align_client/audit.py
def format_report(report: AuditReport) -> str:
    """Render an :class:`AuditReport` as a human-readable summary."""
    lines: list[str] = [
        f"Audited {report.deps_total} runtime dep"
        f"{'s' if report.deps_total != 1 else ''} for project '{report.project}'.",
        "",
    ]
    if report.deps_total == 0:
        lines.append("No `[project].dependencies` found.")
        return "\n".join(lines)

    lines.append(
        f"  ALLOWED              : {len(report.allowed) - len(report.transport_errors)}"
    )
    lines.append(f"  PROVISIONALLY_ALLOWED: {len(report.provisional)}")
    lines.append(f"  DENIED               : {len(report.denied)}")
    if report.transport_errors:
        lines.append(f"  (server unreachable for {len(report.transport_errors)})")
    lines.append("")

    if report.denied:
        lines.append("DENIED:")
        for v in report.denied:
            lines.extend(_render_verdict(v, marker="✗"))
        lines.append("")
    if report.provisional:
        lines.append("PROVISIONALLY_ALLOWED:")
        for v in report.provisional:
            lines.extend(_render_verdict(v, marker="◎"))
        lines.append(
            "  → run `lex-align-client request-approval --package <name> "
            "--rationale \"<why>\"` to enqueue formal review."
        )
        lines.append("")
    return "\n".join(lines).rstrip() + "\n"

run

run(project_root: Path, config: ClientConfig, *, as_json: bool, agent_model: str | None = None, agent_version: str | None = None) -> int

CLI entry point. Returns the desired exit code (0/1/2).

Source code in src/lex_align_client/audit.py
def run(
    project_root: Path,
    config: ClientConfig,
    *,
    as_json: bool,
    agent_model: str | None = None,
    agent_version: str | None = None,
) -> int:
    """CLI entry point. Returns the desired exit code (0/1/2)."""
    try:
        report = evaluate(
            project_root, config,
            agent_model=agent_model, agent_version=agent_version,
        )
    except ServerUnreachable as exc:
        print(f"[lex-align] server unreachable: {exc}", file=sys.stderr)
        return 1
    except ServerError as exc:
        print(f"[lex-align] server error: {exc}", file=sys.stderr)
        return 1
    if as_json:
        print(json.dumps(report.to_dict(), indent=2))
    else:
        print(format_report(report), end="")
    return 2 if report.denied else 0

claude_hooks

Claude Code session hooks (Advisor surface).

SessionStart and PreToolUse proxy through to the server's /evaluate so the agent sees blocked / provisionally-allowed verdicts during planning, before the pre-commit gate fires.

handle_pre_tool_use

handle_pre_tool_use(event: dict, project_root: Path, config: ClientConfig) -> Optional[tuple[str, str | None]]

Return (decision, message) for pyproject.toml edits, else None.

Source code in src/lex_align_client/claude_hooks.py
def handle_pre_tool_use(
    event: dict, project_root: Path, config: ClientConfig
) -> Optional[tuple[str, str | None]]:
    """Return (decision, message) for pyproject.toml edits, else None."""
    tool_name = event.get("tool_name", "")
    tool_input = event.get("tool_input", {}) or {}
    path_str = tool_input.get("path", "") or tool_input.get("file_path", "")
    if "pyproject.toml" not in path_str:
        return None
    if tool_name not in ("Edit", "Write", "MultiEdit"):
        return None

    pyproject_path = (
        project_root / path_str if not Path(path_str).is_absolute() else Path(path_str)
    )
    if not pyproject_path.exists():
        return None

    current = pyproject_path.read_text()
    proposed = apply_edit(current, tool_name, tool_input)
    added, removed = diff_deps(current, proposed)
    if not added and not removed:
        return None

    header = ["You are modifying runtime dependencies:"]
    for name, spec in sorted(added.items()):
        header.append(f"  + {spec}")
    for name in sorted(removed):
        header.append(f"  - {name}")

    model, version = _detect_agent(event)
    blocks: list[str] = []
    notes: list[str] = []
    try:
        with LexAlignClient(
            config, agent_model=model, agent_version=version
        ) as client:
            for name, spec in sorted(added.items()):
                pinned = extract_pinned_version(spec)
                v = client.check(name, pinned)
                if v.denied:
                    blocks.append(_format_verdict(v, spec))
                elif v.verdict == "PROVISIONALLY_ALLOWED":
                    auto_note = ""
                    if config.auto_request_approval and v.is_requestable:
                        rationale = _auto_rationale(spec, v)
                        try:
                            client.request_approval(name, rationale)
                            auto_note = " (auto-enqueued for review)"
                        except (ServerUnreachable, ServerError) as exc:
                            auto_note = f" (auto-enqueue failed: {exc.__class__.__name__})"
                    elif v.is_requestable:
                        auto_note = " (run `lex-align-client request-approval` after this lands)"
                    notes.append(f"  ◎ {name} — provisional: {v.reason}{auto_note}")
                elif v.needs_rationale:
                    notes.append(
                        f"  • {name} — allowed, but registry-approved (neutral); "
                        "document the architectural need in your commit message or PR."
                    )
                else:
                    notes.append(f"  ✓ {name}{v.reason}")
    except (ServerUnreachable, ServerError) as exc:
        if config.fail_open:
            return ("allow", "\n".join(header + ["", f"[lex-align] {exc} — fail_open=true; allowing edit."]))
        return ("block", f"[lex-align] cannot reach server: {exc}")

    if blocks:
        msg = "\n".join(header + ["", "ENFORCEMENT — blocked by registry:"] + blocks +
                        ["", "No dependencies were modified. Adjust the edit and retry."])
        return ("block", msg)
    if notes:
        return ("allow", "\n".join(header + [""] + notes))
    return ("allow", "\n".join(header))

claudemd

CLAUDE.md integration: write lex-align usage instructions into the project's CLAUDE.md.

install_claude_md

install_claude_md(project_root: Path) -> tuple[Path, bool]

Create CLAUDE.md or append the lex-align section if not already present.

Returns (path, created_or_updated). Returns False for the second element when the section was already present and no write was performed.

Source code in src/lex_align_client/claudemd.py
def install_claude_md(project_root: Path) -> tuple[Path, bool]:
    """Create CLAUDE.md or append the lex-align section if not already present.

    Returns (path, created_or_updated). Returns False for the second element
    when the section was already present and no write was performed.
    """
    path = project_root / "CLAUDE.md"
    if path.exists():
        existing = path.read_text(encoding="utf-8")
        if _SECTION_HEADER in existing:
            return path, False
        path.write_text(existing.rstrip("\n") + "\n\n" + _SECTION + "\n", encoding="utf-8")
    else:
        path.write_text(_SECTION + "\n", encoding="utf-8")
    return path, True

cli

lex-align-client CLI: init, check, request-approval, hook, precommit.

main

main() -> None

lex-align client — talks to the lex-align server.

Source code in src/lex_align_client/cli.py
@click.group()
def main() -> None:
    """lex-align client — talks to the lex-align server."""

init

init(yes: bool, server_url: str | None, project_name: str | None, mode: str | None, no_claude_hooks: bool, no_precommit: bool, no_claude_md: bool) -> None

One-time setup: write .lexalign.toml and install hooks.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--yes", "-y", is_flag=True, help="Accept defaults without prompting.")
@click.option("--server-url", default=None, help="Override server URL.")
@click.option("--project", "project_name", default=None, help="Override project name.")
@click.option("--mode", type=click.Choice(["single-user", "org"]), default=None)
@click.option("--no-claude-hooks", is_flag=True, help="Skip Claude Code hook install.")
@click.option("--no-precommit", is_flag=True, help="Skip git pre-commit install.")
@click.option("--no-claude-md", is_flag=True, help="Skip CLAUDE.md creation/update.")
def init(
    yes: bool,
    server_url: str | None,
    project_name: str | None,
    mode: str | None,
    no_claude_hooks: bool,
    no_precommit: bool,
    no_claude_md: bool,
) -> None:
    """One-time setup: write .lexalign.toml and install hooks."""
    project_root = Path.cwd()

    if config_path(project_root).exists() and not yes:
        if not click.confirm(
            f"{CONFIG_FILENAME} already exists. Overwrite?", default=False
        ):
            raise click.Abort()

    autodetected = detect_project_name(project_root / "pyproject.toml", project_root.name)
    if project_name is None:
        project_name = autodetected if yes else click.prompt(
            "Project name", default=autodetected
        )
    if server_url is None:
        server_url = "http://127.0.0.1:8765" if yes else click.prompt(
            "Server URL", default="http://127.0.0.1:8765"
        )
    if mode is None:
        mode = "single-user" if yes else click.prompt(
            "Mode", type=click.Choice(["single-user", "org"]), default="single-user"
        )

    config = ClientConfig(
        project=project_name,
        server_url=server_url,
        mode=mode,
        fail_open=(mode == "single-user"),
        auto_request_approval=(mode == "single-user"),
    )
    path = save_config(project_root, config)
    click.echo(f"Wrote {path}")

    if not no_claude_hooks:
        install_claude_hooks(project_root)
        click.echo("Installed Claude Code hooks in .claude/settings.json.")

    if not no_claude_md:
        md_existed = (project_root / "CLAUDE.md").exists()
        md_path, changed = install_claude_md(project_root)
        if changed:
            action = "Updated" if md_existed else "Created"
            click.echo(f"{action} {md_path.name} with lex-align usage instructions.")
        else:
            click.echo(f"{md_path.name} already contains lex-align section; skipped.")

    if not no_precommit:
        hook_path = install_precommit(project_root)
        if hook_path:
            click.echo(f"Installed git pre-commit hook at {hook_path}.")
        else:
            click.echo(
                "Skipped pre-commit install: this directory is not a git repo "
                "(run `git init` then `lex-align-client init` again)."
            )

    if mode == "org":
        click.echo("")
        click.echo(
            f"Organization mode: export your API key as ${config.api_key_env_var} "
            "before running checks."
        )

check

check(package: str, version: str | None, as_json: bool, agent_model: str | None, agent_version: str | None) -> None

Check a package against the server policy.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--package", required=True)
@click.option("--version", default=None)
@click.option("--json", "as_json", is_flag=True, default=True, help="Emit JSON (default).")
@click.option(
    "--agent-model",
    default=None,
    help="Override agent model tag (default: $LEXALIGN_AGENT_MODEL).",
)
@click.option(
    "--agent-version",
    default=None,
    help="Override agent version tag (default: $LEXALIGN_AGENT_VERSION).",
)
def check(
    package: str,
    version: str | None,
    as_json: bool,
    agent_model: str | None,
    agent_version: str | None,
) -> None:
    """Check a package against the server policy."""
    project_root = find_project_root()
    config = _require_config(project_root)
    try:
        with LexAlignClient(
            config, agent_model=agent_model, agent_version=agent_version
        ) as client:
            verdict = client.check(package, version)
    except ServerUnreachable as exc:
        raise click.ClickException(f"Server unreachable: {exc}")
    except ServerError as exc:
        raise click.ClickException(f"Server error: {exc}")
    click.echo(json.dumps(verdict.to_dict(), indent=2))
    if verdict.denied:
        sys.exit(2)

request_approval

request_approval(package: str, rationale: str, agent_model: str | None, agent_version: str | None) -> None

Submit a non-blocking request to add package to the registry.

Source code in src/lex_align_client/cli.py
@main.command("request-approval")
@click.option("--package", required=True)
@click.option("--rationale", required=True)
@click.option(
    "--agent-model",
    default=None,
    help="Override agent model tag (default: $LEXALIGN_AGENT_MODEL).",
)
@click.option(
    "--agent-version",
    default=None,
    help="Override agent version tag (default: $LEXALIGN_AGENT_VERSION).",
)
def request_approval(
    package: str,
    rationale: str,
    agent_model: str | None,
    agent_version: str | None,
) -> None:
    """Submit a non-blocking request to add `package` to the registry."""
    project_root = find_project_root()
    config = _require_config(project_root)
    try:
        with LexAlignClient(
            config, agent_model=agent_model, agent_version=agent_version
        ) as client:
            response = client.request_approval(package, rationale)
    except ServerUnreachable as exc:
        raise click.ClickException(f"Server unreachable: {exc}")
    except ServerError as exc:
        raise click.ClickException(f"Server error: {exc}")
    click.echo(json.dumps(response, indent=2))

audit

audit(as_json: bool, agent_model: str | None, agent_version: str | None) -> None

Re-evaluate every runtime dep in pyproject.toml against the server.

Read-only sibling of the pre-commit hook: useful when adopting lex-align on an existing project, or to check policy without needing to stage and commit. Exits 2 if any dep is DENIED.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--json", "as_json", is_flag=True, help="Emit machine-readable JSON.")
@click.option(
    "--agent-model",
    default=None,
    help="Override agent model tag (default: $LEXALIGN_AGENT_MODEL).",
)
@click.option(
    "--agent-version",
    default=None,
    help="Override agent version tag (default: $LEXALIGN_AGENT_VERSION).",
)
def audit(
    as_json: bool, agent_model: str | None, agent_version: str | None
) -> None:
    """Re-evaluate every runtime dep in `pyproject.toml` against the server.

    Read-only sibling of the pre-commit hook: useful when adopting
    `lex-align` on an existing project, or to check policy without
    needing to stage and commit. Exits 2 if any dep is DENIED.
    """
    project_root = find_project_root()
    config = _require_config(project_root)
    sys.exit(
        audit_module.run(
            project_root, config, as_json=as_json,
            agent_model=agent_model, agent_version=agent_version,
        )
    )

status

status(as_json: bool) -> None

One-screen status: server reachability, hooks, pending approvals, denials.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--json", "as_json", is_flag=True, help="Emit machine-readable JSON.")
def status(as_json: bool) -> None:
    """One-screen status: server reachability, hooks, pending approvals, denials."""
    project_root = find_project_root()
    config = _require_config(project_root)
    report = status_module.collect(project_root, config)
    if as_json:
        click.echo(json.dumps(report.to_dict(), indent=2))
    else:
        click.echo(status_module.format_report(report))

precommit

precommit() -> None

Pre-commit hook entry point. Exits non-zero on any DENIED dependency.

Source code in src/lex_align_client/cli.py
@main.command()
def precommit() -> None:
    """Pre-commit hook entry point. Exits non-zero on any DENIED dependency."""
    sys.exit(run_precommit())

uninstall

uninstall(yes: bool) -> None

Remove .claude hooks and the git pre-commit shim. Leaves .lexalign.toml.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--yes", "-y", is_flag=True)
def uninstall(yes: bool) -> None:
    """Remove .claude hooks and the git pre-commit shim. Leaves .lexalign.toml."""
    project_root = find_project_root()
    if not yes and not click.confirm(
        "Remove lex-align hooks (Claude + git pre-commit)?", default=False
    ):
        raise click.Abort()
    remove_claude_hooks(project_root)
    remove_precommit(project_root)
    click.echo("Removed Claude hooks and git pre-commit shim.")
    click.echo(f"{CONFIG_FILENAME} preserved.")

config

.lexalign.toml reader/writer.

Schema:

project = "lex-align" server_url = "http://127.0.0.1:8765" mode = "single-user" # or "org" fail_open = true # ignored unless server is unreachable api_key_env_var = "LEXALIGN_API_KEY" # only used when mode = "org" auto_request_approval = true # PROVISIONALLY_ALLOWED → enqueue review # automatically. Defaults to true in # single-user mode, false in org mode.

find_project_root

find_project_root(start: Optional[Path] = None) -> Path

Walk up looking for .lexalign.toml. Falls back to cwd.

Source code in src/lex_align_client/config.py
def find_project_root(start: Optional[Path] = None) -> Path:
    """Walk up looking for `.lexalign.toml`. Falls back to cwd."""
    cwd = (start or Path.cwd()).resolve()
    for parent in [cwd] + list(cwd.parents):
        if (parent / CONFIG_FILENAME).exists():
            return parent
    return cwd

precommit

Git pre-commit hook entry point.

Behavior
  1. Read the staged contents of pyproject.toml (if it's in the index).
  2. Re-evaluate every runtime dependency against the server. Reading existing deps catches new CVEs published since the last commit.
  3. If any verdict is DENIED, exit non-zero with a structured stderr message that an AI agent can parse to self-correct.

pyproject_utils

Helpers for parsing and diffing pyproject.toml dependencies.

These are the only pieces of the v1.x reconciler that survive the rewrite — the client uses them to drive the pre-commit hook and the Claude Code edit hook.

get_runtime_deps

get_runtime_deps(pyproject_path: Path) -> dict[str, str]

Return {normalized_name: raw_spec} from [project].dependencies.

Source code in src/lex_align_client/pyproject_utils.py
def get_runtime_deps(pyproject_path: Path) -> dict[str, str]:
    """Return {normalized_name: raw_spec} from [project].dependencies."""
    if not pyproject_path.exists():
        return {}
    with open(pyproject_path, "rb") as f:
        data = tomllib.load(f)
    deps = data.get("project", {}).get("dependencies", []) or []
    return {normalize_name(d): d.strip() for d in deps}

diff_deps

diff_deps(old_content: str, new_content: str) -> tuple[dict[str, str], set[str]]

Return ({added_name: raw_spec}, {removed_names}).

Source code in src/lex_align_client/pyproject_utils.py
def diff_deps(old_content: str, new_content: str) -> tuple[dict[str, str], set[str]]:
    """Return ({added_name: raw_spec}, {removed_names})."""
    old = parse_deps_from_content(old_content)
    new = parse_deps_from_content(new_content)
    added = {name: spec for name, spec in new.items() if name not in old}
    removed = set(old) - set(new)
    return added, removed

extract_pinned_version

extract_pinned_version(spec: str) -> Optional[str]

Pull a concrete version out of a spec like 'redis>=5.0' or 'httpx==0.28.1'.

Source code in src/lex_align_client/pyproject_utils.py
def extract_pinned_version(spec: str) -> Optional[str]:
    """Pull a concrete version out of a spec like 'redis>=5.0' or 'httpx==0.28.1'."""
    m = re.search(r"(?:>=|<=|!=|~=|==|>|<)\s*([0-9][0-9a-zA-Z\.\-\+\_]*)", spec)
    return m.group(1) if m else None

apply_edit

apply_edit(current_content: str, tool_name: str, tool_input: dict) -> str

Simulate how a Claude Code Edit/Write/MultiEdit affects file content.

Source code in src/lex_align_client/pyproject_utils.py
def apply_edit(current_content: str, tool_name: str, tool_input: dict) -> str:
    """Simulate how a Claude Code Edit/Write/MultiEdit affects file content."""
    if tool_name == "Write":
        return tool_input.get("content", "")
    if tool_name == "Edit":
        old_str = tool_input.get("old_string", "")
        new_str = tool_input.get("new_string", "")
        return current_content.replace(old_str, new_str, 1)
    if tool_name == "MultiEdit":
        content = current_content
        for edit in tool_input.get("edits", []):
            content = content.replace(edit.get("old_string", ""), edit.get("new_string", ""), 1)
        return content
    return current_content

detect_project_name

detect_project_name(pyproject_path: Path, fallback: str) -> str

Best-effort autodetect for lex-align-client init.

Prefer [project].name from pyproject.toml; fall back to the directory name.

Source code in src/lex_align_client/pyproject_utils.py
def detect_project_name(pyproject_path: Path, fallback: str) -> str:
    """Best-effort autodetect for `lex-align-client init`.

    Prefer [project].name from pyproject.toml; fall back to the directory name.
    """
    if pyproject_path.exists():
        try:
            with open(pyproject_path, "rb") as f:
                data = tomllib.load(f)
            name = data.get("project", {}).get("name")
            if isinstance(name, str) and name.strip():
                return name.strip()
        except Exception:
            pass
    return fallback

settings

Idempotent installers for the Claude Code hooks and the git pre-commit hook.

precommit_installed

precommit_installed(project_root: Path) -> bool

True iff the git pre-commit hook contains the lex-align marker.

Source code in src/lex_align_client/settings.py
def precommit_installed(project_root: Path) -> bool:
    """True iff the git pre-commit hook contains the lex-align marker."""
    hook_path = _precommit_path(project_root)
    if not hook_path.exists():
        return False
    try:
        return _PRECOMMIT_MARKER in hook_path.read_text()
    except OSError:
        return False

install_precommit

install_precommit(project_root: Path) -> Path | None

Install or augment .git/hooks/pre-commit. Returns the path if written.

Source code in src/lex_align_client/settings.py
def install_precommit(project_root: Path) -> Path | None:
    """Install or augment `.git/hooks/pre-commit`. Returns the path if written."""
    git_dir = project_root / ".git"
    if not git_dir.exists():
        return None
    hook_path = _precommit_path(project_root)
    hook_path.parent.mkdir(parents=True, exist_ok=True)

    snippet = (
        f"\n{_PRECOMMIT_MARKER}\n"
        "if command -v lex-align-client >/dev/null 2>&1; then\n"
        "  lex-align-client precommit || exit $?\n"
        "elif command -v uv >/dev/null 2>&1; then\n"
        "  uv run lex-align-client precommit || exit $?\n"
        "fi\n"
    )

    if hook_path.exists():
        existing = hook_path.read_text()
        if _PRECOMMIT_MARKER in existing:
            return hook_path
        new = existing.rstrip() + "\n" + snippet
        hook_path.write_text(new)
    else:
        hook_path.write_text("#!/usr/bin/env bash\nset -e\n" + snippet)
    try:
        hook_path.chmod(hook_path.stat().st_mode | 0o111)
    except OSError:
        pass
    return hook_path

status

lex-align-client status — one-screen project + server snapshot.

Pulls together the things a developer normally checks across multiple URLs (server health, pending approval queue, recent CVE-driven denials) plus what's only visible on the client (hook install state, local dep count) so you can see the whole picture without leaving the terminal.

The collector tolerates a missing or unreachable server: each section gets a status flag and the rest of the report still renders. This is intentional — if the server's down you still want to know which hooks are wired and how many runtime deps the project carries.

CLI

cli

lex-align-client CLI: init, check, request-approval, hook, precommit.

main

main() -> None

lex-align client — talks to the lex-align server.

Source code in src/lex_align_client/cli.py
@click.group()
def main() -> None:
    """lex-align client — talks to the lex-align server."""

init

init(yes: bool, server_url: str | None, project_name: str | None, mode: str | None, no_claude_hooks: bool, no_precommit: bool, no_claude_md: bool) -> None

One-time setup: write .lexalign.toml and install hooks.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--yes", "-y", is_flag=True, help="Accept defaults without prompting.")
@click.option("--server-url", default=None, help="Override server URL.")
@click.option("--project", "project_name", default=None, help="Override project name.")
@click.option("--mode", type=click.Choice(["single-user", "org"]), default=None)
@click.option("--no-claude-hooks", is_flag=True, help="Skip Claude Code hook install.")
@click.option("--no-precommit", is_flag=True, help="Skip git pre-commit install.")
@click.option("--no-claude-md", is_flag=True, help="Skip CLAUDE.md creation/update.")
def init(
    yes: bool,
    server_url: str | None,
    project_name: str | None,
    mode: str | None,
    no_claude_hooks: bool,
    no_precommit: bool,
    no_claude_md: bool,
) -> None:
    """One-time setup: write .lexalign.toml and install hooks."""
    project_root = Path.cwd()

    if config_path(project_root).exists() and not yes:
        if not click.confirm(
            f"{CONFIG_FILENAME} already exists. Overwrite?", default=False
        ):
            raise click.Abort()

    autodetected = detect_project_name(project_root / "pyproject.toml", project_root.name)
    if project_name is None:
        project_name = autodetected if yes else click.prompt(
            "Project name", default=autodetected
        )
    if server_url is None:
        server_url = "http://127.0.0.1:8765" if yes else click.prompt(
            "Server URL", default="http://127.0.0.1:8765"
        )
    if mode is None:
        mode = "single-user" if yes else click.prompt(
            "Mode", type=click.Choice(["single-user", "org"]), default="single-user"
        )

    config = ClientConfig(
        project=project_name,
        server_url=server_url,
        mode=mode,
        fail_open=(mode == "single-user"),
        auto_request_approval=(mode == "single-user"),
    )
    path = save_config(project_root, config)
    click.echo(f"Wrote {path}")

    if not no_claude_hooks:
        install_claude_hooks(project_root)
        click.echo("Installed Claude Code hooks in .claude/settings.json.")

    if not no_claude_md:
        md_existed = (project_root / "CLAUDE.md").exists()
        md_path, changed = install_claude_md(project_root)
        if changed:
            action = "Updated" if md_existed else "Created"
            click.echo(f"{action} {md_path.name} with lex-align usage instructions.")
        else:
            click.echo(f"{md_path.name} already contains lex-align section; skipped.")

    if not no_precommit:
        hook_path = install_precommit(project_root)
        if hook_path:
            click.echo(f"Installed git pre-commit hook at {hook_path}.")
        else:
            click.echo(
                "Skipped pre-commit install: this directory is not a git repo "
                "(run `git init` then `lex-align-client init` again)."
            )

    if mode == "org":
        click.echo("")
        click.echo(
            f"Organization mode: export your API key as ${config.api_key_env_var} "
            "before running checks."
        )

check

check(package: str, version: str | None, as_json: bool, agent_model: str | None, agent_version: str | None) -> None

Check a package against the server policy.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--package", required=True)
@click.option("--version", default=None)
@click.option("--json", "as_json", is_flag=True, default=True, help="Emit JSON (default).")
@click.option(
    "--agent-model",
    default=None,
    help="Override agent model tag (default: $LEXALIGN_AGENT_MODEL).",
)
@click.option(
    "--agent-version",
    default=None,
    help="Override agent version tag (default: $LEXALIGN_AGENT_VERSION).",
)
def check(
    package: str,
    version: str | None,
    as_json: bool,
    agent_model: str | None,
    agent_version: str | None,
) -> None:
    """Check a package against the server policy."""
    project_root = find_project_root()
    config = _require_config(project_root)
    try:
        with LexAlignClient(
            config, agent_model=agent_model, agent_version=agent_version
        ) as client:
            verdict = client.check(package, version)
    except ServerUnreachable as exc:
        raise click.ClickException(f"Server unreachable: {exc}")
    except ServerError as exc:
        raise click.ClickException(f"Server error: {exc}")
    click.echo(json.dumps(verdict.to_dict(), indent=2))
    if verdict.denied:
        sys.exit(2)

request_approval

request_approval(package: str, rationale: str, agent_model: str | None, agent_version: str | None) -> None

Submit a non-blocking request to add package to the registry.

Source code in src/lex_align_client/cli.py
@main.command("request-approval")
@click.option("--package", required=True)
@click.option("--rationale", required=True)
@click.option(
    "--agent-model",
    default=None,
    help="Override agent model tag (default: $LEXALIGN_AGENT_MODEL).",
)
@click.option(
    "--agent-version",
    default=None,
    help="Override agent version tag (default: $LEXALIGN_AGENT_VERSION).",
)
def request_approval(
    package: str,
    rationale: str,
    agent_model: str | None,
    agent_version: str | None,
) -> None:
    """Submit a non-blocking request to add `package` to the registry."""
    project_root = find_project_root()
    config = _require_config(project_root)
    try:
        with LexAlignClient(
            config, agent_model=agent_model, agent_version=agent_version
        ) as client:
            response = client.request_approval(package, rationale)
    except ServerUnreachable as exc:
        raise click.ClickException(f"Server unreachable: {exc}")
    except ServerError as exc:
        raise click.ClickException(f"Server error: {exc}")
    click.echo(json.dumps(response, indent=2))

audit

audit(as_json: bool, agent_model: str | None, agent_version: str | None) -> None

Re-evaluate every runtime dep in pyproject.toml against the server.

Read-only sibling of the pre-commit hook: useful when adopting lex-align on an existing project, or to check policy without needing to stage and commit. Exits 2 if any dep is DENIED.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--json", "as_json", is_flag=True, help="Emit machine-readable JSON.")
@click.option(
    "--agent-model",
    default=None,
    help="Override agent model tag (default: $LEXALIGN_AGENT_MODEL).",
)
@click.option(
    "--agent-version",
    default=None,
    help="Override agent version tag (default: $LEXALIGN_AGENT_VERSION).",
)
def audit(
    as_json: bool, agent_model: str | None, agent_version: str | None
) -> None:
    """Re-evaluate every runtime dep in `pyproject.toml` against the server.

    Read-only sibling of the pre-commit hook: useful when adopting
    `lex-align` on an existing project, or to check policy without
    needing to stage and commit. Exits 2 if any dep is DENIED.
    """
    project_root = find_project_root()
    config = _require_config(project_root)
    sys.exit(
        audit_module.run(
            project_root, config, as_json=as_json,
            agent_model=agent_model, agent_version=agent_version,
        )
    )

status

status(as_json: bool) -> None

One-screen status: server reachability, hooks, pending approvals, denials.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--json", "as_json", is_flag=True, help="Emit machine-readable JSON.")
def status(as_json: bool) -> None:
    """One-screen status: server reachability, hooks, pending approvals, denials."""
    project_root = find_project_root()
    config = _require_config(project_root)
    report = status_module.collect(project_root, config)
    if as_json:
        click.echo(json.dumps(report.to_dict(), indent=2))
    else:
        click.echo(status_module.format_report(report))

precommit

precommit() -> None

Pre-commit hook entry point. Exits non-zero on any DENIED dependency.

Source code in src/lex_align_client/cli.py
@main.command()
def precommit() -> None:
    """Pre-commit hook entry point. Exits non-zero on any DENIED dependency."""
    sys.exit(run_precommit())

uninstall

uninstall(yes: bool) -> None

Remove .claude hooks and the git pre-commit shim. Leaves .lexalign.toml.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--yes", "-y", is_flag=True)
def uninstall(yes: bool) -> None:
    """Remove .claude hooks and the git pre-commit shim. Leaves .lexalign.toml."""
    project_root = find_project_root()
    if not yes and not click.confirm(
        "Remove lex-align hooks (Claude + git pre-commit)?", default=False
    ):
        raise click.Abort()
    remove_claude_hooks(project_root)
    remove_precommit(project_root)
    click.echo("Removed Claude hooks and git pre-commit shim.")
    click.echo(f"{CONFIG_FILENAME} preserved.")

Server API client

api

HTTP client for the lex-align server.

Synchronous; the CLI is one-shot, so the simplicity of httpx.Client is worth more than the throughput of an async client. Failure semantics:

  • connection error and fail_open=true → return a synthetic ALLOWED verdict with transport_error=True so the caller can warn.
  • connection error and fail_open=false → raise ServerUnreachable.
  • server 4xx/5xx → raise ServerError(detail).

LexAlignClient

LexAlignClient(config: ClientConfig, http_client: Client | None = None, *, agent_model: Optional[str] = None, agent_version: Optional[str] = None)
Source code in src/lex_align_client/api.py
def __init__(
    self,
    config: ClientConfig,
    http_client: httpx.Client | None = None,
    *,
    agent_model: Optional[str] = None,
    agent_version: Optional[str] = None,
):
    self.config = config
    self._http = http_client or httpx.Client(timeout=5.0)
    self._owns_client = http_client is None
    # Explicit kwargs win, then env vars, then None. The agent identity
    # is purely informational (it tags audit rows for the dashboards),
    # so we never reject a request because it's missing.
    self.agent_model = (
        agent_model
        if agent_model is not None
        else (os.environ.get(AGENT_MODEL_ENV) or None)
    )
    self.agent_version = (
        agent_version
        if agent_version is not None
        else (os.environ.get(AGENT_VERSION_ENV) or None)
    )

pending_approvals

pending_approvals(project: Optional[str] = None) -> list[dict]

List PENDING_REVIEW approval requests for project.

Defaults to the project bound to this client. Used by lex-align-client status and the SessionStart brief to surface the queue without an extra trip to the dashboard.

Source code in src/lex_align_client/api.py
def pending_approvals(self, project: Optional[str] = None) -> list[dict]:
    """List PENDING_REVIEW approval requests for ``project``.

    Defaults to the project bound to this client. Used by
    ``lex-align-client status`` and the SessionStart brief to surface
    the queue without an extra trip to the dashboard.
    """
    params = {
        "project": project or self.config.project,
        "status": "PENDING_REVIEW",
    }
    resp = self._http.get(
        f"{self.config.server_url}/api/v1/reports/approval-requests",
        params=params,
        headers=self._headers(),
    )
    resp.raise_for_status()
    body = resp.json() or {}
    return list(body.get("items") or [])

security_report

security_report(project: Optional[str] = None) -> dict

Project-scoped CVE-driven denial rollup.

Used by status and the SessionStart brief to surface critical CVE pressure on packages this project has been touching, so a freshly-published advisory is visible the next time a session starts — not at commit time.

Source code in src/lex_align_client/api.py
def security_report(self, project: Optional[str] = None) -> dict:
    """Project-scoped CVE-driven denial rollup.

    Used by ``status`` and the SessionStart brief to surface critical
    CVE pressure on packages this project has been touching, so a
    freshly-published advisory is visible the next time a session
    starts — not at commit time.
    """
    params = {"project": project or self.config.project}
    resp = self._http.get(
        f"{self.config.server_url}/api/v1/reports/security",
        params=params,
        headers=self._headers(),
    )
    resp.raise_for_status()
    return resp.json() or {}

Configuration

config

.lexalign.toml reader/writer.

Schema:

project = "lex-align" server_url = "http://127.0.0.1:8765" mode = "single-user" # or "org" fail_open = true # ignored unless server is unreachable api_key_env_var = "LEXALIGN_API_KEY" # only used when mode = "org" auto_request_approval = true # PROVISIONALLY_ALLOWED → enqueue review # automatically. Defaults to true in # single-user mode, false in org mode.

find_project_root

find_project_root(start: Optional[Path] = None) -> Path

Walk up looking for .lexalign.toml. Falls back to cwd.

Source code in src/lex_align_client/config.py
def find_project_root(start: Optional[Path] = None) -> Path:
    """Walk up looking for `.lexalign.toml`. Falls back to cwd."""
    cwd = (start or Path.cwd()).resolve()
    for parent in [cwd] + list(cwd.parents):
        if (parent / CONFIG_FILENAME).exists():
            return parent
    return cwd

Settings

settings

Idempotent installers for the Claude Code hooks and the git pre-commit hook.

precommit_installed

precommit_installed(project_root: Path) -> bool

True iff the git pre-commit hook contains the lex-align marker.

Source code in src/lex_align_client/settings.py
def precommit_installed(project_root: Path) -> bool:
    """True iff the git pre-commit hook contains the lex-align marker."""
    hook_path = _precommit_path(project_root)
    if not hook_path.exists():
        return False
    try:
        return _PRECOMMIT_MARKER in hook_path.read_text()
    except OSError:
        return False

install_precommit

install_precommit(project_root: Path) -> Path | None

Install or augment .git/hooks/pre-commit. Returns the path if written.

Source code in src/lex_align_client/settings.py
def install_precommit(project_root: Path) -> Path | None:
    """Install or augment `.git/hooks/pre-commit`. Returns the path if written."""
    git_dir = project_root / ".git"
    if not git_dir.exists():
        return None
    hook_path = _precommit_path(project_root)
    hook_path.parent.mkdir(parents=True, exist_ok=True)

    snippet = (
        f"\n{_PRECOMMIT_MARKER}\n"
        "if command -v lex-align-client >/dev/null 2>&1; then\n"
        "  lex-align-client precommit || exit $?\n"
        "elif command -v uv >/dev/null 2>&1; then\n"
        "  uv run lex-align-client precommit || exit $?\n"
        "fi\n"
    )

    if hook_path.exists():
        existing = hook_path.read_text()
        if _PRECOMMIT_MARKER in existing:
            return hook_path
        new = existing.rstrip() + "\n" + snippet
        hook_path.write_text(new)
    else:
        hook_path.write_text("#!/usr/bin/env bash\nset -e\n" + snippet)
    try:
        hook_path.chmod(hook_path.stat().st_mode | 0o111)
    except OSError:
        pass
    return hook_path

pyproject.toml utilities

pyproject_utils

Helpers for parsing and diffing pyproject.toml dependencies.

These are the only pieces of the v1.x reconciler that survive the rewrite — the client uses them to drive the pre-commit hook and the Claude Code edit hook.

get_runtime_deps

get_runtime_deps(pyproject_path: Path) -> dict[str, str]

Return {normalized_name: raw_spec} from [project].dependencies.

Source code in src/lex_align_client/pyproject_utils.py
def get_runtime_deps(pyproject_path: Path) -> dict[str, str]:
    """Return {normalized_name: raw_spec} from [project].dependencies."""
    if not pyproject_path.exists():
        return {}
    with open(pyproject_path, "rb") as f:
        data = tomllib.load(f)
    deps = data.get("project", {}).get("dependencies", []) or []
    return {normalize_name(d): d.strip() for d in deps}

diff_deps

diff_deps(old_content: str, new_content: str) -> tuple[dict[str, str], set[str]]

Return ({added_name: raw_spec}, {removed_names}).

Source code in src/lex_align_client/pyproject_utils.py
def diff_deps(old_content: str, new_content: str) -> tuple[dict[str, str], set[str]]:
    """Return ({added_name: raw_spec}, {removed_names})."""
    old = parse_deps_from_content(old_content)
    new = parse_deps_from_content(new_content)
    added = {name: spec for name, spec in new.items() if name not in old}
    removed = set(old) - set(new)
    return added, removed

extract_pinned_version

extract_pinned_version(spec: str) -> Optional[str]

Pull a concrete version out of a spec like 'redis>=5.0' or 'httpx==0.28.1'.

Source code in src/lex_align_client/pyproject_utils.py
def extract_pinned_version(spec: str) -> Optional[str]:
    """Pull a concrete version out of a spec like 'redis>=5.0' or 'httpx==0.28.1'."""
    m = re.search(r"(?:>=|<=|!=|~=|==|>|<)\s*([0-9][0-9a-zA-Z\.\-\+\_]*)", spec)
    return m.group(1) if m else None

apply_edit

apply_edit(current_content: str, tool_name: str, tool_input: dict) -> str

Simulate how a Claude Code Edit/Write/MultiEdit affects file content.

Source code in src/lex_align_client/pyproject_utils.py
def apply_edit(current_content: str, tool_name: str, tool_input: dict) -> str:
    """Simulate how a Claude Code Edit/Write/MultiEdit affects file content."""
    if tool_name == "Write":
        return tool_input.get("content", "")
    if tool_name == "Edit":
        old_str = tool_input.get("old_string", "")
        new_str = tool_input.get("new_string", "")
        return current_content.replace(old_str, new_str, 1)
    if tool_name == "MultiEdit":
        content = current_content
        for edit in tool_input.get("edits", []):
            content = content.replace(edit.get("old_string", ""), edit.get("new_string", ""), 1)
        return content
    return current_content

detect_project_name

detect_project_name(pyproject_path: Path, fallback: str) -> str

Best-effort autodetect for lex-align-client init.

Prefer [project].name from pyproject.toml; fall back to the directory name.

Source code in src/lex_align_client/pyproject_utils.py
def detect_project_name(pyproject_path: Path, fallback: str) -> str:
    """Best-effort autodetect for `lex-align-client init`.

    Prefer [project].name from pyproject.toml; fall back to the directory name.
    """
    if pyproject_path.exists():
        try:
            with open(pyproject_path, "rb") as f:
                data = tomllib.load(f)
            name = data.get("project", {}).get("name")
            if isinstance(name, str) and name.strip():
                return name.strip()
        except Exception:
            pass
    return fallback

Pre-commit hook

precommit

Git pre-commit hook entry point.

Behavior
  1. Read the staged contents of pyproject.toml (if it's in the index).
  2. Re-evaluate every runtime dependency against the server. Reading existing deps catches new CVEs published since the last commit.
  3. If any verdict is DENIED, exit non-zero with a structured stderr message that an AI agent can parse to self-correct.

Claude Code hooks

claude_hooks

Claude Code session hooks (Advisor surface).

SessionStart and PreToolUse proxy through to the server's /evaluate so the agent sees blocked / provisionally-allowed verdicts during planning, before the pre-commit gate fires.

handle_pre_tool_use

handle_pre_tool_use(event: dict, project_root: Path, config: ClientConfig) -> Optional[tuple[str, str | None]]

Return (decision, message) for pyproject.toml edits, else None.

Source code in src/lex_align_client/claude_hooks.py
def handle_pre_tool_use(
    event: dict, project_root: Path, config: ClientConfig
) -> Optional[tuple[str, str | None]]:
    """Return (decision, message) for pyproject.toml edits, else None."""
    tool_name = event.get("tool_name", "")
    tool_input = event.get("tool_input", {}) or {}
    path_str = tool_input.get("path", "") or tool_input.get("file_path", "")
    if "pyproject.toml" not in path_str:
        return None
    if tool_name not in ("Edit", "Write", "MultiEdit"):
        return None

    pyproject_path = (
        project_root / path_str if not Path(path_str).is_absolute() else Path(path_str)
    )
    if not pyproject_path.exists():
        return None

    current = pyproject_path.read_text()
    proposed = apply_edit(current, tool_name, tool_input)
    added, removed = diff_deps(current, proposed)
    if not added and not removed:
        return None

    header = ["You are modifying runtime dependencies:"]
    for name, spec in sorted(added.items()):
        header.append(f"  + {spec}")
    for name in sorted(removed):
        header.append(f"  - {name}")

    model, version = _detect_agent(event)
    blocks: list[str] = []
    notes: list[str] = []
    try:
        with LexAlignClient(
            config, agent_model=model, agent_version=version
        ) as client:
            for name, spec in sorted(added.items()):
                pinned = extract_pinned_version(spec)
                v = client.check(name, pinned)
                if v.denied:
                    blocks.append(_format_verdict(v, spec))
                elif v.verdict == "PROVISIONALLY_ALLOWED":
                    auto_note = ""
                    if config.auto_request_approval and v.is_requestable:
                        rationale = _auto_rationale(spec, v)
                        try:
                            client.request_approval(name, rationale)
                            auto_note = " (auto-enqueued for review)"
                        except (ServerUnreachable, ServerError) as exc:
                            auto_note = f" (auto-enqueue failed: {exc.__class__.__name__})"
                    elif v.is_requestable:
                        auto_note = " (run `lex-align-client request-approval` after this lands)"
                    notes.append(f"  ◎ {name} — provisional: {v.reason}{auto_note}")
                elif v.needs_rationale:
                    notes.append(
                        f"  • {name} — allowed, but registry-approved (neutral); "
                        "document the architectural need in your commit message or PR."
                    )
                else:
                    notes.append(f"  ✓ {name}{v.reason}")
    except (ServerUnreachable, ServerError) as exc:
        if config.fail_open:
            return ("allow", "\n".join(header + ["", f"[lex-align] {exc} — fail_open=true; allowing edit."]))
        return ("block", f"[lex-align] cannot reach server: {exc}")

    if blocks:
        msg = "\n".join(header + ["", "ENFORCEMENT — blocked by registry:"] + blocks +
                        ["", "No dependencies were modified. Adjust the edit and retry."])
        return ("block", msg)
    if notes:
        return ("allow", "\n".join(header + [""] + notes))
    return ("allow", "\n".join(header))

CLAUDE.md rendering

claudemd

CLAUDE.md integration: write lex-align usage instructions into the project's CLAUDE.md.

install_claude_md

install_claude_md(project_root: Path) -> tuple[Path, bool]

Create CLAUDE.md or append the lex-align section if not already present.

Returns (path, created_or_updated). Returns False for the second element when the section was already present and no write was performed.

Source code in src/lex_align_client/claudemd.py
def install_claude_md(project_root: Path) -> tuple[Path, bool]:
    """Create CLAUDE.md or append the lex-align section if not already present.

    Returns (path, created_or_updated). Returns False for the second element
    when the section was already present and no write was performed.
    """
    path = project_root / "CLAUDE.md"
    if path.exists():
        existing = path.read_text(encoding="utf-8")
        if _SECTION_HEADER in existing:
            return path, False
        path.write_text(existing.rstrip("\n") + "\n\n" + _SECTION + "\n", encoding="utf-8")
    else:
        path.write_text(_SECTION + "\n", encoding="utf-8")
    return path, True

lex_align_server

The FastAPI service that owns the registry, runs license + CVE checks, persists the audit log, and surfaces report endpoints.

lex_align_server

api

v1

approval_requests

POST /api/v1/approval-requests — the Good Citizen.

Persists the request, then non-blockingly fires the configured proposer so a PR (or local-file write, etc.) gets opened. The 202 returns immediately — the agent isn't penalized if the proposer hits a transient error; the audit row is still durable, the dashboard will surface it, and the operator can re-trigger from the UI.

The proposer call defaults the YAML status to approved — a safe, explicit "this package is in the registry but isn't blessed as preferred." The PR description tells reviewers they can flip it to preferred (or version-constrained / deprecated) before merge if a stronger classification is appropriate.

evaluate

GET /api/v1/evaluate — the Advisor.

health

GET /api/v1/health — basic liveness/readiness.

registry

Registry endpoints used by the dashboard workshop and the agent request-approval flow.

  • GET /registry live registry as JSON.
  • GET /registry/pending explicit pending approval requests plus implicit candidates (packages seen in audit_log without an approval-request follow-up). Each row carries a reason describing why it's surfacing.
  • POST /registry/proposals operator-initiated proposal — opens a PR / commits / writes YAML depending on the configured proposer backend. Replaces the pre-Phase-4 in-memory classify endpoint.
  • POST /registry/parse-yaml YAML round-trip validator, used by the dashboard's import button.
  • POST /registry/reload operator-only manual reload (fallback for when the merge webhook is missed).
  • POST /registry/webhook git-host webhook callback (verified by REGISTRY_WEBHOOK_SECRET); triggers a pull-and-reload.

The classify endpoint (POST /registry/packages) was removed in Phase 4 — every registry update now flows through a proposer and a reload, so there's exactly one source of truth (registry.yml).

ProposalBody

Bases: BaseModel

Same shape as the legacy classify body. The endpoint emits a proposer call instead of mutating the in-memory registry, so the fields are advisory until the proposal lands (PR merged / file written / commit landed).

pending_requests async
pending_requests(request: Request) -> dict

Triage queue for the dashboard.

Combines two streams
  • explicit — approval requests an agent or operator filed via POST /approval-requests.
  • implicit — packages seen in audit_log over the last 30 days that never generated an approval request (because the agent only called check, or the call returned DENIED, or only the pre-commit hook ran). Each row carries a reason explaining why it surfaced.

Both streams filter against the live registry — once a package is classified (PR merged → reload → mark_approved_by_package fires), it drops out of both lists automatically.

Source code in src/lex_align_server/api/v1/registry.py
@router.get("/registry/pending")
async def pending_requests(request: Request) -> dict:
    """Triage queue for the dashboard.

    Combines two streams:
      * ``explicit`` — approval requests an agent or operator filed via
        ``POST /approval-requests``.
      * ``implicit`` — packages seen in ``audit_log`` over the last 30
        days that *never* generated an approval request (because the
        agent only called ``check``, or the call returned DENIED, or
        only the pre-commit hook ran). Each row carries a ``reason``
        explaining why it surfaced.

    Both streams filter against the live registry — once a package is
    classified (PR merged → reload → mark_approved_by_package fires), it
    drops out of both lists automatically.
    """
    state = request.app.state.lex
    explicit = await state.audit.list_pending_by_package()
    implicit = await state.audit.list_implicit_candidates(window_days=30)

    registered: set[str] = set()
    if state.registry is not None:
        registered = set(state.registry.packages.keys())
    explicit_keys = {row["normalized_name"] for row in explicit}

    explicit = [r for r in explicit if r["normalized_name"] not in registered]
    implicit = [
        r for r in implicit
        if r["normalized_name"] not in registered
        and r["normalized_name"] not in explicit_keys
    ]
    return {"explicit": explicit, "implicit": implicit}
open_proposal async
open_proposal(body: ProposalBody, request: Request, project: str = Depends(get_project), requester: str = Depends(get_requester), agent: AgentInfo = Depends(get_agent_info)) -> dict

Operator-initiated proposal — fires the configured proposer.

Replaces the legacy POST /registry/packages in-memory classify endpoint. The dashboard's "Approve" button calls this; the result payload tells the UI whether a PR was opened (opened / amended), the change was applied directly (applied, local-file / local-git modes), or it was logged only.

Source code in src/lex_align_server/api/v1/registry.py
@router.post("/registry/proposals", status_code=200)
async def open_proposal(
    body: ProposalBody,
    request: Request,
    project: str = Depends(get_project),
    requester: str = Depends(get_requester),
    agent: AgentInfo = Depends(get_agent_info),
) -> dict:
    """Operator-initiated proposal — fires the configured proposer.

    Replaces the legacy ``POST /registry/packages`` in-memory classify
    endpoint. The dashboard's "Approve" button calls this; the result
    payload tells the UI whether a PR was opened (``opened`` /
    ``amended``), the change was applied directly (``applied``,
    local-file / local-git modes), or it was logged only.
    """
    state = request.app.state.lex

    rule_dict: dict = {"status": body.status}
    if body.reason:      rule_dict["reason"] = body.reason
    if body.replacement: rule_dict["replacement"] = body.replacement
    if body.min_version: rule_dict["min_version"] = body.min_version
    if body.max_version: rule_dict["max_version"] = body.max_version
    try:
        validate_package_rule(body.name, rule_dict)
    except ValidationError as exc:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc),
        )

    rule = ProposedRule(
        name=body.name,
        status=body.status,
        reason=body.reason,
        replacement=body.replacement,
        min_version=body.min_version,
        max_version=body.max_version,
    )
    context = ProposalContext(
        source="operator",
        project=project,
        requester=requester,
        rationale=body.rationale or "",
        agent_model=agent.model,
        agent_version=agent.version,
    )
    try:
        result = await state.proposer.propose(rule, context)
    except ProposerError as exc:
        logger.warning("proposer rejected proposal for %s: %s", body.name, exc)
        raise HTTPException(
            status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc),
        )
    return result.to_dict()
reload_endpoint async
reload_endpoint(request: Request) -> dict

Manual fallback for the case where the merge webhook is lost.

Re-reads REGISTRY_PATH from disk, validates, and atomically swaps the in-memory registry. Operator-only by convention — when org-mode auth is enabled the auth backend gates this with everything else.

Source code in src/lex_align_server/api/v1/registry.py
@router.post("/registry/reload", status_code=200)
async def reload_endpoint(request: Request) -> dict:
    """Manual fallback for the case where the merge webhook is lost.

    Re-reads ``REGISTRY_PATH`` from disk, validates, and atomically
    swaps the in-memory registry. Operator-only by convention — when
    org-mode auth is enabled the auth backend gates this with
    everything else.
    """
    result = await reload_registry(request.app.state.lex)
    if not result.ok:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail=result.detail,
        )
    return result.to_dict()
webhook_endpoint async
webhook_endpoint(request: Request, x_hub_signature_256: Optional[str] = Header(None), x_github_event: Optional[str] = Header(None)) -> dict

GitHub-style webhook callback.

Verifies the HMAC-SHA256 signature against REGISTRY_WEBHOOK_SECRET. On a merged pull_request event, pulls the merged YAML and triggers a reload. Pings (ping event) return 200 so the operator can use the host's "test webhook" UI.

Source code in src/lex_align_server/api/v1/registry.py
@router.post("/registry/webhook", status_code=200)
async def webhook_endpoint(
    request: Request,
    x_hub_signature_256: Optional[str] = Header(None),
    x_github_event: Optional[str] = Header(None),
) -> dict:
    """GitHub-style webhook callback.

    Verifies the HMAC-SHA256 signature against
    ``REGISTRY_WEBHOOK_SECRET``. On a merged ``pull_request`` event,
    pulls the merged YAML and triggers a reload. Pings (``ping`` event)
    return 200 so the operator can use the host's "test webhook" UI.
    """
    state = request.app.state.lex
    secret = state.settings.registry_webhook_secret
    raw = await request.body()

    if not secret:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=(
                "REGISTRY_WEBHOOK_SECRET is not configured; webhook is "
                "disabled. Configure the secret to enable hot-reload."
            ),
        )
    if not _verify_signature(secret, raw, x_hub_signature_256):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid webhook signature.",
        )

    if x_github_event == "ping":
        return {"ok": True, "event": "ping"}

    if x_github_event != "pull_request":
        return {"ok": True, "event": x_github_event, "ignored": True}

    try:
        payload = await request.json()
    except Exception as exc:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"Webhook body is not JSON: {exc}",
        )

    action = payload.get("action")
    pr = payload.get("pull_request") or {}
    if action != "closed" or not pr.get("merged"):
        return {"ok": True, "event": "pull_request", "action": action, "ignored": True}

    # The proposer (if it's a Git-backed one) knows how to fetch the
    # latest YAML to disk; the reloader then re-reads from there.
    try:
        await _refresh_local_yaml(state)
    except Exception:
        logger.exception("webhook: failed to refresh local registry YAML")

    result = await reload_registry(state)
    return {"ok": result.ok, "event": "pull_request.merged", **result.to_dict()}
reports

Read-only report endpoints.

Phase 4 dashboards consume these; Phase 3 also exposes them directly so operators can query them with curl.

agents_report async
agents_report(request: Request, project: Optional[str] = Query(None)) -> dict

Aggregate audit rows by (agent_model, agent_version).

Operators use this to answer "which Claude version is doing what" — the headers X-LexAlign-Agent-Model and X-LexAlign-Agent-Version reported by the client propagate into every audit row.

Source code in src/lex_align_server/api/v1/reports.py
@router.get("/reports/agents")
async def agents_report(
    request: Request, project: Optional[str] = Query(None)
) -> dict:
    """Aggregate audit rows by (agent_model, agent_version).

    Operators use this to answer "which Claude version is doing what" — the
    headers `X-LexAlign-Agent-Model` and `X-LexAlign-Agent-Version` reported
    by the client propagate into every audit row.
    """
    return await request.app.state.lex.audit.agents_report(project)

audit

SQLite audit log + approval-request store.

Two tables:

  • audit_log — one row per /evaluate call. Backs the legal report (license-driven denials) and security report (CVE-driven denials).
  • approval_requests — one row per /approval-requests POST. Phase 3 will attach a PR-creation workflow; for now we just persist them.

We use plain aiosqlite rather than an ORM. The schema is small and the report queries are easier to read as straight SQL.

AuditStore

AuditStore(db_path: Path)
Source code in src/lex_align_server/audit.py
def __init__(self, db_path: Path):
    self._db_path = db_path
record_cve_alert async
record_cve_alert(*, package: str, cve_ids: list[str], max_cvss: Optional[float], registry_status: Optional[str] = None, project: str = 'lex-align-server', requester: str = 'cve-scanner') -> str

Persist a single CVE_ALERT row from the background scanner.

Reuses the audit_log table so the dashboard, exports, and the client status command see one consistent event stream — the denial_category=DENIAL_CVE_ALERT discriminator keeps these out of the existing CVE-denial rollups.

Source code in src/lex_align_server/audit.py
async def record_cve_alert(
    self,
    *,
    package: str,
    cve_ids: list[str],
    max_cvss: Optional[float],
    registry_status: Optional[str] = None,
    project: str = "lex-align-server",
    requester: str = "cve-scanner",
) -> str:
    """Persist a single CVE_ALERT row from the background scanner.

    Reuses the audit_log table so the dashboard, exports, and the
    client `status` command see one consistent event stream — the
    ``denial_category=DENIAL_CVE_ALERT`` discriminator keeps these
    out of the existing CVE-denial rollups.
    """
    return await self.record_evaluation(AuditRecord(
        project=project,
        requester=requester,
        package=package,
        version=None,
        resolved_version=None,
        verdict=VERDICT_CVE_ALERT,
        denial_category=DENIAL_CVE_ALERT,
        reason=(
            f"Registered package now has CVSS {max_cvss} "
            f"(>= configured denial threshold)."
            if max_cvss is not None else
            "Registered package now has CVE coverage."
        ),
        cve_ids=cve_ids,
        max_cvss=max_cvss,
        registry_status=registry_status,
    ))
mark_approved_by_package async
mark_approved_by_package(normalized_name: str) -> int

Move every PENDING_REVIEW request for normalized_name to APPROVED.

Called when an operator classifies a pending package via the dashboard and adds it to the in-memory registry. Returns the number of rows flipped, which the dashboard can display in its toast.

Source code in src/lex_align_server/audit.py
async def mark_approved_by_package(self, normalized_name: str) -> int:
    """Move every PENDING_REVIEW request for `normalized_name` to APPROVED.

    Called when an operator classifies a pending package via the dashboard
    and adds it to the in-memory registry. Returns the number of rows
    flipped, which the dashboard can display in its toast.
    """
    from .registry import normalize_name as _norm
    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        cur = await db.execute(
            "SELECT id, package FROM approval_requests WHERE status = ?",
            (APPROVAL_PENDING,),
        )
        rows = [dict(r) for r in await cur.fetchall()]
        await cur.close()
        ids = [r["id"] for r in rows if _norm(r["package"]) == normalized_name]
        if not ids:
            return 0
        placeholders = ",".join("?" * len(ids))
        await db.execute(
            f"UPDATE approval_requests SET status = ? WHERE id IN ({placeholders})",
            [APPROVAL_APPROVED, *ids],
        )
        await db.commit()
    return len(ids)
legal_report async
legal_report(project: Optional[str] = None) -> dict[str, Any]

License-compliance posture for the audit log.

Returns the existing total_denials / recent fields plus three rollups the dashboard renders as charts:

  • license_breakdown — every audit row in scope grouped by license and split by verdict, so operators can see how often each license shows up and how the policy classified it.
  • unknown_license — same shape, restricted to rows whose license normalised to UNKNOWN. Surfaces how the unknown_license_policy is performing in production.
  • top_projects — projects ranked by license-driven denials, so operators can tell which repos are pulling the most non-compliant packages.
Source code in src/lex_align_server/audit.py
async def legal_report(self, project: Optional[str] = None) -> dict[str, Any]:
    """License-compliance posture for the audit log.

    Returns the existing ``total_denials`` / ``recent`` fields plus three
    rollups the dashboard renders as charts:

    * ``license_breakdown`` — every audit row in scope grouped by
      ``license`` and split by verdict, so operators can see how often
      each license shows up and how the policy classified it.
    * ``unknown_license`` — same shape, restricted to rows whose license
      normalised to ``UNKNOWN``. Surfaces how the
      ``unknown_license_policy`` is performing in production.
    * ``top_projects`` — projects ranked by license-driven denials, so
      operators can tell which repos are pulling the most non-compliant
      packages.
    """
    base = await self._denial_report(DENIAL_LICENSE, project)
    breakdown, unknown = await self._license_breakdown(project)
    top_projects = await self._top_projects_for_category(DENIAL_LICENSE, project)
    base["license_breakdown"] = breakdown
    base["unknown_license"] = unknown
    base["top_projects"] = top_projects
    return base
security_report async
security_report(project: Optional[str] = None, registry: Optional[Any] = None) -> dict[str, Any]

Vulnerability posture for the audit log.

Adds three rollups on top of the base denial report:

  • severity_distribution — CVE-denied rows bucketed by CVSS severity (critical / high / medium / low / unknown).
  • top_packages — packages with the most CVE-driven denials, carrying their highest-seen CVSS and the CVE ids responsible.
  • top_cves — the CVE identifiers showing up most often.
  • hot_registry_packages — only included when registry is provided. Lists packages currently in the registry as preferred / approved / version-constrained whose recent audit rows include a CVE denial or provisional verdict. This catches the "we said yes, then OSV published a critical" scenario the pre-commit hook is meant to backstop.
Source code in src/lex_align_server/audit.py
async def security_report(
    self,
    project: Optional[str] = None,
    registry: Optional[Any] = None,
) -> dict[str, Any]:
    """Vulnerability posture for the audit log.

    Adds three rollups on top of the base denial report:

    * ``severity_distribution`` — CVE-denied rows bucketed by
      CVSS severity (critical / high / medium / low / unknown).
    * ``top_packages`` — packages with the most CVE-driven denials,
      carrying their highest-seen CVSS and the CVE ids responsible.
    * ``top_cves`` — the CVE identifiers showing up most often.
    * ``hot_registry_packages`` — only included when ``registry`` is
      provided. Lists packages currently in the registry as
      ``preferred`` / ``approved`` / ``version-constrained`` whose
      recent audit rows include a CVE denial or provisional verdict.
      This catches the "we said yes, then OSV published a critical"
      scenario the pre-commit hook is meant to backstop.
    """
    base = await self._denial_report(DENIAL_CVE, project)
    rows = base["recent"]
    base["severity_distribution"] = _bucket_severity(rows)
    base["top_packages"] = _rank_top_packages(rows)
    base["top_cves"] = _rank_top_cves(rows)
    if registry is not None:
        base["hot_registry_packages"] = await self._hot_registry_packages(
            registry, project
        )
    else:
        base["hot_registry_packages"] = []
    base["cve_alerts"] = await self.recent_cve_alerts(project)
    return base
recent_cve_alerts async
recent_cve_alerts(project: Optional[str] = None, limit: int = 100) -> list[dict]

Most recent CVE_ALERT rows written by the background scanner.

Surfaced through security_report so the dashboard and the client's status command see scanner output without any endpoint changes.

Source code in src/lex_align_server/audit.py
async def recent_cve_alerts(
    self, project: Optional[str] = None, limit: int = 100
) -> list[dict]:
    """Most recent CVE_ALERT rows written by the background scanner.

    Surfaced through ``security_report`` so the dashboard and the
    client's ``status`` command see scanner output without any
    endpoint changes.
    """
    clauses = ["denial_category = ?"]
    params: list[Any] = [DENIAL_CVE_ALERT]
    if project:
        clauses.append("project = ?")
        params.append(project)
    where = " WHERE " + " AND ".join(clauses)
    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        cur = await db.execute(
            f"""SELECT id, ts, package, cve_ids, max_cvss, reason,
                      registry_status
               FROM audit_log{where}
               ORDER BY ts DESC
               LIMIT ?""",
            [*params, limit],
        )
        rows = [dict(r) for r in await cur.fetchall()]
        await cur.close()
    for row in rows:
        if row.get("cve_ids"):
            try:
                row["cve_ids"] = json.loads(row["cve_ids"])
            except json.JSONDecodeError:
                row["cve_ids"] = []
        else:
            row["cve_ids"] = []
    return rows
list_pending_by_package async
list_pending_by_package() -> list[dict]

All PENDING_REVIEW approval requests grouped by package.

The dashboard uses this to surface "things developers asked for" as a triage queue. Multiple requests for the same package (across projects/requesters) collapse into a single row carrying the count, the most recent rationale, and the most recent timestamp. Callers are expected to further filter against the live registry.

Source code in src/lex_align_server/audit.py
async def list_pending_by_package(self) -> list[dict]:
    """All PENDING_REVIEW approval requests grouped by package.

    The dashboard uses this to surface "things developers asked for" as a
    triage queue. Multiple requests for the same package (across
    projects/requesters) collapse into a single row carrying the count,
    the most recent rationale, and the most recent timestamp. Callers
    are expected to further filter against the live registry.
    """
    # Imported lazily to avoid a circular import at module load.
    from .registry import normalize_name

    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        cur = await db.execute(
            """SELECT package, rationale, project, requester, ts
               FROM approval_requests
               WHERE status = ?
               ORDER BY ts DESC""",
            (APPROVAL_PENDING,),
        )
        rows = [dict(r) for r in await cur.fetchall()]
        await cur.close()

    grouped: dict[str, dict] = {}
    for r in rows:
        key = normalize_name(r["package"])
        entry = grouped.get(key)
        if entry is None:
            grouped[key] = {
                "package": r["package"],
                "normalized_name": key,
                "latest_rationale": r["rationale"],
                "latest_ts": r["ts"],
                "latest_project": r["project"],
                "latest_requester": r["requester"],
                "request_count": 1,
            }
        else:
            entry["request_count"] += 1
            # Rows are ordered DESC by ts so the first one wins for "latest".
    return list(grouped.values())
list_implicit_candidates async
list_implicit_candidates(*, window_days: int = 30) -> list[dict]

Packages that show up in audit_log but never produced an explicit request-approval row.

This catches the cases where an agent / hook only ever called check: the verdict's recorded but no one is going to file an approval — yet the operator still wants to triage.

Each row is annotated with a reason field explaining why it's surfacing:

provisional-no-rationale — the package got PROVISIONALLY_ALLOWED and the agent didn't follow up with request-approval. Highest signal. repeatedly-denied — the package was DENIED repeatedly, so an explicit registry rule (banned, version-pinned, replaced) would save downstream agents from rediscovering the wall. pre-screened — only check calls, all ALLOWED. Lowest signal, but useful for "things lots of agents poke at."

Callers (the dashboard) further filter against the live registry so packages that just got merged stop appearing immediately.

Source code in src/lex_align_server/audit.py
async def list_implicit_candidates(
    self, *, window_days: int = 30
) -> list[dict]:
    """Packages that show up in `audit_log` but never produced an
    explicit `request-approval` row.

    This catches the cases where an agent / hook only ever called
    `check`: the verdict's recorded but no one is going to file an
    approval — yet the operator still wants to triage.

    Each row is annotated with a ``reason`` field explaining why
    it's surfacing:

      ``provisional-no-rationale`` — the package got
          ``PROVISIONALLY_ALLOWED`` and the agent didn't follow up
          with ``request-approval``. Highest signal.
      ``repeatedly-denied`` — the package was DENIED repeatedly, so
          an explicit registry rule (banned, version-pinned, replaced)
          would save downstream agents from rediscovering the wall.
      ``pre-screened`` — only `check` calls, all ALLOWED. Lowest
          signal, but useful for "things lots of agents poke at."

    Callers (the dashboard) further filter against the live registry
    so packages that just got merged stop appearing immediately.
    """
    from .registry import normalize_name

    # Pull all audit rows in the window, plus all approval_requests rows
    # ever (so we can subtract the explicit ones).
    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        cur = await db.execute(
            f"""SELECT package, verdict, denial_category, ts, project,
                      agent_model, agent_version
               FROM audit_log
               WHERE ts >= datetime('now', '-{int(window_days)} days')
               ORDER BY ts DESC""",
        )
        audit_rows = [dict(r) for r in await cur.fetchall()]
        await cur.close()

        cur2 = await db.execute(
            "SELECT DISTINCT package FROM approval_requests",
        )
        explicit = {
            normalize_name(dict(r)["package"])
            for r in await cur2.fetchall()
        }
        await cur2.close()

    grouped: dict[str, dict] = {}
    for r in audit_rows:
        key = normalize_name(r["package"])
        if key in explicit:
            # Explicit `request-approval` exists → already in pending panel.
            continue
        entry = grouped.setdefault(key, {
            "package": r["package"],
            "normalized_name": key,
            "evaluations": 0,
            "denials": 0,
            "provisional": 0,
            "latest_ts": r["ts"],
            "latest_project": r["project"],
            "latest_agent_model": r.get("agent_model"),
            "latest_agent_version": r.get("agent_version"),
            "projects": set(),
        })
        entry["evaluations"] += 1
        if r["verdict"] == VERDICT_DENIED:
            entry["denials"] += 1
        elif r["verdict"] == VERDICT_PROVISIONALLY_ALLOWED:
            entry["provisional"] += 1
        entry["projects"].add(r["project"])

    out: list[dict] = []
    for key, entry in grouped.items():
        entry["project_count"] = len(entry.pop("projects"))
        entry["reason"], entry["reason_detail"] = _classify_implicit(entry)
        out.append(entry)

    # Rank by signal strength. provisional > denied > pre-screened, then
    # by frequency × recency proxy (just frequency for now).
    rank_order = {
        "provisional-no-rationale": 0,
        "repeatedly-denied": 1,
        "pre-screened": 2,
    }
    out.sort(key=lambda r: (
        rank_order.get(r["reason"], 99), -r["evaluations"],
    ))
    return out
agents_report async
agents_report(project: Optional[str] = None) -> dict[str, Any]

Aggregate evaluations by (agent_model, agent_version).

Powers the "agents" dashboard, so operators can see exactly which Claude (or other agent) version is making which kinds of requests. Rows where the agent is unknown collapse into one bucket.

Source code in src/lex_align_server/audit.py
async def agents_report(self, project: Optional[str] = None) -> dict[str, Any]:
    """Aggregate evaluations by (agent_model, agent_version).

    Powers the "agents" dashboard, so operators can see exactly which
    Claude (or other agent) version is making which kinds of requests.
    Rows where the agent is unknown collapse into one bucket.
    """
    where = ""
    params: list[Any] = []
    if project:
        where = " WHERE project = ?"
        params.append(project)

    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        agg_cur = await db.execute(
            f"""SELECT agent_model, agent_version,
                      COUNT(*) AS evaluations,
                      SUM(CASE WHEN verdict = ? THEN 1 ELSE 0 END) AS denials,
                      SUM(CASE WHEN verdict = ? THEN 1 ELSE 0 END) AS provisional,
                      MAX(ts) AS last_seen
               FROM audit_log{where}
               GROUP BY agent_model, agent_version
               ORDER BY evaluations DESC""",
            [VERDICT_DENIED, VERDICT_PROVISIONALLY_ALLOWED, *params],
        )
        agents = [dict(r) for r in await agg_cur.fetchall()]
        await agg_cur.close()

        recent_cur = await db.execute(
            f"""SELECT id, ts, project, requester, package, version, verdict,
                      denial_category, reason, agent_model, agent_version
               FROM audit_log{where}
               ORDER BY ts DESC LIMIT 50""",
            params,
        )
        recent = [dict(r) for r in await recent_cur.fetchall()]
        await recent_cur.close()
    return {
        "project": project,
        "agents": agents,
        "recent": recent,
    }

auth

Auth dependencies.

Identity resolution lives in the pluggable authenticator on app.state.lex.authenticator (see authn/). The functions here are thin FastAPI Depends shims so endpoints can stay declarative:

requester: str = Depends(get_requester)
identity: Identity = Depends(get_identity)
agent: AgentInfo = Depends(get_agent_info)

Single-user mode ships an AnonymousAuthenticator so the same code path works without any auth setup; org mode swaps in whatever the operator configured via AUTH_BACKEND.

The agent-identity headers (X-LexAlign-Agent-Model / -Version) are tag metadata, not authentication — they carry the model that originated the request (e.g. opus 4.7) and feed the agent-activity dashboards. The Authenticator answers "who is calling" (the human/principal); get_agent_info answers "which agent did the calling for them."

AgentInfo dataclass

AgentInfo(model: Optional[str] = None, version: Optional[str] = None)

Agent identity reported by the client.

Both fields are optional; when the client doesn't send the headers, both are None and reports group those rows under an "(unknown agent)" bucket.

get_identity async

get_identity(request: Request) -> Identity

Resolve the requester via the per-app authenticator.

Endpoints depend on this when they need email/groups (e.g. for future per-team authorization). For the common "I just need a string for the audit log" case, depend on :func:get_requester instead.

Source code in src/lex_align_server/auth.py
async def get_identity(request: Request) -> Identity:
    """Resolve the requester via the per-app authenticator.

    Endpoints depend on this when they need email/groups (e.g. for
    future per-team authorization). For the common "I just need a
    string for the audit log" case, depend on :func:`get_requester`
    instead.
    """
    return await request.app.state.lex.authenticator.authenticate(request)

get_requester async

get_requester(identity: Identity = Depends(get_identity)) -> str

The principal id, suitable for audit_log.requester.

Source code in src/lex_align_server/auth.py
async def get_requester(identity: Identity = Depends(get_identity)) -> str:
    """The principal id, suitable for ``audit_log.requester``."""
    return identity.id

authn

Pluggable authentication for org mode.

Single-user mode (AUTH_ENABLED=false) bypasses this entirely and uses AnonymousAuthenticator. Org mode (AUTH_ENABLED=true) selects one of the built-in backends via AUTH_BACKEND:

  • header (recommended) — trust X-Forwarded-User / -Email / -Groups injected by an upstream auth gateway (oauth2-proxy, Pomerium, Cloudflare Access, an ingress' OIDC filter, etc.). Zero Python customization; the org configures their existing gateway.
  • webhook — forward the request's bearer token to a URL the org controls (AUTH_VERIFY_URL); the verifier returns user JSON. The org writes one tiny endpoint in any language.
  • apikey — stub for a future built-in API-key store. Currently raises NotImplementedError so an ill-configured deployment fails loud rather than silently allowing anonymous access.
  • module:path.to:ClassName — escape hatch. Drop a Python file implementing :class:Authenticator into the container and point the env var at it.

The dependency surface (get_identity/get_requester in auth.py) is stable; orgs only need to swap the backend.

AuthError

AuthError(detail: str = 'Authentication required.')

Bases: HTTPException

Authentication failed. Maps to HTTP 401 with a WWW-Authenticate header so curl/SDK clients can react sensibly.

Source code in src/lex_align_server/authn/base.py
def __init__(self, detail: str = "Authentication required."):
    super().__init__(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail=detail,
        headers={"WWW-Authenticate": "Bearer"},
    )

Authenticator

Bases: ABC

Resolve a :class:Request to an :class:Identity or raise.

Implementations must
  • return an :class:Identity on success;
  • raise :class:AuthError (or any HTTPException) on failure;
  • be async — the framework awaits them on every request.

They should not perform authorization (group/role checks): that's a downstream concern. The Authenticator's only job is to answer "who is making this request".

Identity dataclass

Identity(id: str, email: str | None = None, groups: tuple[str, ...] = (), raw: Mapping[str, Any] = dict())

The authenticated principal for a single request.

id is what lex-align records in audit_log.requester and uses to de-dupe approval requests. The other fields are advisory and surface in the dashboards: email for "who asked for this package", and groups for future per-team authorization (e.g. limit registry edits to security-engineers). raw is a free-form dict — backend-specific extras (claims from a JWT, the full user record from a webhook response, etc.) — that custom authenticators can stash for downstream code without inventing new fields.

load_authenticator

load_authenticator(settings: 'Settings', http_client: AsyncClient) -> Authenticator

Return the authenticator selected by settings.

Single-user mode short-circuits to anonymous regardless of AUTH_BACKEND — so flipping AUTH_ENABLED=false is always a safe escape hatch during incident response.

Source code in src/lex_align_server/authn/loader.py
def load_authenticator(
    settings: "Settings", http_client: httpx.AsyncClient
) -> Authenticator:
    """Return the authenticator selected by ``settings``.

    Single-user mode short-circuits to anonymous regardless of
    ``AUTH_BACKEND`` — so flipping ``AUTH_ENABLED=false`` is always a
    safe escape hatch during incident response.
    """
    if not settings.auth_enabled:
        return AnonymousAuthenticator()

    backend = (settings.auth_backend or "header").strip()

    if backend == "anonymous":
        return AnonymousAuthenticator()
    if backend == "header":
        return HeaderAuthenticator(
            user_header=settings.auth_user_header,
            email_header=settings.auth_email_header or None,
            groups_header=settings.auth_groups_header or None,
            groups_separator=settings.auth_groups_separator,
            trusted_proxies=_split_csv(settings.auth_trusted_proxies),
        )
    if backend == "webhook":
        return WebhookAuthenticator(
            verify_url=settings.auth_verify_url or "",
            http_client=http_client,
            timeout=settings.auth_verify_timeout,
        )
    if backend == "apikey":
        return ApiKeyAuthenticator()

    if ":" in backend:
        return _load_module_authenticator(backend, settings, http_client)

    raise ValueError(
        f"Unknown AUTH_BACKEND={backend!r}. Built-ins: "
        f"{sorted(BUILTIN_BACKENDS)}, or a 'module.path:ClassName' string."
    )

anonymous

Anonymous authenticator — used when AUTH_ENABLED=false.

Always returns the same identity. Single-user mode runs this so that the audit log still has a valid requester value without any auth setup.

apikey

Built-in API-key store — currently a stub.

Reserved for a future backend that validates bearer tokens against an api_keys SQLite table managed by lex-align-server admin keys. Useful for evaluation or for orgs that don't want to wire up SSO.

Until that's implemented, selecting AUTH_BACKEND=apikey raises at request time so a misconfigured deployment fails loud rather than silently dropping every request through.

base

Authenticator contract.

Custom backends only need to subclass :class:Authenticator and implement authenticate. Everything else (settings parsing, dependency wiring, audit logging) is handled by the framework.

Identity dataclass
Identity(id: str, email: str | None = None, groups: tuple[str, ...] = (), raw: Mapping[str, Any] = dict())

The authenticated principal for a single request.

id is what lex-align records in audit_log.requester and uses to de-dupe approval requests. The other fields are advisory and surface in the dashboards: email for "who asked for this package", and groups for future per-team authorization (e.g. limit registry edits to security-engineers). raw is a free-form dict — backend-specific extras (claims from a JWT, the full user record from a webhook response, etc.) — that custom authenticators can stash for downstream code without inventing new fields.

AuthError
AuthError(detail: str = 'Authentication required.')

Bases: HTTPException

Authentication failed. Maps to HTTP 401 with a WWW-Authenticate header so curl/SDK clients can react sensibly.

Source code in src/lex_align_server/authn/base.py
def __init__(self, detail: str = "Authentication required."):
    super().__init__(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail=detail,
        headers={"WWW-Authenticate": "Bearer"},
    )
Authenticator

Bases: ABC

Resolve a :class:Request to an :class:Identity or raise.

Implementations must
  • return an :class:Identity on success;
  • raise :class:AuthError (or any HTTPException) on failure;
  • be async — the framework awaits them on every request.

They should not perform authorization (group/role checks): that's a downstream concern. The Authenticator's only job is to answer "who is making this request".

header

Trust headers from an upstream auth gateway.

This is the recommended backend for organizations: oauth2-proxy, Pomerium, Authelia, Cloudflare Access, ingress-nginx with auth_request, Envoy ext_authz — any of them can authenticate against your existing SSO and forward the result as headers. lex-align then consumes them.

Critical: only trust the headers when the request reaches the server through a known proxy. trusted_proxies is a list of CIDR blocks; any request from outside those blocks is rejected with HTTP 401 even if the headers are present, because an attacker can otherwise spoof them by hitting the server directly. The default of 127.0.0.1/32 assumes the proxy is co-located.

loader

Resolve the configured AUTH_BACKEND to an :class:Authenticator.

Called once from the FastAPI lifespan. The result is stored on app.state.lex.authenticator and reused for every request via the get_identity dependency.

load_authenticator
load_authenticator(settings: 'Settings', http_client: AsyncClient) -> Authenticator

Return the authenticator selected by settings.

Single-user mode short-circuits to anonymous regardless of AUTH_BACKEND — so flipping AUTH_ENABLED=false is always a safe escape hatch during incident response.

Source code in src/lex_align_server/authn/loader.py
def load_authenticator(
    settings: "Settings", http_client: httpx.AsyncClient
) -> Authenticator:
    """Return the authenticator selected by ``settings``.

    Single-user mode short-circuits to anonymous regardless of
    ``AUTH_BACKEND`` — so flipping ``AUTH_ENABLED=false`` is always a
    safe escape hatch during incident response.
    """
    if not settings.auth_enabled:
        return AnonymousAuthenticator()

    backend = (settings.auth_backend or "header").strip()

    if backend == "anonymous":
        return AnonymousAuthenticator()
    if backend == "header":
        return HeaderAuthenticator(
            user_header=settings.auth_user_header,
            email_header=settings.auth_email_header or None,
            groups_header=settings.auth_groups_header or None,
            groups_separator=settings.auth_groups_separator,
            trusted_proxies=_split_csv(settings.auth_trusted_proxies),
        )
    if backend == "webhook":
        return WebhookAuthenticator(
            verify_url=settings.auth_verify_url or "",
            http_client=http_client,
            timeout=settings.auth_verify_timeout,
        )
    if backend == "apikey":
        return ApiKeyAuthenticator()

    if ":" in backend:
        return _load_module_authenticator(backend, settings, http_client)

    raise ValueError(
        f"Unknown AUTH_BACKEND={backend!r}. Built-ins: "
        f"{sorted(BUILTIN_BACKENDS)}, or a 'module.path:ClassName' string."
    )

webhook

Forward the bearer token to an org-controlled verifier.

The org writes one tiny endpoint that accepts POST <verify_url> with {"token": "..."} and returns {"id": "...", "email"?: "...", "groups"?: ["..."]} on success or a non-2xx on failure. lex-align treats the returned JSON as the :class:Identity payload.

This is a good fit for orgs that don't have an HTTP-level auth gateway but do have an internal user-info or session-validation service. The verifier can be a Lambda, a sidecar, an internal microservice — any HTTP endpoint that knows how to validate the org's tokens.

cache

Redis-backed JSON cache.

Used by the license and CVE adapters to avoid hammering PyPI/OSV. We intentionally swallow connection errors and degrade to "no cache" so the server keeps serving when Redis is unreachable — the audit log still records every decision.

check_config

lex-align-server check-config — pre-flight checks for a single-team local-file deployment.

Each check is independent: it returns a :class:CheckResult so the CLI can render it as a coloured row and exit non-zero when any of them fails. The checks deliberately stop at what a one-person / single-team install needs:

  • REGISTRY_PATH is set, exists or can be created, and round-trips through the registry validator.
  • The audit SQLite directory is writable.
  • The cache backend (Redis if configured, in-memory otherwise) is reachable.
  • The proposer auto-detects to local_file (the recommended single- team mode) — anything else surfaces as a warning so operators don't silently end up on a heavier backend.
  • Authentication is configured deliberately, not by accident.

The validator never connects to GitHub. The PR-based proposer is the escape hatch for large orgs; if you're running check-config you're almost certainly on the local-file path and the GitHub probe would just add noise.

check_registry_path

check_registry_path(settings: Settings) -> CheckResult

REGISTRY_PATH must be set to a YAML file path the server can use.

Source code in src/lex_align_server/check_config.py
def check_registry_path(settings: Settings) -> CheckResult:
    """REGISTRY_PATH must be set to a YAML file path the server can use."""
    path = settings.registry_path
    if path is None:
        return _fail(
            "REGISTRY_PATH",
            "unset — the server has nowhere to load or write the registry. "
            "Export REGISTRY_PATH=/path/to/registry.yml.",
        )
    if path.exists():
        if not path.is_file():
            return _fail(
                "REGISTRY_PATH",
                f"{path} exists but is not a regular file.",
            )
        return _ok("REGISTRY_PATH", f"{path} exists ({path.stat().st_size} bytes).")
    parent = path.parent
    if not parent.exists():
        return _fail(
            "REGISTRY_PATH",
            f"{path} does not exist and parent {parent} is missing — "
            "create the directory before starting the server.",
        )
    if not os.access(parent, os.W_OK):
        return _fail(
            "REGISTRY_PATH",
            f"{path} does not exist and parent {parent} is not writable; "
            "the server cannot create the registry on first proposal.",
        )
    return _warn(
        "REGISTRY_PATH",
        f"{path} does not exist yet; will be created on first proposal.",
    )

check_registry_yaml

check_registry_yaml(settings: Settings) -> CheckResult

The registry YAML, if present, must round-trip through the validator.

Source code in src/lex_align_server/check_config.py
def check_registry_yaml(settings: Settings) -> CheckResult:
    """The registry YAML, if present, must round-trip through the validator."""
    path = settings.registry_path
    if path is None or not path.exists():
        # Either unset (caught above) or empty-on-first-boot (caught above).
        return _ok("registry YAML", "no file to validate yet.")
    try:
        doc = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
    except yaml.YAMLError as exc:
        return _fail("registry YAML", f"{path} did not parse as YAML: {exc}")
    if not isinstance(doc, dict):
        return _fail(
            "registry YAML",
            f"{path} top level is {type(doc).__name__}; expected a mapping.",
        )
    try:
        compiled = validate_registry(doc)
    except ValidationError as exc:
        return _fail("registry YAML", f"{path} failed validation: {exc}")
    n = len(compiled.get("packages") or {})
    return _ok("registry YAML", f"{path} validates ({n} package rule{'s' if n != 1 else ''}).")

check_audit_path

check_audit_path(settings: Settings) -> CheckResult

The audit SQLite directory must be writable.

Source code in src/lex_align_server/check_config.py
def check_audit_path(settings: Settings) -> CheckResult:
    """The audit SQLite directory must be writable."""
    db_path = settings.database_path
    parent = db_path.parent
    if db_path.exists():
        if not os.access(db_path, os.W_OK):
            return _fail(
                "audit DB",
                f"{db_path} exists but is not writable by uid {os.getuid()}.",
            )
        return _ok("audit DB", f"{db_path} exists and is writable.")
    if not parent.exists():
        return _fail(
            "audit DB",
            f"parent {parent} does not exist; create it before starting "
            "the server (audit log is required, not optional).",
        )
    if not os.access(parent, os.W_OK):
        return _fail(
            "audit DB",
            f"parent {parent} is not writable by uid {os.getuid()}.",
        )
    return _warn(
        "audit DB",
        f"{db_path} does not exist yet; will be created on first request.",
    )

check_cache async

check_cache(settings: Settings) -> CheckResult

Ping the configured cache backend.

Cache failure is not fatal — the server degrades to "no cache" and keeps serving — so this is a warning, never a hard failure.

Source code in src/lex_align_server/check_config.py
async def check_cache(settings: Settings) -> CheckResult:
    """Ping the configured cache backend.

    Cache failure is **not** fatal — the server degrades to "no cache"
    and keeps serving — so this is a warning, never a hard failure.
    """
    cache = JsonCache(settings.redis_url)
    try:
        ok = await cache.ping()
    finally:
        await cache.close()
    if ok:
        return _ok("cache (redis)", f"reachable at {settings.redis_url}.")
    return _warn(
        "cache (redis)",
        f"unreachable at {settings.redis_url}; server will run without cache "
        "(license/CVE lookups hit upstream every time).",
    )

check_proposer

check_proposer(settings: Settings) -> CheckResult

Recommend local_file for the single-team path.

Anything other than local_file / log_only is surfaced as a warning so the operator notices if auto-detection picked something heavier than they expected.

Source code in src/lex_align_server/check_config.py
def check_proposer(settings: Settings) -> CheckResult:
    """Recommend ``local_file`` for the single-team path.

    Anything other than ``local_file`` / ``log_only`` is surfaced as a
    warning so the operator notices if auto-detection picked something
    heavier than they expected.
    """
    explicit = (settings.registry_proposer or "").strip()
    if explicit:
        backend = explicit
        source = "explicit (REGISTRY_PROPOSER)"
    else:
        # We replicate the loader's own decision tree here without booting
        # the loader itself, so that we don't construct a real proposer
        # (which would, e.g., require git for the local_git path).
        backend = _predict_autodetect(settings)
        source = "auto-detected"

    if backend == "local_file":
        return _ok("proposer", f"{backend} ({source}) — recommended for single-team installs.")
    if backend == "log_only":
        return _warn(
            "proposer",
            "log_only — proposals are logged but not written. "
            "Set REGISTRY_PATH to enable local_file (recommended).",
        )
    if backend == "local_git":
        return _ok(
            "proposer",
            f"{backend} ({source}) — proposals are committed to a local git tree.",
        )
    if backend == "github":
        return _warn(
            "proposer",
            "github — PR-based flow. This is the multi-team / org-wide path; "
            "make sure REGISTRY_REPO_TOKEN is provisioned and the webhook is wired. "
            "Single-team installs usually want local_file instead.",
        )
    return _warn("proposer", f"{backend} ({source}) — custom backend.")

check_auth

check_auth(settings: Settings) -> CheckResult

Surface accidental anonymous-mode deployments.

Anonymous is a fine default (it makes evaluation friction-free), but a server bound to anything other than localhost without auth is almost always a misconfiguration.

Source code in src/lex_align_server/check_config.py
def check_auth(settings: Settings) -> CheckResult:
    """Surface accidental anonymous-mode deployments.

    Anonymous is a fine *default* (it makes evaluation friction-free),
    but a server bound to anything other than localhost without auth is
    almost always a misconfiguration.
    """
    if settings.auth_enabled:
        return _ok("auth", f"enabled (backend={settings.auth_backend}).")
    if settings.bind_host in ("127.0.0.1", "localhost", "::1"):
        return _ok(
            "auth",
            f"disabled (anonymous), bound to {settings.bind_host} — "
            "fine for single-user / local evaluation.",
        )
    return _warn(
        "auth",
        f"AUTH_ENABLED=false but BIND_HOST={settings.bind_host} — "
        "the server accepts anonymous requests on a non-loopback interface. "
        "Set AUTH_ENABLED=true and choose an AUTH_BACKEND.",
    )

run_checks async

run_checks(settings: Settings) -> list[CheckResult]

Run every check in the order the CLI prints them.

Source code in src/lex_align_server/check_config.py
async def run_checks(settings: Settings) -> list[CheckResult]:
    """Run every check in the order the CLI prints them."""
    results: list[CheckResult] = [
        check_registry_path(settings),
        check_registry_yaml(settings),
        check_audit_path(settings),
        await check_cache(settings),
        check_proposer(settings),
        check_auth(settings),
    ]
    return results

run_checks_sync

run_checks_sync(settings: Settings) -> list[CheckResult]

Sync wrapper for the CLI entry point.

Source code in src/lex_align_server/check_config.py
def run_checks_sync(settings: Settings) -> list[CheckResult]:
    """Sync wrapper for the CLI entry point."""
    return asyncio.run(run_checks(settings))

cli

lex-align-server CLI.

Exposes
  • serve — run uvicorn against the FastAPI app.
  • quickstart — Docker-free single-user bring-up: materialize a local registry + sqlite, then run the server in-process on 127.0.0.1:8765.
  • init — materialize the operator bundle (compose stack, Dockerfile, registry, .env) into a target dir.
  • check-config — pre-flight checks for a single-team install: REGISTRY_PATH, audit DB, cache, proposer, auth.
  • registry compile — compile a YAML registry to the JSON form the server consumes.
  • selftest — hit /api/v1/health to confirm a stack is up.
  • admin keys ... — placeholders for the org-mode admin tooling.

main

main() -> None

lex-align server CLI.

Source code in src/lex_align_server/cli.py
@click.group()
def main() -> None:
    """lex-align server CLI."""

serve

serve(host: str | None, port: int | None, reload: bool) -> None

Start the lex-align FastAPI server via uvicorn.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option("--host", default=None, help="Override BIND_HOST.")
@click.option("--port", default=None, type=int, help="Override BIND_PORT.")
@click.option("--reload", is_flag=True, help="Enable hot reload (development only).")
def serve(host: str | None, port: int | None, reload: bool) -> None:
    """Start the lex-align FastAPI server via uvicorn."""
    import uvicorn
    from .config import get_settings
    settings = get_settings()
    uvicorn.run(
        "lex_align_server.main:app",
        host=host or settings.bind_host,
        port=port or settings.bind_port,
        reload=reload,
    )

quickstart

quickstart(target: Path | None, host: str, port: int, no_serve: bool, force: bool) -> None

Docker-free single-user bring-up.

Lays down a local registry + sqlite under --target (default ~/.lexalign), then runs the server in-process on http://127.0.0.1:8765. Redis is skipped (the cache layer silently degrades). Stop with Ctrl-C.

For multi-user deployments use lex-align-server init + Docker Compose instead.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option(
    "--target",
    default=None,
    type=click.Path(file_okay=False, path_type=Path),
    help="Directory for the registry + audit DB. Defaults to ~/.lexalign.",
)
@click.option("--host", default="127.0.0.1", help="Bind host (default 127.0.0.1).")
@click.option("--port", default=8765, type=int, help="Bind port (default 8765).")
@click.option(
    "--no-serve",
    is_flag=True,
    help="Materialize the directory and exit without starting uvicorn.",
)
@click.option(
    "--force",
    is_flag=True,
    help="Overwrite an existing registry.yml in the target directory.",
)
def quickstart(
    target: Path | None,
    host: str,
    port: int,
    no_serve: bool,
    force: bool,
) -> None:
    """Docker-free single-user bring-up.

    Lays down a local registry + sqlite under ``--target`` (default
    ``~/.lexalign``), then runs the server in-process on
    ``http://127.0.0.1:8765``. Redis is skipped (the cache layer
    silently degrades). Stop with Ctrl-C.

    For multi-user deployments use `lex-align-server init` + Docker
    Compose instead.
    """
    target = target or quickstart_default_target()
    try:
        result = quickstart_materialize(
            target, bind_host=host, bind_port=port, force=force
        )
    except ValidationError as exc:
        raise click.ClickException(f"Registry validation failed: {exc}")

    for p in result.written:
        click.echo(f"  + {p}")
    for p in result.skipped:
        click.echo(f"  · skipped (exists) {p}")

    click.echo("")
    click.echo(f"Quickstart bundle ready under {result.target}.")
    click.echo(f"  registry: {result.registry_yml}")
    click.echo(f"  audit DB: {result.database_path}")
    click.echo("")

    if no_serve:
        click.echo("Skipping `serve` (--no-serve). Start later with:")
        click.echo(
            "  REGISTRY_PATH={reg} DATABASE_PATH={db} BIND_HOST={h} BIND_PORT={p} "
            "lex-align-server serve".format(
                reg=result.registry_yml, db=result.database_path,
                h=result.bind_host, p=result.bind_port,
            )
        )
        return

    os.environ.update(quickstart_apply_env(result))

    click.echo(f"Starting server on http://{result.bind_host}:{result.bind_port} (Ctrl-C to stop)")
    click.echo("Run `lex-align-client init` in another terminal to point a project at it.")
    click.echo("")
    import uvicorn
    uvicorn.run(
        "lex_align_server.main:app",
        host=result.bind_host,
        port=result.bind_port,
    )

init

init(target: Path, force: bool) -> None

Materialize the docker-compose stack, Dockerfile, registry, and .env template into TARGET so the server can be built and run with a single docker compose up -d.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option(
    "--target",
    default="./lexalign",
    type=click.Path(file_okay=False, path_type=Path),
    help="Directory to write the operator bundle into. Created if missing.",
)
@click.option(
    "--force",
    is_flag=True,
    help=f"Overwrite existing files (and the {MARKER_FILENAME} marker).",
)
def init(target: Path, force: bool) -> None:
    """Materialize the docker-compose stack, Dockerfile, registry, and .env
    template into TARGET so the server can be built and run with a single
    `docker compose up -d`.
    """
    try:
        result = init_target(target, force=force)
    except FileExistsError as exc:
        raise click.ClickException(str(exc))
    except ValidationError as exc:
        raise click.ClickException(f"Registry validation failed: {exc}")

    for path in result.written:
        click.echo(f"  + {path.relative_to(Path.cwd()) if path.is_relative_to(Path.cwd()) else path}")
    for path in result.skipped:
        click.echo(f"  · skipped (exists) {path}")

    click.echo("")
    click.echo(f"Operator bundle written to {result.target}.")
    click.echo("")
    click.echo("Next steps:")
    click.echo(f"  cd {result.target.relative_to(Path.cwd()) if result.target.is_relative_to(Path.cwd()) else result.target}")
    click.echo("  cp .env.example .env       # edit if you need to change defaults")
    click.echo("  docker compose up -d")
    click.echo("  lex-align-server selftest  # confirm the stack is alive")

registry

registry() -> None

Manage the enterprise package registry.

Source code in src/lex_align_server/cli.py
@main.group()
def registry() -> None:
    """Manage the enterprise package registry."""

registry_compile

registry_compile(source: Path, destination: Path) -> None

Compile a YAML registry SOURCE to the JSON form at DESTINATION.

Source code in src/lex_align_server/cli.py
@registry.command("compile")
@click.argument("source", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.argument("destination", type=click.Path(dir_okay=False, path_type=Path))
def registry_compile(source: Path, destination: Path) -> None:
    """Compile a YAML registry SOURCE to the JSON form at DESTINATION."""
    import yaml

    try:
        doc = yaml.safe_load(source.read_text())
    except yaml.YAMLError as exc:
        raise click.ClickException(f"YAML parse error: {exc}")
    try:
        compiled = validate_registry(doc)
    except ValidationError as exc:
        raise click.ClickException(f"Registry validation failed: {exc}")

    destination.parent.mkdir(parents=True, exist_ok=True)
    destination.write_text(json.dumps(compiled, indent=2, sort_keys=True) + "\n")
    click.echo(
        f"Compiled {len(compiled['packages'])} package rules from {source}{destination}"
    )

check_config

check_config() -> None

Validate the server's configuration for a single-team install.

Verifies REGISTRY_PATH, audit DB writability, cache reachability, the auto-detected proposer, and auth posture. Exits non-zero if any check fails. Warnings (e.g. anonymous auth on a non-loopback bind, Redis unreachable) do not fail the run.

Source code in src/lex_align_server/cli.py
@main.command("check-config")
def check_config() -> None:
    """Validate the server's configuration for a single-team install.

    Verifies REGISTRY_PATH, audit DB writability, cache reachability, the
    auto-detected proposer, and auth posture. Exits non-zero if any
    check fails. Warnings (e.g. anonymous auth on a non-loopback bind,
    Redis unreachable) do not fail the run.
    """
    from .config import get_settings
    settings = get_settings()
    results = run_checks_sync(settings)

    label_width = max((len(r.label) for r in results), default=10)
    for r in results:
        click.echo(f"{_STATUS_GLYPH[r.status]}  {r.label.ljust(label_width)}  {r.detail}")

    fails = sum(1 for r in results if r.status == FAIL)
    warns = sum(1 for r in results if r.status == WARN)
    click.echo("")
    if fails:
        click.echo(f"{fails} failure(s), {warns} warning(s).", err=True)
        sys.exit(1)
    if warns:
        click.echo(f"All checks passed with {warns} warning(s).")
    else:
        click.echo("All checks passed.")

selftest

selftest(url: str, timeout: float) -> None

Probe /api/v1/health and report whether the stack is alive.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option(
    "--url",
    default="http://127.0.0.1:8765",
    help="Base URL of the server to probe.",
)
@click.option("--timeout", default=5.0, type=float, help="Per-request timeout (seconds).")
def selftest(url: str, timeout: float) -> None:
    """Probe `/api/v1/health` and report whether the stack is alive."""
    import httpx

    health = url.rstrip("/") + "/api/v1/health"
    try:
        response = httpx.get(health, timeout=timeout)
    except httpx.HTTPError as exc:
        raise click.ClickException(f"Server unreachable at {health}: {exc}")

    if response.status_code != 200:
        raise click.ClickException(
            f"Health check failed: HTTP {response.status_code} from {health}"
        )

    click.echo(f"OK  {health}")
    try:
        click.echo(json.dumps(response.json(), indent=2))
    except ValueError:
        click.echo(response.text)

admin

admin() -> None

Administrative commands (organization mode).

Source code in src/lex_align_server/cli.py
@main.group()
def admin() -> None:
    """Administrative commands (organization mode)."""

keys

keys() -> None

Manage API keys (deferred — Phase 3+).

Source code in src/lex_align_server/cli.py
@admin.group()
def keys() -> None:
    """Manage API keys (deferred — Phase 3+)."""

config

Server configuration.

All settings are sourced from environment variables. Defaults bias towards the single-user/local-evaluation experience: no auth, bind to localhost, point at a co-located Redis.

get_settings

get_settings() -> Settings

Build a fresh Settings object. Avoids module-level caching so tests can override via env vars on a per-test basis.

Source code in src/lex_align_server/config.py
def get_settings() -> Settings:
    """Build a fresh Settings object. Avoids module-level caching so tests can
    override via env vars on a per-test basis.
    """
    return Settings()

cve

CVE lookup via OSV (osv.dev).

OSV is the Google-sponsored aggregator that ingests GHSA, NVD, PyPA Advisory DB, and others. The free POST /v1/query endpoint takes a package coordinate and returns the list of vulnerabilities with severity scores. We parse out the highest CVSS base score and let GlobalPolicies.cve_blocks decide.

query_osv async

query_osv(package: str, version: Optional[str], osv_url: str, http_client: AsyncClient) -> Optional[list[dict]]

Return raw vulns list from OSV, or None on error/timeout.

Source code in src/lex_align_server/cve.py
async def query_osv(
    package: str,
    version: Optional[str],
    osv_url: str,
    http_client: httpx.AsyncClient,
) -> Optional[list[dict]]:
    """Return raw `vulns` list from OSV, or None on error/timeout."""
    payload: dict = {
        "package": {"name": package, "ecosystem": "PyPI"},
    }
    if version:
        payload["version"] = version
    try:
        resp = await http_client.post(osv_url, json=payload)
    except (httpx.HTTPError, httpx.TimeoutException) as exc:
        logger.warning("osv: query for %s@%s failed: %s", package, version, exc)
        return None
    if resp.status_code != 200:
        logger.warning("osv: %s returned %s", package, resp.status_code)
        return None
    try:
        data = resp.json()
    except ValueError:
        return None
    vulns = data.get("vulns") or []
    return [v for v in vulns if isinstance(v, dict)]

resolve_cves async

resolve_cves(package: str, version: Optional[str], cache: JsonCache, cache_ttl: int, osv_url: str, http_client: AsyncClient) -> CveInfo

Cache-or-fetch CVE data for a package@version.

A None version is cached separately from versioned queries — OSV's behaviour with no version is documented as "all versions" so we treat it as a distinct cache entry.

Source code in src/lex_align_server/cve.py
async def resolve_cves(
    package: str,
    version: Optional[str],
    cache: JsonCache,
    cache_ttl: int,
    osv_url: str,
    http_client: httpx.AsyncClient,
) -> CveInfo:
    """Cache-or-fetch CVE data for a package@version.

    A None version is cached separately from versioned queries — OSV's
    behaviour with no version is documented as "all versions" so we treat it
    as a distinct cache entry.
    """
    cache_key = f"cve:{package.lower()}@{version or 'latest'}"
    cached = await cache.get(cache_key)
    if cached is not None:
        try:
            return CveInfo.from_dict(cached)
        except Exception:
            pass

    vulns = await query_osv(package, version, osv_url, http_client)
    if vulns is None:
        # Treat OSV outage as "no known CVEs" but do NOT cache the negative
        # result — we want a fresh attempt next time.
        return CveInfo(ids=[], max_score=None, raw_count=0)

    info = _summarize_vulns(vulns)
    await cache.set(cache_key, info.to_dict(), cache_ttl)
    return info

cve_scanner

Background CVE re-scan scheduler.

Walks every package in the live registry on a fixed cadence, re-queries OSV via :func:lex_align_server.cve.resolve_cves, and writes a CVE_ALERT row to the audit log whenever the package's max CVSS now crosses GlobalPolicies.cve_threshold.

The scanner is alert-only. It does not flip registry entries to banned or otherwise change verdict policy — that decision belongs to the operator. The dashboard and the client's status command surface the alerts via the existing security_report endpoint.

Lifecycle mirrors RegistryPoller: start() schedules an asyncio task on the running loop, stop() drains it. Wired into the FastAPI lifespan so the task starts and stops with the server.

CveScanner

CveScanner(state: 'AppState')

Periodic OSV re-scan of every registered package.

The first scan runs after the configured interval, not at startup — server bring-up should not block on a wave of OSV requests, and the /evaluate path already covers fresh additions.

Source code in src/lex_align_server/cve_scanner.py
def __init__(self, state: "AppState"):
    self.state = state
    self._task: Optional[asyncio.Task] = None
    self._stop = asyncio.Event()
    self._last_run_at: Optional[datetime.datetime] = None
    self._last_alert_count: int = 0
    self._last_packages_scanned: int = 0
scan_once async
scan_once() -> int

Run one scan pass over the live registry.

Returns the number of CVE_ALERT rows written. Public so tests and the operator's manual reload paths can drive it.

Source code in src/lex_align_server/cve_scanner.py
async def scan_once(self) -> int:
    """Run one scan pass over the live registry.

    Returns the number of CVE_ALERT rows written. Public so tests
    and the operator's manual reload paths can drive it.
    """
    registry = self.state.registry
    if registry is None:
        logger.debug("cve scanner: no registry loaded; skipping")
        return 0
    packages = list(registry.packages.items())
    if not packages:
        return 0

    threshold_score = registry.global_policies.cve_threshold * 10.0
    alerts = 0
    scanned = 0
    for normalized_name, rule in packages:
        if self._stop.is_set():
            break
        try:
            info = await resolve_cves(
                normalized_name,
                None,  # scan latest known versions
                self.state.cache,
                self.state.settings.cve_cache_ttl,
                self.state.settings.osv_api_url,
                self.state.http,
            )
        except Exception:
            logger.exception(
                "cve scanner: lookup failed for %s", normalized_name,
            )
            continue
        scanned += 1
        if info.max_score is None or not info.ids:
            continue
        if info.max_score < threshold_score:
            continue
        await self.state.audit.record_cve_alert(
            package=normalized_name,
            cve_ids=info.ids,
            max_cvss=info.max_score,
            registry_status=(
                rule.status.value if rule.status is not None else None
            ),
        )
        alerts += 1
        logger.warning(
            "cve scanner: %s now exceeds threshold "
            "(max_cvss=%.1f, ids=%s)",
            normalized_name, info.max_score, info.ids[:3],
        )

    self._last_run_at = datetime.datetime.now(tz=datetime.timezone.utc)
    self._last_alert_count = alerts
    self._last_packages_scanned = scanned
    logger.info(
        "cve scanner: scan complete (packages=%d, alerts=%d)",
        scanned, alerts,
    )
    return alerts

dashboards

router

Dashboard pages.

/ is a welcome landing page. A shared side panel (_sidebar.html) links the landing page and every dashboard so operators can switch views without retyping URLs.

Four dashboards render server-side and fetch their data from the JSON API:

  • /dashboard/security is a vulnerability-posture view: severity buckets, packages with the worst CVE history, and the "hot" cell — registry-allowed packages that have started attracting CVE denials.
  • /dashboard/legal is a license-compliance view: license breakdown by verdict, unknown-license policy performance, projects pulling the most non-compliant packages.
  • /dashboard/agents shows a generic agent-activity report.
  • /dashboard/registry is an interactive workshop: it loads the live registry, lets the operator triage pending approval requests, and exports the result as YAML. Classifying a pending request also updates the in-memory registry so live /evaluate calls see the rule immediately, but persistence still requires exporting the YAML and rebuilding the server image.

evaluate

Evaluation orchestrator.

Combines registry lookup, CVE check, and license check into a single verdict. Emits one audit-log row per call. The order matters:

  1. registry hard-blocks (banned, deprecated, version-violated)
  2. CVE check — applies even to registry-allowed packages so a newly published critical CVE on a preferred package still blocks
  3. license check — only when the package is unknown to the registry

evaluate async

evaluate(*, package: str, version: Optional[str], project: str, requester: str, registry: Optional[Registry], cache: JsonCache, audit: AuditStore, settings: Settings, http_client: AsyncClient, agent: Optional[AgentInfo] = None) -> EvaluationResult

Single evaluation. Always writes one audit row before returning.

Source code in src/lex_align_server/evaluate.py
async def evaluate(
    *,
    package: str,
    version: Optional[str],
    project: str,
    requester: str,
    registry: Optional[Registry],
    cache: JsonCache,
    audit: AuditStore,
    settings: Settings,
    http_client: httpx.AsyncClient,
    agent: Optional[AgentInfo] = None,
) -> EvaluationResult:
    """Single evaluation. Always writes one audit row before returning."""
    agent = agent or AgentInfo()
    # Step 1 — registry verdict (only if a registry is configured).
    pkg_verdict: Optional[PackageVerdict] = None
    if registry is not None:
        pkg_verdict = registry.lookup(package, version)
        if pkg_verdict.action is Action.BLOCK:
            result = _denied(
                package=package,
                version=version,
                resolved_version=None,
                reason=pkg_verdict.reason or "Blocked by enterprise registry.",
                registry_status=pkg_verdict.status.value if pkg_verdict.status else None,
                replacement=pkg_verdict.replacement,
                version_constraint=pkg_verdict.version_constraint,
            )
            await _audit(audit, result, project, requester, agent, DENIAL_REGISTRY)
            return result

    # Step 2 — license + latest version (PyPI). We need PyPI even for known
    # packages so we can resolve "latest" when no version is provided.
    license_info: Optional[LicenseInfo] = None
    latest_version: Optional[str] = None
    if registry is None or pkg_verdict is None or pkg_verdict.action is Action.UNKNOWN:
        license_info, latest_version = await resolve_license(
            package, version, cache, settings.license_cache_ttl,
            settings.pypi_api_url, http_client,
        )
    else:
        # Known to registry — still need latest_version for the CVE call when
        # no version is pinned, so we hit PyPI but ignore the license.
        if version is None:
            license_info, latest_version = await resolve_license(
                package, None, cache, settings.license_cache_ttl,
                settings.pypi_api_url, http_client,
            )

    cve_query_version = version or latest_version

    # Step 3 — CVE check (applies regardless of registry status).
    cves = await resolve_cves(
        package,
        cve_query_version,
        cache,
        settings.cve_cache_ttl,
        settings.osv_api_url,
        http_client,
    )

    if registry is not None and registry.global_policies.cve_blocks(cves.max_score):
        result = _denied(
            package=package,
            version=version,
            resolved_version=cve_query_version,
            reason=(
                f"Critical CVE detected (max CVSS {cves.max_score}); "
                f"threshold is {registry.global_policies.cve_threshold * 10:.1f}."
            ),
            registry_status=(
                pkg_verdict.status.value if pkg_verdict and pkg_verdict.status else None
            ),
            license=license_info.license_normalized if license_info else None,
            cves=cves,
        )
        await _audit(audit, result, project, requester, agent, DENIAL_CVE)
        return result

    # Step 4 — license verdict for unknown-to-registry packages.
    if registry is not None and pkg_verdict is not None and pkg_verdict.action is Action.UNKNOWN:
        assert license_info is not None  # set in step 2
        lic_verdict = evaluate_license(
            license_info.license_normalized, registry.global_policies
        )
        if lic_verdict.action is Action.BLOCK:
            result = _denied(
                package=package,
                version=version,
                resolved_version=cve_query_version,
                reason=lic_verdict.reason,
                registry_status=None,
                license=license_info.license_normalized,
                cves=cves,
            )
            await _audit(audit, result, project, requester, agent, DENIAL_LICENSE)
            return result
        # Unknown but license-passing → provisionally allowed.
        if lic_verdict.needs_human_review:
            prov_reason = (
                "Not yet in your approved registry. License could not be "
                "determined; provisionally allowed pending human review. "
                "An approval request will be automatically submitted."
            )
        else:
            prov_reason = (
                "Not yet in your approved registry. License "
                f"{license_info.license_normalized} is on the auto-approve list "
                "and no critical CVEs are reported. Run `lex-align-client "
                "request-approval` to formalize."
            )
        result = EvaluationResult(
            verdict=VERDICT_PROVISIONALLY_ALLOWED,
            reason=prov_reason,
            package=package,
            version=version,
            resolved_version=cve_query_version,
            registry_status=None,
            license=license_info.license_normalized,
            cve_ids=list(cves.ids),
            max_cvss=cves.max_score,
            is_requestable=True,
            auto_request_approval=lic_verdict.needs_human_review,
        )
        await _audit(audit, result, project, requester, agent, DENIAL_NONE)
        return result

    # Step 5 — known-to-registry ALLOW or REQUIRE_PROPOSE.
    if pkg_verdict is None:
        # No registry configured at all. Treat as provisionally allowed
        # (license/CVE already passed if applicable).
        result = EvaluationResult(
            verdict=VERDICT_PROVISIONALLY_ALLOWED,
            reason="No enterprise registry configured; permitted by default.",
            package=package,
            version=version,
            resolved_version=cve_query_version,
            registry_status=None,
            license=license_info.license_normalized if license_info else None,
            cve_ids=list(cves.ids),
            max_cvss=cves.max_score,
            is_requestable=False,
        )
        await _audit(audit, result, project, requester, agent, DENIAL_NONE)
        return result

    needs_rationale = pkg_verdict.action is Action.REQUIRE_PROPOSE
    result = EvaluationResult(
        verdict=VERDICT_ALLOWED,
        reason=pkg_verdict.reason or (
            f"Allowed by local registry ({pkg_verdict.status.value if pkg_verdict.status else 'allow'})."
        ),
        package=package,
        version=version,
        resolved_version=cve_query_version,
        registry_status=pkg_verdict.status.value if pkg_verdict.status else None,
        version_constraint=pkg_verdict.version_constraint,
        license=license_info.license_normalized if license_info else None,
        cve_ids=list(cves.ids),
        max_cvss=cves.max_score,
        is_requestable=False,
        needs_rationale=needs_rationale,
    )
    await _audit(audit, result, project, requester, agent, DENIAL_NONE)
    return result

init

lex-align-server init — materialize the operator bundle into a directory.

Mirrors the design of lex-align-client init: copies the docker-compose stack, Dockerfile, example registry, and .env.example from the wheel's bundled assets into a target directory, then compiles the registry to JSON so docker compose up works on the first try.

asset_names

asset_names() -> Iterable[str]

All asset basenames as they appear in the wheel. Useful for tests.

Source code in src/lex_align_server/init.py
def asset_names() -> Iterable[str]:
    """All asset basenames as they appear in the wheel. Useful for tests."""
    return (
        *_VERBATIM_ASSETS,
        *_TEMPLATED_ASSETS,
        *(src for src, _ in _RENAMED_ASSETS),
    )

init_target

init_target(target: Path, *, force: bool = False) -> InitResult

Materialize the operator bundle into target.

Idempotent by default: if the marker file exists, raises FileExistsError unless force is set.

Source code in src/lex_align_server/init.py
def init_target(target: Path, *, force: bool = False) -> InitResult:
    """Materialize the operator bundle into ``target``.

    Idempotent by default: if the marker file exists, raises FileExistsError
    unless ``force`` is set.
    """
    target = target.resolve()
    marker = target / MARKER_FILENAME
    if marker.exists() and not force:
        raise FileExistsError(
            f"{marker} exists; this directory was already initialized. "
            "Use --force to overwrite."
        )

    target.mkdir(parents=True, exist_ok=True)
    version = _installed_version()
    written: list[Path] = []
    skipped: list[Path] = []

    for name in _VERBATIM_ASSETS:
        _write(target / name, _read_asset(name), force=force, written=written, skipped=skipped)

    for name in _TEMPLATED_ASSETS:
        body = _read_asset(name).decode("utf-8").replace("{LEX_ALIGN_VERSION}", version)
        _write(target / name, body.encode("utf-8"), force=force, written=written, skipped=skipped)

    for src, dest in _RENAMED_ASSETS:
        _write(target / dest, _read_asset(src), force=force, written=written, skipped=skipped)

    yaml_path = target / "registry.yml"
    json_path = target / "registry.json"
    compiled: Path | None = None
    if yaml_path.exists() and (force or not json_path.exists()):
        _compile_registry(yaml_path, json_path)
        compiled = json_path
        if json_path not in written:
            written.append(json_path)

    marker.write_text(f'version = "{version}"\n')
    if marker not in written:
        written.append(marker)

    return InitResult(
        target=target,
        written=written,
        skipped=skipped,
        compiled_registry=compiled,
    )

licenses

License lookup, normalization, and policy evaluation.

For packages not listed in the registry, the server fetches the license from PyPI, normalizes it to an SPDX-ish token, and checks it against global_policies. Results are cached in Redis keyed by package name.

fetch_license_from_pypi async

fetch_license_from_pypi(package: str, version: Optional[str], pypi_base: str, client: AsyncClient) -> tuple[Optional[str], Optional[str]]

Return (raw_license_string, latest_version) from PyPI, or (None, None) on error.

Source code in src/lex_align_server/licenses.py
async def fetch_license_from_pypi(
    package: str,
    version: Optional[str],
    pypi_base: str,
    client: httpx.AsyncClient,
) -> tuple[Optional[str], Optional[str]]:
    """Return (raw_license_string, latest_version) from PyPI, or (None, None) on error."""
    if version:
        url = PYPI_VERSIONED_URL_TEMPLATE.format(base=pypi_base, package=package, version=version)
    else:
        url = PYPI_URL_TEMPLATE.format(base=pypi_base, package=package)
    try:
        resp = await client.get(url)
    except (httpx.HTTPError, httpx.TimeoutException):
        return None, None
    if resp.status_code != 200:
        return None, None
    try:
        data = resp.json()
    except ValueError:
        return None, None
    raw = _extract_license_from_pypi_json(data)
    info = data.get("info") or {}
    latest = info.get("version")
    return raw, latest if isinstance(latest, str) else None

resolve_license async

resolve_license(package: str, version: Optional[str], cache: JsonCache, cache_ttl: int, pypi_base: str, http_client: AsyncClient) -> tuple[LicenseInfo, Optional[str]]

Resolve (cache-or-fetch) a package's license. Returns (info, latest_version).

Source code in src/lex_align_server/licenses.py
async def resolve_license(
    package: str,
    version: Optional[str],
    cache: JsonCache,
    cache_ttl: int,
    pypi_base: str,
    http_client: httpx.AsyncClient,
) -> tuple[LicenseInfo, Optional[str]]:
    """Resolve (cache-or-fetch) a package's license. Returns (info, latest_version)."""
    cache_key = f"license:{package.lower()}"
    cached = await cache.get(cache_key)
    if cached is not None:
        try:
            info = LicenseInfo.from_dict(cached["info"])
            return info, cached.get("latest_version")
        except (KeyError, TypeError):
            pass

    raw, latest = await fetch_license_from_pypi(package, version, pypi_base, http_client)
    info = LicenseInfo(
        license_raw=raw,
        license_normalized=normalize_license(raw),
    )
    await cache.set(
        cache_key,
        {"info": info.to_dict(), "latest_version": latest},
        cache_ttl,
    )
    return info, latest

main

FastAPI application factory.

Wires up the routers, builds the per-app state (cache, audit store, HTTP client, registry), and exposes a lifespan so resources are torn down cleanly.

proposer

Pluggable registry-update proposers.

When an agent calls request-approval or an operator clicks "Approve" on the dashboard, the server emits a proposal to update registry.yml with the new (or amended) package rule. How that proposal lands depends entirely on the configured proposer:

  • log_only — log it; don't touch anything. Safe default for evaluation / read-only demos. Selected when no registry write target is configured.
  • local_file — write directly to REGISTRY_PATH, recompile, reload in-memory. Best fit for single-user mode and local evaluation.
  • local_git — commit to a local git working tree (no remote, no PR). Useful when the operator wants the audit trail of git history without hosting a remote.
  • github — clone, branch, push, open a PR against the configured remote (REGISTRY_REPO_URL). The merge webhook on /api/v1/registry/webhook triggers the in-memory reload.
  • module:path:Class — escape hatch for custom CI/CD portals.

Selection happens via load_proposer(settings, http_client) — the loader auto-detects from existing settings, so REGISTRY_PROPOSER is optional. See proposer/loader.py for the precedence rules.

The dependency surface for endpoints (ProposedRule, ProposalContext, ProposalResult) is stable across backends; the endpoint code never branches on which proposer is active.

ProposalContext dataclass

ProposalContext(source: str, project: str, requester: str, rationale: str, agent_model: Optional[str] = None, agent_version: Optional[str] = None, extra: Mapping[str, Any] = dict())

Who is asking for the change and why.

source is "agent" for the agent's request-approval flow and "operator" when the dashboard is the originator. Both values end up in commit messages / PR bodies so reviewers can tell at a glance whether the change came from a human triaging the queue or from an agent running unsupervised.

ProposalResult dataclass

ProposalResult(backend: str, status: str, url: Optional[str] = None, branch: Optional[str] = None, commit_sha: Optional[str] = None, detail: str = '')

Outcome surfaced back to the API caller.

status values: * opened — a fresh PR / commit / file write was created. * amended — an existing open proposal for this package was updated (e.g. the agent re-requested with a new rationale). * applied — the change is live (local-file / local-git backends, or a webhook-driven reload after merge). * logged — log-only backend; no durable change happened. * failed — proposer hit an error. detail describes it.

ProposedRule dataclass

ProposedRule(name: str, status: str, reason: Optional[str] = None, replacement: Optional[str] = None, min_version: Optional[str] = None, max_version: Optional[str] = None)

The package classification an agent or operator wants in the registry.

name is the raw display form (Some-Pkg); the proposer normalizes it before writing to YAML. Optional fields mirror the YAML schema 1:1.

to_yaml_rule
to_yaml_rule() -> dict

Render to the dict shape the YAML schema expects.

Source code in src/lex_align_server/proposer/base.py
def to_yaml_rule(self) -> dict:
    """Render to the dict shape the YAML schema expects."""
    out: dict = {"status": self.status}
    if self.reason:      out["reason"] = self.reason
    if self.replacement: out["replacement"] = self.replacement
    if self.min_version: out["min_version"] = self.min_version
    if self.max_version: out["max_version"] = self.max_version
    return out

Proposer

Bases: ABC

Resolve a :class:ProposedRule into a registry change.

Implementations must be re-entrant — multiple requests for the same package collapse onto the same branch/PR via the backend's own idempotency rules, but the framework doesn't serialize calls.

close async
close() -> None

Optional cleanup. Override for proposers that hold long-lived resources (open file handles, working trees, etc.).

Source code in src/lex_align_server/proposer/base.py
async def close(self) -> None:
    """Optional cleanup. Override for proposers that hold long-lived
    resources (open file handles, working trees, etc.)."""
    return None

ProposerError

Bases: Exception

Proposer hit a recoverable error. Endpoint code wraps this as a :class:ProposalResult with status="failed" so a transient git outage doesn't cascade into an HTTP 5xx for the agent.

base

Proposer contract.

The dataclasses are deliberately small and serializable so dashboard responses can carry them verbatim without an additional translation layer. Custom backends only need to subclass :class:Proposer and implement propose; close is a no-op default for proposers that don't keep persistent state.

ProposedRule dataclass
ProposedRule(name: str, status: str, reason: Optional[str] = None, replacement: Optional[str] = None, min_version: Optional[str] = None, max_version: Optional[str] = None)

The package classification an agent or operator wants in the registry.

name is the raw display form (Some-Pkg); the proposer normalizes it before writing to YAML. Optional fields mirror the YAML schema 1:1.

to_yaml_rule
to_yaml_rule() -> dict

Render to the dict shape the YAML schema expects.

Source code in src/lex_align_server/proposer/base.py
def to_yaml_rule(self) -> dict:
    """Render to the dict shape the YAML schema expects."""
    out: dict = {"status": self.status}
    if self.reason:      out["reason"] = self.reason
    if self.replacement: out["replacement"] = self.replacement
    if self.min_version: out["min_version"] = self.min_version
    if self.max_version: out["max_version"] = self.max_version
    return out
ProposalContext dataclass
ProposalContext(source: str, project: str, requester: str, rationale: str, agent_model: Optional[str] = None, agent_version: Optional[str] = None, extra: Mapping[str, Any] = dict())

Who is asking for the change and why.

source is "agent" for the agent's request-approval flow and "operator" when the dashboard is the originator. Both values end up in commit messages / PR bodies so reviewers can tell at a glance whether the change came from a human triaging the queue or from an agent running unsupervised.

ProposalResult dataclass
ProposalResult(backend: str, status: str, url: Optional[str] = None, branch: Optional[str] = None, commit_sha: Optional[str] = None, detail: str = '')

Outcome surfaced back to the API caller.

status values: * opened — a fresh PR / commit / file write was created. * amended — an existing open proposal for this package was updated (e.g. the agent re-requested with a new rationale). * applied — the change is live (local-file / local-git backends, or a webhook-driven reload after merge). * logged — log-only backend; no durable change happened. * failed — proposer hit an error. detail describes it.

ProposerError

Bases: Exception

Proposer hit a recoverable error. Endpoint code wraps this as a :class:ProposalResult with status="failed" so a transient git outage doesn't cascade into an HTTP 5xx for the agent.

Proposer

Bases: ABC

Resolve a :class:ProposedRule into a registry change.

Implementations must be re-entrant — multiple requests for the same package collapse onto the same branch/PR via the backend's own idempotency rules, but the framework doesn't serialize calls.

close async
close() -> None

Optional cleanup. Override for proposers that hold long-lived resources (open file handles, working trees, etc.).

Source code in src/lex_align_server/proposer/base.py
async def close(self) -> None:
    """Optional cleanup. Override for proposers that hold long-lived
    resources (open file handles, working trees, etc.)."""
    return None

github

GitHub PR proposer.

Production-grade flow for orgs that want PR review on every registry change. Each propose() call:

  1. Ensures a fresh shallow clone of the registry repo in a working directory (REGISTRY_REPO_WORKDIR, default /var/lib/lexalign/registry-work).
  2. Branches off the configured default branch as lex-align/approval/<normalized-package-name>. If the branch already exists on the remote, fetches and re-uses it so the same package never gets two parallel PRs.
  3. Edits the YAML to add / replace the package rule.
  4. Commits with an author identity scoped to the bot (REGISTRY_BOT_AUTHOR_NAME / REGISTRY_BOT_AUTHOR_EMAIL).
  5. Pushes the branch.
  6. Opens a PR via the GitHub REST API, or posts a follow-up comment if one is already open for the same branch.

The merge → reload step is not in this module — the operator wires GitHub's webhook to POST /api/v1/registry/webhook, which pulls the updated YAML and triggers the in-memory reload.

We shell out to git (subprocess) and use httpx for the REST calls. No additional Python dependencies.

GitHubProposer
GitHubProposer(*, repo_url: str, registry_file_path: str, token: str, http_client: AsyncClient, workdir: Path, api_base: str = 'https://api.github.com', default_branch: str = 'main', author_name: str = 'lex-align bot', author_email: str = 'lex-align-bot@users.noreply.github.com')

Bases: Proposer

Source code in src/lex_align_server/proposer/github.py
def __init__(
    self,
    *,
    repo_url: str,
    registry_file_path: str,
    token: str,
    http_client: httpx.AsyncClient,
    workdir: Path,
    api_base: str = "https://api.github.com",
    default_branch: str = "main",
    author_name: str = "lex-align bot",
    author_email: str = "lex-align-bot@users.noreply.github.com",
):
    if not repo_url:
        raise ValueError("GitHubProposer requires REGISTRY_REPO_URL.")
    if not token:
        raise ValueError("GitHubProposer requires REGISTRY_REPO_TOKEN.")
    if not registry_file_path:
        raise ValueError(
            "GitHubProposer requires REGISTRY_FILE_PATH (the path to "
            "registry.yml inside the repo)."
        )
    if not shutil.which("git"):
        raise ProposerError("git binary not on PATH.")

    m = _GITHUB_REPO_RE.match(repo_url.strip())
    if not m:
        raise ValueError(
            f"REGISTRY_REPO_URL={repo_url!r} doesn't look like a GitHub "
            "repo URL (expected https://github.com/owner/repo or git@...)."
        )
    self.owner = m.group("owner")
    self.repo = m.group("repo")
    # Normalize to an HTTPS URL with the token embedded for the push.
    # We never log this — see _redact() below — and the embedded form is
    # only used for git subprocess calls.
    self._authed_https = (
        f"https://x-access-token:{token}@github.com/{self.owner}/{self.repo}.git"
    )
    self.repo_url_public = f"https://github.com/{self.owner}/{self.repo}"
    self.registry_file_path = registry_file_path
    self.token = token
    self.http = http_client
    self.api_base = api_base.rstrip("/")
    self.default_branch = default_branch
    self.author_name = author_name
    self.author_email = author_email
    self.workdir = workdir
    self.workdir.mkdir(parents=True, exist_ok=True)
    # Serialize subprocess git operations so concurrent proposals
    # against the same shared working tree don't trample each other.
    self._lock = asyncio.Lock()
refresh_local_yaml async
refresh_local_yaml(dest: Optional[Path]) -> None

Pull the merged registry YAML into the local REGISTRY_PATH.

Called by the webhook handler on a pull_request.merged event. Without this, the reloader's read-from-disk would still show the pre-merge state until the next git pull happened. Cheap: a depth-1 fetch plus a file copy.

Source code in src/lex_align_server/proposer/github.py
async def refresh_local_yaml(self, dest: Optional[Path]) -> None:
    """Pull the merged registry YAML into the local ``REGISTRY_PATH``.

    Called by the webhook handler on a ``pull_request.merged`` event.
    Without this, the reloader's read-from-disk would still show the
    pre-merge state until the next ``git pull`` happened. Cheap: a
    depth-1 fetch plus a file copy.
    """
    if dest is None:
        return
    async with self._lock:
        try:
            clone_dir = await asyncio.to_thread(self._clone_or_refresh)
            source = clone_dir / self.registry_file_path
            if not source.exists():
                logger.warning(
                    "github proposer: %s missing in repo after merge fetch",
                    self.registry_file_path,
                )
                return
            await asyncio.to_thread(self._copy_to_dest, source, dest)
        except Exception:
            logger.exception("github proposer: refresh_local_yaml failed")

loader

Resolve the configured proposer for the running server.

Called once from the FastAPI lifespan. The result is stored on app.state.lex.proposer and reused for every approval request and every dashboard "Approve" click.

Auto-detection biases towards the single-team / local-file experience. REGISTRY_PROPOSER only needs to be set when overriding the default:

  1. REGISTRY_REPO_URL set explicitly → github (the PR-based backend; opt- in only — never auto- selected from the presence of a GitHub remote alone).
  2. REGISTRY_PATH is inside a git working → local_git. tree.
  3. REGISTRY_PATH set, parent dir writable → local_file.
  4. Nothing configured → log_only, with a startup warning.

local_file

Direct-write proposer — edits REGISTRY_PATH in place.

Best fit for single-user mode and small teams without a git host integration. The flow is:

  1. Read the current YAML (creating an empty registry if absent).
  2. Insert / replace the package rule.
  3. Validate the merged document via validate_registry so an invalid proposal can never produce a corrupt file on disk.
  4. Write atomically (write-to-temp + rename).
  5. Recompile and reload the in-memory Registry so the next /evaluate call sees the change immediately.

There is no review step — the dashboard / agent's request is the authorization. Use local_git if you want a git log audit trail, or the GitHub backend if you want PR review.

local_git

Local-git proposer — commits to a working tree, no remote, no PR.

Same flow as local_file but with a git commit after each write so the operator gets git log as the audit trail. Useful for teams that want history without standing up a GitHub/GitLab integration.

We shell out to the system git rather than pulling in a Python git library — keeps the dependency surface small and the operator's normal git tooling (signed commits, hooks, config) keeps working unchanged.

log_only

Log-only proposer — does nothing durable.

Selected automatically when no registry write target is configured (neither REGISTRY_REPO_URL nor a writable REGISTRY_PATH). Useful for read-only demos and for the "is this thing even talking to my server?" smoke test phase of evaluation. The dashboard's "Approve" button still works — it just won't make the change stick.

quickstart

lex-align-server quickstart — Docker-free single-user bring-up.

The Docker compose path stays the recommended deployment for anything multi-user, but a single developer evaluating the tool on a laptop benefits from a one-command path that:

  • materializes a registry into a writable directory next to a sqlite audit DB,
  • runs the FastAPI app in-process under uvicorn on 127.0.0.1:8765,
  • skips Redis (the cache layer already silently degrades when the configured URL can't be reached, so the server still serves — license and CVE lookups just hit upstream every time).

This is not a substitute for the docker stack in production; it is a shortcut for "I want to try this on my laptop without setting up infrastructure." The marker file (.lexalign-quickstart.toml) keeps the directory layout idempotent and distinct from init's docker bundle.

materialize

materialize(target: Path | None = None, *, bind_host: str = '127.0.0.1', bind_port: int = 8765, force: bool = False) -> QuickstartResult

Lay down target so serve can boot against it.

Idempotent: existing files are preserved unless force is set. The registry YAML is always (re)compiled to JSON because the server reads the JSON form.

Source code in src/lex_align_server/quickstart.py
def materialize(
    target: Path | None = None,
    *,
    bind_host: str = "127.0.0.1",
    bind_port: int = 8765,
    force: bool = False,
) -> QuickstartResult:
    """Lay down ``target`` so ``serve`` can boot against it.

    Idempotent: existing files are preserved unless ``force`` is set.
    The registry YAML is always (re)compiled to JSON because the
    server reads the JSON form.
    """
    import json

    import yaml

    target = (target or default_target()).expanduser().resolve()
    target.mkdir(parents=True, exist_ok=True)

    written: list[Path] = []
    skipped: list[Path] = []

    yml_path = target / _REGISTRY_FILE
    if yml_path.exists() and not force:
        skipped.append(yml_path)
    else:
        yml_path.write_bytes(
            resources.files(_ASSETS_PACKAGE).joinpath("registry.example.yml").read_bytes()
        )
        written.append(yml_path)

    # Always compile JSON so the server has a fresh artifact even when
    # the YAML was preserved (the user may have edited it).
    json_path = target / "registry.json"
    doc = yaml.safe_load(yml_path.read_text(encoding="utf-8")) or {}
    compiled = validate_registry(doc)
    json_path.write_text(json.dumps(compiled, indent=2, sort_keys=True) + "\n")
    if json_path not in written:
        written.append(json_path)

    db_path = target / _DB_FILE
    marker = target / QUICKSTART_MARKER
    marker.write_text(
        f'target = "{target}"\n'
        f'registry_path = "{yml_path}"\n'
        f'database_path = "{db_path}"\n'
        f'bind_host = "{bind_host}"\n'
        f'bind_port = {bind_port}\n'
    )
    if marker not in written:
        written.append(marker)

    return QuickstartResult(
        target=target,
        registry_yml=yml_path,
        registry_json=json_path,
        database_path=db_path,
        bind_host=bind_host,
        bind_port=bind_port,
        written=written,
        skipped=skipped,
    )

apply_env

apply_env(result: QuickstartResult) -> dict[str, str]

Compute the environment overrides the in-process uvicorn run needs.

Returned as a dict so callers can choose to os.environ.update it (the CLI does) or inspect it (tests do). REDIS_URL is set to "none" so the cache layer skips Redis entirely without attempting a connection or emitting warnings — quickstart is explicitly single- user with no Redis available.

Source code in src/lex_align_server/quickstart.py
def apply_env(result: QuickstartResult) -> dict[str, str]:
    """Compute the environment overrides the in-process uvicorn run needs.

    Returned as a dict so callers can choose to ``os.environ.update`` it
    (the CLI does) or inspect it (tests do). ``REDIS_URL`` is set to
    ``"none"`` so the cache layer skips Redis entirely without attempting
    a connection or emitting warnings — quickstart is explicitly single-
    user with no Redis available.
    """
    return {
        "REGISTRY_PATH": str(result.registry_json),
        "DATABASE_PATH": str(result.database_path),
        "BIND_HOST": result.bind_host,
        "BIND_PORT": str(result.bind_port),
        "REDIS_URL": "none",
        # No proposer override: with REGISTRY_PATH set on a writable
        # directory the auto-detector picks ``local_file`` (the
        # recommended single-team backend).
    }

registry

Enterprise registry: package policies, license rules, CVE threshold.

The registry is loaded from a local JSON file (compiled from the human-authored YAML by lex-align-server registry compile). The server consults it on every /evaluate call.

registry_schema

Registry-YAML schema validation, shared by the CLI compiler and the dashboard's import endpoint.

This module is intentionally pure: no I/O, no CLI argparse, no FastAPI dependencies. The two entry points that consume it (the lex-align-server registry compile CLI and lex_align_server.api.v1.registry) both depend on the same validator so a registry YAML accepted by the dashboard is guaranteed to compile in CI, and vice versa.

validate_package_rule

validate_package_rule(name: str, rule: Any) -> None

Public single-package validator. Raises ValidationError on failure.

Used by the dashboard's classify endpoint so the in-memory mutation has the same guarantees as a YAML round-trip.

Source code in src/lex_align_server/registry_schema.py
def validate_package_rule(name: str, rule: Any) -> None:
    """Public single-package validator. Raises ValidationError on failure.

    Used by the dashboard's classify endpoint so the in-memory mutation
    has the same guarantees as a YAML round-trip.
    """
    _validate_package(name, rule)

reloader

Registry hot-reload coordinator.

Three triggers all funnel through reload_registry:

  • the GitHub webhook handler (POST /api/v1/registry/webhook) on PR merge — it pulls the merged YAML to disk first, then calls in;
  • the operator's manual POST /api/v1/registry/reload fallback for when the webhook gets lost;
  • the periodic RegistryPoller that watches REGISTRY_PATH mtime every REGISTRY_RELOAD_INTERVAL seconds.

The reload is atomic: in-flight /evaluate calls finish against the old Registry; the next call sees the new one. We also flip matching PENDING_REVIEW approval rows to APPROVED for every package the new registry now contains — so the dashboard's pending queue auto-trims when a PR merges or a YAML edit goes through.

ReloadResult dataclass

ReloadResult(ok: bool, previous_version: Optional[str] = None, new_version: Optional[str] = None, package_count: int = 0, added_packages: int = 0, approved_requests: int = 0, detail: str = '')

Outcome of a reload attempt; surfaced through the manual reload endpoint and recorded in the server log.

RegistryPoller

RegistryPoller(state: 'AppState')

Background task that reloads on REGISTRY_PATH mtime change.

Cheap: a stat() per tick, recompile only when mtime advances. The webhook is the primary reload signal in production; this exists as a backstop for missed webhooks and for local-file / local-git modes where there is no webhook at all.

Source code in src/lex_align_server/reloader.py
def __init__(self, state: "AppState"):
    self.state = state
    self._task: Optional[asyncio.Task] = None
    self._stop = asyncio.Event()
    self._last_mtime: Optional[float] = None

reload_registry async

reload_registry(state: 'AppState') -> ReloadResult

Re-read REGISTRY_PATH from disk, validate, and atomically swap the in-memory Registry. Idempotent: a no-op when the file hasn't changed.

Caller is responsible for putting the latest YAML on disk (e.g. the webhook handler runs git pull first).

Source code in src/lex_align_server/reloader.py
async def reload_registry(state: "AppState") -> ReloadResult:
    """Re-read ``REGISTRY_PATH`` from disk, validate, and atomically swap
    the in-memory ``Registry``. Idempotent: a no-op when the file hasn't
    changed.

    Caller is responsible for putting the latest YAML on disk (e.g. the
    webhook handler runs ``git pull`` first).
    """
    path = state.settings.registry_path
    if path is None:
        return ReloadResult(
            ok=False,
            detail="REGISTRY_PATH is not configured; nothing to reload.",
        )
    if not path.exists():
        return ReloadResult(
            ok=False,
            detail=f"REGISTRY_PATH {path} does not exist.",
        )

    try:
        doc = await asyncio.to_thread(_read_yaml, path)
        compiled = validate_registry(doc)
    except ValidationError as exc:
        logger.error("registry reload rejected (validation): %s", exc)
        return ReloadResult(
            ok=False,
            detail=f"Validation failed; keeping previous registry: {exc}",
        )
    except Exception as exc:
        logger.exception("registry reload failed reading %s", path)
        return ReloadResult(
            ok=False,
            detail=f"Failed reading {path}: {exc}",
        )

    new_registry = Registry.from_dict(compiled, source_path=path)
    previous = state.registry
    previous_keys = set(previous.packages.keys()) if previous else set()
    new_keys = set(new_registry.packages.keys())
    added = new_keys - previous_keys

    # Atomic swap. Per the GIL, a single attribute assignment is atomic;
    # any concurrent reader of ``state.registry`` either sees the old or
    # the new object, never a half-built one. We don't bother with a lock.
    state.registry = new_registry

    approved_total = 0
    if added:
        for normalized in added:
            approved_total += await state.audit.mark_approved_by_package(normalized)

    logger.info(
        "registry reloaded: version %s%s, packages %d%d "
        "(added %d, auto-approved %d pending requests)",
        previous.version if previous else None, new_registry.version,
        len(previous_keys), len(new_keys), len(added), approved_total,
    )
    return ReloadResult(
        ok=True,
        previous_version=previous.version if previous else None,
        new_version=new_registry.version,
        package_count=len(new_keys),
        added_packages=len(added),
        approved_requests=approved_total,
        detail="ok",
    )

state

Application state container.

Carried on app.state so the per-request dependencies don't have to know how the cache, audit log, or registry got built.

Application entrypoint

main

FastAPI application factory.

Wires up the routers, builds the per-app state (cache, audit store, HTTP client, registry), and exposes a lifespan so resources are torn down cleanly.

CLI

cli

lex-align-server CLI.

Exposes
  • serve — run uvicorn against the FastAPI app.
  • quickstart — Docker-free single-user bring-up: materialize a local registry + sqlite, then run the server in-process on 127.0.0.1:8765.
  • init — materialize the operator bundle (compose stack, Dockerfile, registry, .env) into a target dir.
  • check-config — pre-flight checks for a single-team install: REGISTRY_PATH, audit DB, cache, proposer, auth.
  • registry compile — compile a YAML registry to the JSON form the server consumes.
  • selftest — hit /api/v1/health to confirm a stack is up.
  • admin keys ... — placeholders for the org-mode admin tooling.

main

main() -> None

lex-align server CLI.

Source code in src/lex_align_server/cli.py
@click.group()
def main() -> None:
    """lex-align server CLI."""

serve

serve(host: str | None, port: int | None, reload: bool) -> None

Start the lex-align FastAPI server via uvicorn.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option("--host", default=None, help="Override BIND_HOST.")
@click.option("--port", default=None, type=int, help="Override BIND_PORT.")
@click.option("--reload", is_flag=True, help="Enable hot reload (development only).")
def serve(host: str | None, port: int | None, reload: bool) -> None:
    """Start the lex-align FastAPI server via uvicorn."""
    import uvicorn
    from .config import get_settings
    settings = get_settings()
    uvicorn.run(
        "lex_align_server.main:app",
        host=host or settings.bind_host,
        port=port or settings.bind_port,
        reload=reload,
    )

quickstart

quickstart(target: Path | None, host: str, port: int, no_serve: bool, force: bool) -> None

Docker-free single-user bring-up.

Lays down a local registry + sqlite under --target (default ~/.lexalign), then runs the server in-process on http://127.0.0.1:8765. Redis is skipped (the cache layer silently degrades). Stop with Ctrl-C.

For multi-user deployments use lex-align-server init + Docker Compose instead.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option(
    "--target",
    default=None,
    type=click.Path(file_okay=False, path_type=Path),
    help="Directory for the registry + audit DB. Defaults to ~/.lexalign.",
)
@click.option("--host", default="127.0.0.1", help="Bind host (default 127.0.0.1).")
@click.option("--port", default=8765, type=int, help="Bind port (default 8765).")
@click.option(
    "--no-serve",
    is_flag=True,
    help="Materialize the directory and exit without starting uvicorn.",
)
@click.option(
    "--force",
    is_flag=True,
    help="Overwrite an existing registry.yml in the target directory.",
)
def quickstart(
    target: Path | None,
    host: str,
    port: int,
    no_serve: bool,
    force: bool,
) -> None:
    """Docker-free single-user bring-up.

    Lays down a local registry + sqlite under ``--target`` (default
    ``~/.lexalign``), then runs the server in-process on
    ``http://127.0.0.1:8765``. Redis is skipped (the cache layer
    silently degrades). Stop with Ctrl-C.

    For multi-user deployments use `lex-align-server init` + Docker
    Compose instead.
    """
    target = target or quickstart_default_target()
    try:
        result = quickstart_materialize(
            target, bind_host=host, bind_port=port, force=force
        )
    except ValidationError as exc:
        raise click.ClickException(f"Registry validation failed: {exc}")

    for p in result.written:
        click.echo(f"  + {p}")
    for p in result.skipped:
        click.echo(f"  · skipped (exists) {p}")

    click.echo("")
    click.echo(f"Quickstart bundle ready under {result.target}.")
    click.echo(f"  registry: {result.registry_yml}")
    click.echo(f"  audit DB: {result.database_path}")
    click.echo("")

    if no_serve:
        click.echo("Skipping `serve` (--no-serve). Start later with:")
        click.echo(
            "  REGISTRY_PATH={reg} DATABASE_PATH={db} BIND_HOST={h} BIND_PORT={p} "
            "lex-align-server serve".format(
                reg=result.registry_yml, db=result.database_path,
                h=result.bind_host, p=result.bind_port,
            )
        )
        return

    os.environ.update(quickstart_apply_env(result))

    click.echo(f"Starting server on http://{result.bind_host}:{result.bind_port} (Ctrl-C to stop)")
    click.echo("Run `lex-align-client init` in another terminal to point a project at it.")
    click.echo("")
    import uvicorn
    uvicorn.run(
        "lex_align_server.main:app",
        host=result.bind_host,
        port=result.bind_port,
    )

init

init(target: Path, force: bool) -> None

Materialize the docker-compose stack, Dockerfile, registry, and .env template into TARGET so the server can be built and run with a single docker compose up -d.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option(
    "--target",
    default="./lexalign",
    type=click.Path(file_okay=False, path_type=Path),
    help="Directory to write the operator bundle into. Created if missing.",
)
@click.option(
    "--force",
    is_flag=True,
    help=f"Overwrite existing files (and the {MARKER_FILENAME} marker).",
)
def init(target: Path, force: bool) -> None:
    """Materialize the docker-compose stack, Dockerfile, registry, and .env
    template into TARGET so the server can be built and run with a single
    `docker compose up -d`.
    """
    try:
        result = init_target(target, force=force)
    except FileExistsError as exc:
        raise click.ClickException(str(exc))
    except ValidationError as exc:
        raise click.ClickException(f"Registry validation failed: {exc}")

    for path in result.written:
        click.echo(f"  + {path.relative_to(Path.cwd()) if path.is_relative_to(Path.cwd()) else path}")
    for path in result.skipped:
        click.echo(f"  · skipped (exists) {path}")

    click.echo("")
    click.echo(f"Operator bundle written to {result.target}.")
    click.echo("")
    click.echo("Next steps:")
    click.echo(f"  cd {result.target.relative_to(Path.cwd()) if result.target.is_relative_to(Path.cwd()) else result.target}")
    click.echo("  cp .env.example .env       # edit if you need to change defaults")
    click.echo("  docker compose up -d")
    click.echo("  lex-align-server selftest  # confirm the stack is alive")

registry

registry() -> None

Manage the enterprise package registry.

Source code in src/lex_align_server/cli.py
@main.group()
def registry() -> None:
    """Manage the enterprise package registry."""

registry_compile

registry_compile(source: Path, destination: Path) -> None

Compile a YAML registry SOURCE to the JSON form at DESTINATION.

Source code in src/lex_align_server/cli.py
@registry.command("compile")
@click.argument("source", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.argument("destination", type=click.Path(dir_okay=False, path_type=Path))
def registry_compile(source: Path, destination: Path) -> None:
    """Compile a YAML registry SOURCE to the JSON form at DESTINATION."""
    import yaml

    try:
        doc = yaml.safe_load(source.read_text())
    except yaml.YAMLError as exc:
        raise click.ClickException(f"YAML parse error: {exc}")
    try:
        compiled = validate_registry(doc)
    except ValidationError as exc:
        raise click.ClickException(f"Registry validation failed: {exc}")

    destination.parent.mkdir(parents=True, exist_ok=True)
    destination.write_text(json.dumps(compiled, indent=2, sort_keys=True) + "\n")
    click.echo(
        f"Compiled {len(compiled['packages'])} package rules from {source}{destination}"
    )

check_config

check_config() -> None

Validate the server's configuration for a single-team install.

Verifies REGISTRY_PATH, audit DB writability, cache reachability, the auto-detected proposer, and auth posture. Exits non-zero if any check fails. Warnings (e.g. anonymous auth on a non-loopback bind, Redis unreachable) do not fail the run.

Source code in src/lex_align_server/cli.py
@main.command("check-config")
def check_config() -> None:
    """Validate the server's configuration for a single-team install.

    Verifies REGISTRY_PATH, audit DB writability, cache reachability, the
    auto-detected proposer, and auth posture. Exits non-zero if any
    check fails. Warnings (e.g. anonymous auth on a non-loopback bind,
    Redis unreachable) do not fail the run.
    """
    from .config import get_settings
    settings = get_settings()
    results = run_checks_sync(settings)

    label_width = max((len(r.label) for r in results), default=10)
    for r in results:
        click.echo(f"{_STATUS_GLYPH[r.status]}  {r.label.ljust(label_width)}  {r.detail}")

    fails = sum(1 for r in results if r.status == FAIL)
    warns = sum(1 for r in results if r.status == WARN)
    click.echo("")
    if fails:
        click.echo(f"{fails} failure(s), {warns} warning(s).", err=True)
        sys.exit(1)
    if warns:
        click.echo(f"All checks passed with {warns} warning(s).")
    else:
        click.echo("All checks passed.")

selftest

selftest(url: str, timeout: float) -> None

Probe /api/v1/health and report whether the stack is alive.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option(
    "--url",
    default="http://127.0.0.1:8765",
    help="Base URL of the server to probe.",
)
@click.option("--timeout", default=5.0, type=float, help="Per-request timeout (seconds).")
def selftest(url: str, timeout: float) -> None:
    """Probe `/api/v1/health` and report whether the stack is alive."""
    import httpx

    health = url.rstrip("/") + "/api/v1/health"
    try:
        response = httpx.get(health, timeout=timeout)
    except httpx.HTTPError as exc:
        raise click.ClickException(f"Server unreachable at {health}: {exc}")

    if response.status_code != 200:
        raise click.ClickException(
            f"Health check failed: HTTP {response.status_code} from {health}"
        )

    click.echo(f"OK  {health}")
    try:
        click.echo(json.dumps(response.json(), indent=2))
    except ValueError:
        click.echo(response.text)

admin

admin() -> None

Administrative commands (organization mode).

Source code in src/lex_align_server/cli.py
@main.group()
def admin() -> None:
    """Administrative commands (organization mode)."""

keys

keys() -> None

Manage API keys (deferred — Phase 3+).

Source code in src/lex_align_server/cli.py
@admin.group()
def keys() -> None:
    """Manage API keys (deferred — Phase 3+)."""

Configuration

config

Server configuration.

All settings are sourced from environment variables. Defaults bias towards the single-user/local-evaluation experience: no auth, bind to localhost, point at a co-located Redis.

get_settings

get_settings() -> Settings

Build a fresh Settings object. Avoids module-level caching so tests can override via env vars on a per-test basis.

Source code in src/lex_align_server/config.py
def get_settings() -> Settings:
    """Build a fresh Settings object. Avoids module-level caching so tests can
    override via env vars on a per-test basis.
    """
    return Settings()

State management

state

Application state container.

Carried on app.state so the per-request dependencies don't have to know how the cache, audit log, or registry got built.

Evaluation pipeline

evaluate

Evaluation orchestrator.

Combines registry lookup, CVE check, and license check into a single verdict. Emits one audit-log row per call. The order matters:

  1. registry hard-blocks (banned, deprecated, version-violated)
  2. CVE check — applies even to registry-allowed packages so a newly published critical CVE on a preferred package still blocks
  3. license check — only when the package is unknown to the registry

evaluate async

evaluate(*, package: str, version: Optional[str], project: str, requester: str, registry: Optional[Registry], cache: JsonCache, audit: AuditStore, settings: Settings, http_client: AsyncClient, agent: Optional[AgentInfo] = None) -> EvaluationResult

Single evaluation. Always writes one audit row before returning.

Source code in src/lex_align_server/evaluate.py
async def evaluate(
    *,
    package: str,
    version: Optional[str],
    project: str,
    requester: str,
    registry: Optional[Registry],
    cache: JsonCache,
    audit: AuditStore,
    settings: Settings,
    http_client: httpx.AsyncClient,
    agent: Optional[AgentInfo] = None,
) -> EvaluationResult:
    """Single evaluation. Always writes one audit row before returning."""
    agent = agent or AgentInfo()
    # Step 1 — registry verdict (only if a registry is configured).
    pkg_verdict: Optional[PackageVerdict] = None
    if registry is not None:
        pkg_verdict = registry.lookup(package, version)
        if pkg_verdict.action is Action.BLOCK:
            result = _denied(
                package=package,
                version=version,
                resolved_version=None,
                reason=pkg_verdict.reason or "Blocked by enterprise registry.",
                registry_status=pkg_verdict.status.value if pkg_verdict.status else None,
                replacement=pkg_verdict.replacement,
                version_constraint=pkg_verdict.version_constraint,
            )
            await _audit(audit, result, project, requester, agent, DENIAL_REGISTRY)
            return result

    # Step 2 — license + latest version (PyPI). We need PyPI even for known
    # packages so we can resolve "latest" when no version is provided.
    license_info: Optional[LicenseInfo] = None
    latest_version: Optional[str] = None
    if registry is None or pkg_verdict is None or pkg_verdict.action is Action.UNKNOWN:
        license_info, latest_version = await resolve_license(
            package, version, cache, settings.license_cache_ttl,
            settings.pypi_api_url, http_client,
        )
    else:
        # Known to registry — still need latest_version for the CVE call when
        # no version is pinned, so we hit PyPI but ignore the license.
        if version is None:
            license_info, latest_version = await resolve_license(
                package, None, cache, settings.license_cache_ttl,
                settings.pypi_api_url, http_client,
            )

    cve_query_version = version or latest_version

    # Step 3 — CVE check (applies regardless of registry status).
    cves = await resolve_cves(
        package,
        cve_query_version,
        cache,
        settings.cve_cache_ttl,
        settings.osv_api_url,
        http_client,
    )

    if registry is not None and registry.global_policies.cve_blocks(cves.max_score):
        result = _denied(
            package=package,
            version=version,
            resolved_version=cve_query_version,
            reason=(
                f"Critical CVE detected (max CVSS {cves.max_score}); "
                f"threshold is {registry.global_policies.cve_threshold * 10:.1f}."
            ),
            registry_status=(
                pkg_verdict.status.value if pkg_verdict and pkg_verdict.status else None
            ),
            license=license_info.license_normalized if license_info else None,
            cves=cves,
        )
        await _audit(audit, result, project, requester, agent, DENIAL_CVE)
        return result

    # Step 4 — license verdict for unknown-to-registry packages.
    if registry is not None and pkg_verdict is not None and pkg_verdict.action is Action.UNKNOWN:
        assert license_info is not None  # set in step 2
        lic_verdict = evaluate_license(
            license_info.license_normalized, registry.global_policies
        )
        if lic_verdict.action is Action.BLOCK:
            result = _denied(
                package=package,
                version=version,
                resolved_version=cve_query_version,
                reason=lic_verdict.reason,
                registry_status=None,
                license=license_info.license_normalized,
                cves=cves,
            )
            await _audit(audit, result, project, requester, agent, DENIAL_LICENSE)
            return result
        # Unknown but license-passing → provisionally allowed.
        if lic_verdict.needs_human_review:
            prov_reason = (
                "Not yet in your approved registry. License could not be "
                "determined; provisionally allowed pending human review. "
                "An approval request will be automatically submitted."
            )
        else:
            prov_reason = (
                "Not yet in your approved registry. License "
                f"{license_info.license_normalized} is on the auto-approve list "
                "and no critical CVEs are reported. Run `lex-align-client "
                "request-approval` to formalize."
            )
        result = EvaluationResult(
            verdict=VERDICT_PROVISIONALLY_ALLOWED,
            reason=prov_reason,
            package=package,
            version=version,
            resolved_version=cve_query_version,
            registry_status=None,
            license=license_info.license_normalized,
            cve_ids=list(cves.ids),
            max_cvss=cves.max_score,
            is_requestable=True,
            auto_request_approval=lic_verdict.needs_human_review,
        )
        await _audit(audit, result, project, requester, agent, DENIAL_NONE)
        return result

    # Step 5 — known-to-registry ALLOW or REQUIRE_PROPOSE.
    if pkg_verdict is None:
        # No registry configured at all. Treat as provisionally allowed
        # (license/CVE already passed if applicable).
        result = EvaluationResult(
            verdict=VERDICT_PROVISIONALLY_ALLOWED,
            reason="No enterprise registry configured; permitted by default.",
            package=package,
            version=version,
            resolved_version=cve_query_version,
            registry_status=None,
            license=license_info.license_normalized if license_info else None,
            cve_ids=list(cves.ids),
            max_cvss=cves.max_score,
            is_requestable=False,
        )
        await _audit(audit, result, project, requester, agent, DENIAL_NONE)
        return result

    needs_rationale = pkg_verdict.action is Action.REQUIRE_PROPOSE
    result = EvaluationResult(
        verdict=VERDICT_ALLOWED,
        reason=pkg_verdict.reason or (
            f"Allowed by local registry ({pkg_verdict.status.value if pkg_verdict.status else 'allow'})."
        ),
        package=package,
        version=version,
        resolved_version=cve_query_version,
        registry_status=pkg_verdict.status.value if pkg_verdict.status else None,
        version_constraint=pkg_verdict.version_constraint,
        license=license_info.license_normalized if license_info else None,
        cve_ids=list(cves.ids),
        max_cvss=cves.max_score,
        is_requestable=False,
        needs_rationale=needs_rationale,
    )
    await _audit(audit, result, project, requester, agent, DENIAL_NONE)
    return result

Registry

registry

Enterprise registry: package policies, license rules, CVE threshold.

The registry is loaded from a local JSON file (compiled from the human-authored YAML by lex-align-server registry compile). The server consults it on every /evaluate call.

registry_schema

Registry-YAML schema validation, shared by the CLI compiler and the dashboard's import endpoint.

This module is intentionally pure: no I/O, no CLI argparse, no FastAPI dependencies. The two entry points that consume it (the lex-align-server registry compile CLI and lex_align_server.api.v1.registry) both depend on the same validator so a registry YAML accepted by the dashboard is guaranteed to compile in CI, and vice versa.

validate_package_rule

validate_package_rule(name: str, rule: Any) -> None

Public single-package validator. Raises ValidationError on failure.

Used by the dashboard's classify endpoint so the in-memory mutation has the same guarantees as a YAML round-trip.

Source code in src/lex_align_server/registry_schema.py
def validate_package_rule(name: str, rule: Any) -> None:
    """Public single-package validator. Raises ValidationError on failure.

    Used by the dashboard's classify endpoint so the in-memory mutation
    has the same guarantees as a YAML round-trip.
    """
    _validate_package(name, rule)

License checks

licenses

License lookup, normalization, and policy evaluation.

For packages not listed in the registry, the server fetches the license from PyPI, normalizes it to an SPDX-ish token, and checks it against global_policies. Results are cached in Redis keyed by package name.

fetch_license_from_pypi async

fetch_license_from_pypi(package: str, version: Optional[str], pypi_base: str, client: AsyncClient) -> tuple[Optional[str], Optional[str]]

Return (raw_license_string, latest_version) from PyPI, or (None, None) on error.

Source code in src/lex_align_server/licenses.py
async def fetch_license_from_pypi(
    package: str,
    version: Optional[str],
    pypi_base: str,
    client: httpx.AsyncClient,
) -> tuple[Optional[str], Optional[str]]:
    """Return (raw_license_string, latest_version) from PyPI, or (None, None) on error."""
    if version:
        url = PYPI_VERSIONED_URL_TEMPLATE.format(base=pypi_base, package=package, version=version)
    else:
        url = PYPI_URL_TEMPLATE.format(base=pypi_base, package=package)
    try:
        resp = await client.get(url)
    except (httpx.HTTPError, httpx.TimeoutException):
        return None, None
    if resp.status_code != 200:
        return None, None
    try:
        data = resp.json()
    except ValueError:
        return None, None
    raw = _extract_license_from_pypi_json(data)
    info = data.get("info") or {}
    latest = info.get("version")
    return raw, latest if isinstance(latest, str) else None

resolve_license async

resolve_license(package: str, version: Optional[str], cache: JsonCache, cache_ttl: int, pypi_base: str, http_client: AsyncClient) -> tuple[LicenseInfo, Optional[str]]

Resolve (cache-or-fetch) a package's license. Returns (info, latest_version).

Source code in src/lex_align_server/licenses.py
async def resolve_license(
    package: str,
    version: Optional[str],
    cache: JsonCache,
    cache_ttl: int,
    pypi_base: str,
    http_client: httpx.AsyncClient,
) -> tuple[LicenseInfo, Optional[str]]:
    """Resolve (cache-or-fetch) a package's license. Returns (info, latest_version)."""
    cache_key = f"license:{package.lower()}"
    cached = await cache.get(cache_key)
    if cached is not None:
        try:
            info = LicenseInfo.from_dict(cached["info"])
            return info, cached.get("latest_version")
        except (KeyError, TypeError):
            pass

    raw, latest = await fetch_license_from_pypi(package, version, pypi_base, http_client)
    info = LicenseInfo(
        license_raw=raw,
        license_normalized=normalize_license(raw),
    )
    await cache.set(
        cache_key,
        {"info": info.to_dict(), "latest_version": latest},
        cache_ttl,
    )
    return info, latest

CVE checks (OSV)

cve

CVE lookup via OSV (osv.dev).

OSV is the Google-sponsored aggregator that ingests GHSA, NVD, PyPA Advisory DB, and others. The free POST /v1/query endpoint takes a package coordinate and returns the list of vulnerabilities with severity scores. We parse out the highest CVSS base score and let GlobalPolicies.cve_blocks decide.

query_osv async

query_osv(package: str, version: Optional[str], osv_url: str, http_client: AsyncClient) -> Optional[list[dict]]

Return raw vulns list from OSV, or None on error/timeout.

Source code in src/lex_align_server/cve.py
async def query_osv(
    package: str,
    version: Optional[str],
    osv_url: str,
    http_client: httpx.AsyncClient,
) -> Optional[list[dict]]:
    """Return raw `vulns` list from OSV, or None on error/timeout."""
    payload: dict = {
        "package": {"name": package, "ecosystem": "PyPI"},
    }
    if version:
        payload["version"] = version
    try:
        resp = await http_client.post(osv_url, json=payload)
    except (httpx.HTTPError, httpx.TimeoutException) as exc:
        logger.warning("osv: query for %s@%s failed: %s", package, version, exc)
        return None
    if resp.status_code != 200:
        logger.warning("osv: %s returned %s", package, resp.status_code)
        return None
    try:
        data = resp.json()
    except ValueError:
        return None
    vulns = data.get("vulns") or []
    return [v for v in vulns if isinstance(v, dict)]

resolve_cves async

resolve_cves(package: str, version: Optional[str], cache: JsonCache, cache_ttl: int, osv_url: str, http_client: AsyncClient) -> CveInfo

Cache-or-fetch CVE data for a package@version.

A None version is cached separately from versioned queries — OSV's behaviour with no version is documented as "all versions" so we treat it as a distinct cache entry.

Source code in src/lex_align_server/cve.py
async def resolve_cves(
    package: str,
    version: Optional[str],
    cache: JsonCache,
    cache_ttl: int,
    osv_url: str,
    http_client: httpx.AsyncClient,
) -> CveInfo:
    """Cache-or-fetch CVE data for a package@version.

    A None version is cached separately from versioned queries — OSV's
    behaviour with no version is documented as "all versions" so we treat it
    as a distinct cache entry.
    """
    cache_key = f"cve:{package.lower()}@{version or 'latest'}"
    cached = await cache.get(cache_key)
    if cached is not None:
        try:
            return CveInfo.from_dict(cached)
        except Exception:
            pass

    vulns = await query_osv(package, version, osv_url, http_client)
    if vulns is None:
        # Treat OSV outage as "no known CVEs" but do NOT cache the negative
        # result — we want a fresh attempt next time.
        return CveInfo(ids=[], max_score=None, raw_count=0)

    info = _summarize_vulns(vulns)
    await cache.set(cache_key, info.to_dict(), cache_ttl)
    return info

Audit log

audit

SQLite audit log + approval-request store.

Two tables:

  • audit_log — one row per /evaluate call. Backs the legal report (license-driven denials) and security report (CVE-driven denials).
  • approval_requests — one row per /approval-requests POST. Phase 3 will attach a PR-creation workflow; for now we just persist them.

We use plain aiosqlite rather than an ORM. The schema is small and the report queries are easier to read as straight SQL.

AuditStore

AuditStore(db_path: Path)
Source code in src/lex_align_server/audit.py
def __init__(self, db_path: Path):
    self._db_path = db_path

record_cve_alert async

record_cve_alert(*, package: str, cve_ids: list[str], max_cvss: Optional[float], registry_status: Optional[str] = None, project: str = 'lex-align-server', requester: str = 'cve-scanner') -> str

Persist a single CVE_ALERT row from the background scanner.

Reuses the audit_log table so the dashboard, exports, and the client status command see one consistent event stream — the denial_category=DENIAL_CVE_ALERT discriminator keeps these out of the existing CVE-denial rollups.

Source code in src/lex_align_server/audit.py
async def record_cve_alert(
    self,
    *,
    package: str,
    cve_ids: list[str],
    max_cvss: Optional[float],
    registry_status: Optional[str] = None,
    project: str = "lex-align-server",
    requester: str = "cve-scanner",
) -> str:
    """Persist a single CVE_ALERT row from the background scanner.

    Reuses the audit_log table so the dashboard, exports, and the
    client `status` command see one consistent event stream — the
    ``denial_category=DENIAL_CVE_ALERT`` discriminator keeps these
    out of the existing CVE-denial rollups.
    """
    return await self.record_evaluation(AuditRecord(
        project=project,
        requester=requester,
        package=package,
        version=None,
        resolved_version=None,
        verdict=VERDICT_CVE_ALERT,
        denial_category=DENIAL_CVE_ALERT,
        reason=(
            f"Registered package now has CVSS {max_cvss} "
            f"(>= configured denial threshold)."
            if max_cvss is not None else
            "Registered package now has CVE coverage."
        ),
        cve_ids=cve_ids,
        max_cvss=max_cvss,
        registry_status=registry_status,
    ))

mark_approved_by_package async

mark_approved_by_package(normalized_name: str) -> int

Move every PENDING_REVIEW request for normalized_name to APPROVED.

Called when an operator classifies a pending package via the dashboard and adds it to the in-memory registry. Returns the number of rows flipped, which the dashboard can display in its toast.

Source code in src/lex_align_server/audit.py
async def mark_approved_by_package(self, normalized_name: str) -> int:
    """Move every PENDING_REVIEW request for `normalized_name` to APPROVED.

    Called when an operator classifies a pending package via the dashboard
    and adds it to the in-memory registry. Returns the number of rows
    flipped, which the dashboard can display in its toast.
    """
    from .registry import normalize_name as _norm
    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        cur = await db.execute(
            "SELECT id, package FROM approval_requests WHERE status = ?",
            (APPROVAL_PENDING,),
        )
        rows = [dict(r) for r in await cur.fetchall()]
        await cur.close()
        ids = [r["id"] for r in rows if _norm(r["package"]) == normalized_name]
        if not ids:
            return 0
        placeholders = ",".join("?" * len(ids))
        await db.execute(
            f"UPDATE approval_requests SET status = ? WHERE id IN ({placeholders})",
            [APPROVAL_APPROVED, *ids],
        )
        await db.commit()
    return len(ids)

legal_report async

legal_report(project: Optional[str] = None) -> dict[str, Any]

License-compliance posture for the audit log.

Returns the existing total_denials / recent fields plus three rollups the dashboard renders as charts:

  • license_breakdown — every audit row in scope grouped by license and split by verdict, so operators can see how often each license shows up and how the policy classified it.
  • unknown_license — same shape, restricted to rows whose license normalised to UNKNOWN. Surfaces how the unknown_license_policy is performing in production.
  • top_projects — projects ranked by license-driven denials, so operators can tell which repos are pulling the most non-compliant packages.
Source code in src/lex_align_server/audit.py
async def legal_report(self, project: Optional[str] = None) -> dict[str, Any]:
    """License-compliance posture for the audit log.

    Returns the existing ``total_denials`` / ``recent`` fields plus three
    rollups the dashboard renders as charts:

    * ``license_breakdown`` — every audit row in scope grouped by
      ``license`` and split by verdict, so operators can see how often
      each license shows up and how the policy classified it.
    * ``unknown_license`` — same shape, restricted to rows whose license
      normalised to ``UNKNOWN``. Surfaces how the
      ``unknown_license_policy`` is performing in production.
    * ``top_projects`` — projects ranked by license-driven denials, so
      operators can tell which repos are pulling the most non-compliant
      packages.
    """
    base = await self._denial_report(DENIAL_LICENSE, project)
    breakdown, unknown = await self._license_breakdown(project)
    top_projects = await self._top_projects_for_category(DENIAL_LICENSE, project)
    base["license_breakdown"] = breakdown
    base["unknown_license"] = unknown
    base["top_projects"] = top_projects
    return base

security_report async

security_report(project: Optional[str] = None, registry: Optional[Any] = None) -> dict[str, Any]

Vulnerability posture for the audit log.

Adds three rollups on top of the base denial report:

  • severity_distribution — CVE-denied rows bucketed by CVSS severity (critical / high / medium / low / unknown).
  • top_packages — packages with the most CVE-driven denials, carrying their highest-seen CVSS and the CVE ids responsible.
  • top_cves — the CVE identifiers showing up most often.
  • hot_registry_packages — only included when registry is provided. Lists packages currently in the registry as preferred / approved / version-constrained whose recent audit rows include a CVE denial or provisional verdict. This catches the "we said yes, then OSV published a critical" scenario the pre-commit hook is meant to backstop.
Source code in src/lex_align_server/audit.py
async def security_report(
    self,
    project: Optional[str] = None,
    registry: Optional[Any] = None,
) -> dict[str, Any]:
    """Vulnerability posture for the audit log.

    Adds three rollups on top of the base denial report:

    * ``severity_distribution`` — CVE-denied rows bucketed by
      CVSS severity (critical / high / medium / low / unknown).
    * ``top_packages`` — packages with the most CVE-driven denials,
      carrying their highest-seen CVSS and the CVE ids responsible.
    * ``top_cves`` — the CVE identifiers showing up most often.
    * ``hot_registry_packages`` — only included when ``registry`` is
      provided. Lists packages currently in the registry as
      ``preferred`` / ``approved`` / ``version-constrained`` whose
      recent audit rows include a CVE denial or provisional verdict.
      This catches the "we said yes, then OSV published a critical"
      scenario the pre-commit hook is meant to backstop.
    """
    base = await self._denial_report(DENIAL_CVE, project)
    rows = base["recent"]
    base["severity_distribution"] = _bucket_severity(rows)
    base["top_packages"] = _rank_top_packages(rows)
    base["top_cves"] = _rank_top_cves(rows)
    if registry is not None:
        base["hot_registry_packages"] = await self._hot_registry_packages(
            registry, project
        )
    else:
        base["hot_registry_packages"] = []
    base["cve_alerts"] = await self.recent_cve_alerts(project)
    return base

recent_cve_alerts async

recent_cve_alerts(project: Optional[str] = None, limit: int = 100) -> list[dict]

Most recent CVE_ALERT rows written by the background scanner.

Surfaced through security_report so the dashboard and the client's status command see scanner output without any endpoint changes.

Source code in src/lex_align_server/audit.py
async def recent_cve_alerts(
    self, project: Optional[str] = None, limit: int = 100
) -> list[dict]:
    """Most recent CVE_ALERT rows written by the background scanner.

    Surfaced through ``security_report`` so the dashboard and the
    client's ``status`` command see scanner output without any
    endpoint changes.
    """
    clauses = ["denial_category = ?"]
    params: list[Any] = [DENIAL_CVE_ALERT]
    if project:
        clauses.append("project = ?")
        params.append(project)
    where = " WHERE " + " AND ".join(clauses)
    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        cur = await db.execute(
            f"""SELECT id, ts, package, cve_ids, max_cvss, reason,
                      registry_status
               FROM audit_log{where}
               ORDER BY ts DESC
               LIMIT ?""",
            [*params, limit],
        )
        rows = [dict(r) for r in await cur.fetchall()]
        await cur.close()
    for row in rows:
        if row.get("cve_ids"):
            try:
                row["cve_ids"] = json.loads(row["cve_ids"])
            except json.JSONDecodeError:
                row["cve_ids"] = []
        else:
            row["cve_ids"] = []
    return rows

list_pending_by_package async

list_pending_by_package() -> list[dict]

All PENDING_REVIEW approval requests grouped by package.

The dashboard uses this to surface "things developers asked for" as a triage queue. Multiple requests for the same package (across projects/requesters) collapse into a single row carrying the count, the most recent rationale, and the most recent timestamp. Callers are expected to further filter against the live registry.

Source code in src/lex_align_server/audit.py
async def list_pending_by_package(self) -> list[dict]:
    """All PENDING_REVIEW approval requests grouped by package.

    The dashboard uses this to surface "things developers asked for" as a
    triage queue. Multiple requests for the same package (across
    projects/requesters) collapse into a single row carrying the count,
    the most recent rationale, and the most recent timestamp. Callers
    are expected to further filter against the live registry.
    """
    # Imported lazily to avoid a circular import at module load.
    from .registry import normalize_name

    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        cur = await db.execute(
            """SELECT package, rationale, project, requester, ts
               FROM approval_requests
               WHERE status = ?
               ORDER BY ts DESC""",
            (APPROVAL_PENDING,),
        )
        rows = [dict(r) for r in await cur.fetchall()]
        await cur.close()

    grouped: dict[str, dict] = {}
    for r in rows:
        key = normalize_name(r["package"])
        entry = grouped.get(key)
        if entry is None:
            grouped[key] = {
                "package": r["package"],
                "normalized_name": key,
                "latest_rationale": r["rationale"],
                "latest_ts": r["ts"],
                "latest_project": r["project"],
                "latest_requester": r["requester"],
                "request_count": 1,
            }
        else:
            entry["request_count"] += 1
            # Rows are ordered DESC by ts so the first one wins for "latest".
    return list(grouped.values())

list_implicit_candidates async

list_implicit_candidates(*, window_days: int = 30) -> list[dict]

Packages that show up in audit_log but never produced an explicit request-approval row.

This catches the cases where an agent / hook only ever called check: the verdict's recorded but no one is going to file an approval — yet the operator still wants to triage.

Each row is annotated with a reason field explaining why it's surfacing:

provisional-no-rationale — the package got PROVISIONALLY_ALLOWED and the agent didn't follow up with request-approval. Highest signal. repeatedly-denied — the package was DENIED repeatedly, so an explicit registry rule (banned, version-pinned, replaced) would save downstream agents from rediscovering the wall. pre-screened — only check calls, all ALLOWED. Lowest signal, but useful for "things lots of agents poke at."

Callers (the dashboard) further filter against the live registry so packages that just got merged stop appearing immediately.

Source code in src/lex_align_server/audit.py
async def list_implicit_candidates(
    self, *, window_days: int = 30
) -> list[dict]:
    """Packages that show up in `audit_log` but never produced an
    explicit `request-approval` row.

    This catches the cases where an agent / hook only ever called
    `check`: the verdict's recorded but no one is going to file an
    approval — yet the operator still wants to triage.

    Each row is annotated with a ``reason`` field explaining why
    it's surfacing:

      ``provisional-no-rationale`` — the package got
          ``PROVISIONALLY_ALLOWED`` and the agent didn't follow up
          with ``request-approval``. Highest signal.
      ``repeatedly-denied`` — the package was DENIED repeatedly, so
          an explicit registry rule (banned, version-pinned, replaced)
          would save downstream agents from rediscovering the wall.
      ``pre-screened`` — only `check` calls, all ALLOWED. Lowest
          signal, but useful for "things lots of agents poke at."

    Callers (the dashboard) further filter against the live registry
    so packages that just got merged stop appearing immediately.
    """
    from .registry import normalize_name

    # Pull all audit rows in the window, plus all approval_requests rows
    # ever (so we can subtract the explicit ones).
    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        cur = await db.execute(
            f"""SELECT package, verdict, denial_category, ts, project,
                      agent_model, agent_version
               FROM audit_log
               WHERE ts >= datetime('now', '-{int(window_days)} days')
               ORDER BY ts DESC""",
        )
        audit_rows = [dict(r) for r in await cur.fetchall()]
        await cur.close()

        cur2 = await db.execute(
            "SELECT DISTINCT package FROM approval_requests",
        )
        explicit = {
            normalize_name(dict(r)["package"])
            for r in await cur2.fetchall()
        }
        await cur2.close()

    grouped: dict[str, dict] = {}
    for r in audit_rows:
        key = normalize_name(r["package"])
        if key in explicit:
            # Explicit `request-approval` exists → already in pending panel.
            continue
        entry = grouped.setdefault(key, {
            "package": r["package"],
            "normalized_name": key,
            "evaluations": 0,
            "denials": 0,
            "provisional": 0,
            "latest_ts": r["ts"],
            "latest_project": r["project"],
            "latest_agent_model": r.get("agent_model"),
            "latest_agent_version": r.get("agent_version"),
            "projects": set(),
        })
        entry["evaluations"] += 1
        if r["verdict"] == VERDICT_DENIED:
            entry["denials"] += 1
        elif r["verdict"] == VERDICT_PROVISIONALLY_ALLOWED:
            entry["provisional"] += 1
        entry["projects"].add(r["project"])

    out: list[dict] = []
    for key, entry in grouped.items():
        entry["project_count"] = len(entry.pop("projects"))
        entry["reason"], entry["reason_detail"] = _classify_implicit(entry)
        out.append(entry)

    # Rank by signal strength. provisional > denied > pre-screened, then
    # by frequency × recency proxy (just frequency for now).
    rank_order = {
        "provisional-no-rationale": 0,
        "repeatedly-denied": 1,
        "pre-screened": 2,
    }
    out.sort(key=lambda r: (
        rank_order.get(r["reason"], 99), -r["evaluations"],
    ))
    return out

agents_report async

agents_report(project: Optional[str] = None) -> dict[str, Any]

Aggregate evaluations by (agent_model, agent_version).

Powers the "agents" dashboard, so operators can see exactly which Claude (or other agent) version is making which kinds of requests. Rows where the agent is unknown collapse into one bucket.

Source code in src/lex_align_server/audit.py
async def agents_report(self, project: Optional[str] = None) -> dict[str, Any]:
    """Aggregate evaluations by (agent_model, agent_version).

    Powers the "agents" dashboard, so operators can see exactly which
    Claude (or other agent) version is making which kinds of requests.
    Rows where the agent is unknown collapse into one bucket.
    """
    where = ""
    params: list[Any] = []
    if project:
        where = " WHERE project = ?"
        params.append(project)

    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        agg_cur = await db.execute(
            f"""SELECT agent_model, agent_version,
                      COUNT(*) AS evaluations,
                      SUM(CASE WHEN verdict = ? THEN 1 ELSE 0 END) AS denials,
                      SUM(CASE WHEN verdict = ? THEN 1 ELSE 0 END) AS provisional,
                      MAX(ts) AS last_seen
               FROM audit_log{where}
               GROUP BY agent_model, agent_version
               ORDER BY evaluations DESC""",
            [VERDICT_DENIED, VERDICT_PROVISIONALLY_ALLOWED, *params],
        )
        agents = [dict(r) for r in await agg_cur.fetchall()]
        await agg_cur.close()

        recent_cur = await db.execute(
            f"""SELECT id, ts, project, requester, package, version, verdict,
                      denial_category, reason, agent_model, agent_version
               FROM audit_log{where}
               ORDER BY ts DESC LIMIT 50""",
            params,
        )
        recent = [dict(r) for r in await recent_cur.fetchall()]
        await recent_cur.close()
    return {
        "project": project,
        "agents": agents,
        "recent": recent,
    }

Caching

cache

Redis-backed JSON cache.

Used by the license and CVE adapters to avoid hammering PyPI/OSV. We intentionally swallow connection errors and degrade to "no cache" so the server keeps serving when Redis is unreachable — the audit log still records every decision.

Authentication

auth

Auth dependencies.

Identity resolution lives in the pluggable authenticator on app.state.lex.authenticator (see authn/). The functions here are thin FastAPI Depends shims so endpoints can stay declarative:

requester: str = Depends(get_requester)
identity: Identity = Depends(get_identity)
agent: AgentInfo = Depends(get_agent_info)

Single-user mode ships an AnonymousAuthenticator so the same code path works without any auth setup; org mode swaps in whatever the operator configured via AUTH_BACKEND.

The agent-identity headers (X-LexAlign-Agent-Model / -Version) are tag metadata, not authentication — they carry the model that originated the request (e.g. opus 4.7) and feed the agent-activity dashboards. The Authenticator answers "who is calling" (the human/principal); get_agent_info answers "which agent did the calling for them."

AgentInfo dataclass

AgentInfo(model: Optional[str] = None, version: Optional[str] = None)

Agent identity reported by the client.

Both fields are optional; when the client doesn't send the headers, both are None and reports group those rows under an "(unknown agent)" bucket.

get_identity async

get_identity(request: Request) -> Identity

Resolve the requester via the per-app authenticator.

Endpoints depend on this when they need email/groups (e.g. for future per-team authorization). For the common "I just need a string for the audit log" case, depend on :func:get_requester instead.

Source code in src/lex_align_server/auth.py
async def get_identity(request: Request) -> Identity:
    """Resolve the requester via the per-app authenticator.

    Endpoints depend on this when they need email/groups (e.g. for
    future per-team authorization). For the common "I just need a
    string for the audit log" case, depend on :func:`get_requester`
    instead.
    """
    return await request.app.state.lex.authenticator.authenticate(request)

get_requester async

get_requester(identity: Identity = Depends(get_identity)) -> str

The principal id, suitable for audit_log.requester.

Source code in src/lex_align_server/auth.py
async def get_requester(identity: Identity = Depends(get_identity)) -> str:
    """The principal id, suitable for ``audit_log.requester``."""
    return identity.id