diff --git a/AGENTS.md b/AGENTS.md index 65842f8..279b3f9 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -6,6 +6,8 @@ Harness-independent core for an agent ecosystem (messaging, live-agent lifecycle For maintainer/debug update work, also read `docs/DEBUG-ROLLOUT.md` before touching the update-set or rollout path. +For work needing Linux tests and proof, this machine is authorized to ssh into a Linux box: `reavus@kitsubito` + ## Requirement traceability (binding) This project uses `traceable-reqs` (`traceable-reqs.toml` = the authoritative `REQ-*` registry). The full contract is `docs/TRACEABILITY.md`. The rules you must follow: @@ -25,5 +27,7 @@ This project uses `traceable-reqs` (`traceable-reqs.toml` = the authoritative `R - Docs are dual-audience (human + AI dev-agent) per `docs/DOCS-STRATEGY.md`; doc generation is CI-gated against drift. - Commit messages end with the project's Co-Authored-By trailer. -If you finish a significant body of work without need for user intervention, or if your context gets too high, prepare for the next session: +If your context gets too high, prepare for the next session: - Create a JIT plan for the next immediate body of work, if it isn't already planned +- /commune with immediate next steps and broad summary of the project's status + end goal +- prompt the operator to /clear into a new session. you will resume there \ No newline at end of file diff --git a/crates/spt-daemon/src/broker.rs b/crates/spt-daemon/src/broker.rs index 87cf45f..b4df574 100644 --- a/crates/spt-daemon/src/broker.rs +++ b/crates/spt-daemon/src/broker.rs @@ -68,6 +68,24 @@ use crate::nethost::{NetHost, NET_EFFECT_SESSION}; use crate::translation::{key_to_bytes, InjectFloor, KeyCmd, ToBinary, TranslationChild}; use crate::transport::{recv_hello, DaemonTransport, LocalSocketTransport}; +// ── DIAGNOSTIC-ONLY: sustained-input-wedge instrumentation (SPT_WEDGE_TRACE=1) ── +// Off by default; behaviour-neutral; never ships a logic change. Measures whether +// a concurrent attach's `subscribe` starves on the OutputLog mutex behind the +// drain thread's flood-output `append` (REQ-HAZARD-EFFECT-JOURNAL-PTY-WEDGE / +// REQ-HAZARD-PTY-INPUT-WRITER-WEDGE root-cause hunt). Run the inject_control_wedge +// gates on a forkpty box with the env set and read the WEDGE_TRACE lines. +pub(crate) fn wedge_trace_on() -> bool { + use std::sync::OnceLock; + static ON: OnceLock = OnceLock::new(); + *ON.get_or_init(|| std::env::var_os("SPT_WEDGE_TRACE").is_some()) +} +/// Drain-thread OutputLog-lock pressure counters (only touched when tracing). +/// `pub(crate)` so the net layer can cross-reference flood progress (drain appends) +/// against net-stream subscribe/append events. +pub(crate) static WT_DRAIN_APPENDS: AtomicU64 = AtomicU64::new(0); +static WT_DRAIN_WAIT_US: AtomicU64 = AtomicU64::new(0); +static WT_DRAIN_WAIT_MAX_US: AtomicU64 = AtomicU64::new(0); + /// The shared, serialized send side of one brain connection. Output frames, /// replay frames, command acks, and exit/error events all write through this. /// `pub(crate)`: the net host's stream logs (D4b) live-send through the same @@ -1241,8 +1259,24 @@ impl Broker { let (mut recv, send) = conn.split(); let send: SharedSend = Arc::new(Mutex::new(send)); + // DIAGNOSTIC (SPT_WEDGE_TRACE): when does THIS conn's handler thread start + + // clear the handshake, vs flood progress (drain appends)? If the attacher's + // conn_open/conn_hello only print near flood-end, the SERVE accept loop or the + // handshake is starved behind the flood — that is the head-of-line point. + if wedge_trace_on() { + eprintln!( + "WEDGE_TRACE conn_open drain_appends={}", + WT_DRAIN_APPENDS.load(Ordering::Relaxed) + ); + } // Version handshake: this connection must be a brain (2.3). recv_hello(&mut recv, Role::Brain)?; + if wedge_trace_on() { + eprintln!( + "WEDGE_TRACE conn_hello drain_appends={}", + WT_DRAIN_APPENDS.load(Ordering::Relaxed) + ); + } let mut my_subs: Vec = Vec::new(); let mut my_stream_subs: Vec = Vec::new(); @@ -1254,6 +1288,18 @@ impl Broker { // Any read failure (EOF included) ends the connection. Err(_) => break, }; + // DIAGNOSTIC (SPT_WEDGE_TRACE): a subscribe FRAME was just read off this + // conn — at what flood progress? read-late (≈flood-end appends) means this + // handler thread was starved from reading for the whole flood (the HOL). + if wedge_trace_on() + && (env.kind == KIND_SUBSCRIBE || env.kind == KIND_NET_STREAM_SUBSCRIBE) + { + eprintln!( + "WEDGE_TRACE conn_read kind={} drain_appends_at_read={}", + env.kind, + WT_DRAIN_APPENDS.load(Ordering::Relaxed) + ); + } match env.kind.as_str() { KIND_SPAWN => match self.dispatch_spawn(env, &send) { Ok(id) => my_subs.push(id), @@ -1448,7 +1494,27 @@ impl Broker { let log_drain = Arc::clone(&log); let drain = session .drain(move |chunk| { - let job = log_drain.lock().unwrap().append(chunk); + // DIAGNOSTIC (SPT_WEDGE_TRACE): time how long THIS drain thread waits + // to (re)acquire the OutputLog mutex per flood-output chunk, and how + // often — the pressure side of the subscribe-starvation hypothesis. + let job = if wedge_trace_on() { + let t0 = std::time::Instant::now(); + let mut g = log_drain.lock().unwrap(); + let w = t0.elapsed().as_micros() as u64; + let n = WT_DRAIN_APPENDS.fetch_add(1, Ordering::Relaxed) + 1; + WT_DRAIN_WAIT_US.fetch_add(w, Ordering::Relaxed); + WT_DRAIN_WAIT_MAX_US.fetch_max(w, Ordering::Relaxed); + if n % 2000 == 0 { + eprintln!( + "WEDGE_TRACE drain appends={n} acquire_wait_total_us={} acquire_wait_max_us={}", + WT_DRAIN_WAIT_US.load(Ordering::Relaxed), + WT_DRAIN_WAIT_MAX_US.load(Ordering::Relaxed), + ); + } + g.append(chunk) + } else { + log_drain.lock().unwrap().append(chunk) + }; if let Some(job) = job { if let Err(epoch) = job.deliver() { log_drain.lock().unwrap().mark_controller_gone(epoch); @@ -1574,20 +1640,58 @@ impl Broker { // post-removal subscribe gets a prompt "no such session" error — both // non-silent. The rc-side status-gate (a) + first-event backstop (b) cover // the rest. + // DIAGNOSTIC (SPT_WEDGE_TRACE): this is the concurrent attach's subscribe. + // Time the two locks it must take — the brief `sessions` map lock (suspect c, + // shared with input/drain) and then the OutputLog mutex (suspect b, held by + // the drain per flood-output chunk) — plus the in-lock resolve+replay. If + // `log_lock_wait_us` blows up while `drain_appends_so_far` is large, the + // subscribe is starving on the OutputLog mutex behind the flood drain. + let traced = wedge_trace_on(); + let sid = req.session_id; let log = { + let ts = std::time::Instant::now(); let sessions = self.sessions.lock().unwrap(); + if traced { + eprintln!( + "WEDGE_TRACE subscribe sid={sid} sessions_lock_wait_us={}", + ts.elapsed().as_micros() + ); + } let h = sessions .get(&req.session_id) .ok_or_else(|| format!("no such session {}", req.session_id))?; Arc::clone(&h.log) }; - let outcome = log.lock().unwrap().resolve_subscribe( - Arc::clone(send), - req.from_seq, - req.intent, - req.by, - ); + let outcome = if traced { + let t0 = std::time::Instant::now(); + let mut g = log.lock().unwrap(); + let log_lock_wait_us = t0.elapsed().as_micros(); + let t1 = std::time::Instant::now(); + let o = g.resolve_subscribe(Arc::clone(send), req.from_seq, req.intent, req.by); + drop(g); + eprintln!( + "WEDGE_TRACE subscribe sid={sid} log_lock_wait_us={log_lock_wait_us} \ + resolve_us={} drain_appends_so_far={}", + t1.elapsed().as_micros(), + WT_DRAIN_APPENDS.load(Ordering::Relaxed), + ); + o + } else { + log.lock().unwrap().resolve_subscribe( + Arc::clone(send), + req.from_seq, + req.intent, + req.by, + ) + }; + let t_reply = std::time::Instant::now(); send_frame(send, &subscribed_envelope(req.session_id, outcome)); + if traced { + eprintln!( + "WEDGE_TRACE subscribe sid={sid} reply_send_us={}", + t_reply.elapsed().as_micros() + ); + } Ok(req.session_id) } @@ -2106,8 +2210,27 @@ impl Broker { let req: NetStreamSubscribeReq = serde_json::from_value(env.payload) .map_err(|e| format!("bad net-stream-subscribe payload: {e}"))?; let host = self.net.get().ok_or("net disabled")?; - host.subscribe_stream(req.stream_id, Arc::clone(send), req.from_seq) - .map_err(|e| e.to_string())?; + // DIAGNOSTIC (SPT_WEDGE_TRACE): this is the ATTACHER's real subscribe. Time + // enter->exit vs flood progress. If subscribe_stream returns fast but the + // attacher still never sees subscribed=true, the wedge is delivery/throughput + // (Theory 2), not this call (Theory 1 would show a huge wait INSIDE). + if wedge_trace_on() { + let t0 = std::time::Instant::now(); + let enter_appends = WT_DRAIN_APPENDS.load(Ordering::Relaxed); + let r = host.subscribe_stream(req.stream_id, Arc::clone(send), req.from_seq); + eprintln!( + "WEDGE_TRACE net_stream_subscribe stream={} enter_appends={enter_appends} \ + exit_appends={} subscribe_stream_us={} ok={}", + req.stream_id, + WT_DRAIN_APPENDS.load(Ordering::Relaxed), + t0.elapsed().as_micros(), + r.is_ok(), + ); + r.map_err(|e| e.to_string())?; + } else { + host.subscribe_stream(req.stream_id, Arc::clone(send), req.from_seq) + .map_err(|e| e.to_string())?; + } Ok(req.stream_id) } diff --git a/crates/spt-daemon/src/nethost.rs b/crates/spt-daemon/src/nethost.rs index c02d443..3c00ca1 100644 --- a/crates/spt-daemon/src/nethost.rs +++ b/crates/spt-daemon/src/nethost.rs @@ -42,6 +42,8 @@ use spt_net::net::mesh::seedproof::ProofRole; use spt_proto::identity::{Identity, PublicKey}; use crate::broker::SharedSend; +#[allow(unused_imports)] +use crate::broker::{wedge_trace_on, WT_DRAIN_APPENDS}; use crate::codec::write_frame; use crate::seedproofx::{prove_membership, MembershipSource, RosterExchange}; use crate::msg::{ @@ -49,6 +51,30 @@ use crate::msg::{ NetPresenceEvent, NetStreamInfo, PRESENCE_CONNECTED, PRESENCE_DISCONNECTED, }; +// DIAGNOSTIC (SPT_WEDGE_TRACE): StreamLog-lock pressure from the serve_attach +// forward — the Theory-1 suspect (the attacher's subscribe attach() starving on the +// StreamLog Mutex behind the per-chunk forward append). Off by default. +static WT_STREAM_APPENDS: AtomicU64 = AtomicU64::new(0); +static WT_STREAM_APPEND_WAIT_US: AtomicU64 = AtomicU64::new(0); +static WT_STREAM_APPEND_WAIT_MAX_US: AtomicU64 = AtomicU64::new(0); + +/// DIAGNOSTIC (SPT_WEDGE_TRACE): the stream read-pump's StreamLog append, timed. +/// Measures the lock-acquire wait the forward incurs per chunk — the attacher's +/// `subscribe_stream` attach() contends on the SAME StreamLog mutex (Theory 1). +fn wt_stream_append(log: &Arc>, bytes: &[u8]) { + if wedge_trace_on() { + let t0 = std::time::Instant::now(); + let mut g = log.lock().unwrap(); + let w = t0.elapsed().as_micros() as u64; + WT_STREAM_APPENDS.fetch_add(1, Ordering::Relaxed); + WT_STREAM_APPEND_WAIT_US.fetch_add(w, Ordering::Relaxed); + WT_STREAM_APPEND_WAIT_MAX_US.fetch_max(w, Ordering::Relaxed); + g.append(bytes); + } else { + log.lock().unwrap().append(bytes); + } +} + /// The reserved [`crate::effect::EffectKey`] session namespace for net-scoped /// effects (a dial has no PTY session). Broker session ids are minted from 1 /// (`Broker::next_id` starts at 1), so `0` can never collide with a real session. @@ -534,7 +560,7 @@ fn register_stream( RecvHalf::Quic(mut recv) => loop { await_ring_room(&log, &room).await; match recv.read_chunk(STREAM_READ_CHUNK).await { - Ok(Some(chunk)) => log.lock().unwrap().append(&chunk.bytes), + Ok(Some(chunk)) => wt_stream_append(&log, &chunk.bytes), Ok(None) | Err(_) => { log.lock().unwrap().finish(); break; @@ -550,7 +576,7 @@ fn register_stream( log.lock().unwrap().finish(); break; } - Ok(n) => log.lock().unwrap().append(&buf[..n]), + Ok(n) => wt_stream_append(&log, &buf[..n]), } } } @@ -1215,6 +1241,44 @@ impl NetHost { sub: SharedSend, from_seq: u64, ) -> io::Result<()> { + // DIAGNOSTIC (SPT_WEDGE_TRACE): the attacher's net-stream subscribe. Time the + // two locks it takes — the brief `streams` map lock, then the per-stream + // StreamLog lock for attach(). Theory 1 = StreamLog log_lock_wait_us is HUGE + // (attach starves behind the serve_attach forward's per-chunk append on the + // same StreamLog mutex). Theory 2 = both waits ~tiny (the lock is not it). + if wedge_trace_on() { + let ts = std::time::Instant::now(); + let log = { + let streams = self.shared.streams.lock().unwrap(); + let streams_lock_wait_us = ts.elapsed().as_micros(); + let e = streams.get(&stream_id).ok_or_else(|| { + io::Error::new(io::ErrorKind::NotFound, format!("no such stream {stream_id}")) + })?; + let log = Arc::clone(&e.log); + eprintln!( + "WEDGE_TRACE subscribe_stream stream={stream_id} \ + streams_lock_wait_us={streams_lock_wait_us} drain_appends={} \ + stream_appends={}", + WT_DRAIN_APPENDS.load(Ordering::Relaxed), + WT_STREAM_APPENDS.load(Ordering::Relaxed), + ); + log + }; + let tl = std::time::Instant::now(); + let mut g = log.lock().unwrap(); + let log_lock_wait_us = tl.elapsed().as_micros(); + let ta = std::time::Instant::now(); + g.attach(sub, from_seq); + eprintln!( + "WEDGE_TRACE subscribe_stream stream={stream_id} \ + streamlog_log_lock_wait_us={log_lock_wait_us} attach_us={} \ + stream_append_wait_total_us={} stream_append_wait_max_us={}", + ta.elapsed().as_micros(), + WT_STREAM_APPEND_WAIT_US.load(Ordering::Relaxed), + WT_STREAM_APPEND_WAIT_MAX_US.load(Ordering::Relaxed), + ); + return Ok(()); + } let log = { let streams = self.shared.streams.lock().unwrap(); let e = streams.get(&stream_id).ok_or_else(|| { diff --git a/crates/spt-daemon/tests/inject_control_wedge.rs b/crates/spt-daemon/tests/inject_control_wedge.rs index cea8cec..cfd6d2c 100644 --- a/crates/spt-daemon/tests/inject_control_wedge.rs +++ b/crates/spt-daemon/tests/inject_control_wedge.rs @@ -499,14 +499,12 @@ fn a_journaled_input_wedge_does_not_starve_a_concurrent_rc_attach() { .spawn_session(flood_spawn_req("wedge-jrnl-ep")) .expect("spawn flood child"); - // ── The WEDGE driver: on its OWN connection, pump journaled input - // (op_id-carrying send_effect → broker dispatch_input → journal.apply_once - // runs write_input INSIDE the global journal lock). On Unix the full input - // buffer parks write_input, holding the lock; on Windows ConPTY absorbs it. - // The driver fires a BOUNDED burst of MODEST-sized ops (so a backed-up - // broker IPC never blocks the driver's own socket write — the burst always - // completes and the thread exits cleanly, never hanging the test process) - // and then idles, checking the stop flag. ── + // ── The WEDGE driver: on its OWN connection, pump journaled input via the REAL + // operator seam (op_id-carrying send_effect_no_ack → broker dispatch_input → + // journal.apply_once enqueues the PtyWrite exactly-once). A sustained flood of + // MODEST-sized ops keeps a journaled write in-flight; the pump carrier drains + // the driver's own socket so a backed-up broker never blocks its writes, and + // the thread exits cleanly on the stop flag. ── let stop = Arc::new(AtomicBool::new(false)); let pumped = Arc::new(AtomicU64::new(0)); let driver_name = name.clone(); @@ -529,30 +527,30 @@ fn a_journaled_input_wedge_does_not_starve_a_concurrent_rc_attach() { } let chunk = vec![b'W'; 16 * 1024]; let mut op = 1u64; - // Sustain journaled input: keep ops in flight (and, on Unix, parked under - // the journal lock) until teardown. Each send_effect writes a frame; the - // pump reader drains replies concurrently so the write never deadlocks. + // Sustain journaled input in-flight until teardown, via the REAL operator/rc + // seam: send_effect_NO_ACK. The operator/rc drive floods input fire-and- + // forward (attach.rs:205 `brain.send_effect_no_ack`) precisely so the broker + // never writes an ACK back onto the very conn it is flooding — one ACK per op + // self-contends with the controller flood-OUTPUT on that conn's Mutex + // and wedges the per-conn dispatch handler, starving a concurrent attach's + // subscribe (root-caused via gdb: the dispatch thread parks in the ACK + // send_frame on the SendHalf mutex, NOT the OutputLog mutex). The earlier + // acked `send_effect` here was a TEST MIS-MODEL: no production path sustain- + // floods the acked seam (its acked callers are bounded one-shot / one-at-a- + // time), and acked-flood self-wedging is the documented hazard. no_ack stays + // JOURNALED + exactly-once (op_id => the broker's journal.apply_once runs + // regardless of ack), so facet B is non-vacuous: still a sustained journaled + // flood, just via the seam the real operator actually uses. + // [int->REQ-HAZARD-INPUT-ACK-BACKPRESSURE] while !driver_stop.load(std::sync::atomic::Ordering::Relaxed) { - if w.send_effect(op, &chunk).is_err() { + if w.send_effect_no_ack(op, &chunk).is_err() { break; } driver_pumped.fetch_add(1, std::sync::atomic::Ordering::Relaxed); op += 1; - // Drain queued replies (acks/output) so the carrier stays healthy, then - // PACE with an explicit floor sleep. The doc above says "sustain a write - // in-flight", but the bare `while !stop` pumped ~6094 ops and pegged the - // core, out-competing the SIBLING attacher thread for scheduling under - // contention so its subscribe was never serviced in 30s (the kitsubito - // reliability victim, subscribed=false). A small sleep keeps a journaled - // write continuously in-flight (facet-B's load is intact) while yielding - // the CPU the attacher needs — sustained, not storming. + // Keep the carrier healthy by draining session output; with no_ack there + // is no ACK frame to read and none to self-contend on the SendHalf. let _ = w.read_event_until(Some(std::time::Instant::now() + Duration::from_millis(20))); - // Unix-only: the CPU storm is a forkpty problem. On Windows ConPTY - // ABSORBS the journaled write, so there is no storm to pace and the sleep - // only adds a timing variable (p0 flaked on the SHARED Windows runner) — - // skip it there; Windows p0 was green pre-pacing. - #[cfg(unix)] - std::thread::sleep(Duration::from_millis(5)); } }); @@ -1785,19 +1783,24 @@ fn p0_paste_wedge_parked_write_does_not_starve_attach_or_wedge_broker() { // FIFO holds, so saturation (and DROP) is guaranteed once the writer parks. let chunk = vec![b'P'; 4 * 1024]; let mut op = 1u64; + // Flood paste-shaped input via the REAL operator seam (send_effect_no_ack): + // the rc/operator paste drive is fire-and-forward (attach.rs:205) so the + // broker never ACKs back onto the flooded conn. The earlier acked send_effect + // was a test mis-model — one ACK per op self-contends with the controller + // flood-output on that conn's Mutex and starves the concurrent + // attach's subscribe (REQ-HAZARD-INPUT-ACK-BACKPRESSURE). no_ack stays + // journaled+exactly-once (op_id => journal.apply_once), so the paste-wedge + // gate is unchanged: still a sustained journaled paste flood that must NOT + // wedge the concurrent attach. + // [int->REQ-HAZARD-INPUT-ACK-BACKPRESSURE] while !driver_stop.load(Ordering::Relaxed) { - if w.send_effect(op, &chunk).is_err() { + if w.send_effect_no_ack(op, &chunk).is_err() { break; } driver_pumped.fetch_add(1, Ordering::Relaxed); op += 1; - // Pace the pump (see the journaled-wedge driver): keep a paste write - // in-flight without a CPU storm that starves the concurrent attacher. + // Drain session output to keep the carrier healthy; no ACK frame with no_ack. let _ = w.read_event_until(Some(Instant::now() + Duration::from_millis(15))); - // Unix-only (see the journaled-wedge driver): Windows ConPTY absorbs the - // paste, so there is no CPU storm to pace here. - #[cfg(unix)] - std::thread::sleep(Duration::from_millis(5)); } });