diff --git a/AGENTS.md b/AGENTS.md index 65842f8..279b3f9 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -6,6 +6,8 @@ Harness-independent core for an agent ecosystem (messaging, live-agent lifecycle For maintainer/debug update work, also read `docs/DEBUG-ROLLOUT.md` before touching the update-set or rollout path. +For work needing Linux tests and proof, this machine is authorized to ssh into a Linux box: `reavus@kitsubito` + ## Requirement traceability (binding) This project uses `traceable-reqs` (`traceable-reqs.toml` = the authoritative `REQ-*` registry). The full contract is `docs/TRACEABILITY.md`. The rules you must follow: @@ -25,5 +27,7 @@ This project uses `traceable-reqs` (`traceable-reqs.toml` = the authoritative `R - Docs are dual-audience (human + AI dev-agent) per `docs/DOCS-STRATEGY.md`; doc generation is CI-gated against drift. - Commit messages end with the project's Co-Authored-By trailer. -If you finish a significant body of work without need for user intervention, or if your context gets too high, prepare for the next session: +If your context gets too high, prepare for the next session: - Create a JIT plan for the next immediate body of work, if it isn't already planned +- /commune with immediate next steps and broad summary of the project's status + end goal +- prompt the operator to /clear into a new session. you will resume there \ No newline at end of file diff --git a/crates/spt-daemon/src/broker.rs b/crates/spt-daemon/src/broker.rs index 87cf45f..21e451f 100644 --- a/crates/spt-daemon/src/broker.rs +++ b/crates/spt-daemon/src/broker.rs @@ -68,6 +68,22 @@ use crate::nethost::{NetHost, NET_EFFECT_SESSION}; use crate::translation::{key_to_bytes, InjectFloor, KeyCmd, ToBinary, TranslationChild}; use crate::transport::{recv_hello, DaemonTransport, LocalSocketTransport}; +// ── DIAGNOSTIC-ONLY: sustained-input-wedge instrumentation (SPT_WEDGE_TRACE=1) ── +// Off by default; behaviour-neutral; never ships a logic change. Measures whether +// a concurrent attach's `subscribe` starves on the OutputLog mutex behind the +// drain thread's flood-output `append` (REQ-HAZARD-EFFECT-JOURNAL-PTY-WEDGE / +// REQ-HAZARD-PTY-INPUT-WRITER-WEDGE root-cause hunt). Run the inject_control_wedge +// gates on a forkpty box with the env set and read the WEDGE_TRACE lines. +fn wedge_trace_on() -> bool { + use std::sync::OnceLock; + static ON: OnceLock = OnceLock::new(); + *ON.get_or_init(|| std::env::var_os("SPT_WEDGE_TRACE").is_some()) +} +/// Drain-thread OutputLog-lock pressure counters (only touched when tracing). +static WT_DRAIN_APPENDS: AtomicU64 = AtomicU64::new(0); +static WT_DRAIN_WAIT_US: AtomicU64 = AtomicU64::new(0); +static WT_DRAIN_WAIT_MAX_US: AtomicU64 = AtomicU64::new(0); + /// The shared, serialized send side of one brain connection. Output frames, /// replay frames, command acks, and exit/error events all write through this. /// `pub(crate)`: the net host's stream logs (D4b) live-send through the same @@ -1241,8 +1257,24 @@ impl Broker { let (mut recv, send) = conn.split(); let send: SharedSend = Arc::new(Mutex::new(send)); + // DIAGNOSTIC (SPT_WEDGE_TRACE): when does THIS conn's handler thread start + + // clear the handshake, vs flood progress (drain appends)? If the attacher's + // conn_open/conn_hello only print near flood-end, the SERVE accept loop or the + // handshake is starved behind the flood — that is the head-of-line point. + if wedge_trace_on() { + eprintln!( + "WEDGE_TRACE conn_open drain_appends={}", + WT_DRAIN_APPENDS.load(Ordering::Relaxed) + ); + } // Version handshake: this connection must be a brain (2.3). recv_hello(&mut recv, Role::Brain)?; + if wedge_trace_on() { + eprintln!( + "WEDGE_TRACE conn_hello drain_appends={}", + WT_DRAIN_APPENDS.load(Ordering::Relaxed) + ); + } let mut my_subs: Vec = Vec::new(); let mut my_stream_subs: Vec = Vec::new(); @@ -1254,6 +1286,18 @@ impl Broker { // Any read failure (EOF included) ends the connection. Err(_) => break, }; + // DIAGNOSTIC (SPT_WEDGE_TRACE): a subscribe FRAME was just read off this + // conn — at what flood progress? read-late (≈flood-end appends) means this + // handler thread was starved from reading for the whole flood (the HOL). + if wedge_trace_on() + && (env.kind == KIND_SUBSCRIBE || env.kind == KIND_NET_STREAM_SUBSCRIBE) + { + eprintln!( + "WEDGE_TRACE conn_read kind={} drain_appends_at_read={}", + env.kind, + WT_DRAIN_APPENDS.load(Ordering::Relaxed) + ); + } match env.kind.as_str() { KIND_SPAWN => match self.dispatch_spawn(env, &send) { Ok(id) => my_subs.push(id), @@ -1448,7 +1492,27 @@ impl Broker { let log_drain = Arc::clone(&log); let drain = session .drain(move |chunk| { - let job = log_drain.lock().unwrap().append(chunk); + // DIAGNOSTIC (SPT_WEDGE_TRACE): time how long THIS drain thread waits + // to (re)acquire the OutputLog mutex per flood-output chunk, and how + // often — the pressure side of the subscribe-starvation hypothesis. + let job = if wedge_trace_on() { + let t0 = std::time::Instant::now(); + let mut g = log_drain.lock().unwrap(); + let w = t0.elapsed().as_micros() as u64; + let n = WT_DRAIN_APPENDS.fetch_add(1, Ordering::Relaxed) + 1; + WT_DRAIN_WAIT_US.fetch_add(w, Ordering::Relaxed); + WT_DRAIN_WAIT_MAX_US.fetch_max(w, Ordering::Relaxed); + if n % 2000 == 0 { + eprintln!( + "WEDGE_TRACE drain appends={n} acquire_wait_total_us={} acquire_wait_max_us={}", + WT_DRAIN_WAIT_US.load(Ordering::Relaxed), + WT_DRAIN_WAIT_MAX_US.load(Ordering::Relaxed), + ); + } + g.append(chunk) + } else { + log_drain.lock().unwrap().append(chunk) + }; if let Some(job) = job { if let Err(epoch) = job.deliver() { log_drain.lock().unwrap().mark_controller_gone(epoch); @@ -1574,20 +1638,58 @@ impl Broker { // post-removal subscribe gets a prompt "no such session" error — both // non-silent. The rc-side status-gate (a) + first-event backstop (b) cover // the rest. + // DIAGNOSTIC (SPT_WEDGE_TRACE): this is the concurrent attach's subscribe. + // Time the two locks it must take — the brief `sessions` map lock (suspect c, + // shared with input/drain) and then the OutputLog mutex (suspect b, held by + // the drain per flood-output chunk) — plus the in-lock resolve+replay. If + // `log_lock_wait_us` blows up while `drain_appends_so_far` is large, the + // subscribe is starving on the OutputLog mutex behind the flood drain. + let traced = wedge_trace_on(); + let sid = req.session_id; let log = { + let ts = std::time::Instant::now(); let sessions = self.sessions.lock().unwrap(); + if traced { + eprintln!( + "WEDGE_TRACE subscribe sid={sid} sessions_lock_wait_us={}", + ts.elapsed().as_micros() + ); + } let h = sessions .get(&req.session_id) .ok_or_else(|| format!("no such session {}", req.session_id))?; Arc::clone(&h.log) }; - let outcome = log.lock().unwrap().resolve_subscribe( - Arc::clone(send), - req.from_seq, - req.intent, - req.by, - ); + let outcome = if traced { + let t0 = std::time::Instant::now(); + let mut g = log.lock().unwrap(); + let log_lock_wait_us = t0.elapsed().as_micros(); + let t1 = std::time::Instant::now(); + let o = g.resolve_subscribe(Arc::clone(send), req.from_seq, req.intent, req.by); + drop(g); + eprintln!( + "WEDGE_TRACE subscribe sid={sid} log_lock_wait_us={log_lock_wait_us} \ + resolve_us={} drain_appends_so_far={}", + t1.elapsed().as_micros(), + WT_DRAIN_APPENDS.load(Ordering::Relaxed), + ); + o + } else { + log.lock().unwrap().resolve_subscribe( + Arc::clone(send), + req.from_seq, + req.intent, + req.by, + ) + }; + let t_reply = std::time::Instant::now(); send_frame(send, &subscribed_envelope(req.session_id, outcome)); + if traced { + eprintln!( + "WEDGE_TRACE subscribe sid={sid} reply_send_us={}", + t_reply.elapsed().as_micros() + ); + } Ok(req.session_id) } diff --git a/crates/spt-daemon/tests/inject_control_wedge.rs b/crates/spt-daemon/tests/inject_control_wedge.rs index cea8cec..cfd6d2c 100644 --- a/crates/spt-daemon/tests/inject_control_wedge.rs +++ b/crates/spt-daemon/tests/inject_control_wedge.rs @@ -499,14 +499,12 @@ fn a_journaled_input_wedge_does_not_starve_a_concurrent_rc_attach() { .spawn_session(flood_spawn_req("wedge-jrnl-ep")) .expect("spawn flood child"); - // ── The WEDGE driver: on its OWN connection, pump journaled input - // (op_id-carrying send_effect → broker dispatch_input → journal.apply_once - // runs write_input INSIDE the global journal lock). On Unix the full input - // buffer parks write_input, holding the lock; on Windows ConPTY absorbs it. - // The driver fires a BOUNDED burst of MODEST-sized ops (so a backed-up - // broker IPC never blocks the driver's own socket write — the burst always - // completes and the thread exits cleanly, never hanging the test process) - // and then idles, checking the stop flag. ── + // ── The WEDGE driver: on its OWN connection, pump journaled input via the REAL + // operator seam (op_id-carrying send_effect_no_ack → broker dispatch_input → + // journal.apply_once enqueues the PtyWrite exactly-once). A sustained flood of + // MODEST-sized ops keeps a journaled write in-flight; the pump carrier drains + // the driver's own socket so a backed-up broker never blocks its writes, and + // the thread exits cleanly on the stop flag. ── let stop = Arc::new(AtomicBool::new(false)); let pumped = Arc::new(AtomicU64::new(0)); let driver_name = name.clone(); @@ -529,30 +527,30 @@ fn a_journaled_input_wedge_does_not_starve_a_concurrent_rc_attach() { } let chunk = vec![b'W'; 16 * 1024]; let mut op = 1u64; - // Sustain journaled input: keep ops in flight (and, on Unix, parked under - // the journal lock) until teardown. Each send_effect writes a frame; the - // pump reader drains replies concurrently so the write never deadlocks. + // Sustain journaled input in-flight until teardown, via the REAL operator/rc + // seam: send_effect_NO_ACK. The operator/rc drive floods input fire-and- + // forward (attach.rs:205 `brain.send_effect_no_ack`) precisely so the broker + // never writes an ACK back onto the very conn it is flooding — one ACK per op + // self-contends with the controller flood-OUTPUT on that conn's Mutex + // and wedges the per-conn dispatch handler, starving a concurrent attach's + // subscribe (root-caused via gdb: the dispatch thread parks in the ACK + // send_frame on the SendHalf mutex, NOT the OutputLog mutex). The earlier + // acked `send_effect` here was a TEST MIS-MODEL: no production path sustain- + // floods the acked seam (its acked callers are bounded one-shot / one-at-a- + // time), and acked-flood self-wedging is the documented hazard. no_ack stays + // JOURNALED + exactly-once (op_id => the broker's journal.apply_once runs + // regardless of ack), so facet B is non-vacuous: still a sustained journaled + // flood, just via the seam the real operator actually uses. + // [int->REQ-HAZARD-INPUT-ACK-BACKPRESSURE] while !driver_stop.load(std::sync::atomic::Ordering::Relaxed) { - if w.send_effect(op, &chunk).is_err() { + if w.send_effect_no_ack(op, &chunk).is_err() { break; } driver_pumped.fetch_add(1, std::sync::atomic::Ordering::Relaxed); op += 1; - // Drain queued replies (acks/output) so the carrier stays healthy, then - // PACE with an explicit floor sleep. The doc above says "sustain a write - // in-flight", but the bare `while !stop` pumped ~6094 ops and pegged the - // core, out-competing the SIBLING attacher thread for scheduling under - // contention so its subscribe was never serviced in 30s (the kitsubito - // reliability victim, subscribed=false). A small sleep keeps a journaled - // write continuously in-flight (facet-B's load is intact) while yielding - // the CPU the attacher needs — sustained, not storming. + // Keep the carrier healthy by draining session output; with no_ack there + // is no ACK frame to read and none to self-contend on the SendHalf. let _ = w.read_event_until(Some(std::time::Instant::now() + Duration::from_millis(20))); - // Unix-only: the CPU storm is a forkpty problem. On Windows ConPTY - // ABSORBS the journaled write, so there is no storm to pace and the sleep - // only adds a timing variable (p0 flaked on the SHARED Windows runner) — - // skip it there; Windows p0 was green pre-pacing. - #[cfg(unix)] - std::thread::sleep(Duration::from_millis(5)); } }); @@ -1785,19 +1783,24 @@ fn p0_paste_wedge_parked_write_does_not_starve_attach_or_wedge_broker() { // FIFO holds, so saturation (and DROP) is guaranteed once the writer parks. let chunk = vec![b'P'; 4 * 1024]; let mut op = 1u64; + // Flood paste-shaped input via the REAL operator seam (send_effect_no_ack): + // the rc/operator paste drive is fire-and-forward (attach.rs:205) so the + // broker never ACKs back onto the flooded conn. The earlier acked send_effect + // was a test mis-model — one ACK per op self-contends with the controller + // flood-output on that conn's Mutex and starves the concurrent + // attach's subscribe (REQ-HAZARD-INPUT-ACK-BACKPRESSURE). no_ack stays + // journaled+exactly-once (op_id => journal.apply_once), so the paste-wedge + // gate is unchanged: still a sustained journaled paste flood that must NOT + // wedge the concurrent attach. + // [int->REQ-HAZARD-INPUT-ACK-BACKPRESSURE] while !driver_stop.load(Ordering::Relaxed) { - if w.send_effect(op, &chunk).is_err() { + if w.send_effect_no_ack(op, &chunk).is_err() { break; } driver_pumped.fetch_add(1, Ordering::Relaxed); op += 1; - // Pace the pump (see the journaled-wedge driver): keep a paste write - // in-flight without a CPU storm that starves the concurrent attacher. + // Drain session output to keep the carrier healthy; no ACK frame with no_ack. let _ = w.read_event_until(Some(Instant::now() + Duration::from_millis(15))); - // Unix-only (see the journaled-wedge driver): Windows ConPTY absorbs the - // paste, so there is no CPU storm to pace here. - #[cfg(unix)] - std::thread::sleep(Duration::from_millis(5)); } });