diff --git a/crates/spt-daemon/src/broker.rs b/crates/spt-daemon/src/broker.rs index 87cf45f..6508e07 100644 --- a/crates/spt-daemon/src/broker.rs +++ b/crates/spt-daemon/src/broker.rs @@ -68,6 +68,22 @@ use crate::nethost::{NetHost, NET_EFFECT_SESSION}; use crate::translation::{key_to_bytes, InjectFloor, KeyCmd, ToBinary, TranslationChild}; use crate::transport::{recv_hello, DaemonTransport, LocalSocketTransport}; +// ── DIAGNOSTIC-ONLY: sustained-input-wedge instrumentation (SPT_WEDGE_TRACE=1) ── +// Off by default; behaviour-neutral; never ships a logic change. Measures whether +// a concurrent attach's `subscribe` starves on the OutputLog mutex behind the +// drain thread's flood-output `append` (REQ-HAZARD-EFFECT-JOURNAL-PTY-WEDGE / +// REQ-HAZARD-PTY-INPUT-WRITER-WEDGE root-cause hunt). Run the inject_control_wedge +// gates on a forkpty box with the env set and read the WEDGE_TRACE lines. +fn wedge_trace_on() -> bool { + use std::sync::OnceLock; + static ON: OnceLock = OnceLock::new(); + *ON.get_or_init(|| std::env::var_os("SPT_WEDGE_TRACE").is_some()) +} +/// Drain-thread OutputLog-lock pressure counters (only touched when tracing). +static WT_DRAIN_APPENDS: AtomicU64 = AtomicU64::new(0); +static WT_DRAIN_WAIT_US: AtomicU64 = AtomicU64::new(0); +static WT_DRAIN_WAIT_MAX_US: AtomicU64 = AtomicU64::new(0); + /// The shared, serialized send side of one brain connection. Output frames, /// replay frames, command acks, and exit/error events all write through this. /// `pub(crate)`: the net host's stream logs (D4b) live-send through the same @@ -1448,7 +1464,27 @@ impl Broker { let log_drain = Arc::clone(&log); let drain = session .drain(move |chunk| { - let job = log_drain.lock().unwrap().append(chunk); + // DIAGNOSTIC (SPT_WEDGE_TRACE): time how long THIS drain thread waits + // to (re)acquire the OutputLog mutex per flood-output chunk, and how + // often — the pressure side of the subscribe-starvation hypothesis. + let job = if wedge_trace_on() { + let t0 = std::time::Instant::now(); + let mut g = log_drain.lock().unwrap(); + let w = t0.elapsed().as_micros() as u64; + let n = WT_DRAIN_APPENDS.fetch_add(1, Ordering::Relaxed) + 1; + WT_DRAIN_WAIT_US.fetch_add(w, Ordering::Relaxed); + WT_DRAIN_WAIT_MAX_US.fetch_max(w, Ordering::Relaxed); + if n % 2000 == 0 { + eprintln!( + "WEDGE_TRACE drain appends={n} acquire_wait_total_us={} acquire_wait_max_us={}", + WT_DRAIN_WAIT_US.load(Ordering::Relaxed), + WT_DRAIN_WAIT_MAX_US.load(Ordering::Relaxed), + ); + } + g.append(chunk) + } else { + log_drain.lock().unwrap().append(chunk) + }; if let Some(job) = job { if let Err(epoch) = job.deliver() { log_drain.lock().unwrap().mark_controller_gone(epoch); @@ -1574,20 +1610,58 @@ impl Broker { // post-removal subscribe gets a prompt "no such session" error — both // non-silent. The rc-side status-gate (a) + first-event backstop (b) cover // the rest. + // DIAGNOSTIC (SPT_WEDGE_TRACE): this is the concurrent attach's subscribe. + // Time the two locks it must take — the brief `sessions` map lock (suspect c, + // shared with input/drain) and then the OutputLog mutex (suspect b, held by + // the drain per flood-output chunk) — plus the in-lock resolve+replay. If + // `log_lock_wait_us` blows up while `drain_appends_so_far` is large, the + // subscribe is starving on the OutputLog mutex behind the flood drain. + let traced = wedge_trace_on(); + let sid = req.session_id; let log = { + let ts = std::time::Instant::now(); let sessions = self.sessions.lock().unwrap(); + if traced { + eprintln!( + "WEDGE_TRACE subscribe sid={sid} sessions_lock_wait_us={}", + ts.elapsed().as_micros() + ); + } let h = sessions .get(&req.session_id) .ok_or_else(|| format!("no such session {}", req.session_id))?; Arc::clone(&h.log) }; - let outcome = log.lock().unwrap().resolve_subscribe( - Arc::clone(send), - req.from_seq, - req.intent, - req.by, - ); + let outcome = if traced { + let t0 = std::time::Instant::now(); + let mut g = log.lock().unwrap(); + let log_lock_wait_us = t0.elapsed().as_micros(); + let t1 = std::time::Instant::now(); + let o = g.resolve_subscribe(Arc::clone(send), req.from_seq, req.intent, req.by); + drop(g); + eprintln!( + "WEDGE_TRACE subscribe sid={sid} log_lock_wait_us={log_lock_wait_us} \ + resolve_us={} drain_appends_so_far={}", + t1.elapsed().as_micros(), + WT_DRAIN_APPENDS.load(Ordering::Relaxed), + ); + o + } else { + log.lock().unwrap().resolve_subscribe( + Arc::clone(send), + req.from_seq, + req.intent, + req.by, + ) + }; + let t_reply = std::time::Instant::now(); send_frame(send, &subscribed_envelope(req.session_id, outcome)); + if traced { + eprintln!( + "WEDGE_TRACE subscribe sid={sid} reply_send_us={}", + t_reply.elapsed().as_micros() + ); + } Ok(req.session_id) }