Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions beyond-pg-init/src/substrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ const MSG_READY: u8 = 0x81; // Agent → host: ready after boot.
const MSG_HEARTBEAT: u8 = 0x02; // Host → agent: liveness probe.
const MSG_HEARTBEAT_RESP: u8 = 0x82; // Agent → host: heartbeat reply.
const MSG_SHUTDOWN: u8 = 0x04; // Host → agent: shutdown requested.
const MSG_GUEST_RESOURCE_STATS: u8 = 0xA2; // Agent → host: periodic resource stats.
/// Frame length ceiling — sanity bound so a corrupt length can't allocate wild.
const MAX_FRAME: u32 = 16 * 1024 * 1024;
/// How often to report guest memory pressure to the host.
const RESOURCE_STATS_PERIOD: std::time::Duration = std::time::Duration::from_secs(30);

/// Agent → host "ready after boot" payload. A field-compatible subset of
/// `vsock_protocol::ReadyPayload` — only the always-present fields; the rest are
Expand All @@ -53,6 +56,62 @@ struct HeartbeatPayload {
timestamp: u64,
}

/// Agent → host periodic resource stats. A field-compatible subset of
/// `vsock_protocol::GuestResourceStatsPayload`: we report only PSI memory
/// pressure, so the host's memory controller can right-size this VM. `seq` and
/// `disk_used_bytes` are required by the host struct (sent as 0); we omit
/// `disk_total_bytes` so the host skips disk billing for this report (Postgres
/// disk usage is not tracked here). Keys must match the host decoder exactly.
#[derive(Serialize)]
struct GuestResourceStatsPayload {
seq: u64,
disk_used_bytes: u64,
#[serde(skip_serializing_if = "Option::is_none")]
psi_mem_some_avg10: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
psi_mem_full_avg10: Option<f64>,
}

/// Read Linux PSI memory pressure `(some.avg10, full.avg10)` from
/// `/proc/pressure/memory`. `None` if PSI is unavailable (kernel without
/// `CONFIG_PSI` / not booted with `psi=1`) or the file can't be read.
fn read_memory_pressure() -> Option<(f64, f64)> {
let raw = std::fs::read_to_string("/proc/pressure/memory").ok()?;
parse_memory_pressure(&raw)
}

/// Parse `some.avg10` / `full.avg10` from `/proc/pressure/memory` text. Split
/// out from the file read so it's testable without `/proc`.
fn parse_memory_pressure(raw: &str) -> Option<(f64, f64)> {
let mut some = None;
let mut full = None;
for line in raw.lines() {
let mut fields = line.split_ascii_whitespace();
let kind = fields.next();
let avg10 = fields.find_map(|f| f.strip_prefix("avg10=")?.parse::<f64>().ok());
match kind {
Some("some") => some = avg10,
Some("full") => full = avg10,
_ => {}
}
}
Some((some?, full.unwrap_or(0.0)))
}

/// Encode a `GuestResourceStats` frame carrying current PSI memory pressure, or
/// `None` if PSI is unavailable this tick.
fn encode_resource_stats_frame() -> Option<Vec<u8>> {
let (some, full) = read_memory_pressure()?;
let payload = GuestResourceStatsPayload {
seq: 0,
disk_used_bytes: 0,
psi_mem_some_avg10: Some(some),
psi_mem_full_avg10: Some(full),
};
let body = rmp_serde::to_vec_named(&payload).ok()?;
Some(encode_frame(MSG_GUEST_RESOURCE_STATS, &body))
}

/// Frame a message: `[len: u32 BE = 1 + payload][type][MessagePack payload]`.
fn encode_frame(msg_type: u8, body: &[u8]) -> Vec<u8> {
let mut frame = ((body.len() as u32) + 1).to_be_bytes().to_vec();
Expand Down Expand Up @@ -164,9 +223,24 @@ where
let (log_tx, mut log_rx) = tokio::sync::mpsc::channel::<LogFrameBytes>(1024);
spawn_log_sink(log_tx);

// Periodic guest memory-pressure (PSI) report for the host memory
// controller. Fire-and-forget like heartbeats; a write error breaks the
// loop and the thread exits (same as any vsock failure).
let mut psi_interval = tokio::time::interval(RESOURCE_STATS_PERIOD);
psi_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut len_buf = [0u8; 4];
loop {
tokio::select! {
// Periodic PSI report.
_ = psi_interval.tick() => {
if let Some(frame) = encode_resource_stats_frame() {
if conn.write_all(&frame).await.is_err() {
break;
}
let _ = conn.flush().await;
}
}
// Inbound substrate frames (heartbeat / shutdown / ignored).
read = conn.read_exact(&mut len_buf) => {
if read.is_err() {
Expand Down Expand Up @@ -325,4 +399,46 @@ mod tests {
assert!(obj.contains_key("boot_time_ms"));
assert!(obj.contains_key("reconnect"));
}

/// Pins the `GuestResourceStats` (0xA2) frame so it can't drift from instd's
/// `vsock_protocol::GuestResourceStatsPayload` decoder: BE length, the type
/// byte, then a MessagePack map whose keys match the host struct. `seq` and
/// `disk_used_bytes` are required host fields; `disk_total_bytes` is omitted
/// so the host skips disk billing; the PSI keys carry the signal.
#[test]
fn resource_stats_frame_is_stable() {
let payload = GuestResourceStatsPayload {
seq: 0,
disk_used_bytes: 0,
psi_mem_some_avg10: Some(12.34),
psi_mem_full_avg10: Some(0.5),
};
let body = rmp_serde::to_vec_named(&payload).unwrap();
let frame = encode_frame(MSG_GUEST_RESOURCE_STATS, &body);

let len = u32::from_be_bytes(frame[0..4].try_into().unwrap()) as usize;
assert_eq!(len, frame.len() - 4, "length covers type + payload");
assert_eq!(frame[4], MSG_GUEST_RESOURCE_STATS, "type byte is 0xA2");

let v: serde_json::Value = rmp_serde::from_slice(&frame[5..]).unwrap();
let obj = v.as_object().expect("payload must be a map");
assert!(obj.contains_key("seq"));
assert!(obj.contains_key("disk_used_bytes"));
assert!(obj.contains_key("psi_mem_some_avg10"));
assert!(obj.contains_key("psi_mem_full_avg10"));
assert!(
!obj.contains_key("disk_total_bytes"),
"disk_total omitted so the host skips disk billing"
);
}

#[test]
fn parses_psi_memory() {
let raw = "\
some avg10=12.34 avg60=5.00 avg300=1.20 total=461476658
full avg10=0.50 avg60=0.10 avg300=0.00 total=422631474
";
assert_eq!(parse_memory_pressure(raw), Some((12.34, 0.50)));
assert_eq!(parse_memory_pressure(""), None);
}
}