# In-Process Host API

This is the API an out-of-tree Python program (an orchestrator, a custom
CLI, an IDE plug-in) uses to drive Attractor in the same process.

The bundled `attractor.cli` is one consumer of this surface. The
contract is defined in [SPEC §12.1](../SPEC.md#121-in-process-host-api):
names re-exported from the top-level `attractor` package are stable;
anything reachable only via a submodule path is internal.

## What to import for what

| Need | Symbols |
| ---- | ------- |
| Launch a workflow artifact | `Engine.launch`, `RunSession`, `InputCopy` |
| Construct + drive a graph directly | `Engine`, `RunStatus`, `InputCopy` |
| Drive a validated graph with a live handle | `Engine.start`, `RunSession` |
| Parse + validate a workflow | `parse`, `validate`, `ValidGraph`, `ParseError`, `ValidationFailed` |
| Observe progress | `EventCallback`, `EngineEvent`, `RunStarted`, `NodeStarted`, `NodeOutcome`, `HumanGate`, `Checkpointed`, `RunCompleted`, `Aborted` |
| Inspect runs after the fact | `RunHandle`, `RunSummary`, `NodeRecord` |
| Handle errors | `EngineError` (base), `UnknownRun`, `NotPausedAtGate`, `UnknownChoice`, `WorkflowNotValid`, `AbortNotPaused` |

All of these are reachable as `from attractor import X`. If you find
yourself reaching into `attractor.engine.journal` or
`attractor.workflow.stylesheet`, you've crossed into internals — usable,
but expect breakage between minor versions.

## Minimal driver

The engine writes refs under `refs/heads/attractor/` in the repository
at `repo_root`, and creates a per-run worktree under
`<repo_root>/.attractor/worktrees/<run-id>/`. Run it inside a git
repository.

<!-- [doc->REQ-LAUNCH-PATH-PREFLIGHT] -->
For hosts that launch a workflow file, prefer `Engine.launch`. It
reads/parses/validates the DOT artifact, checks launch inputs and the
base ref, creates an addressable run, and returns a `RunSession` whose
`run_id` is the launch receipt.

```python
import asyncio
from pathlib import Path

from attractor import (
    Engine,
    EngineEvent,
    HumanGate,
    NodeOutcome,
    RunStarted,
    RunStatus,
)


async def main() -> None:
    engine = Engine(Path.cwd())

    run_id: str | None = None

    def on_event(ev: EngineEvent) -> None:
        nonlocal run_id
        match ev:
            case RunStarted(run_id=rid):
                run_id = rid
                print(f"run started: {rid}")
            case NodeOutcome(node_id=node, success=ok):
                print(f"  {node}: {'ok' if ok else 'fail'}")
            case HumanGate(node_id=node, prompt=prompt, choices=choices):
                print(f"  paused at {node}: {prompt!r} (choices: {choices})")
            case _:
                pass

    session = await engine.launch(Path("workflow.dot"), events=on_event)
    print(f"run id: {session.run_id}")
    status = await session.wait()
    print(f"final status: {status}")

    # If the workflow had a human gate, status is PAUSED. Respond and
    # resume — the engine returns again when the run terminates or pauses
    # again.
    while status is RunStatus.PAUSED:
        assert run_id is not None
        await engine.respond(run_id, choice="continue")
        status = await engine.resume(run_id, events=on_event)
        print(f"after resume: {status}")


asyncio.run(main())
```

## Live handle: `Engine.start` + `RunSession`

`Engine.run` blocks until the run terminates or pauses; the only way
to learn the `run_id` is to capture it from the first `RunStarted`
event. For an orchestrator that wants to address the run *while it's
in flight* — correlate logs, query progress via `engine.show()`,
display a cancel UI — use `Engine.start` instead. It returns a
`RunSession` whose `run_id` is set the moment `start` awaits:

```python
import asyncio
from pathlib import Path

from attractor import Engine, RunStatus, parse, validate

DOT = """
digraph hello {
    start [shape=Mdiamond];
    greet [shape=parallelogram, script="echo hello"];
    finish [shape=Msquare];
    start -> greet -> finish;
}
"""


async def main() -> None:
    graph = validate(parse(DOT))
    engine = Engine(Path.cwd())

    session = await engine.start(graph)
    print(f"run started: {session.run_id}")

    # The run_id is queryable RIGHT NOW — the worktree exists, the
    # RunInitialized journal entry is written, and traversal is
    # making progress as a background task.
    summary = engine.show(session.run_id)
    print(f"current node: {summary.current_node}")

    # Block on the terminal status when ready.
    status: RunStatus = await session.wait()
    print(f"final status: {status}")


asyncio.run(main())
```

`engine.run(graph)` is exactly equivalent to
`await (await engine.start(graph)).wait()` — pick whichever frames
the call site better.

### Cancelling an in-flight run

`await session.wait()` propagates `CancelledError` if the awaiter is
cancelled, but does NOT stop the background task — the run continues
to completion. To actually stop an in-flight run, call
`session.cancel()`:

```python
session = await engine.start(graph)
# ... later, from another coroutine / signal handler / UI button ...
session.cancel()
try:
    await session.wait()
except asyncio.CancelledError:
    pass  # run stopped cleanly
```

`cancel()` is synchronous and returns `True` if cancellation was
scheduled (`False` if the run already terminated). The cancellation is
delivered to the traversal at its next `await` point — typically
inside the agent SDK, the subprocess wait, or a journal write.

Recovery is uniform with a hard kill. Call
`engine.resume(session.run_id)` after a cancel to pick up from the
last checkpoint per SPEC §6.4 — between-node cancels resume at the
next node (case a), mid-node cancels re-enter the same node with the
worktree as-is (case b). Paused runs (`RunStatus.PAUSED`) are a
different shape: use `engine.abort_run(run_id)` to transition them
cleanly to `INCOMPLETE` if you don't intend to respond.

## Events are a synchronous callback

`EventCallback` is `Callable[[EngineEvent], None]` — no `async`. The
engine invokes it inline during traversal; keep the callback cheap
(append to a list, push onto a queue, log a line).

If your orchestrator wants async iteration, wrap the callback into your
own `asyncio.Queue` and bridge yourself — there's no buffering or
backpressure baked into the engine on purpose. SPEC §6 documents the
choice; a queue/iterator surface may layer on later without breaking
this callback shape.

## Concurrent runs in one process

The engine is intentionally stateless across `run()` / `resume()`
calls. One `Engine(repo_root)` instance can drive many concurrent runs
in the same repo — each one keys off its own state branch, worktree
branch, and worktree directory under `.attractor/worktrees/<run-id>/`.

Don't share a single `Engine` instance across threads, though: the
underlying `pygit2.Repository` is not thread-safe. Stick to one event
loop, or one engine per process.

## Inspection

```python
for handle in engine.list():
    print(handle.run_id, handle.status, handle.worktree_path, handle.started_at)

summary = engine.show("<run-id>")
print(summary.status, summary.current_node, summary.last_updated_at)
for record in summary.history:
    print(record.node_id, record.visit, record.success, record.duration_ms)
```

`list` enumerates live worktree-backed runs (COMPLETED runs whose
worktree dir was removed don't appear); `show` reads the persisted
journal and works for any run with a state branch, alive or finished.

## Testing your orchestrator

For your orchestrator's own test suite, `attractor.testing` exports a
helper that does the git-repo setup work without making you replicate
the `pygit2` init dance:

```python
from pathlib import Path

from attractor import Engine
from attractor.testing import create_test_repo


def test_my_orchestrator(tmp_path: Path) -> None:
    repo = create_test_repo(tmp_path / "repo")
    engine = Engine(repo)
    # ... drive engine, assert outcomes ...
```

`create_test_repo(path, *, initial_branch="main")` returns the path
after:

1. Creating a non-bare git repo at `path` (errors if `path` exists).
2. Writing a generic test identity to local config.
3. Forcing `commit.gpgsign` / `tag.gpgsign` to `false` locally so
   signing failures in CI runners or sandboxes without a key don't
   break tests.
4. Committing a one-file root commit on `initial_branch` so
   `Engine.run` has a `base_ref` to seed worktrees from.

**`attractor.testing` is explicitly NOT part of the stable host API
contract.** Symbols there aren't in `attractor.__all__` and may
change in any release. Production code (your orchestrator's actual
runtime, not its tests) must never import it — the host is
responsible for the user's real git repo and shouldn't be forcing
their gitconfig.

## Error handling

All engine-raised exceptions inherit from `EngineError`. Pattern-match
on the specific subclass when the cause matters:

```python
from attractor import EngineError, UnknownChoice, UnknownRun

try:
    await engine.respond(run_id, choice="proceed")
except UnknownChoice as exc:
    print(f"choice not in gate's edges: {exc}")
except UnknownRun:
    print("run not found in this repo")
except EngineError as exc:
    print(f"engine refused the request: {exc}")
```

`WorkflowNotValid` fires if you pass `Engine.run` something that isn't
a `ValidGraph`. Always pipe `parse() → validate()` first; the type
system already enforces this at the boundary.
