diff --git a/beyond-pg-init/src/substrate.rs b/beyond-pg-init/src/substrate.rs index 024c46f..128a8b0 100644 --- a/beyond-pg-init/src/substrate.rs +++ b/beyond-pg-init/src/substrate.rs @@ -34,8 +34,11 @@ const MSG_READY: u8 = 0x81; // Agent → host: ready after boot. const MSG_HEARTBEAT: u8 = 0x02; // Host → agent: liveness probe. const MSG_HEARTBEAT_RESP: u8 = 0x82; // Agent → host: heartbeat reply. const MSG_SHUTDOWN: u8 = 0x04; // Host → agent: shutdown requested. +const MSG_GUEST_RESOURCE_STATS: u8 = 0xA2; // Agent → host: periodic resource stats. /// Frame length ceiling — sanity bound so a corrupt length can't allocate wild. const MAX_FRAME: u32 = 16 * 1024 * 1024; +/// How often to report guest memory pressure to the host. +const RESOURCE_STATS_PERIOD: std::time::Duration = std::time::Duration::from_secs(30); /// Agent → host "ready after boot" payload. A field-compatible subset of /// `vsock_protocol::ReadyPayload` — only the always-present fields; the rest are @@ -53,6 +56,62 @@ struct HeartbeatPayload { timestamp: u64, } +/// Agent → host periodic resource stats. A field-compatible subset of +/// `vsock_protocol::GuestResourceStatsPayload`: we report only PSI memory +/// pressure, so the host's memory controller can right-size this VM. `seq` and +/// `disk_used_bytes` are required by the host struct (sent as 0); we omit +/// `disk_total_bytes` so the host skips disk billing for this report (Postgres +/// disk usage is not tracked here). Keys must match the host decoder exactly. +#[derive(Serialize)] +struct GuestResourceStatsPayload { + seq: u64, + disk_used_bytes: u64, + #[serde(skip_serializing_if = "Option::is_none")] + psi_mem_some_avg10: Option, + #[serde(skip_serializing_if = "Option::is_none")] + psi_mem_full_avg10: Option, +} + +/// Read Linux PSI memory pressure `(some.avg10, full.avg10)` from +/// `/proc/pressure/memory`. `None` if PSI is unavailable (kernel without +/// `CONFIG_PSI` / not booted with `psi=1`) or the file can't be read. +fn read_memory_pressure() -> Option<(f64, f64)> { + let raw = std::fs::read_to_string("/proc/pressure/memory").ok()?; + parse_memory_pressure(&raw) +} + +/// Parse `some.avg10` / `full.avg10` from `/proc/pressure/memory` text. Split +/// out from the file read so it's testable without `/proc`. +fn parse_memory_pressure(raw: &str) -> Option<(f64, f64)> { + let mut some = None; + let mut full = None; + for line in raw.lines() { + let mut fields = line.split_ascii_whitespace(); + let kind = fields.next(); + let avg10 = fields.find_map(|f| f.strip_prefix("avg10=")?.parse::().ok()); + match kind { + Some("some") => some = avg10, + Some("full") => full = avg10, + _ => {} + } + } + Some((some?, full.unwrap_or(0.0))) +} + +/// Encode a `GuestResourceStats` frame carrying current PSI memory pressure, or +/// `None` if PSI is unavailable this tick. +fn encode_resource_stats_frame() -> Option> { + let (some, full) = read_memory_pressure()?; + let payload = GuestResourceStatsPayload { + seq: 0, + disk_used_bytes: 0, + psi_mem_some_avg10: Some(some), + psi_mem_full_avg10: Some(full), + }; + let body = rmp_serde::to_vec_named(&payload).ok()?; + Some(encode_frame(MSG_GUEST_RESOURCE_STATS, &body)) +} + /// Frame a message: `[len: u32 BE = 1 + payload][type][MessagePack payload]`. fn encode_frame(msg_type: u8, body: &[u8]) -> Vec { let mut frame = ((body.len() as u32) + 1).to_be_bytes().to_vec(); @@ -164,9 +223,24 @@ where let (log_tx, mut log_rx) = tokio::sync::mpsc::channel::(1024); spawn_log_sink(log_tx); + // Periodic guest memory-pressure (PSI) report for the host memory + // controller. Fire-and-forget like heartbeats; a write error breaks the + // loop and the thread exits (same as any vsock failure). + let mut psi_interval = tokio::time::interval(RESOURCE_STATS_PERIOD); + psi_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut len_buf = [0u8; 4]; loop { tokio::select! { + // Periodic PSI report. + _ = psi_interval.tick() => { + if let Some(frame) = encode_resource_stats_frame() { + if conn.write_all(&frame).await.is_err() { + break; + } + let _ = conn.flush().await; + } + } // Inbound substrate frames (heartbeat / shutdown / ignored). read = conn.read_exact(&mut len_buf) => { if read.is_err() { @@ -325,4 +399,46 @@ mod tests { assert!(obj.contains_key("boot_time_ms")); assert!(obj.contains_key("reconnect")); } + + /// Pins the `GuestResourceStats` (0xA2) frame so it can't drift from instd's + /// `vsock_protocol::GuestResourceStatsPayload` decoder: BE length, the type + /// byte, then a MessagePack map whose keys match the host struct. `seq` and + /// `disk_used_bytes` are required host fields; `disk_total_bytes` is omitted + /// so the host skips disk billing; the PSI keys carry the signal. + #[test] + fn resource_stats_frame_is_stable() { + let payload = GuestResourceStatsPayload { + seq: 0, + disk_used_bytes: 0, + psi_mem_some_avg10: Some(12.34), + psi_mem_full_avg10: Some(0.5), + }; + let body = rmp_serde::to_vec_named(&payload).unwrap(); + let frame = encode_frame(MSG_GUEST_RESOURCE_STATS, &body); + + let len = u32::from_be_bytes(frame[0..4].try_into().unwrap()) as usize; + assert_eq!(len, frame.len() - 4, "length covers type + payload"); + assert_eq!(frame[4], MSG_GUEST_RESOURCE_STATS, "type byte is 0xA2"); + + let v: serde_json::Value = rmp_serde::from_slice(&frame[5..]).unwrap(); + let obj = v.as_object().expect("payload must be a map"); + assert!(obj.contains_key("seq")); + assert!(obj.contains_key("disk_used_bytes")); + assert!(obj.contains_key("psi_mem_some_avg10")); + assert!(obj.contains_key("psi_mem_full_avg10")); + assert!( + !obj.contains_key("disk_total_bytes"), + "disk_total omitted so the host skips disk billing" + ); + } + + #[test] + fn parses_psi_memory() { + let raw = "\ +some avg10=12.34 avg60=5.00 avg300=1.20 total=461476658 +full avg10=0.50 avg60=0.10 avg300=0.00 total=422631474 +"; + assert_eq!(parse_memory_pressure(raw), Some((12.34, 0.50))); + assert_eq!(parse_memory_pressure(""), None); + } }