Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ name = "beyond-pg"
path = "src/main.rs"

[dependencies]
base64 = "0.22"
beyond-pg-core = { path = "./beyond-pg-core" }
clap = { version = "4", features = ["derive"] }
quinn = "0.11"
Expand Down
8 changes: 8 additions & 0 deletions beyond-pg-init/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,11 @@ serde_json = "1"
[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2"
handoff = { version = "0.1", package = "beyond-handoff" }
# The guest-ready handshake instd waits for is a single self-described vsock
# frame; we speak it directly (see src/substrate.rs) rather than depend on the
# Beyond workspace, so this repo builds standalone in CI. The wire contract is
# pinned by a fixture test.
nix = { version = "0.31", features = ["mount"] }
tokio = { version = "1", features = ["rt", "macros", "net", "time", "io-util"] }
tokio-vsock = "0.7"
rmp-serde = "1"
68 changes: 59 additions & 9 deletions beyond-pg-init/src/bootsetup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ use std::time::Duration;

const MMDS_ADDR: &str = "169.254.169.254:80";
const MMDS_MAX_ATTEMPTS: u32 = 30;
const MMDS_RETRY: Duration = Duration::from_millis(10);
const HTTP_TIMEOUT: Duration = Duration::from_millis(200);
const MMDS_RETRY: Duration = Duration::from_millis(200);
// Firecracker's MMDS can take up to ~1–2s to answer the first request after
// boot on this substrate; a 200ms read timeout raced that and every attempt
// died with EAGAIN (the connect succeeds — it's the response that's slow), so
// the whole loop fell back to "POSTGRES_PASSWORD not set" and panicked. guest-
// init happens to win the race with its smaller VM; we must not depend on luck.
const HTTP_TIMEOUT: Duration = Duration::from_secs(3);
const MAX_RESPONSE_BYTES: u64 = 64 * 1024;

/// Run the full PID 1 boot sequence. Bails (exit 1) on fatal failures.
Expand All @@ -41,6 +46,11 @@ pub fn run() {
mount_essential_filesystems();
setup_network();
fetch_mmds();
// instd records attached data volumes in MMDS; mount them (as root, before
// the supervisor child spawns) so postgres finds its data dir at
// /var/lib/postgresql. Tolerant: no volumes → no-op; a missing device is
// logged FATAL but never aborts the VM.
crate::volumes::mount_from_mmds();
setup_zram();
}

Expand Down Expand Up @@ -293,13 +303,17 @@ fn write_mmds_file(json: &serde_json::Value) {
fn poll_mmds() -> Result<serde_json::Value, ()> {
let token = get_mmds_token();
for attempt in 1..=MMDS_MAX_ATTEMPTS {
let t0 = std::time::Instant::now();
match fetch_mmds_metadata(token.as_deref()) {
Ok(Some(data)) => {
eprintln!("[init] MMDS data available (attempt {attempt})");
return Ok(data);
}
Ok(None) => {}
Err(e) => eprintln!("[init] MMDS fetch attempt {attempt} failed: {e}"),
Err(e) => eprintln!(
"[init] MMDS fetch attempt {attempt} failed after {}ms: {e}",
t0.elapsed().as_millis()
),
}
std::thread::sleep(MMDS_RETRY);
}
Expand Down Expand Up @@ -361,20 +375,56 @@ fn fetch_mmds_metadata(
}

fn http_roundtrip(request: &[u8]) -> std::io::Result<Vec<u8>> {
// Infallible: MMDS_ADDR is a compile-time-known SocketAddr literal.
let addr: std::net::SocketAddr = MMDS_ADDR.parse().unwrap();
// connect_timeout (non-blocking connect + poll) handles the post-boot window
// where the eth0 link-local neighbor isn't resolved yet.
let addr: std::net::SocketAddr = MMDS_ADDR.parse().expect("MMDS_ADDR is a literal");
let mut stream = TcpStream::connect_timeout(&addr, HTTP_TIMEOUT)?;
stream.set_write_timeout(Some(HTTP_TIMEOUT))?;
stream.set_read_timeout(Some(HTTP_TIMEOUT))?;
stream.write_all(request)?;

// Read headers, then exactly `Content-Length` body bytes — do NOT read to
// EOF. Firecracker's MMDS keeps the TCP connection OPEN after the response
// (it ignores `Connection: close`), so there is no EOF; `read_to_end` blocks
// for the full read timeout on every request and surfaces as EAGAIN. This is
// the same Content-Length-bounded read guest-init's MMDS client uses.
let mut buf = Vec::new();
// Propagate read errors (e.g. timeout mid-response) so the retry loop
// logs the underlying I/O failure instead of a downstream JSON parse
// error on a truncated body.
stream.take(MAX_RESPONSE_BYTES).read_to_end(&mut buf)?;
let mut chunk = [0u8; 4096];
loop {
if let Some(end) = find_headers_end(&buf) {
if let Some(len) = content_length(&buf[..end]) {
if buf.len() >= end + len {
buf.truncate(end + len);
break;
}
}
}
match stream.read(&mut chunk) {
Ok(0) => break, // EOF (Connection: close honored)
Ok(n) => buf.extend_from_slice(&chunk[..n]),
Err(e) => return Err(e), // timeout / error — surface to the retry loop
}
if buf.len() as u64 > MAX_RESPONSE_BYTES {
break;
}
}
Ok(buf)
}

/// Byte offset just past the `\r\n\r\n` header terminator, if fully buffered.
fn find_headers_end(buf: &[u8]) -> Option<usize> {
buf.windows(4).position(|w| w == b"\r\n\r\n").map(|p| p + 4)
}

/// Parse the `Content-Length` header (case-insensitive) from a header block.
fn content_length(headers: &[u8]) -> Option<usize> {
let text = std::str::from_utf8(headers).ok()?;
text.split("\r\n")
.filter_map(|line| line.split_once(':'))
.find(|(name, _)| name.trim().eq_ignore_ascii_case("content-length"))
.and_then(|(_, value)| value.trim().parse().ok())
}

fn http_status(response: &[u8]) -> Option<u16> {
let line = response.split(|&b| b == b'\n').next()?;
let s = std::str::from_utf8(line).ok()?;
Expand Down
9 changes: 9 additions & 0 deletions beyond-pg-init/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
#[cfg(target_os = "linux")]
mod bootsetup;
#[cfg(target_os = "linux")]
mod substrate;
#[cfg(target_os = "linux")]
mod supervise;
#[cfg(target_os = "linux")]
mod volumes;

#[cfg(target_os = "linux")]
fn main() -> ! {
Expand All @@ -22,6 +26,11 @@ fn main() -> ! {
std::process::exit(1);
}
bootsetup::run();
// Report "guest ready" to the host substrate over vsock and keep the
// connection alive for the VM's lifetime. instd waits for this handshake
// before considering the create successful. Spawned after boot setup (so
// the network/MMDS are up) and before the supervise loop takes over.
substrate::spawn_handshake();
supervise::run();
}

Expand Down
173 changes: 173 additions & 0 deletions beyond-pg-init/src/substrate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
//! Substrate vsock "guest ready" handshake for `beyond-pg-init`.
//!
//! After spawning Firecracker, instd waits ~30s for the guest to report ready
//! over vsock; without it the VM's create never completes ("guest not ready:
//! timeout"). The contract is a single self-described frame on the host vsock
//! channel — we speak it directly here so this repo builds standalone (no
//! dependency on the Beyond workspace):
//!
//! ```text
//! connect: AF_VSOCK cid=2 (host) port=52
//! frame: [len: u32 BE = 1 + payload][type: u8 = 0x81 Ready][payload: MessagePack]
//! payload = rmp_serde::to_vec_named(ReadyPayload) // string keys
//! ```
//!
//! instd marks the guest ready the moment it reads the `Ready` frame; we then
//! hold the connection open for the VM's lifetime so the host keeps seeing the
//! guest as present. The frame layout mirrors `rustlib/vsock-protocol` in the
//! Beyond repo — kept honest by `tests::ready_frame_is_stable` below; if instd
//! ever changes the wire, that fixture must change in lockstep.
//!
//! Soft-fail throughout: no AF_VSOCK (e.g. a Docker test box) or a failed
//! connect is logged and the thread exits — never fatal. The supervise loop
//! owns shutdown via signalfd (instd also sends SIGTERM), so this thread never
//! powers the VM off.

use serde::Serialize;

/// Host vsock context id (`VMADDR_CID_HOST`).
const HOST_CID: u32 = 2;
/// Substrate vsock port instd listens on (`vsock_protocol::VSOCK_PORT`).
const SUBSTRATE_PORT: u32 = 52;
/// `Ready` message discriminator (`vsock_protocol::MessageType::Ready`).
const MSG_READY: u8 = 0x81;

/// Agent → host "ready after boot" payload. A field-compatible subset of
/// `vsock_protocol::ReadyPayload` — only the always-present fields; the rest are
/// `skip_serializing_if`/`default` on the host side and so may be omitted.
#[derive(Serialize)]
struct ReadyPayload {
agent_version: String,
boot_time_ms: u64,
reconnect: bool,
}

/// Encode the `Ready` frame: `[len: u32 BE][type][MessagePack payload]`.
/// Length covers the type byte + payload (not the 4 length bytes themselves).
fn encode_ready_frame(payload: &ReadyPayload) -> Result<Vec<u8>, rmp_serde::encode::Error> {
let body = rmp_serde::to_vec_named(payload)?;
let mut frame = ((body.len() as u32) + 1).to_be_bytes().to_vec();
frame.push(MSG_READY);
frame.extend_from_slice(&body);
Ok(frame)
}

/// Spawn the dedicated `substrate-vsock` thread that performs the guest-ready
/// handshake and then keeps the connection alive for the VM's lifetime.
///
/// Returns immediately; everything happens on the spawned thread. Soft-fail
/// throughout: a failed spawn or a failed connect is logged, never fatal.
pub fn spawn_handshake() {
let builder = std::thread::Builder::new().name("substrate-vsock".to_string());
if let Err(e) = builder.spawn(run) {
eprintln!("[init] WARNING: failed to spawn substrate-vsock thread: {e}");
}
}

fn run() {
// Current-thread runtime: this thread only hosts the single vsock
// connection + keep-alive read, so a multi-thread scheduler would just
// waste workers.
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
eprintln!("[init] WARNING: substrate-vsock tokio runtime build failed: {e}");
return;
}
};
runtime.block_on(handshake());
}

async fn handshake() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_vsock::{VsockAddr, VsockStream};

let payload = ReadyPayload {
agent_version: format!("beyond-pg-init/{}", env!("CARGO_PKG_VERSION")),
boot_time_ms: read_uptime_ms(),
reconnect: false,
};
let frame = match encode_ready_frame(&payload) {
Ok(f) => f,
Err(e) => {
eprintln!("[init] WARNING: encode Ready frame failed: {e}");
return;
}
};

let mut conn = match VsockStream::connect(VsockAddr::new(HOST_CID, SUBSTRATE_PORT)).await {
Ok(c) => c,
Err(e) => {
// Soft-fail: no AF_VSOCK (Docker tests) or no host listener just
// means there's no substrate to report to. Let the thread exit.
eprintln!(
"[init] WARNING: substrate vsock connect failed; guest-ready unreported: {e}"
);
return;
}
};
if let Err(e) = conn.write_all(&frame).await {
eprintln!("[init] WARNING: substrate Ready write failed: {e}");
return;
}
let _ = conn.flush().await;
eprintln!("[init] substrate vsock handshake complete; guest reported ready");

// Hold the connection open for the VM's lifetime so the host keeps seeing
// the guest as present. We don't serve admin exec/ping here (the Postgres VM
// is reached over the VPC via pgbouncer, not the admin channel); inbound
// bytes (ReadyAck, heartbeats) are read and ignored. EOF/err → thread exits.
let mut buf = [0u8; 512];
loop {
match conn.read(&mut buf).await {
Ok(0) => break,
Ok(_) => {}
Err(_) => break,
}
}
}

/// Milliseconds since kernel boot, from `/proc/uptime`. Best-effort (0 on any
/// read/parse failure) — it's only telemetry in the Ready payload.
fn read_uptime_ms() -> u64 {
std::fs::read_to_string("/proc/uptime")
.ok()
.and_then(|s| s.split_whitespace().next().map(str::to_owned))
.and_then(|s| s.parse::<f64>().ok())
.map(|secs| (secs * 1000.0) as u64)
.unwrap_or(0)
}

#[cfg(test)]
mod tests {
use super::*;

/// Pins the on-wire `Ready` frame so it can't silently drift from instd's
/// `rustlib/vsock-protocol` decoder: a 4-byte BE length covering type +
/// payload, the `0x81` type byte, then a MessagePack *map* (string keys)
/// carrying the payload fields.
#[test]
fn ready_frame_is_stable() {
let p = ReadyPayload {
agent_version: "beyond-pg-init/0.1.0".to_string(),
boot_time_ms: 1234,
reconnect: false,
};
let frame = encode_ready_frame(&p).unwrap();

let len = u32::from_be_bytes(frame[0..4].try_into().unwrap()) as usize;
assert_eq!(len, frame.len() - 4, "length covers type + payload");
assert_eq!(frame[4], MSG_READY, "type byte is Ready (0x81)");

// Body round-trips to the named fields (string-keyed map = to_vec_named).
let body = &frame[5..];
let v: serde_json::Value = rmp_serde::from_slice(body).unwrap();
let obj = v.as_object().expect("Ready payload must be a map");
assert!(obj.contains_key("agent_version"));
assert!(obj.contains_key("boot_time_ms"));
assert!(obj.contains_key("reconnect"));
}
}
Loading