# admin_api — Worker Processes

The `apps/admin_api` service hosts two spawn patterns for long-running jobs that can't run inline on an HTTP request: **registered worker threads** (driven by a generic task runner) and **standalone worker processes** (single-file workers with their own lifecycle). Both ride on Node's `worker_threads`, both are tracked in a shared `activeWorkers` map, and both notify Discord on completion.

This page is the authoritative list of those workers and the data each one touches. If you're adding a new background job, register it here.

**Parent service:** [services/admin-api.md](./admin-api.md).
**Entry files:**
- [apps/admin_api/admin_api.ts](../../apps/admin_api/admin_api.ts):236 — `startTaskWorker`
- [apps/admin_api/admin_api.ts](../../apps/admin_api/admin_api.ts):255 — `startWorkerProcess`
- [apps/admin_api/worker_tasks.ts](../../apps/admin_api/worker_tasks.ts) — `WORKER_TASKS` registry
- [apps/admin_api/task_runner.ts](../../apps/admin_api/task_runner.ts) — generic task dispatcher

## Worker Architecture

```mermaid
flowchart TB
    subgraph AdminProcess["apps/admin_api process"]
        ENDPOINT[Admin-API route<br/>e.g. POST /admin/shop/worker/checklist]
        subgraph Spawn["Spawn helpers"]
            STW[startTaskWorker<br/>taskName, req]
            SWP[startWorkerProcess<br/>file, req]
        end
        REG[activeWorkers map<br/>threadId → WorkerInfo]
        SETUP[setupWorkerTracking<br/>60s auto-expire after finish]
        STATUS[GET /admin/workers/status]
    end

    subgraph TaskWorker["Worker thread: task_runner.js"]
        TR[task_runner.ts]
        LOOKUP{WORKER_TASKS<br/>taskName}
        METHOD[Invoke<br/>apiModule.method request]
    end

    subgraph CustomWorker["Worker thread: custom file"]
        CW[e.g. kb_sync_worker.js]
    end

    subgraph Discord["Discord"]
        DISC[DiscordLogger.info<br/>on completion]
    end

    ENDPOINT --> STW
    ENDPOINT --> SWP
    STW --> TaskWorker
    SWP --> CustomWorker
    STW --> SETUP
    SWP --> SETUP
    SETUP --> REG
    TaskWorker --> LOOKUP
    LOOKUP --> METHOD
    TaskWorker -->|parentPort.postMessage| SETUP
    CustomWorker -->|parentPort.postMessage| SETUP
    SETUP --> DISC
    REG --> STATUS
```

All workers:

- Run in a separate V8 isolate (Node `worker_threads`), so blocking work doesn't stall the Express event loop.
- Receive a cleaned-up copy of the originating request (`cleanReqForWorker` — strips unserialisable fields but keeps `currentAccount`, `verifiedAccessToken`, etc.).
- Are tracked with `{ name, startedBy, startedAt, status, completedAt, durationMs }`.
- On success or failure, notify Discord (via `DiscordLogger.Source.Fabricator`) and self-terminate; their registry entry lingers 60 s so the UI can still surface "recently finished" jobs.
- Are surfaced to Arda via `GET /admin/workers/status` (requires `Fabricator`, `FabricatorReadOnly`, or `Admin` policy).

## Registered Worker Tasks (`WORKER_TASKS`)

Every task registered in [worker_tasks.ts](../../apps/admin_api/worker_tasks.ts) shares the `task_runner.ts` dispatcher. Adding a new one means one entry in `WORKER_TASKS` plus one HTTP endpoint that calls `startTaskWorker("<name>", req)`.

| Task key | Display name | Module / Method | Data it changes |
|----------|--------------|-----------------|-----------------|
| `checklist` | Big Order Checklist | `FabricatorAdminApi.runBigOrderChecklist` | Postgres `big_orders.checklistResult` |
| `orders_sync` | Sync Shopify Orders | `FabricatorAdminApi.syncBigOrders_DEPRECATED` | **Deprecated** — kept for compatibility |
| `sync_all_big_orders` | Sync All Big Orders | `FabricatorAdminApi.worker_syncAllBigOrders` | Postgres `big_orders` (line items, shipment groups, line item summaries) |
| `shipper_sync` | Shipper Sync | `ShippingAdminApi.worker_findShippableBigOrders` | Redis `worker_process:readyToShipOrderIds` |
| `shippo_analytics` | Shippo Analytics Cache | `ShippingAdminApi.worker_cacheShippoAnalytics` | Redis `worker_process:shippoTransactions`, `worker_process:shippoAnalytics` |

### `checklist` — Big Order Checklist

Runs `FabricatorAdminApi.runBigOrderChecklist` ([FabricatorAdminApi.ts](../../api/src/fabricator/FabricatorAdminApi.ts):1651).

Queries every BigOrder in the non-terminal states (`Unknown`, `WaitingForScanRequest`, `WaitingForManufacturing`, `WaitingForInventory`, `ReadyToShip`, `Error`). For each, computes the "next action" and — if it changed — updates the stored `checklistResult`. Processing happens in batches of 10.

**Mutates:**

- Postgres `big_orders.checklistResult` (JSON column) — each row gets a refreshed `{ nextAction, ... }` payload.
- Possibly triggers automatic state transitions when the next action is safe to auto-execute.

**Sends a Discord summary** with the number of orders whose checklist changed.

### `orders_sync` — DEPRECATED

The original Shopify-sync path. Kept in the registry for backward compatibility with looper cron jobs that still reference `orders_sync`. Use `sync_all_big_orders` for new work.

### `sync_all_big_orders` — Sync All Big Orders

Runs `FabricatorAdminApi.worker_syncAllBigOrders` ([FabricatorAdminApi.ts](../../api/src/fabricator/FabricatorAdminApi.ts):2244).

Acquires the Redis lock `syncBigOrders` (prevents multiple instances running simultaneously). Iterates all BigOrders in the non-terminal states and, for each, pulls the matching **cached Shopify order** and runs `syncBigOrderInternal`. Every sync is categorised as one of:

- `LineItemSummaryMissing` → BigOrder needed its line-item summary repopulated
- `LineItemTypeUpdateNeeded` → line-item types got re-derived
- `ShipmentGroupUpdateNeeded` → shipment group membership changed
- `LineItemUpdateNeeded` → individual line items were mutated

**Mutates:**

- Postgres `big_orders` and related Fabricator tables (`line_items`, `line_item_summaries`, `shipment_groups` — names reflect the FabricatorSchemas; actual table names live in [FabricatorDatabase.ts](../../api/src/fabricator/FabricatorDatabase.ts)).
- **Does not** write Shopify — it syncs *from* the cached Shopify order only.

Caps at `MAX_ORDER_COUNT = 1000` per run; releases the Redis lock on exit.

### `shipper_sync` — Shipper Sync

Runs `ShippingAdminApi.worker_findShippableBigOrders` ([ShippingAdminApi.ts](../../api/src/fabricator/ShippingAdminApi.ts):5087).

Finds Shopify-origin BigOrders in `WaitingForInventory` or `ReadyToShip` states. For each, walks its shipment groups and decides which one (if any) should ship now — producing a "winning shipment group id" per ready order. Processing is batched at 250 per batch.

**Mutates:**

- Redis `worker_process:readyToShipOrderIds` — a hash of `{ bigOrderId → shipmentGroupId }` that Arda's BigShipper UI reads to show the "next to ship" queue. The hash is rewritten (`del` + `hset`) each run.

Does **not** create shipping labels itself — downstream steps (BigShipper UI → operator action → label generation endpoints) do that.

### `shippo_analytics` — Shippo Analytics Cache

Runs `ShippingAdminApi.worker_cacheShippoAnalytics` ([ShippingAdminApi.ts](../../api/src/fabricator/ShippingAdminApi.ts):5236).

Two-phase:

1. Refresh the raw transaction cache (`worker_cacheShippoTransactions`) — pages through Shippo's API and stores every tracked transaction in Redis.
2. Compute analytics — status breakdown, 90-day time-series, and summary totals — from the cached transactions.

**Mutates:**

- Redis `worker_process:shippoTransactions` (hash keyed by tracking number; JSON-serialised transaction payloads).
- Redis `worker_process:shippoAnalytics` (single JSON blob with 1 h TTL; read by Arda's Shippo analytics panel).

The analytics page in Arda reads straight from Redis, so the freshness bound is "this worker's last run".

## Standalone Worker Processes (`startWorkerProcess`)

Workers that need custom startup (their own dependencies, their own lifecycle) live as standalone `.ts` files beside `admin_api.ts`. They are still `worker_threads`, still tracked, still Discord-notified — but they don't flow through `task_runner.ts`.

| File | Endpoint | Purpose | Data it changes |
|------|----------|---------|-----------------|
| [`kb_sync_worker.ts`](../../apps/admin_api/kb_sync_worker.ts) | `GET /admin/kb/sync` | GitHub → KB ingestion (commits, wikis) | Postgres `pgvector` tables: KB documents + embeddings |

### `kb_sync_worker`

Triggered by `GET /admin/kb/sync` ([admin_api.ts](../../apps/admin_api/admin_api.ts):1187) — returns `200 { status: "running" }` immediately while the worker runs in the background. Uses dynamic imports to load KB code only when running (keeps the admin-API boot light).

Reads config via `getGitHubConfigFromEnv()` (requires `GITHUB_APP_ID`, `GITHUB_APP_PRIVATE_KEY`, `GITHUB_APP_INSTALLATION_ID`), instantiates `GitHubIngestionService`, and runs a full sync.

**Mutates:**

- KB Postgres / pgvector tables owned by [apps/kb](./kb.md) — documents, chunks, embeddings. The service itself is gated on the `FabricatorReadOnly` or `Admin` policy because it's the same admin API.

This is the only `startWorkerProcess` caller today. The pattern is available for future workers that need heavier dependencies than the generic `task_runner` setup.

## Other `apps/admin_api` Scripts (not workers)

These live in the same folder but are **not** worker threads — they are their own entry points, started manually or by external tooling. Documenting them here for completeness since they ship alongside the admin-api package.

| Script | How it runs | What it does |
|--------|-------------|--------------|
| [`looper.ts`](../../apps/admin_api/looper.ts) | Standalone Node process (cron-style) | Logs in via `/auth/login`, then periodically hits `/admin/shop/orders/data` and `/admin/shop/orders/checklist` over HTTP. It's the external trigger that kicks off the `checklist` worker on a schedule. |
| [`discord_bot.ts`](../../apps/admin_api/discord_bot.ts) | Standalone Node process | Discord bot that answers questions via the KB (`hybridSearch`, `rerankWithCohere`, `generateResponse`). Separate lifecycle from admin_api's HTTP server. |
| [`beyond_db_migrate.ts`](../../apps/admin_api/beyond_db_migrate.ts) | One-shot script | Schema migrations for Fabricator tables. `ALTER TABLE … ADD COLUMN IF NOT EXISTS …` style. Run by hand / as part of deploys. |
| [`kb_db_setup.ts`](../../apps/admin_api/kb_db_setup.ts) | One-shot script | Creates the KB Postgres tables + pgvector extension. |
| [`check_kb.ts`](../../apps/admin_api/check_kb.ts) | One-shot script | KB diagnostic — verifies ingested content is searchable. |
| [`reingest_github_commits.ts`](../../apps/admin_api/reingest_github_commits.ts) | One-shot script | Force-reingests GitHub commits into the KB after a schema or chunking change. |
| [`reporting.ts`](../../apps/admin_api/reporting.ts) | Module (imports as lib) | Exports helpers used by reporting endpoints (entitlement/SKU snapshot lookups). |
| [`shippo_analytics.ts`](../../apps/admin_api/shippo_analytics.ts) | Module (imports as lib) | Exports the `ShippoAnalytics` class used by the `shippo_analytics` worker. |

## Redis Keys Used by Workers

Quick reference, since these are the only mutable state surfaces outside Postgres / DynamoDB that workers touch directly:

| Redis key | Type | TTL | Owner | Purpose |
|-----------|------|-----|-------|---------|
| `worker_process:readyToShipOrderIds` | hash | none (rewritten each run) | `shipper_sync` | BigOrders ready to ship, keyed by id → winning shipment group |
| `worker_process:shippoTransactions` | hash | none (rewritten each run) | `shippo_analytics` | Shippo transaction payloads keyed by tracking number |
| `worker_process:shippoAnalytics` | string (JSON) | 1 h | `shippo_analytics` | Computed analytics blob — status breakdown, time series, summary |
| `lock:syncBigOrders` | redlock | 5 s (default) | `sync_all_big_orders` | Serialise concurrent big-order syncs |

## Adding a New Worker

For tasks that fit the generic pattern (call method X on API module Y with the request payload):

1. Add a method on `FabricatorAdminApi` or `ShippingAdminApi` (or expand `API_MODULES` in [task_runner.ts](../../apps/admin_api/task_runner.ts) if it needs a new module).
2. Register it in `WORKER_TASKS` in [worker_tasks.ts](../../apps/admin_api/worker_tasks.ts).
3. Add an HTTP endpoint in [admin_api.ts](../../apps/admin_api/admin_api.ts) that calls `startTaskWorker("<your-key>", req)`.

For tasks that need custom bootstrapping (heavy deps, dynamic imports, non-standard lifecycle):

1. Create `apps/admin_api/<your_worker>.ts` that imports `parentPort, workerData` from `worker_threads`.
2. Add an endpoint that calls `startWorkerProcess("./<your_worker>.js", req)`.

Either way, the worker is automatically picked up by `GET /admin/workers/status` and participates in the Discord-notification + auto-cleanup lifecycle.

## Further reading

- Parent service → [services/admin-api.md](./admin-api.md)
- Fabricator handler detail → [libraries/api-handlers.md](../libraries/api-handlers.md)
- KB ingestion pipeline (destination of `kb_sync_worker`) → [services/kb.md](./kb.md)
