Skip to content

API Reference

Docstrings on this page are pulled directly from the source under src/ by mkdocstrings. Edit the docstring, rebuild, and the rendered page updates.


lex_align_client

The thin client used by developers and AI agents. Provides the lex-align-client CLI as well as the Claude Code and git pre-commit hooks.

lex_align_client

api

HTTP client for the lex-align server.

Synchronous; the CLI is one-shot, so the simplicity of httpx.Client is worth more than the throughput of an async client. Failure semantics:

  • connection error and fail_open=true → return a synthetic ALLOWED verdict with transport_error=True so the caller can warn.
  • connection error and fail_open=false → raise ServerUnreachable.
  • server 4xx/5xx → raise ServerError(detail).

claude_hooks

Claude Code session hooks (Advisor surface).

SessionStart and PreToolUse proxy through to the server's /evaluate so the agent sees blocked / provisionally-allowed verdicts during planning, before the pre-commit gate fires.

handle_pre_tool_use

handle_pre_tool_use(event: dict, project_root: Path, config: ClientConfig) -> Optional[tuple[str, str | None]]

Return (decision, message) for pyproject.toml edits, else None.

Source code in src/lex_align_client/claude_hooks.py
def handle_pre_tool_use(
    event: dict, project_root: Path, config: ClientConfig
) -> Optional[tuple[str, str | None]]:
    """Return (decision, message) for pyproject.toml edits, else None."""
    tool_name = event.get("tool_name", "")
    tool_input = event.get("tool_input", {}) or {}
    path_str = tool_input.get("path", "") or tool_input.get("file_path", "")
    if "pyproject.toml" not in path_str:
        return None
    if tool_name not in ("Edit", "Write", "MultiEdit"):
        return None

    pyproject_path = (
        project_root / path_str if not Path(path_str).is_absolute() else Path(path_str)
    )
    if not pyproject_path.exists():
        return None

    current = pyproject_path.read_text()
    proposed = apply_edit(current, tool_name, tool_input)
    added, removed = diff_deps(current, proposed)
    if not added and not removed:
        return None

    header = ["You are modifying runtime dependencies:"]
    for name, spec in sorted(added.items()):
        header.append(f"  + {spec}")
    for name in sorted(removed):
        header.append(f"  - {name}")

    model, version = _detect_agent(event)
    blocks: list[str] = []
    notes: list[str] = []
    try:
        with LexAlignClient(
            config, agent_model=model, agent_version=version
        ) as client:
            for name, spec in sorted(added.items()):
                version = extract_pinned_version(spec)
                v = client.check(name, version)
                if v.denied:
                    blocks.append(_format_verdict(v, spec))
                elif v.verdict == "PROVISIONALLY_ALLOWED":
                    suffix = " (run `lex-align-client request-approval` after this lands)" if v.is_requestable else ""
                    notes.append(f"  ◎ {name} — provisional: {v.reason}{suffix}")
                elif v.needs_rationale:
                    notes.append(
                        f"  • {name} — allowed, but registry-approved (neutral); "
                        "document the architectural need in your commit message or PR."
                    )
                else:
                    notes.append(f"  ✓ {name}{v.reason}")
    except (ServerUnreachable, ServerError) as exc:
        if config.fail_open:
            return ("allow", "\n".join(header + ["", f"[lex-align] {exc} — fail_open=true; allowing edit."]))
        return ("block", f"[lex-align] cannot reach server: {exc}")

    if blocks:
        msg = "\n".join(header + ["", "ENFORCEMENT — blocked by registry:"] + blocks +
                        ["", "No dependencies were modified. Adjust the edit and retry."])
        return ("block", msg)
    if notes:
        return ("allow", "\n".join(header + [""] + notes))
    return ("allow", "\n".join(header))

claudemd

CLAUDE.md integration: write lex-align usage instructions into the project's CLAUDE.md.

install_claude_md

install_claude_md(project_root: Path) -> tuple[Path, bool]

Create CLAUDE.md or append the lex-align section if not already present.

Returns (path, created_or_updated). Returns False for the second element when the section was already present and no write was performed.

Source code in src/lex_align_client/claudemd.py
def install_claude_md(project_root: Path) -> tuple[Path, bool]:
    """Create CLAUDE.md or append the lex-align section if not already present.

    Returns (path, created_or_updated). Returns False for the second element
    when the section was already present and no write was performed.
    """
    path = project_root / "CLAUDE.md"
    if path.exists():
        existing = path.read_text(encoding="utf-8")
        if _SECTION_HEADER in existing:
            return path, False
        path.write_text(existing.rstrip("\n") + "\n\n" + _SECTION + "\n", encoding="utf-8")
    else:
        path.write_text(_SECTION + "\n", encoding="utf-8")
    return path, True

cli

lex-align-client CLI: init, check, request-approval, hook, precommit.

main

main() -> None

lex-align client — talks to the lex-align server.

Source code in src/lex_align_client/cli.py
@click.group()
def main() -> None:
    """lex-align client — talks to the lex-align server."""

init

init(yes: bool, server_url: str | None, project_name: str | None, mode: str | None, no_claude_hooks: bool, no_precommit: bool, no_claude_md: bool) -> None

One-time setup: write .lexalign.toml and install hooks.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--yes", "-y", is_flag=True, help="Accept defaults without prompting.")
@click.option("--server-url", default=None, help="Override server URL.")
@click.option("--project", "project_name", default=None, help="Override project name.")
@click.option("--mode", type=click.Choice(["single-user", "org"]), default=None)
@click.option("--no-claude-hooks", is_flag=True, help="Skip Claude Code hook install.")
@click.option("--no-precommit", is_flag=True, help="Skip git pre-commit install.")
@click.option("--no-claude-md", is_flag=True, help="Skip CLAUDE.md creation/update.")
def init(
    yes: bool,
    server_url: str | None,
    project_name: str | None,
    mode: str | None,
    no_claude_hooks: bool,
    no_precommit: bool,
    no_claude_md: bool,
) -> None:
    """One-time setup: write .lexalign.toml and install hooks."""
    project_root = Path.cwd()

    if config_path(project_root).exists() and not yes:
        if not click.confirm(
            f"{CONFIG_FILENAME} already exists. Overwrite?", default=False
        ):
            raise click.Abort()

    autodetected = detect_project_name(project_root / "pyproject.toml", project_root.name)
    if project_name is None:
        project_name = autodetected if yes else click.prompt(
            "Project name", default=autodetected
        )
    if server_url is None:
        server_url = "http://127.0.0.1:8765" if yes else click.prompt(
            "Server URL", default="http://127.0.0.1:8765"
        )
    if mode is None:
        mode = "single-user" if yes else click.prompt(
            "Mode", type=click.Choice(["single-user", "org"]), default="single-user"
        )

    config = ClientConfig(
        project=project_name,
        server_url=server_url,
        mode=mode,
        fail_open=(mode == "single-user"),
    )
    path = save_config(project_root, config)
    click.echo(f"Wrote {path}")

    if not no_claude_hooks:
        install_claude_hooks(project_root)
        click.echo("Installed Claude Code hooks in .claude/settings.json.")

    if not no_claude_md:
        md_existed = (project_root / "CLAUDE.md").exists()
        md_path, changed = install_claude_md(project_root)
        if changed:
            action = "Updated" if md_existed else "Created"
            click.echo(f"{action} {md_path.name} with lex-align usage instructions.")
        else:
            click.echo(f"{md_path.name} already contains lex-align section; skipped.")

    if not no_precommit:
        hook_path = install_precommit(project_root)
        if hook_path:
            click.echo(f"Installed git pre-commit hook at {hook_path}.")
        else:
            click.echo(
                "Skipped pre-commit install: this directory is not a git repo "
                "(run `git init` then `lex-align-client init` again)."
            )

    if mode == "org":
        click.echo("")
        click.echo(
            f"Organization mode: export your API key as ${config.api_key_env_var} "
            "before running checks."
        )

check

check(package: str, version: str | None, as_json: bool, agent_model: str | None, agent_version: str | None) -> None

Check a package against the server policy.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--package", required=True)
@click.option("--version", default=None)
@click.option("--json", "as_json", is_flag=True, default=True, help="Emit JSON (default).")
@click.option(
    "--agent-model",
    default=None,
    help="Override agent model tag (default: $LEXALIGN_AGENT_MODEL).",
)
@click.option(
    "--agent-version",
    default=None,
    help="Override agent version tag (default: $LEXALIGN_AGENT_VERSION).",
)
def check(
    package: str,
    version: str | None,
    as_json: bool,
    agent_model: str | None,
    agent_version: str | None,
) -> None:
    """Check a package against the server policy."""
    project_root = find_project_root()
    config = _require_config(project_root)
    try:
        with LexAlignClient(
            config, agent_model=agent_model, agent_version=agent_version
        ) as client:
            verdict = client.check(package, version)
    except ServerUnreachable as exc:
        raise click.ClickException(f"Server unreachable: {exc}")
    except ServerError as exc:
        raise click.ClickException(f"Server error: {exc}")
    click.echo(json.dumps(verdict.to_dict(), indent=2))
    if verdict.denied:
        sys.exit(2)

request_approval

request_approval(package: str, rationale: str, agent_model: str | None, agent_version: str | None) -> None

Submit a non-blocking request to add package to the registry.

Source code in src/lex_align_client/cli.py
@main.command("request-approval")
@click.option("--package", required=True)
@click.option("--rationale", required=True)
@click.option(
    "--agent-model",
    default=None,
    help="Override agent model tag (default: $LEXALIGN_AGENT_MODEL).",
)
@click.option(
    "--agent-version",
    default=None,
    help="Override agent version tag (default: $LEXALIGN_AGENT_VERSION).",
)
def request_approval(
    package: str,
    rationale: str,
    agent_model: str | None,
    agent_version: str | None,
) -> None:
    """Submit a non-blocking request to add `package` to the registry."""
    project_root = find_project_root()
    config = _require_config(project_root)
    try:
        with LexAlignClient(
            config, agent_model=agent_model, agent_version=agent_version
        ) as client:
            response = client.request_approval(package, rationale)
    except ServerUnreachable as exc:
        raise click.ClickException(f"Server unreachable: {exc}")
    except ServerError as exc:
        raise click.ClickException(f"Server error: {exc}")
    click.echo(json.dumps(response, indent=2))

precommit

precommit() -> None

Pre-commit hook entry point. Exits non-zero on any DENIED dependency.

Source code in src/lex_align_client/cli.py
@main.command()
def precommit() -> None:
    """Pre-commit hook entry point. Exits non-zero on any DENIED dependency."""
    sys.exit(run_precommit())

uninstall

uninstall(yes: bool) -> None

Remove .claude hooks and the git pre-commit shim. Leaves .lexalign.toml.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--yes", "-y", is_flag=True)
def uninstall(yes: bool) -> None:
    """Remove .claude hooks and the git pre-commit shim. Leaves .lexalign.toml."""
    project_root = find_project_root()
    if not yes and not click.confirm(
        "Remove lex-align hooks (Claude + git pre-commit)?", default=False
    ):
        raise click.Abort()
    remove_claude_hooks(project_root)
    remove_precommit(project_root)
    click.echo("Removed Claude hooks and git pre-commit shim.")
    click.echo(f"{CONFIG_FILENAME} preserved.")

config

.lexalign.toml reader/writer.

Schema:

project = "lex-align" server_url = "http://127.0.0.1:8765" mode = "single-user" # or "org" fail_open = true # ignored unless server is unreachable api_key_env_var = "LEXALIGN_API_KEY" # only used when mode = "org"

find_project_root

find_project_root(start: Optional[Path] = None) -> Path

Walk up looking for .lexalign.toml. Falls back to cwd.

Source code in src/lex_align_client/config.py
def find_project_root(start: Optional[Path] = None) -> Path:
    """Walk up looking for `.lexalign.toml`. Falls back to cwd."""
    cwd = (start or Path.cwd()).resolve()
    for parent in [cwd] + list(cwd.parents):
        if (parent / CONFIG_FILENAME).exists():
            return parent
    return cwd

precommit

Git pre-commit hook entry point.

Behavior
  1. Read the staged contents of pyproject.toml (if it's in the index).
  2. Re-evaluate every runtime dependency against the server. Reading existing deps catches new CVEs published since the last commit.
  3. If any verdict is DENIED, exit non-zero with a structured stderr message that an AI agent can parse to self-correct.

pyproject_utils

Helpers for parsing and diffing pyproject.toml dependencies.

These are the only pieces of the v1.x reconciler that survive the rewrite — the client uses them to drive the pre-commit hook and the Claude Code edit hook.

get_runtime_deps

get_runtime_deps(pyproject_path: Path) -> dict[str, str]

Return {normalized_name: raw_spec} from [project].dependencies.

Source code in src/lex_align_client/pyproject_utils.py
def get_runtime_deps(pyproject_path: Path) -> dict[str, str]:
    """Return {normalized_name: raw_spec} from [project].dependencies."""
    if not pyproject_path.exists():
        return {}
    with open(pyproject_path, "rb") as f:
        data = tomllib.load(f)
    deps = data.get("project", {}).get("dependencies", []) or []
    return {normalize_name(d): d.strip() for d in deps}

diff_deps

diff_deps(old_content: str, new_content: str) -> tuple[dict[str, str], set[str]]

Return ({added_name: raw_spec}, {removed_names}).

Source code in src/lex_align_client/pyproject_utils.py
def diff_deps(old_content: str, new_content: str) -> tuple[dict[str, str], set[str]]:
    """Return ({added_name: raw_spec}, {removed_names})."""
    old = parse_deps_from_content(old_content)
    new = parse_deps_from_content(new_content)
    added = {name: spec for name, spec in new.items() if name not in old}
    removed = set(old) - set(new)
    return added, removed

extract_pinned_version

extract_pinned_version(spec: str) -> Optional[str]

Pull a concrete version out of a spec like 'redis>=5.0' or 'httpx==0.28.1'.

Source code in src/lex_align_client/pyproject_utils.py
def extract_pinned_version(spec: str) -> Optional[str]:
    """Pull a concrete version out of a spec like 'redis>=5.0' or 'httpx==0.28.1'."""
    m = re.search(r"(?:>=|<=|!=|~=|==|>|<)\s*([0-9][0-9a-zA-Z\.\-\+\_]*)", spec)
    return m.group(1) if m else None

apply_edit

apply_edit(current_content: str, tool_name: str, tool_input: dict) -> str

Simulate how a Claude Code Edit/Write/MultiEdit affects file content.

Source code in src/lex_align_client/pyproject_utils.py
def apply_edit(current_content: str, tool_name: str, tool_input: dict) -> str:
    """Simulate how a Claude Code Edit/Write/MultiEdit affects file content."""
    if tool_name == "Write":
        return tool_input.get("content", "")
    if tool_name == "Edit":
        old_str = tool_input.get("old_string", "")
        new_str = tool_input.get("new_string", "")
        return current_content.replace(old_str, new_str, 1)
    if tool_name == "MultiEdit":
        content = current_content
        for edit in tool_input.get("edits", []):
            content = content.replace(edit.get("old_string", ""), edit.get("new_string", ""), 1)
        return content
    return current_content

detect_project_name

detect_project_name(pyproject_path: Path, fallback: str) -> str

Best-effort autodetect for lex-align-client init.

Prefer [project].name from pyproject.toml; fall back to the directory name.

Source code in src/lex_align_client/pyproject_utils.py
def detect_project_name(pyproject_path: Path, fallback: str) -> str:
    """Best-effort autodetect for `lex-align-client init`.

    Prefer [project].name from pyproject.toml; fall back to the directory name.
    """
    if pyproject_path.exists():
        try:
            with open(pyproject_path, "rb") as f:
                data = tomllib.load(f)
            name = data.get("project", {}).get("name")
            if isinstance(name, str) and name.strip():
                return name.strip()
        except Exception:
            pass
    return fallback

settings

Idempotent installers for the Claude Code hooks and the git pre-commit hook.

install_precommit

install_precommit(project_root: Path) -> Path | None

Install or augment .git/hooks/pre-commit. Returns the path if written.

Source code in src/lex_align_client/settings.py
def install_precommit(project_root: Path) -> Path | None:
    """Install or augment `.git/hooks/pre-commit`. Returns the path if written."""
    git_dir = project_root / ".git"
    if not git_dir.exists():
        return None
    hook_path = _precommit_path(project_root)
    hook_path.parent.mkdir(parents=True, exist_ok=True)

    snippet = (
        f"\n{_PRECOMMIT_MARKER}\n"
        "if command -v lex-align-client >/dev/null 2>&1; then\n"
        "  lex-align-client precommit || exit $?\n"
        "elif command -v uv >/dev/null 2>&1; then\n"
        "  uv run lex-align-client precommit || exit $?\n"
        "fi\n"
    )

    if hook_path.exists():
        existing = hook_path.read_text()
        if _PRECOMMIT_MARKER in existing:
            return hook_path
        new = existing.rstrip() + "\n" + snippet
        hook_path.write_text(new)
    else:
        hook_path.write_text("#!/usr/bin/env bash\nset -e\n" + snippet)
    try:
        hook_path.chmod(hook_path.stat().st_mode | 0o111)
    except OSError:
        pass
    return hook_path

CLI

cli

lex-align-client CLI: init, check, request-approval, hook, precommit.

main

main() -> None

lex-align client — talks to the lex-align server.

Source code in src/lex_align_client/cli.py
@click.group()
def main() -> None:
    """lex-align client — talks to the lex-align server."""

init

init(yes: bool, server_url: str | None, project_name: str | None, mode: str | None, no_claude_hooks: bool, no_precommit: bool, no_claude_md: bool) -> None

One-time setup: write .lexalign.toml and install hooks.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--yes", "-y", is_flag=True, help="Accept defaults without prompting.")
@click.option("--server-url", default=None, help="Override server URL.")
@click.option("--project", "project_name", default=None, help="Override project name.")
@click.option("--mode", type=click.Choice(["single-user", "org"]), default=None)
@click.option("--no-claude-hooks", is_flag=True, help="Skip Claude Code hook install.")
@click.option("--no-precommit", is_flag=True, help="Skip git pre-commit install.")
@click.option("--no-claude-md", is_flag=True, help="Skip CLAUDE.md creation/update.")
def init(
    yes: bool,
    server_url: str | None,
    project_name: str | None,
    mode: str | None,
    no_claude_hooks: bool,
    no_precommit: bool,
    no_claude_md: bool,
) -> None:
    """One-time setup: write .lexalign.toml and install hooks."""
    project_root = Path.cwd()

    if config_path(project_root).exists() and not yes:
        if not click.confirm(
            f"{CONFIG_FILENAME} already exists. Overwrite?", default=False
        ):
            raise click.Abort()

    autodetected = detect_project_name(project_root / "pyproject.toml", project_root.name)
    if project_name is None:
        project_name = autodetected if yes else click.prompt(
            "Project name", default=autodetected
        )
    if server_url is None:
        server_url = "http://127.0.0.1:8765" if yes else click.prompt(
            "Server URL", default="http://127.0.0.1:8765"
        )
    if mode is None:
        mode = "single-user" if yes else click.prompt(
            "Mode", type=click.Choice(["single-user", "org"]), default="single-user"
        )

    config = ClientConfig(
        project=project_name,
        server_url=server_url,
        mode=mode,
        fail_open=(mode == "single-user"),
    )
    path = save_config(project_root, config)
    click.echo(f"Wrote {path}")

    if not no_claude_hooks:
        install_claude_hooks(project_root)
        click.echo("Installed Claude Code hooks in .claude/settings.json.")

    if not no_claude_md:
        md_existed = (project_root / "CLAUDE.md").exists()
        md_path, changed = install_claude_md(project_root)
        if changed:
            action = "Updated" if md_existed else "Created"
            click.echo(f"{action} {md_path.name} with lex-align usage instructions.")
        else:
            click.echo(f"{md_path.name} already contains lex-align section; skipped.")

    if not no_precommit:
        hook_path = install_precommit(project_root)
        if hook_path:
            click.echo(f"Installed git pre-commit hook at {hook_path}.")
        else:
            click.echo(
                "Skipped pre-commit install: this directory is not a git repo "
                "(run `git init` then `lex-align-client init` again)."
            )

    if mode == "org":
        click.echo("")
        click.echo(
            f"Organization mode: export your API key as ${config.api_key_env_var} "
            "before running checks."
        )

check

check(package: str, version: str | None, as_json: bool, agent_model: str | None, agent_version: str | None) -> None

Check a package against the server policy.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--package", required=True)
@click.option("--version", default=None)
@click.option("--json", "as_json", is_flag=True, default=True, help="Emit JSON (default).")
@click.option(
    "--agent-model",
    default=None,
    help="Override agent model tag (default: $LEXALIGN_AGENT_MODEL).",
)
@click.option(
    "--agent-version",
    default=None,
    help="Override agent version tag (default: $LEXALIGN_AGENT_VERSION).",
)
def check(
    package: str,
    version: str | None,
    as_json: bool,
    agent_model: str | None,
    agent_version: str | None,
) -> None:
    """Check a package against the server policy."""
    project_root = find_project_root()
    config = _require_config(project_root)
    try:
        with LexAlignClient(
            config, agent_model=agent_model, agent_version=agent_version
        ) as client:
            verdict = client.check(package, version)
    except ServerUnreachable as exc:
        raise click.ClickException(f"Server unreachable: {exc}")
    except ServerError as exc:
        raise click.ClickException(f"Server error: {exc}")
    click.echo(json.dumps(verdict.to_dict(), indent=2))
    if verdict.denied:
        sys.exit(2)

request_approval

request_approval(package: str, rationale: str, agent_model: str | None, agent_version: str | None) -> None

Submit a non-blocking request to add package to the registry.

Source code in src/lex_align_client/cli.py
@main.command("request-approval")
@click.option("--package", required=True)
@click.option("--rationale", required=True)
@click.option(
    "--agent-model",
    default=None,
    help="Override agent model tag (default: $LEXALIGN_AGENT_MODEL).",
)
@click.option(
    "--agent-version",
    default=None,
    help="Override agent version tag (default: $LEXALIGN_AGENT_VERSION).",
)
def request_approval(
    package: str,
    rationale: str,
    agent_model: str | None,
    agent_version: str | None,
) -> None:
    """Submit a non-blocking request to add `package` to the registry."""
    project_root = find_project_root()
    config = _require_config(project_root)
    try:
        with LexAlignClient(
            config, agent_model=agent_model, agent_version=agent_version
        ) as client:
            response = client.request_approval(package, rationale)
    except ServerUnreachable as exc:
        raise click.ClickException(f"Server unreachable: {exc}")
    except ServerError as exc:
        raise click.ClickException(f"Server error: {exc}")
    click.echo(json.dumps(response, indent=2))

precommit

precommit() -> None

Pre-commit hook entry point. Exits non-zero on any DENIED dependency.

Source code in src/lex_align_client/cli.py
@main.command()
def precommit() -> None:
    """Pre-commit hook entry point. Exits non-zero on any DENIED dependency."""
    sys.exit(run_precommit())

uninstall

uninstall(yes: bool) -> None

Remove .claude hooks and the git pre-commit shim. Leaves .lexalign.toml.

Source code in src/lex_align_client/cli.py
@main.command()
@click.option("--yes", "-y", is_flag=True)
def uninstall(yes: bool) -> None:
    """Remove .claude hooks and the git pre-commit shim. Leaves .lexalign.toml."""
    project_root = find_project_root()
    if not yes and not click.confirm(
        "Remove lex-align hooks (Claude + git pre-commit)?", default=False
    ):
        raise click.Abort()
    remove_claude_hooks(project_root)
    remove_precommit(project_root)
    click.echo("Removed Claude hooks and git pre-commit shim.")
    click.echo(f"{CONFIG_FILENAME} preserved.")

Server API client

api

HTTP client for the lex-align server.

Synchronous; the CLI is one-shot, so the simplicity of httpx.Client is worth more than the throughput of an async client. Failure semantics:

  • connection error and fail_open=true → return a synthetic ALLOWED verdict with transport_error=True so the caller can warn.
  • connection error and fail_open=false → raise ServerUnreachable.
  • server 4xx/5xx → raise ServerError(detail).

Configuration

config

.lexalign.toml reader/writer.

Schema:

project = "lex-align" server_url = "http://127.0.0.1:8765" mode = "single-user" # or "org" fail_open = true # ignored unless server is unreachable api_key_env_var = "LEXALIGN_API_KEY" # only used when mode = "org"

find_project_root

find_project_root(start: Optional[Path] = None) -> Path

Walk up looking for .lexalign.toml. Falls back to cwd.

Source code in src/lex_align_client/config.py
def find_project_root(start: Optional[Path] = None) -> Path:
    """Walk up looking for `.lexalign.toml`. Falls back to cwd."""
    cwd = (start or Path.cwd()).resolve()
    for parent in [cwd] + list(cwd.parents):
        if (parent / CONFIG_FILENAME).exists():
            return parent
    return cwd

Settings

settings

Idempotent installers for the Claude Code hooks and the git pre-commit hook.

install_precommit

install_precommit(project_root: Path) -> Path | None

Install or augment .git/hooks/pre-commit. Returns the path if written.

Source code in src/lex_align_client/settings.py
def install_precommit(project_root: Path) -> Path | None:
    """Install or augment `.git/hooks/pre-commit`. Returns the path if written."""
    git_dir = project_root / ".git"
    if not git_dir.exists():
        return None
    hook_path = _precommit_path(project_root)
    hook_path.parent.mkdir(parents=True, exist_ok=True)

    snippet = (
        f"\n{_PRECOMMIT_MARKER}\n"
        "if command -v lex-align-client >/dev/null 2>&1; then\n"
        "  lex-align-client precommit || exit $?\n"
        "elif command -v uv >/dev/null 2>&1; then\n"
        "  uv run lex-align-client precommit || exit $?\n"
        "fi\n"
    )

    if hook_path.exists():
        existing = hook_path.read_text()
        if _PRECOMMIT_MARKER in existing:
            return hook_path
        new = existing.rstrip() + "\n" + snippet
        hook_path.write_text(new)
    else:
        hook_path.write_text("#!/usr/bin/env bash\nset -e\n" + snippet)
    try:
        hook_path.chmod(hook_path.stat().st_mode | 0o111)
    except OSError:
        pass
    return hook_path

pyproject.toml utilities

pyproject_utils

Helpers for parsing and diffing pyproject.toml dependencies.

These are the only pieces of the v1.x reconciler that survive the rewrite — the client uses them to drive the pre-commit hook and the Claude Code edit hook.

get_runtime_deps

get_runtime_deps(pyproject_path: Path) -> dict[str, str]

Return {normalized_name: raw_spec} from [project].dependencies.

Source code in src/lex_align_client/pyproject_utils.py
def get_runtime_deps(pyproject_path: Path) -> dict[str, str]:
    """Return {normalized_name: raw_spec} from [project].dependencies."""
    if not pyproject_path.exists():
        return {}
    with open(pyproject_path, "rb") as f:
        data = tomllib.load(f)
    deps = data.get("project", {}).get("dependencies", []) or []
    return {normalize_name(d): d.strip() for d in deps}

diff_deps

diff_deps(old_content: str, new_content: str) -> tuple[dict[str, str], set[str]]

Return ({added_name: raw_spec}, {removed_names}).

Source code in src/lex_align_client/pyproject_utils.py
def diff_deps(old_content: str, new_content: str) -> tuple[dict[str, str], set[str]]:
    """Return ({added_name: raw_spec}, {removed_names})."""
    old = parse_deps_from_content(old_content)
    new = parse_deps_from_content(new_content)
    added = {name: spec for name, spec in new.items() if name not in old}
    removed = set(old) - set(new)
    return added, removed

extract_pinned_version

extract_pinned_version(spec: str) -> Optional[str]

Pull a concrete version out of a spec like 'redis>=5.0' or 'httpx==0.28.1'.

Source code in src/lex_align_client/pyproject_utils.py
def extract_pinned_version(spec: str) -> Optional[str]:
    """Pull a concrete version out of a spec like 'redis>=5.0' or 'httpx==0.28.1'."""
    m = re.search(r"(?:>=|<=|!=|~=|==|>|<)\s*([0-9][0-9a-zA-Z\.\-\+\_]*)", spec)
    return m.group(1) if m else None

apply_edit

apply_edit(current_content: str, tool_name: str, tool_input: dict) -> str

Simulate how a Claude Code Edit/Write/MultiEdit affects file content.

Source code in src/lex_align_client/pyproject_utils.py
def apply_edit(current_content: str, tool_name: str, tool_input: dict) -> str:
    """Simulate how a Claude Code Edit/Write/MultiEdit affects file content."""
    if tool_name == "Write":
        return tool_input.get("content", "")
    if tool_name == "Edit":
        old_str = tool_input.get("old_string", "")
        new_str = tool_input.get("new_string", "")
        return current_content.replace(old_str, new_str, 1)
    if tool_name == "MultiEdit":
        content = current_content
        for edit in tool_input.get("edits", []):
            content = content.replace(edit.get("old_string", ""), edit.get("new_string", ""), 1)
        return content
    return current_content

detect_project_name

detect_project_name(pyproject_path: Path, fallback: str) -> str

Best-effort autodetect for lex-align-client init.

Prefer [project].name from pyproject.toml; fall back to the directory name.

Source code in src/lex_align_client/pyproject_utils.py
def detect_project_name(pyproject_path: Path, fallback: str) -> str:
    """Best-effort autodetect for `lex-align-client init`.

    Prefer [project].name from pyproject.toml; fall back to the directory name.
    """
    if pyproject_path.exists():
        try:
            with open(pyproject_path, "rb") as f:
                data = tomllib.load(f)
            name = data.get("project", {}).get("name")
            if isinstance(name, str) and name.strip():
                return name.strip()
        except Exception:
            pass
    return fallback

Pre-commit hook

precommit

Git pre-commit hook entry point.

Behavior
  1. Read the staged contents of pyproject.toml (if it's in the index).
  2. Re-evaluate every runtime dependency against the server. Reading existing deps catches new CVEs published since the last commit.
  3. If any verdict is DENIED, exit non-zero with a structured stderr message that an AI agent can parse to self-correct.

Claude Code hooks

claude_hooks

Claude Code session hooks (Advisor surface).

SessionStart and PreToolUse proxy through to the server's /evaluate so the agent sees blocked / provisionally-allowed verdicts during planning, before the pre-commit gate fires.

handle_pre_tool_use

handle_pre_tool_use(event: dict, project_root: Path, config: ClientConfig) -> Optional[tuple[str, str | None]]

Return (decision, message) for pyproject.toml edits, else None.

Source code in src/lex_align_client/claude_hooks.py
def handle_pre_tool_use(
    event: dict, project_root: Path, config: ClientConfig
) -> Optional[tuple[str, str | None]]:
    """Return (decision, message) for pyproject.toml edits, else None."""
    tool_name = event.get("tool_name", "")
    tool_input = event.get("tool_input", {}) or {}
    path_str = tool_input.get("path", "") or tool_input.get("file_path", "")
    if "pyproject.toml" not in path_str:
        return None
    if tool_name not in ("Edit", "Write", "MultiEdit"):
        return None

    pyproject_path = (
        project_root / path_str if not Path(path_str).is_absolute() else Path(path_str)
    )
    if not pyproject_path.exists():
        return None

    current = pyproject_path.read_text()
    proposed = apply_edit(current, tool_name, tool_input)
    added, removed = diff_deps(current, proposed)
    if not added and not removed:
        return None

    header = ["You are modifying runtime dependencies:"]
    for name, spec in sorted(added.items()):
        header.append(f"  + {spec}")
    for name in sorted(removed):
        header.append(f"  - {name}")

    model, version = _detect_agent(event)
    blocks: list[str] = []
    notes: list[str] = []
    try:
        with LexAlignClient(
            config, agent_model=model, agent_version=version
        ) as client:
            for name, spec in sorted(added.items()):
                version = extract_pinned_version(spec)
                v = client.check(name, version)
                if v.denied:
                    blocks.append(_format_verdict(v, spec))
                elif v.verdict == "PROVISIONALLY_ALLOWED":
                    suffix = " (run `lex-align-client request-approval` after this lands)" if v.is_requestable else ""
                    notes.append(f"  ◎ {name} — provisional: {v.reason}{suffix}")
                elif v.needs_rationale:
                    notes.append(
                        f"  • {name} — allowed, but registry-approved (neutral); "
                        "document the architectural need in your commit message or PR."
                    )
                else:
                    notes.append(f"  ✓ {name}{v.reason}")
    except (ServerUnreachable, ServerError) as exc:
        if config.fail_open:
            return ("allow", "\n".join(header + ["", f"[lex-align] {exc} — fail_open=true; allowing edit."]))
        return ("block", f"[lex-align] cannot reach server: {exc}")

    if blocks:
        msg = "\n".join(header + ["", "ENFORCEMENT — blocked by registry:"] + blocks +
                        ["", "No dependencies were modified. Adjust the edit and retry."])
        return ("block", msg)
    if notes:
        return ("allow", "\n".join(header + [""] + notes))
    return ("allow", "\n".join(header))

CLAUDE.md rendering

claudemd

CLAUDE.md integration: write lex-align usage instructions into the project's CLAUDE.md.

install_claude_md

install_claude_md(project_root: Path) -> tuple[Path, bool]

Create CLAUDE.md or append the lex-align section if not already present.

Returns (path, created_or_updated). Returns False for the second element when the section was already present and no write was performed.

Source code in src/lex_align_client/claudemd.py
def install_claude_md(project_root: Path) -> tuple[Path, bool]:
    """Create CLAUDE.md or append the lex-align section if not already present.

    Returns (path, created_or_updated). Returns False for the second element
    when the section was already present and no write was performed.
    """
    path = project_root / "CLAUDE.md"
    if path.exists():
        existing = path.read_text(encoding="utf-8")
        if _SECTION_HEADER in existing:
            return path, False
        path.write_text(existing.rstrip("\n") + "\n\n" + _SECTION + "\n", encoding="utf-8")
    else:
        path.write_text(_SECTION + "\n", encoding="utf-8")
    return path, True

lex_align_server

The FastAPI service that owns the registry, runs license + CVE checks, persists the audit log, and surfaces report endpoints.

lex_align_server

api

v1

approval_requests

POST /api/v1/approval-requests — the Good Citizen.

Persists a request and returns 202 immediately. Phase 4 will swap in real PR creation against the registry's git repo.

evaluate

GET /api/v1/evaluate — the Advisor.

health

GET /api/v1/health — basic liveness/readiness.

registry

Registry endpoints used by the dashboard workshop.

  • GET /registry returns the live registry as JSON.
  • GET /registry/pending returns PENDING_REVIEW approval requests for packages not in the loaded registry.
  • POST /registry/packages classifies a package and adds it to the in-memory registry. Pending approval requests for that package flip to APPROVED. The change is not persisted to the registry file — the operator must export the updated YAML and redeploy the server to make it stick.
  • POST /registry/parse-yaml validates user-supplied YAML against the schema the CLI compiler uses and returns it as JSON.

The classify endpoint is the only one that mutates server state.

pending_requests async
pending_requests(request: Request) -> dict

Pending approval requests for packages not yet in the live registry.

Once a package is classified via POST /registry/packages, its pending approval requests are flipped to APPROVED in the audit store and stop appearing here. Without a loaded registry, every PENDING_REVIEW row is surfaced.

Source code in src/lex_align_server/api/v1/registry.py
@router.get("/registry/pending")
async def pending_requests(request: Request) -> dict:
    """Pending approval requests for packages not yet in the live registry.

    Once a package is classified via POST /registry/packages, its pending
    approval requests are flipped to APPROVED in the audit store and stop
    appearing here. Without a loaded registry, every PENDING_REVIEW row is
    surfaced.
    """
    state = request.app.state.lex
    grouped = await state.audit.list_pending_by_package()
    registered: set[str] = set()
    if state.registry is not None:
        registered = set(state.registry.packages.keys())
    grouped = [g for g in grouped if g["normalized_name"] not in registered]
    return {"items": grouped}
classify_package async
classify_package(body: PackageClassifyBody, request: Request) -> dict

Classify a package and upsert it into the in-memory registry.

This is what the dashboard calls when an operator approves a pending request: the package + its assigned status is written into the live Registry so subsequent /evaluate calls see it immediately, and every PENDING_REVIEW approval request for the same normalized name flips to APPROVED.

The change lives only in memory. To persist it across restarts, the operator must export the YAML from the dashboard and rebuild the server image (or re-run lex-align-server registry compile against the updated YAML and restart the process).

Source code in src/lex_align_server/api/v1/registry.py
@router.post("/registry/packages", status_code=200)
async def classify_package(body: PackageClassifyBody, request: Request) -> dict:
    """Classify a package and upsert it into the in-memory registry.

    This is what the dashboard calls when an operator approves a pending
    request: the package + its assigned status is written into the live
    `Registry` so subsequent `/evaluate` calls see it immediately, and
    every PENDING_REVIEW approval request for the same normalized name
    flips to APPROVED.

    The change lives only in memory. To persist it across restarts, the
    operator must export the YAML from the dashboard and rebuild the
    server image (or re-run `lex-align-server registry compile` against
    the updated YAML and restart the process).
    """
    state = request.app.state.lex
    if state.registry is None:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail=(
                "No registry is loaded; classification requires REGISTRY_PATH "
                "to point at a compiled registry."
            ),
        )

    rule_dict: dict = {"status": body.status}
    if body.reason:      rule_dict["reason"] = body.reason
    if body.replacement: rule_dict["replacement"] = body.replacement
    if body.min_version: rule_dict["min_version"] = body.min_version
    if body.max_version: rule_dict["max_version"] = body.max_version

    try:
        validate_package_rule(body.name, rule_dict)
    except ValidationError as exc:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(exc),
        )

    normalized = normalize_name(body.name)
    state.registry.packages[normalized] = PackageRule(
        status=PackageStatus(body.status),
        reason=body.reason,
        replacement=body.replacement,
        min_version=body.min_version,
        max_version=body.max_version,
    )
    approved = await state.audit.mark_approved_by_package(normalized)

    logger.info(
        "registry mutated in-memory: package=%s status=%s approved=%d",
        normalized, body.status, approved,
    )
    return {
        "package": body.name,
        "normalized_name": normalized,
        "status": body.status,
        "approved_requests": approved,
        "persisted": False,
        "note": (
            "Change is in-memory only; export the YAML and rebuild the "
            "server image to persist it."
        ),
    }
reports

Read-only report endpoints.

Phase 4 dashboards consume these; Phase 3 also exposes them directly so operators can query them with curl.

agents_report async
agents_report(request: Request, project: Optional[str] = Query(None)) -> dict

Aggregate audit rows by (agent_model, agent_version).

Operators use this to answer "which Claude version is doing what" — the headers X-LexAlign-Agent-Model and X-LexAlign-Agent-Version reported by the client propagate into every audit row.

Source code in src/lex_align_server/api/v1/reports.py
@router.get("/reports/agents")
async def agents_report(
    request: Request, project: Optional[str] = Query(None)
) -> dict:
    """Aggregate audit rows by (agent_model, agent_version).

    Operators use this to answer "which Claude version is doing what" — the
    headers `X-LexAlign-Agent-Model` and `X-LexAlign-Agent-Version` reported
    by the client propagate into every audit row.
    """
    return await request.app.state.lex.audit.agents_report(project)

audit

SQLite audit log + approval-request store.

Two tables:

  • audit_log — one row per /evaluate call. Backs the legal report (license-driven denials) and security report (CVE-driven denials).
  • approval_requests — one row per /approval-requests POST. Phase 3 will attach a PR-creation workflow; for now we just persist them.

We use plain aiosqlite rather than an ORM. The schema is small and the report queries are easier to read as straight SQL.

AuditStore

AuditStore(db_path: Path)
Source code in src/lex_align_server/audit.py
def __init__(self, db_path: Path):
    self._db_path = db_path
mark_approved_by_package async
mark_approved_by_package(normalized_name: str) -> int

Move every PENDING_REVIEW request for normalized_name to APPROVED.

Called when an operator classifies a pending package via the dashboard and adds it to the in-memory registry. Returns the number of rows flipped, which the dashboard can display in its toast.

Source code in src/lex_align_server/audit.py
async def mark_approved_by_package(self, normalized_name: str) -> int:
    """Move every PENDING_REVIEW request for `normalized_name` to APPROVED.

    Called when an operator classifies a pending package via the dashboard
    and adds it to the in-memory registry. Returns the number of rows
    flipped, which the dashboard can display in its toast.
    """
    from .registry import normalize_name as _norm
    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        cur = await db.execute(
            "SELECT id, package FROM approval_requests WHERE status = ?",
            (APPROVAL_PENDING,),
        )
        rows = [dict(r) for r in await cur.fetchall()]
        await cur.close()
        ids = [r["id"] for r in rows if _norm(r["package"]) == normalized_name]
        if not ids:
            return 0
        placeholders = ",".join("?" * len(ids))
        await db.execute(
            f"UPDATE approval_requests SET status = ? WHERE id IN ({placeholders})",
            [APPROVAL_APPROVED, *ids],
        )
        await db.commit()
    return len(ids)
list_pending_by_package async
list_pending_by_package() -> list[dict]

All PENDING_REVIEW approval requests grouped by package.

The dashboard uses this to surface "things developers asked for" as a triage queue. Multiple requests for the same package (across projects/requesters) collapse into a single row carrying the count, the most recent rationale, and the most recent timestamp. Callers are expected to further filter against the live registry.

Source code in src/lex_align_server/audit.py
async def list_pending_by_package(self) -> list[dict]:
    """All PENDING_REVIEW approval requests grouped by package.

    The dashboard uses this to surface "things developers asked for" as a
    triage queue. Multiple requests for the same package (across
    projects/requesters) collapse into a single row carrying the count,
    the most recent rationale, and the most recent timestamp. Callers
    are expected to further filter against the live registry.
    """
    # Imported lazily to avoid a circular import at module load.
    from .registry import normalize_name

    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        cur = await db.execute(
            """SELECT package, rationale, project, requester, ts
               FROM approval_requests
               WHERE status = ?
               ORDER BY ts DESC""",
            (APPROVAL_PENDING,),
        )
        rows = [dict(r) for r in await cur.fetchall()]
        await cur.close()

    grouped: dict[str, dict] = {}
    for r in rows:
        key = normalize_name(r["package"])
        entry = grouped.get(key)
        if entry is None:
            grouped[key] = {
                "package": r["package"],
                "normalized_name": key,
                "latest_rationale": r["rationale"],
                "latest_ts": r["ts"],
                "latest_project": r["project"],
                "latest_requester": r["requester"],
                "request_count": 1,
            }
        else:
            entry["request_count"] += 1
            # Rows are ordered DESC by ts so the first one wins for "latest".
    return list(grouped.values())
agents_report async
agents_report(project: Optional[str] = None) -> dict[str, Any]

Aggregate evaluations by (agent_model, agent_version).

Powers the "agents" dashboard, so operators can see exactly which Claude (or other agent) version is making which kinds of requests. Rows where the agent is unknown collapse into one bucket.

Source code in src/lex_align_server/audit.py
async def agents_report(self, project: Optional[str] = None) -> dict[str, Any]:
    """Aggregate evaluations by (agent_model, agent_version).

    Powers the "agents" dashboard, so operators can see exactly which
    Claude (or other agent) version is making which kinds of requests.
    Rows where the agent is unknown collapse into one bucket.
    """
    where = ""
    params: list[Any] = []
    if project:
        where = " WHERE project = ?"
        params.append(project)

    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        agg_cur = await db.execute(
            f"""SELECT agent_model, agent_version,
                      COUNT(*) AS evaluations,
                      SUM(CASE WHEN verdict = ? THEN 1 ELSE 0 END) AS denials,
                      SUM(CASE WHEN verdict = ? THEN 1 ELSE 0 END) AS provisional,
                      MAX(ts) AS last_seen
               FROM audit_log{where}
               GROUP BY agent_model, agent_version
               ORDER BY evaluations DESC""",
            [VERDICT_DENIED, VERDICT_PROVISIONALLY_ALLOWED, *params],
        )
        agents = [dict(r) for r in await agg_cur.fetchall()]
        await agg_cur.close()

        recent_cur = await db.execute(
            f"""SELECT id, ts, project, requester, package, version, verdict,
                      denial_category, reason, agent_model, agent_version
               FROM audit_log{where}
               ORDER BY ts DESC LIMIT 50""",
            params,
        )
        recent = [dict(r) for r in await recent_cur.fetchall()]
        await recent_cur.close()
    return {
        "project": project,
        "agents": agents,
        "recent": recent,
    }

auth

Auth dependencies.

When AUTH_ENABLED=false (default) every request is treated as anonymous. When AUTH_ENABLED=true the bearer token is required but the validation/key storage is intentionally a Phase-3 stub. Wire-up exists so callers don't need to change when org-mode lands.

AgentInfo dataclass

AgentInfo(model: Optional[str] = None, version: Optional[str] = None)

Agent identity reported by the client.

Both fields are optional; when the client doesn't send the headers, both are None and reports group those rows under an "(unknown agent)" bucket.

cache

Redis-backed JSON cache.

Used by the license and CVE adapters to avoid hammering PyPI/OSV. We intentionally swallow connection errors and degrade to "no cache" so the server keeps serving when Redis is unreachable — the audit log still records every decision.

cli

lex-align-server CLI.

Exposes
  • serve — run uvicorn against the FastAPI app.
  • init — materialize the operator bundle (compose stack, Dockerfile, registry, .env) into a target dir.
  • registry compile — compile a YAML registry to the JSON form the server consumes.
  • selftest — hit /api/v1/health to confirm a stack is up.
  • admin keys ... — placeholders for the org-mode admin tooling.

main

main() -> None

lex-align server CLI.

Source code in src/lex_align_server/cli.py
@click.group()
def main() -> None:
    """lex-align server CLI."""

serve

serve(host: str | None, port: int | None, reload: bool) -> None

Start the lex-align FastAPI server via uvicorn.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option("--host", default=None, help="Override BIND_HOST.")
@click.option("--port", default=None, type=int, help="Override BIND_PORT.")
@click.option("--reload", is_flag=True, help="Enable hot reload (development only).")
def serve(host: str | None, port: int | None, reload: bool) -> None:
    """Start the lex-align FastAPI server via uvicorn."""
    import uvicorn
    settings = get_settings()
    uvicorn.run(
        "lex_align_server.main:app",
        host=host or settings.bind_host,
        port=port or settings.bind_port,
        reload=reload,
    )

init

init(target: Path, force: bool) -> None

Materialize the docker-compose stack, Dockerfile, registry, and .env template into TARGET so the server can be built and run with a single docker compose up -d.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option(
    "--target",
    default="./lexalign",
    type=click.Path(file_okay=False, path_type=Path),
    help="Directory to write the operator bundle into. Created if missing.",
)
@click.option(
    "--force",
    is_flag=True,
    help=f"Overwrite existing files (and the {MARKER_FILENAME} marker).",
)
def init(target: Path, force: bool) -> None:
    """Materialize the docker-compose stack, Dockerfile, registry, and .env
    template into TARGET so the server can be built and run with a single
    `docker compose up -d`.
    """
    try:
        result = init_target(target, force=force)
    except FileExistsError as exc:
        raise click.ClickException(str(exc))
    except ValidationError as exc:
        raise click.ClickException(f"Registry validation failed: {exc}")

    for path in result.written:
        click.echo(f"  + {path.relative_to(Path.cwd()) if path.is_relative_to(Path.cwd()) else path}")
    for path in result.skipped:
        click.echo(f"  · skipped (exists) {path}")

    click.echo("")
    click.echo(f"Operator bundle written to {result.target}.")
    click.echo("")
    click.echo("Next steps:")
    click.echo(f"  cd {result.target.relative_to(Path.cwd()) if result.target.is_relative_to(Path.cwd()) else result.target}")
    click.echo("  cp .env.example .env       # edit if you need to change defaults")
    click.echo("  docker compose up -d")
    click.echo("  lex-align-server selftest  # confirm the stack is alive")

registry

registry() -> None

Manage the enterprise package registry.

Source code in src/lex_align_server/cli.py
@main.group()
def registry() -> None:
    """Manage the enterprise package registry."""

registry_compile

registry_compile(source: Path, destination: Path) -> None

Compile a YAML registry SOURCE to the JSON form at DESTINATION.

Source code in src/lex_align_server/cli.py
@registry.command("compile")
@click.argument("source", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.argument("destination", type=click.Path(dir_okay=False, path_type=Path))
def registry_compile(source: Path, destination: Path) -> None:
    """Compile a YAML registry SOURCE to the JSON form at DESTINATION."""
    import yaml

    try:
        doc = yaml.safe_load(source.read_text())
    except yaml.YAMLError as exc:
        raise click.ClickException(f"YAML parse error: {exc}")
    try:
        compiled = validate_registry(doc)
    except ValidationError as exc:
        raise click.ClickException(f"Registry validation failed: {exc}")

    destination.parent.mkdir(parents=True, exist_ok=True)
    destination.write_text(json.dumps(compiled, indent=2, sort_keys=True) + "\n")
    click.echo(
        f"Compiled {len(compiled['packages'])} package rules from {source}{destination}"
    )

selftest

selftest(url: str, timeout: float) -> None

Probe /api/v1/health and report whether the stack is alive.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option(
    "--url",
    default="http://127.0.0.1:8765",
    help="Base URL of the server to probe.",
)
@click.option("--timeout", default=5.0, type=float, help="Per-request timeout (seconds).")
def selftest(url: str, timeout: float) -> None:
    """Probe `/api/v1/health` and report whether the stack is alive."""
    import httpx

    health = url.rstrip("/") + "/api/v1/health"
    try:
        response = httpx.get(health, timeout=timeout)
    except httpx.HTTPError as exc:
        raise click.ClickException(f"Server unreachable at {health}: {exc}")

    if response.status_code != 200:
        raise click.ClickException(
            f"Health check failed: HTTP {response.status_code} from {health}"
        )

    click.echo(f"OK  {health}")
    try:
        click.echo(json.dumps(response.json(), indent=2))
    except ValueError:
        click.echo(response.text)

admin

admin() -> None

Administrative commands (organization mode).

Source code in src/lex_align_server/cli.py
@main.group()
def admin() -> None:
    """Administrative commands (organization mode)."""

keys

keys() -> None

Manage API keys (deferred — Phase 3+).

Source code in src/lex_align_server/cli.py
@admin.group()
def keys() -> None:
    """Manage API keys (deferred — Phase 3+)."""

config

Server configuration.

All settings are sourced from environment variables. Defaults bias towards the single-user/local-evaluation experience: no auth, bind to localhost, point at a co-located Redis.

get_settings

get_settings() -> Settings

Build a fresh Settings object. Avoids module-level caching so tests can override via env vars on a per-test basis.

Source code in src/lex_align_server/config.py
def get_settings() -> Settings:
    """Build a fresh Settings object. Avoids module-level caching so tests can
    override via env vars on a per-test basis.
    """
    return Settings()

cve

CVE lookup via OSV (osv.dev).

OSV is the Google-sponsored aggregator that ingests GHSA, NVD, PyPA Advisory DB, and others. The free POST /v1/query endpoint takes a package coordinate and returns the list of vulnerabilities with severity scores. We parse out the highest CVSS base score and let GlobalPolicies.cve_blocks decide.

query_osv async

query_osv(package: str, version: Optional[str], osv_url: str, http_client: AsyncClient) -> Optional[list[dict]]

Return raw vulns list from OSV, or None on error/timeout.

Source code in src/lex_align_server/cve.py
async def query_osv(
    package: str,
    version: Optional[str],
    osv_url: str,
    http_client: httpx.AsyncClient,
) -> Optional[list[dict]]:
    """Return raw `vulns` list from OSV, or None on error/timeout."""
    payload: dict = {
        "package": {"name": package, "ecosystem": "PyPI"},
    }
    if version:
        payload["version"] = version
    try:
        resp = await http_client.post(osv_url, json=payload)
    except (httpx.HTTPError, httpx.TimeoutException) as exc:
        logger.warning("osv: query for %s@%s failed: %s", package, version, exc)
        return None
    if resp.status_code != 200:
        logger.warning("osv: %s returned %s", package, resp.status_code)
        return None
    try:
        data = resp.json()
    except ValueError:
        return None
    vulns = data.get("vulns") or []
    return [v for v in vulns if isinstance(v, dict)]

resolve_cves async

resolve_cves(package: str, version: Optional[str], cache: JsonCache, cache_ttl: int, osv_url: str, http_client: AsyncClient) -> CveInfo

Cache-or-fetch CVE data for a package@version.

A None version is cached separately from versioned queries — OSV's behaviour with no version is documented as "all versions" so we treat it as a distinct cache entry.

Source code in src/lex_align_server/cve.py
async def resolve_cves(
    package: str,
    version: Optional[str],
    cache: JsonCache,
    cache_ttl: int,
    osv_url: str,
    http_client: httpx.AsyncClient,
) -> CveInfo:
    """Cache-or-fetch CVE data for a package@version.

    A None version is cached separately from versioned queries — OSV's
    behaviour with no version is documented as "all versions" so we treat it
    as a distinct cache entry.
    """
    cache_key = f"cve:{package.lower()}@{version or 'latest'}"
    cached = await cache.get(cache_key)
    if cached is not None:
        try:
            return CveInfo.from_dict(cached)
        except Exception:
            pass

    vulns = await query_osv(package, version, osv_url, http_client)
    if vulns is None:
        # Treat OSV outage as "no known CVEs" but do NOT cache the negative
        # result — we want a fresh attempt next time.
        return CveInfo(ids=[], max_score=None, raw_count=0)

    info = _summarize_vulns(vulns)
    await cache.set(cache_key, info.to_dict(), cache_ttl)
    return info

dashboards

router

Dashboard pages.

Four pages render server-side and fetch their data from the JSON API: - /dashboard/security, /dashboard/legal and /dashboard/agents show read-only reports. - /dashboard/registry is an interactive workshop: it loads the live registry, lets the operator triage pending approval requests, and exports the result as YAML. Classifying a pending request also updates the in-memory registry so live /evaluate calls see the rule immediately, but persistence still requires exporting the YAML and rebuilding the server image.

evaluate

Evaluation orchestrator.

Combines registry lookup, CVE check, and license check into a single verdict. Emits one audit-log row per call. The order matters:

  1. registry hard-blocks (banned, deprecated, version-violated)
  2. CVE check — applies even to registry-allowed packages so a newly published critical CVE on a preferred package still blocks
  3. license check — only when the package is unknown to the registry

evaluate async

evaluate(*, package: str, version: Optional[str], project: str, requester: str, registry: Optional[Registry], cache: JsonCache, audit: AuditStore, settings: Settings, http_client: AsyncClient, agent: Optional[AgentInfo] = None) -> EvaluationResult

Single evaluation. Always writes one audit row before returning.

Source code in src/lex_align_server/evaluate.py
async def evaluate(
    *,
    package: str,
    version: Optional[str],
    project: str,
    requester: str,
    registry: Optional[Registry],
    cache: JsonCache,
    audit: AuditStore,
    settings: Settings,
    http_client: httpx.AsyncClient,
    agent: Optional[AgentInfo] = None,
) -> EvaluationResult:
    """Single evaluation. Always writes one audit row before returning."""
    agent = agent or AgentInfo()
    # Step 1 — registry verdict (only if a registry is configured).
    pkg_verdict: Optional[PackageVerdict] = None
    if registry is not None:
        pkg_verdict = registry.lookup(package, version)
        if pkg_verdict.action is Action.BLOCK:
            result = _denied(
                package=package,
                version=version,
                resolved_version=None,
                reason=pkg_verdict.reason or "Blocked by enterprise registry.",
                registry_status=pkg_verdict.status.value if pkg_verdict.status else None,
                replacement=pkg_verdict.replacement,
                version_constraint=pkg_verdict.version_constraint,
            )
            await _audit(audit, result, project, requester, agent, DENIAL_REGISTRY)
            return result

    # Step 2 — license + latest version (PyPI). We need PyPI even for known
    # packages so we can resolve "latest" when no version is provided.
    license_info: Optional[LicenseInfo] = None
    latest_version: Optional[str] = None
    if registry is None or pkg_verdict is None or pkg_verdict.action is Action.UNKNOWN:
        license_info, latest_version = await resolve_license(
            package, version, cache, settings.license_cache_ttl,
            settings.pypi_api_url, http_client,
        )
    else:
        # Known to registry — still need latest_version for the CVE call when
        # no version is pinned, so we hit PyPI but ignore the license.
        if version is None:
            license_info, latest_version = await resolve_license(
                package, None, cache, settings.license_cache_ttl,
                settings.pypi_api_url, http_client,
            )

    cve_query_version = version or latest_version

    # Step 3 — CVE check (applies regardless of registry status).
    cves = await resolve_cves(
        package,
        cve_query_version,
        cache,
        settings.cve_cache_ttl,
        settings.osv_api_url,
        http_client,
    )

    if registry is not None and registry.global_policies.cve_blocks(cves.max_score):
        result = _denied(
            package=package,
            version=version,
            resolved_version=cve_query_version,
            reason=(
                f"Critical CVE detected (max CVSS {cves.max_score}); "
                f"threshold is {registry.global_policies.cve_threshold * 10:.1f}."
            ),
            registry_status=(
                pkg_verdict.status.value if pkg_verdict and pkg_verdict.status else None
            ),
            license=license_info.license_normalized if license_info else None,
            cves=cves,
        )
        await _audit(audit, result, project, requester, agent, DENIAL_CVE)
        return result

    # Step 4 — license verdict for unknown-to-registry packages.
    if registry is not None and pkg_verdict is not None and pkg_verdict.action is Action.UNKNOWN:
        assert license_info is not None  # set in step 2
        lic_verdict = evaluate_license(
            license_info.license_normalized, registry.global_policies
        )
        if lic_verdict.action is Action.BLOCK:
            result = _denied(
                package=package,
                version=version,
                resolved_version=cve_query_version,
                reason=lic_verdict.reason,
                registry_status=None,
                license=license_info.license_normalized,
                cves=cves,
            )
            await _audit(audit, result, project, requester, agent, DENIAL_LICENSE)
            return result
        # Unknown but license-passing → provisionally allowed.
        result = EvaluationResult(
            verdict=VERDICT_PROVISIONALLY_ALLOWED,
            reason=(
                "Not yet in the enterprise registry. License "
                f"{license_info.license_normalized} is on the auto-approve list "
                "and no critical CVEs are reported. Run `lex-align-client "
                "request-approval` to formalize."
            ),
            package=package,
            version=version,
            resolved_version=cve_query_version,
            registry_status=None,
            license=license_info.license_normalized,
            cve_ids=list(cves.ids),
            max_cvss=cves.max_score,
            is_requestable=True,
        )
        await _audit(audit, result, project, requester, agent, DENIAL_NONE)
        return result

    # Step 5 — known-to-registry ALLOW or REQUIRE_PROPOSE.
    if pkg_verdict is None:
        # No registry configured at all. Treat as provisionally allowed
        # (license/CVE already passed if applicable).
        result = EvaluationResult(
            verdict=VERDICT_PROVISIONALLY_ALLOWED,
            reason="No enterprise registry configured; permitted by default.",
            package=package,
            version=version,
            resolved_version=cve_query_version,
            registry_status=None,
            license=license_info.license_normalized if license_info else None,
            cve_ids=list(cves.ids),
            max_cvss=cves.max_score,
            is_requestable=False,
        )
        await _audit(audit, result, project, requester, agent, DENIAL_NONE)
        return result

    needs_rationale = pkg_verdict.action is Action.REQUIRE_PROPOSE
    result = EvaluationResult(
        verdict=VERDICT_ALLOWED,
        reason=pkg_verdict.reason or (
            f"Allowed by enterprise registry ({pkg_verdict.status.value if pkg_verdict.status else 'allow'})."
        ),
        package=package,
        version=version,
        resolved_version=cve_query_version,
        registry_status=pkg_verdict.status.value if pkg_verdict.status else None,
        version_constraint=pkg_verdict.version_constraint,
        license=license_info.license_normalized if license_info else None,
        cve_ids=list(cves.ids),
        max_cvss=cves.max_score,
        is_requestable=False,
        needs_rationale=needs_rationale,
    )
    await _audit(audit, result, project, requester, agent, DENIAL_NONE)
    return result

init

lex-align-server init — materialize the operator bundle into a directory.

Mirrors the design of lex-align-client init: copies the docker-compose stack, Dockerfile, example registry, and .env.example from the wheel's bundled assets into a target directory, then compiles the registry to JSON so docker compose up works on the first try.

asset_names

asset_names() -> Iterable[str]

All asset basenames as they appear in the wheel. Useful for tests.

Source code in src/lex_align_server/init.py
def asset_names() -> Iterable[str]:
    """All asset basenames as they appear in the wheel. Useful for tests."""
    return (
        *_VERBATIM_ASSETS,
        *_TEMPLATED_ASSETS,
        *(src for src, _ in _RENAMED_ASSETS),
    )

init_target

init_target(target: Path, *, force: bool = False) -> InitResult

Materialize the operator bundle into target.

Idempotent by default: if the marker file exists, raises FileExistsError unless force is set.

Source code in src/lex_align_server/init.py
def init_target(target: Path, *, force: bool = False) -> InitResult:
    """Materialize the operator bundle into ``target``.

    Idempotent by default: if the marker file exists, raises FileExistsError
    unless ``force`` is set.
    """
    target = target.resolve()
    marker = target / MARKER_FILENAME
    if marker.exists() and not force:
        raise FileExistsError(
            f"{marker} exists; this directory was already initialized. "
            "Use --force to overwrite."
        )

    target.mkdir(parents=True, exist_ok=True)
    version = _installed_version()
    written: list[Path] = []
    skipped: list[Path] = []

    for name in _VERBATIM_ASSETS:
        _write(target / name, _read_asset(name), force=force, written=written, skipped=skipped)

    for name in _TEMPLATED_ASSETS:
        body = _read_asset(name).decode("utf-8").replace("{LEX_ALIGN_VERSION}", version)
        _write(target / name, body.encode("utf-8"), force=force, written=written, skipped=skipped)

    for src, dest in _RENAMED_ASSETS:
        _write(target / dest, _read_asset(src), force=force, written=written, skipped=skipped)

    yaml_path = target / "registry.yml"
    json_path = target / "registry.json"
    compiled: Path | None = None
    if yaml_path.exists() and (force or not json_path.exists()):
        _compile_registry(yaml_path, json_path)
        compiled = json_path
        if json_path not in written:
            written.append(json_path)

    marker.write_text(f'version = "{version}"\n')
    if marker not in written:
        written.append(marker)

    return InitResult(
        target=target,
        written=written,
        skipped=skipped,
        compiled_registry=compiled,
    )

licenses

License lookup, normalization, and policy evaluation.

For packages not listed in the registry, the server fetches the license from PyPI, normalizes it to an SPDX-ish token, and checks it against global_policies. Results are cached in Redis keyed by package name.

fetch_license_from_pypi async

fetch_license_from_pypi(package: str, version: Optional[str], pypi_base: str, client: AsyncClient) -> tuple[Optional[str], Optional[str]]

Return (raw_license_string, latest_version) from PyPI, or (None, None) on error.

Source code in src/lex_align_server/licenses.py
async def fetch_license_from_pypi(
    package: str,
    version: Optional[str],
    pypi_base: str,
    client: httpx.AsyncClient,
) -> tuple[Optional[str], Optional[str]]:
    """Return (raw_license_string, latest_version) from PyPI, or (None, None) on error."""
    if version:
        url = PYPI_VERSIONED_URL_TEMPLATE.format(base=pypi_base, package=package, version=version)
    else:
        url = PYPI_URL_TEMPLATE.format(base=pypi_base, package=package)
    try:
        resp = await client.get(url)
    except (httpx.HTTPError, httpx.TimeoutException):
        return None, None
    if resp.status_code != 200:
        return None, None
    try:
        data = resp.json()
    except ValueError:
        return None, None
    raw = _extract_license_from_pypi_json(data)
    info = data.get("info") or {}
    latest = info.get("version")
    return raw, latest if isinstance(latest, str) else None

resolve_license async

resolve_license(package: str, version: Optional[str], cache: JsonCache, cache_ttl: int, pypi_base: str, http_client: AsyncClient) -> tuple[LicenseInfo, Optional[str]]

Resolve (cache-or-fetch) a package's license. Returns (info, latest_version).

Source code in src/lex_align_server/licenses.py
async def resolve_license(
    package: str,
    version: Optional[str],
    cache: JsonCache,
    cache_ttl: int,
    pypi_base: str,
    http_client: httpx.AsyncClient,
) -> tuple[LicenseInfo, Optional[str]]:
    """Resolve (cache-or-fetch) a package's license. Returns (info, latest_version)."""
    cache_key = f"license:{package.lower()}"
    cached = await cache.get(cache_key)
    if cached is not None:
        try:
            info = LicenseInfo.from_dict(cached["info"])
            return info, cached.get("latest_version")
        except (KeyError, TypeError):
            pass

    raw, latest = await fetch_license_from_pypi(package, version, pypi_base, http_client)
    info = LicenseInfo(
        license_raw=raw,
        license_normalized=normalize_license(raw),
    )
    await cache.set(
        cache_key,
        {"info": info.to_dict(), "latest_version": latest},
        cache_ttl,
    )
    return info, latest

main

FastAPI application factory.

Wires up the routers, builds the per-app state (cache, audit store, HTTP client, registry), and exposes a lifespan so resources are torn down cleanly.

registry

Enterprise registry: package policies, license rules, CVE threshold.

The registry is loaded from a local JSON file (compiled from the human-authored YAML by lex-align-server registry compile). The server consults it on every /evaluate call.

registry_schema

Registry-YAML schema validation, shared by the CLI compiler and the dashboard's import endpoint.

This module is intentionally pure: no I/O, no CLI argparse, no FastAPI dependencies. The two entry points that consume it (the lex-align-server registry compile CLI and lex_align_server.api.v1.registry) both depend on the same validator so a registry YAML accepted by the dashboard is guaranteed to compile in CI, and vice versa.

validate_package_rule

validate_package_rule(name: str, rule: Any) -> None

Public single-package validator. Raises ValidationError on failure.

Used by the dashboard's classify endpoint so the in-memory mutation has the same guarantees as a YAML round-trip.

Source code in src/lex_align_server/registry_schema.py
def validate_package_rule(name: str, rule: Any) -> None:
    """Public single-package validator. Raises ValidationError on failure.

    Used by the dashboard's classify endpoint so the in-memory mutation
    has the same guarantees as a YAML round-trip.
    """
    _validate_package(name, rule)

state

Application state container.

Carried on app.state so the per-request dependencies don't have to know how the cache, audit log, or registry got built.

Application entrypoint

main

FastAPI application factory.

Wires up the routers, builds the per-app state (cache, audit store, HTTP client, registry), and exposes a lifespan so resources are torn down cleanly.

CLI

cli

lex-align-server CLI.

Exposes
  • serve — run uvicorn against the FastAPI app.
  • init — materialize the operator bundle (compose stack, Dockerfile, registry, .env) into a target dir.
  • registry compile — compile a YAML registry to the JSON form the server consumes.
  • selftest — hit /api/v1/health to confirm a stack is up.
  • admin keys ... — placeholders for the org-mode admin tooling.

main

main() -> None

lex-align server CLI.

Source code in src/lex_align_server/cli.py
@click.group()
def main() -> None:
    """lex-align server CLI."""

serve

serve(host: str | None, port: int | None, reload: bool) -> None

Start the lex-align FastAPI server via uvicorn.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option("--host", default=None, help="Override BIND_HOST.")
@click.option("--port", default=None, type=int, help="Override BIND_PORT.")
@click.option("--reload", is_flag=True, help="Enable hot reload (development only).")
def serve(host: str | None, port: int | None, reload: bool) -> None:
    """Start the lex-align FastAPI server via uvicorn."""
    import uvicorn
    settings = get_settings()
    uvicorn.run(
        "lex_align_server.main:app",
        host=host or settings.bind_host,
        port=port or settings.bind_port,
        reload=reload,
    )

init

init(target: Path, force: bool) -> None

Materialize the docker-compose stack, Dockerfile, registry, and .env template into TARGET so the server can be built and run with a single docker compose up -d.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option(
    "--target",
    default="./lexalign",
    type=click.Path(file_okay=False, path_type=Path),
    help="Directory to write the operator bundle into. Created if missing.",
)
@click.option(
    "--force",
    is_flag=True,
    help=f"Overwrite existing files (and the {MARKER_FILENAME} marker).",
)
def init(target: Path, force: bool) -> None:
    """Materialize the docker-compose stack, Dockerfile, registry, and .env
    template into TARGET so the server can be built and run with a single
    `docker compose up -d`.
    """
    try:
        result = init_target(target, force=force)
    except FileExistsError as exc:
        raise click.ClickException(str(exc))
    except ValidationError as exc:
        raise click.ClickException(f"Registry validation failed: {exc}")

    for path in result.written:
        click.echo(f"  + {path.relative_to(Path.cwd()) if path.is_relative_to(Path.cwd()) else path}")
    for path in result.skipped:
        click.echo(f"  · skipped (exists) {path}")

    click.echo("")
    click.echo(f"Operator bundle written to {result.target}.")
    click.echo("")
    click.echo("Next steps:")
    click.echo(f"  cd {result.target.relative_to(Path.cwd()) if result.target.is_relative_to(Path.cwd()) else result.target}")
    click.echo("  cp .env.example .env       # edit if you need to change defaults")
    click.echo("  docker compose up -d")
    click.echo("  lex-align-server selftest  # confirm the stack is alive")

registry

registry() -> None

Manage the enterprise package registry.

Source code in src/lex_align_server/cli.py
@main.group()
def registry() -> None:
    """Manage the enterprise package registry."""

registry_compile

registry_compile(source: Path, destination: Path) -> None

Compile a YAML registry SOURCE to the JSON form at DESTINATION.

Source code in src/lex_align_server/cli.py
@registry.command("compile")
@click.argument("source", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.argument("destination", type=click.Path(dir_okay=False, path_type=Path))
def registry_compile(source: Path, destination: Path) -> None:
    """Compile a YAML registry SOURCE to the JSON form at DESTINATION."""
    import yaml

    try:
        doc = yaml.safe_load(source.read_text())
    except yaml.YAMLError as exc:
        raise click.ClickException(f"YAML parse error: {exc}")
    try:
        compiled = validate_registry(doc)
    except ValidationError as exc:
        raise click.ClickException(f"Registry validation failed: {exc}")

    destination.parent.mkdir(parents=True, exist_ok=True)
    destination.write_text(json.dumps(compiled, indent=2, sort_keys=True) + "\n")
    click.echo(
        f"Compiled {len(compiled['packages'])} package rules from {source}{destination}"
    )

selftest

selftest(url: str, timeout: float) -> None

Probe /api/v1/health and report whether the stack is alive.

Source code in src/lex_align_server/cli.py
@main.command()
@click.option(
    "--url",
    default="http://127.0.0.1:8765",
    help="Base URL of the server to probe.",
)
@click.option("--timeout", default=5.0, type=float, help="Per-request timeout (seconds).")
def selftest(url: str, timeout: float) -> None:
    """Probe `/api/v1/health` and report whether the stack is alive."""
    import httpx

    health = url.rstrip("/") + "/api/v1/health"
    try:
        response = httpx.get(health, timeout=timeout)
    except httpx.HTTPError as exc:
        raise click.ClickException(f"Server unreachable at {health}: {exc}")

    if response.status_code != 200:
        raise click.ClickException(
            f"Health check failed: HTTP {response.status_code} from {health}"
        )

    click.echo(f"OK  {health}")
    try:
        click.echo(json.dumps(response.json(), indent=2))
    except ValueError:
        click.echo(response.text)

admin

admin() -> None

Administrative commands (organization mode).

Source code in src/lex_align_server/cli.py
@main.group()
def admin() -> None:
    """Administrative commands (organization mode)."""

keys

keys() -> None

Manage API keys (deferred — Phase 3+).

Source code in src/lex_align_server/cli.py
@admin.group()
def keys() -> None:
    """Manage API keys (deferred — Phase 3+)."""

Configuration

config

Server configuration.

All settings are sourced from environment variables. Defaults bias towards the single-user/local-evaluation experience: no auth, bind to localhost, point at a co-located Redis.

get_settings

get_settings() -> Settings

Build a fresh Settings object. Avoids module-level caching so tests can override via env vars on a per-test basis.

Source code in src/lex_align_server/config.py
def get_settings() -> Settings:
    """Build a fresh Settings object. Avoids module-level caching so tests can
    override via env vars on a per-test basis.
    """
    return Settings()

State management

state

Application state container.

Carried on app.state so the per-request dependencies don't have to know how the cache, audit log, or registry got built.

Evaluation pipeline

evaluate

Evaluation orchestrator.

Combines registry lookup, CVE check, and license check into a single verdict. Emits one audit-log row per call. The order matters:

  1. registry hard-blocks (banned, deprecated, version-violated)
  2. CVE check — applies even to registry-allowed packages so a newly published critical CVE on a preferred package still blocks
  3. license check — only when the package is unknown to the registry

evaluate async

evaluate(*, package: str, version: Optional[str], project: str, requester: str, registry: Optional[Registry], cache: JsonCache, audit: AuditStore, settings: Settings, http_client: AsyncClient, agent: Optional[AgentInfo] = None) -> EvaluationResult

Single evaluation. Always writes one audit row before returning.

Source code in src/lex_align_server/evaluate.py
async def evaluate(
    *,
    package: str,
    version: Optional[str],
    project: str,
    requester: str,
    registry: Optional[Registry],
    cache: JsonCache,
    audit: AuditStore,
    settings: Settings,
    http_client: httpx.AsyncClient,
    agent: Optional[AgentInfo] = None,
) -> EvaluationResult:
    """Single evaluation. Always writes one audit row before returning."""
    agent = agent or AgentInfo()
    # Step 1 — registry verdict (only if a registry is configured).
    pkg_verdict: Optional[PackageVerdict] = None
    if registry is not None:
        pkg_verdict = registry.lookup(package, version)
        if pkg_verdict.action is Action.BLOCK:
            result = _denied(
                package=package,
                version=version,
                resolved_version=None,
                reason=pkg_verdict.reason or "Blocked by enterprise registry.",
                registry_status=pkg_verdict.status.value if pkg_verdict.status else None,
                replacement=pkg_verdict.replacement,
                version_constraint=pkg_verdict.version_constraint,
            )
            await _audit(audit, result, project, requester, agent, DENIAL_REGISTRY)
            return result

    # Step 2 — license + latest version (PyPI). We need PyPI even for known
    # packages so we can resolve "latest" when no version is provided.
    license_info: Optional[LicenseInfo] = None
    latest_version: Optional[str] = None
    if registry is None or pkg_verdict is None or pkg_verdict.action is Action.UNKNOWN:
        license_info, latest_version = await resolve_license(
            package, version, cache, settings.license_cache_ttl,
            settings.pypi_api_url, http_client,
        )
    else:
        # Known to registry — still need latest_version for the CVE call when
        # no version is pinned, so we hit PyPI but ignore the license.
        if version is None:
            license_info, latest_version = await resolve_license(
                package, None, cache, settings.license_cache_ttl,
                settings.pypi_api_url, http_client,
            )

    cve_query_version = version or latest_version

    # Step 3 — CVE check (applies regardless of registry status).
    cves = await resolve_cves(
        package,
        cve_query_version,
        cache,
        settings.cve_cache_ttl,
        settings.osv_api_url,
        http_client,
    )

    if registry is not None and registry.global_policies.cve_blocks(cves.max_score):
        result = _denied(
            package=package,
            version=version,
            resolved_version=cve_query_version,
            reason=(
                f"Critical CVE detected (max CVSS {cves.max_score}); "
                f"threshold is {registry.global_policies.cve_threshold * 10:.1f}."
            ),
            registry_status=(
                pkg_verdict.status.value if pkg_verdict and pkg_verdict.status else None
            ),
            license=license_info.license_normalized if license_info else None,
            cves=cves,
        )
        await _audit(audit, result, project, requester, agent, DENIAL_CVE)
        return result

    # Step 4 — license verdict for unknown-to-registry packages.
    if registry is not None and pkg_verdict is not None and pkg_verdict.action is Action.UNKNOWN:
        assert license_info is not None  # set in step 2
        lic_verdict = evaluate_license(
            license_info.license_normalized, registry.global_policies
        )
        if lic_verdict.action is Action.BLOCK:
            result = _denied(
                package=package,
                version=version,
                resolved_version=cve_query_version,
                reason=lic_verdict.reason,
                registry_status=None,
                license=license_info.license_normalized,
                cves=cves,
            )
            await _audit(audit, result, project, requester, agent, DENIAL_LICENSE)
            return result
        # Unknown but license-passing → provisionally allowed.
        result = EvaluationResult(
            verdict=VERDICT_PROVISIONALLY_ALLOWED,
            reason=(
                "Not yet in the enterprise registry. License "
                f"{license_info.license_normalized} is on the auto-approve list "
                "and no critical CVEs are reported. Run `lex-align-client "
                "request-approval` to formalize."
            ),
            package=package,
            version=version,
            resolved_version=cve_query_version,
            registry_status=None,
            license=license_info.license_normalized,
            cve_ids=list(cves.ids),
            max_cvss=cves.max_score,
            is_requestable=True,
        )
        await _audit(audit, result, project, requester, agent, DENIAL_NONE)
        return result

    # Step 5 — known-to-registry ALLOW or REQUIRE_PROPOSE.
    if pkg_verdict is None:
        # No registry configured at all. Treat as provisionally allowed
        # (license/CVE already passed if applicable).
        result = EvaluationResult(
            verdict=VERDICT_PROVISIONALLY_ALLOWED,
            reason="No enterprise registry configured; permitted by default.",
            package=package,
            version=version,
            resolved_version=cve_query_version,
            registry_status=None,
            license=license_info.license_normalized if license_info else None,
            cve_ids=list(cves.ids),
            max_cvss=cves.max_score,
            is_requestable=False,
        )
        await _audit(audit, result, project, requester, agent, DENIAL_NONE)
        return result

    needs_rationale = pkg_verdict.action is Action.REQUIRE_PROPOSE
    result = EvaluationResult(
        verdict=VERDICT_ALLOWED,
        reason=pkg_verdict.reason or (
            f"Allowed by enterprise registry ({pkg_verdict.status.value if pkg_verdict.status else 'allow'})."
        ),
        package=package,
        version=version,
        resolved_version=cve_query_version,
        registry_status=pkg_verdict.status.value if pkg_verdict.status else None,
        version_constraint=pkg_verdict.version_constraint,
        license=license_info.license_normalized if license_info else None,
        cve_ids=list(cves.ids),
        max_cvss=cves.max_score,
        is_requestable=False,
        needs_rationale=needs_rationale,
    )
    await _audit(audit, result, project, requester, agent, DENIAL_NONE)
    return result

Registry

registry

Enterprise registry: package policies, license rules, CVE threshold.

The registry is loaded from a local JSON file (compiled from the human-authored YAML by lex-align-server registry compile). The server consults it on every /evaluate call.

registry_schema

Registry-YAML schema validation, shared by the CLI compiler and the dashboard's import endpoint.

This module is intentionally pure: no I/O, no CLI argparse, no FastAPI dependencies. The two entry points that consume it (the lex-align-server registry compile CLI and lex_align_server.api.v1.registry) both depend on the same validator so a registry YAML accepted by the dashboard is guaranteed to compile in CI, and vice versa.

validate_package_rule

validate_package_rule(name: str, rule: Any) -> None

Public single-package validator. Raises ValidationError on failure.

Used by the dashboard's classify endpoint so the in-memory mutation has the same guarantees as a YAML round-trip.

Source code in src/lex_align_server/registry_schema.py
def validate_package_rule(name: str, rule: Any) -> None:
    """Public single-package validator. Raises ValidationError on failure.

    Used by the dashboard's classify endpoint so the in-memory mutation
    has the same guarantees as a YAML round-trip.
    """
    _validate_package(name, rule)

License checks

licenses

License lookup, normalization, and policy evaluation.

For packages not listed in the registry, the server fetches the license from PyPI, normalizes it to an SPDX-ish token, and checks it against global_policies. Results are cached in Redis keyed by package name.

fetch_license_from_pypi async

fetch_license_from_pypi(package: str, version: Optional[str], pypi_base: str, client: AsyncClient) -> tuple[Optional[str], Optional[str]]

Return (raw_license_string, latest_version) from PyPI, or (None, None) on error.

Source code in src/lex_align_server/licenses.py
async def fetch_license_from_pypi(
    package: str,
    version: Optional[str],
    pypi_base: str,
    client: httpx.AsyncClient,
) -> tuple[Optional[str], Optional[str]]:
    """Return (raw_license_string, latest_version) from PyPI, or (None, None) on error."""
    if version:
        url = PYPI_VERSIONED_URL_TEMPLATE.format(base=pypi_base, package=package, version=version)
    else:
        url = PYPI_URL_TEMPLATE.format(base=pypi_base, package=package)
    try:
        resp = await client.get(url)
    except (httpx.HTTPError, httpx.TimeoutException):
        return None, None
    if resp.status_code != 200:
        return None, None
    try:
        data = resp.json()
    except ValueError:
        return None, None
    raw = _extract_license_from_pypi_json(data)
    info = data.get("info") or {}
    latest = info.get("version")
    return raw, latest if isinstance(latest, str) else None

resolve_license async

resolve_license(package: str, version: Optional[str], cache: JsonCache, cache_ttl: int, pypi_base: str, http_client: AsyncClient) -> tuple[LicenseInfo, Optional[str]]

Resolve (cache-or-fetch) a package's license. Returns (info, latest_version).

Source code in src/lex_align_server/licenses.py
async def resolve_license(
    package: str,
    version: Optional[str],
    cache: JsonCache,
    cache_ttl: int,
    pypi_base: str,
    http_client: httpx.AsyncClient,
) -> tuple[LicenseInfo, Optional[str]]:
    """Resolve (cache-or-fetch) a package's license. Returns (info, latest_version)."""
    cache_key = f"license:{package.lower()}"
    cached = await cache.get(cache_key)
    if cached is not None:
        try:
            info = LicenseInfo.from_dict(cached["info"])
            return info, cached.get("latest_version")
        except (KeyError, TypeError):
            pass

    raw, latest = await fetch_license_from_pypi(package, version, pypi_base, http_client)
    info = LicenseInfo(
        license_raw=raw,
        license_normalized=normalize_license(raw),
    )
    await cache.set(
        cache_key,
        {"info": info.to_dict(), "latest_version": latest},
        cache_ttl,
    )
    return info, latest

CVE checks (OSV)

cve

CVE lookup via OSV (osv.dev).

OSV is the Google-sponsored aggregator that ingests GHSA, NVD, PyPA Advisory DB, and others. The free POST /v1/query endpoint takes a package coordinate and returns the list of vulnerabilities with severity scores. We parse out the highest CVSS base score and let GlobalPolicies.cve_blocks decide.

query_osv async

query_osv(package: str, version: Optional[str], osv_url: str, http_client: AsyncClient) -> Optional[list[dict]]

Return raw vulns list from OSV, or None on error/timeout.

Source code in src/lex_align_server/cve.py
async def query_osv(
    package: str,
    version: Optional[str],
    osv_url: str,
    http_client: httpx.AsyncClient,
) -> Optional[list[dict]]:
    """Return raw `vulns` list from OSV, or None on error/timeout."""
    payload: dict = {
        "package": {"name": package, "ecosystem": "PyPI"},
    }
    if version:
        payload["version"] = version
    try:
        resp = await http_client.post(osv_url, json=payload)
    except (httpx.HTTPError, httpx.TimeoutException) as exc:
        logger.warning("osv: query for %s@%s failed: %s", package, version, exc)
        return None
    if resp.status_code != 200:
        logger.warning("osv: %s returned %s", package, resp.status_code)
        return None
    try:
        data = resp.json()
    except ValueError:
        return None
    vulns = data.get("vulns") or []
    return [v for v in vulns if isinstance(v, dict)]

resolve_cves async

resolve_cves(package: str, version: Optional[str], cache: JsonCache, cache_ttl: int, osv_url: str, http_client: AsyncClient) -> CveInfo

Cache-or-fetch CVE data for a package@version.

A None version is cached separately from versioned queries — OSV's behaviour with no version is documented as "all versions" so we treat it as a distinct cache entry.

Source code in src/lex_align_server/cve.py
async def resolve_cves(
    package: str,
    version: Optional[str],
    cache: JsonCache,
    cache_ttl: int,
    osv_url: str,
    http_client: httpx.AsyncClient,
) -> CveInfo:
    """Cache-or-fetch CVE data for a package@version.

    A None version is cached separately from versioned queries — OSV's
    behaviour with no version is documented as "all versions" so we treat it
    as a distinct cache entry.
    """
    cache_key = f"cve:{package.lower()}@{version or 'latest'}"
    cached = await cache.get(cache_key)
    if cached is not None:
        try:
            return CveInfo.from_dict(cached)
        except Exception:
            pass

    vulns = await query_osv(package, version, osv_url, http_client)
    if vulns is None:
        # Treat OSV outage as "no known CVEs" but do NOT cache the negative
        # result — we want a fresh attempt next time.
        return CveInfo(ids=[], max_score=None, raw_count=0)

    info = _summarize_vulns(vulns)
    await cache.set(cache_key, info.to_dict(), cache_ttl)
    return info

Audit log

audit

SQLite audit log + approval-request store.

Two tables:

  • audit_log — one row per /evaluate call. Backs the legal report (license-driven denials) and security report (CVE-driven denials).
  • approval_requests — one row per /approval-requests POST. Phase 3 will attach a PR-creation workflow; for now we just persist them.

We use plain aiosqlite rather than an ORM. The schema is small and the report queries are easier to read as straight SQL.

AuditStore

AuditStore(db_path: Path)
Source code in src/lex_align_server/audit.py
def __init__(self, db_path: Path):
    self._db_path = db_path

mark_approved_by_package async

mark_approved_by_package(normalized_name: str) -> int

Move every PENDING_REVIEW request for normalized_name to APPROVED.

Called when an operator classifies a pending package via the dashboard and adds it to the in-memory registry. Returns the number of rows flipped, which the dashboard can display in its toast.

Source code in src/lex_align_server/audit.py
async def mark_approved_by_package(self, normalized_name: str) -> int:
    """Move every PENDING_REVIEW request for `normalized_name` to APPROVED.

    Called when an operator classifies a pending package via the dashboard
    and adds it to the in-memory registry. Returns the number of rows
    flipped, which the dashboard can display in its toast.
    """
    from .registry import normalize_name as _norm
    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        cur = await db.execute(
            "SELECT id, package FROM approval_requests WHERE status = ?",
            (APPROVAL_PENDING,),
        )
        rows = [dict(r) for r in await cur.fetchall()]
        await cur.close()
        ids = [r["id"] for r in rows if _norm(r["package"]) == normalized_name]
        if not ids:
            return 0
        placeholders = ",".join("?" * len(ids))
        await db.execute(
            f"UPDATE approval_requests SET status = ? WHERE id IN ({placeholders})",
            [APPROVAL_APPROVED, *ids],
        )
        await db.commit()
    return len(ids)

list_pending_by_package async

list_pending_by_package() -> list[dict]

All PENDING_REVIEW approval requests grouped by package.

The dashboard uses this to surface "things developers asked for" as a triage queue. Multiple requests for the same package (across projects/requesters) collapse into a single row carrying the count, the most recent rationale, and the most recent timestamp. Callers are expected to further filter against the live registry.

Source code in src/lex_align_server/audit.py
async def list_pending_by_package(self) -> list[dict]:
    """All PENDING_REVIEW approval requests grouped by package.

    The dashboard uses this to surface "things developers asked for" as a
    triage queue. Multiple requests for the same package (across
    projects/requesters) collapse into a single row carrying the count,
    the most recent rationale, and the most recent timestamp. Callers
    are expected to further filter against the live registry.
    """
    # Imported lazily to avoid a circular import at module load.
    from .registry import normalize_name

    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        cur = await db.execute(
            """SELECT package, rationale, project, requester, ts
               FROM approval_requests
               WHERE status = ?
               ORDER BY ts DESC""",
            (APPROVAL_PENDING,),
        )
        rows = [dict(r) for r in await cur.fetchall()]
        await cur.close()

    grouped: dict[str, dict] = {}
    for r in rows:
        key = normalize_name(r["package"])
        entry = grouped.get(key)
        if entry is None:
            grouped[key] = {
                "package": r["package"],
                "normalized_name": key,
                "latest_rationale": r["rationale"],
                "latest_ts": r["ts"],
                "latest_project": r["project"],
                "latest_requester": r["requester"],
                "request_count": 1,
            }
        else:
            entry["request_count"] += 1
            # Rows are ordered DESC by ts so the first one wins for "latest".
    return list(grouped.values())

agents_report async

agents_report(project: Optional[str] = None) -> dict[str, Any]

Aggregate evaluations by (agent_model, agent_version).

Powers the "agents" dashboard, so operators can see exactly which Claude (or other agent) version is making which kinds of requests. Rows where the agent is unknown collapse into one bucket.

Source code in src/lex_align_server/audit.py
async def agents_report(self, project: Optional[str] = None) -> dict[str, Any]:
    """Aggregate evaluations by (agent_model, agent_version).

    Powers the "agents" dashboard, so operators can see exactly which
    Claude (or other agent) version is making which kinds of requests.
    Rows where the agent is unknown collapse into one bucket.
    """
    where = ""
    params: list[Any] = []
    if project:
        where = " WHERE project = ?"
        params.append(project)

    async with aiosqlite.connect(self._db_path) as db:
        db.row_factory = aiosqlite.Row
        agg_cur = await db.execute(
            f"""SELECT agent_model, agent_version,
                      COUNT(*) AS evaluations,
                      SUM(CASE WHEN verdict = ? THEN 1 ELSE 0 END) AS denials,
                      SUM(CASE WHEN verdict = ? THEN 1 ELSE 0 END) AS provisional,
                      MAX(ts) AS last_seen
               FROM audit_log{where}
               GROUP BY agent_model, agent_version
               ORDER BY evaluations DESC""",
            [VERDICT_DENIED, VERDICT_PROVISIONALLY_ALLOWED, *params],
        )
        agents = [dict(r) for r in await agg_cur.fetchall()]
        await agg_cur.close()

        recent_cur = await db.execute(
            f"""SELECT id, ts, project, requester, package, version, verdict,
                      denial_category, reason, agent_model, agent_version
               FROM audit_log{where}
               ORDER BY ts DESC LIMIT 50""",
            params,
        )
        recent = [dict(r) for r in await recent_cur.fetchall()]
        await recent_cur.close()
    return {
        "project": project,
        "agents": agents,
        "recent": recent,
    }

Caching

cache

Redis-backed JSON cache.

Used by the license and CVE adapters to avoid hammering PyPI/OSV. We intentionally swallow connection errors and degrade to "no cache" so the server keeps serving when Redis is unreachable — the audit log still records every decision.

Authentication

auth

Auth dependencies.

When AUTH_ENABLED=false (default) every request is treated as anonymous. When AUTH_ENABLED=true the bearer token is required but the validation/key storage is intentionally a Phase-3 stub. Wire-up exists so callers don't need to change when org-mode lands.

AgentInfo dataclass

AgentInfo(model: Optional[str] = None, version: Optional[str] = None)

Agent identity reported by the client.

Both fields are optional; when the client doesn't send the headers, both are None and reports group those rows under an "(unknown agent)" bucket.