diff --git a/Cargo.lock b/Cargo.lock index 042b6f5..0ff590a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -275,6 +275,7 @@ dependencies = [ name = "beyond-pg" version = "0.1.0" dependencies = [ + "base64", "beyond-handoff", "beyond-pg-core", "clap", @@ -327,8 +328,12 @@ dependencies = [ "beyond-handoff", "beyond-pg-core", "libc", + "nix 0.31.2", + "rmp-serde", "serde", "serde_json", + "tokio", + "tokio-vsock", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c473ee3..e01b47d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ name = "beyond-pg" path = "src/main.rs" [dependencies] +base64 = "0.22" beyond-pg-core = { path = "./beyond-pg-core" } clap = { version = "4", features = ["derive"] } quinn = "0.11" diff --git a/beyond-pg-init/Cargo.toml b/beyond-pg-init/Cargo.toml index 93beb32..a70553f 100644 --- a/beyond-pg-init/Cargo.toml +++ b/beyond-pg-init/Cargo.toml @@ -17,3 +17,11 @@ serde_json = "1" [target.'cfg(target_os = "linux")'.dependencies] libc = "0.2" handoff = { version = "0.1", package = "beyond-handoff" } +# The guest-ready handshake instd waits for is a single self-described vsock +# frame; we speak it directly (see src/substrate.rs) rather than depend on the +# Beyond workspace, so this repo builds standalone in CI. The wire contract is +# pinned by a fixture test. +nix = { version = "0.31", features = ["mount"] } +tokio = { version = "1", features = ["rt", "macros", "net", "time", "io-util"] } +tokio-vsock = "0.7" +rmp-serde = "1" diff --git a/beyond-pg-init/src/bootsetup.rs b/beyond-pg-init/src/bootsetup.rs index 2597344..52a310d 100644 --- a/beyond-pg-init/src/bootsetup.rs +++ b/beyond-pg-init/src/bootsetup.rs @@ -15,8 +15,13 @@ use std::time::Duration; const MMDS_ADDR: &str = "169.254.169.254:80"; const MMDS_MAX_ATTEMPTS: u32 = 30; -const MMDS_RETRY: Duration = Duration::from_millis(10); -const HTTP_TIMEOUT: Duration = Duration::from_millis(200); +const MMDS_RETRY: Duration = Duration::from_millis(200); +// Firecracker's MMDS can take up to ~1–2s to answer the first request after +// boot on this substrate; a 200ms read timeout raced that and every attempt +// died with EAGAIN (the connect succeeds — it's the response that's slow), so +// the whole loop fell back to "POSTGRES_PASSWORD not set" and panicked. guest- +// init happens to win the race with its smaller VM; we must not depend on luck. +const HTTP_TIMEOUT: Duration = Duration::from_secs(3); const MAX_RESPONSE_BYTES: u64 = 64 * 1024; /// Run the full PID 1 boot sequence. Bails (exit 1) on fatal failures. @@ -41,6 +46,11 @@ pub fn run() { mount_essential_filesystems(); setup_network(); fetch_mmds(); + // instd records attached data volumes in MMDS; mount them (as root, before + // the supervisor child spawns) so postgres finds its data dir at + // /var/lib/postgresql. Tolerant: no volumes → no-op; a missing device is + // logged FATAL but never aborts the VM. + crate::volumes::mount_from_mmds(); setup_zram(); } @@ -293,13 +303,17 @@ fn write_mmds_file(json: &serde_json::Value) { fn poll_mmds() -> Result { let token = get_mmds_token(); for attempt in 1..=MMDS_MAX_ATTEMPTS { + let t0 = std::time::Instant::now(); match fetch_mmds_metadata(token.as_deref()) { Ok(Some(data)) => { eprintln!("[init] MMDS data available (attempt {attempt})"); return Ok(data); } Ok(None) => {} - Err(e) => eprintln!("[init] MMDS fetch attempt {attempt} failed: {e}"), + Err(e) => eprintln!( + "[init] MMDS fetch attempt {attempt} failed after {}ms: {e}", + t0.elapsed().as_millis() + ), } std::thread::sleep(MMDS_RETRY); } @@ -361,20 +375,56 @@ fn fetch_mmds_metadata( } fn http_roundtrip(request: &[u8]) -> std::io::Result> { - // Infallible: MMDS_ADDR is a compile-time-known SocketAddr literal. - let addr: std::net::SocketAddr = MMDS_ADDR.parse().unwrap(); + // connect_timeout (non-blocking connect + poll) handles the post-boot window + // where the eth0 link-local neighbor isn't resolved yet. + let addr: std::net::SocketAddr = MMDS_ADDR.parse().expect("MMDS_ADDR is a literal"); let mut stream = TcpStream::connect_timeout(&addr, HTTP_TIMEOUT)?; stream.set_write_timeout(Some(HTTP_TIMEOUT))?; stream.set_read_timeout(Some(HTTP_TIMEOUT))?; stream.write_all(request)?; + + // Read headers, then exactly `Content-Length` body bytes — do NOT read to + // EOF. Firecracker's MMDS keeps the TCP connection OPEN after the response + // (it ignores `Connection: close`), so there is no EOF; `read_to_end` blocks + // for the full read timeout on every request and surfaces as EAGAIN. This is + // the same Content-Length-bounded read guest-init's MMDS client uses. let mut buf = Vec::new(); - // Propagate read errors (e.g. timeout mid-response) so the retry loop - // logs the underlying I/O failure instead of a downstream JSON parse - // error on a truncated body. - stream.take(MAX_RESPONSE_BYTES).read_to_end(&mut buf)?; + let mut chunk = [0u8; 4096]; + loop { + if let Some(end) = find_headers_end(&buf) { + if let Some(len) = content_length(&buf[..end]) { + if buf.len() >= end + len { + buf.truncate(end + len); + break; + } + } + } + match stream.read(&mut chunk) { + Ok(0) => break, // EOF (Connection: close honored) + Ok(n) => buf.extend_from_slice(&chunk[..n]), + Err(e) => return Err(e), // timeout / error — surface to the retry loop + } + if buf.len() as u64 > MAX_RESPONSE_BYTES { + break; + } + } Ok(buf) } +/// Byte offset just past the `\r\n\r\n` header terminator, if fully buffered. +fn find_headers_end(buf: &[u8]) -> Option { + buf.windows(4).position(|w| w == b"\r\n\r\n").map(|p| p + 4) +} + +/// Parse the `Content-Length` header (case-insensitive) from a header block. +fn content_length(headers: &[u8]) -> Option { + let text = std::str::from_utf8(headers).ok()?; + text.split("\r\n") + .filter_map(|line| line.split_once(':')) + .find(|(name, _)| name.trim().eq_ignore_ascii_case("content-length")) + .and_then(|(_, value)| value.trim().parse().ok()) +} + fn http_status(response: &[u8]) -> Option { let line = response.split(|&b| b == b'\n').next()?; let s = std::str::from_utf8(line).ok()?; diff --git a/beyond-pg-init/src/main.rs b/beyond-pg-init/src/main.rs index c4d14ed..dfa0497 100644 --- a/beyond-pg-init/src/main.rs +++ b/beyond-pg-init/src/main.rs @@ -10,7 +10,11 @@ #[cfg(target_os = "linux")] mod bootsetup; #[cfg(target_os = "linux")] +mod substrate; +#[cfg(target_os = "linux")] mod supervise; +#[cfg(target_os = "linux")] +mod volumes; #[cfg(target_os = "linux")] fn main() -> ! { @@ -22,6 +26,11 @@ fn main() -> ! { std::process::exit(1); } bootsetup::run(); + // Report "guest ready" to the host substrate over vsock and keep the + // connection alive for the VM's lifetime. instd waits for this handshake + // before considering the create successful. Spawned after boot setup (so + // the network/MMDS are up) and before the supervise loop takes over. + substrate::spawn_handshake(); supervise::run(); } diff --git a/beyond-pg-init/src/substrate.rs b/beyond-pg-init/src/substrate.rs new file mode 100644 index 0000000..f711352 --- /dev/null +++ b/beyond-pg-init/src/substrate.rs @@ -0,0 +1,173 @@ +//! Substrate vsock "guest ready" handshake for `beyond-pg-init`. +//! +//! After spawning Firecracker, instd waits ~30s for the guest to report ready +//! over vsock; without it the VM's create never completes ("guest not ready: +//! timeout"). The contract is a single self-described frame on the host vsock +//! channel — we speak it directly here so this repo builds standalone (no +//! dependency on the Beyond workspace): +//! +//! ```text +//! connect: AF_VSOCK cid=2 (host) port=52 +//! frame: [len: u32 BE = 1 + payload][type: u8 = 0x81 Ready][payload: MessagePack] +//! payload = rmp_serde::to_vec_named(ReadyPayload) // string keys +//! ``` +//! +//! instd marks the guest ready the moment it reads the `Ready` frame; we then +//! hold the connection open for the VM's lifetime so the host keeps seeing the +//! guest as present. The frame layout mirrors `rustlib/vsock-protocol` in the +//! Beyond repo — kept honest by `tests::ready_frame_is_stable` below; if instd +//! ever changes the wire, that fixture must change in lockstep. +//! +//! Soft-fail throughout: no AF_VSOCK (e.g. a Docker test box) or a failed +//! connect is logged and the thread exits — never fatal. The supervise loop +//! owns shutdown via signalfd (instd also sends SIGTERM), so this thread never +//! powers the VM off. + +use serde::Serialize; + +/// Host vsock context id (`VMADDR_CID_HOST`). +const HOST_CID: u32 = 2; +/// Substrate vsock port instd listens on (`vsock_protocol::VSOCK_PORT`). +const SUBSTRATE_PORT: u32 = 52; +/// `Ready` message discriminator (`vsock_protocol::MessageType::Ready`). +const MSG_READY: u8 = 0x81; + +/// Agent → host "ready after boot" payload. A field-compatible subset of +/// `vsock_protocol::ReadyPayload` — only the always-present fields; the rest are +/// `skip_serializing_if`/`default` on the host side and so may be omitted. +#[derive(Serialize)] +struct ReadyPayload { + agent_version: String, + boot_time_ms: u64, + reconnect: bool, +} + +/// Encode the `Ready` frame: `[len: u32 BE][type][MessagePack payload]`. +/// Length covers the type byte + payload (not the 4 length bytes themselves). +fn encode_ready_frame(payload: &ReadyPayload) -> Result, rmp_serde::encode::Error> { + let body = rmp_serde::to_vec_named(payload)?; + let mut frame = ((body.len() as u32) + 1).to_be_bytes().to_vec(); + frame.push(MSG_READY); + frame.extend_from_slice(&body); + Ok(frame) +} + +/// Spawn the dedicated `substrate-vsock` thread that performs the guest-ready +/// handshake and then keeps the connection alive for the VM's lifetime. +/// +/// Returns immediately; everything happens on the spawned thread. Soft-fail +/// throughout: a failed spawn or a failed connect is logged, never fatal. +pub fn spawn_handshake() { + let builder = std::thread::Builder::new().name("substrate-vsock".to_string()); + if let Err(e) = builder.spawn(run) { + eprintln!("[init] WARNING: failed to spawn substrate-vsock thread: {e}"); + } +} + +fn run() { + // Current-thread runtime: this thread only hosts the single vsock + // connection + keep-alive read, so a multi-thread scheduler would just + // waste workers. + let runtime = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + eprintln!("[init] WARNING: substrate-vsock tokio runtime build failed: {e}"); + return; + } + }; + runtime.block_on(handshake()); +} + +async fn handshake() { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio_vsock::{VsockAddr, VsockStream}; + + let payload = ReadyPayload { + agent_version: format!("beyond-pg-init/{}", env!("CARGO_PKG_VERSION")), + boot_time_ms: read_uptime_ms(), + reconnect: false, + }; + let frame = match encode_ready_frame(&payload) { + Ok(f) => f, + Err(e) => { + eprintln!("[init] WARNING: encode Ready frame failed: {e}"); + return; + } + }; + + let mut conn = match VsockStream::connect(VsockAddr::new(HOST_CID, SUBSTRATE_PORT)).await { + Ok(c) => c, + Err(e) => { + // Soft-fail: no AF_VSOCK (Docker tests) or no host listener just + // means there's no substrate to report to. Let the thread exit. + eprintln!( + "[init] WARNING: substrate vsock connect failed; guest-ready unreported: {e}" + ); + return; + } + }; + if let Err(e) = conn.write_all(&frame).await { + eprintln!("[init] WARNING: substrate Ready write failed: {e}"); + return; + } + let _ = conn.flush().await; + eprintln!("[init] substrate vsock handshake complete; guest reported ready"); + + // Hold the connection open for the VM's lifetime so the host keeps seeing + // the guest as present. We don't serve admin exec/ping here (the Postgres VM + // is reached over the VPC via pgbouncer, not the admin channel); inbound + // bytes (ReadyAck, heartbeats) are read and ignored. EOF/err → thread exits. + let mut buf = [0u8; 512]; + loop { + match conn.read(&mut buf).await { + Ok(0) => break, + Ok(_) => {} + Err(_) => break, + } + } +} + +/// Milliseconds since kernel boot, from `/proc/uptime`. Best-effort (0 on any +/// read/parse failure) — it's only telemetry in the Ready payload. +fn read_uptime_ms() -> u64 { + std::fs::read_to_string("/proc/uptime") + .ok() + .and_then(|s| s.split_whitespace().next().map(str::to_owned)) + .and_then(|s| s.parse::().ok()) + .map(|secs| (secs * 1000.0) as u64) + .unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Pins the on-wire `Ready` frame so it can't silently drift from instd's + /// `rustlib/vsock-protocol` decoder: a 4-byte BE length covering type + + /// payload, the `0x81` type byte, then a MessagePack *map* (string keys) + /// carrying the payload fields. + #[test] + fn ready_frame_is_stable() { + let p = ReadyPayload { + agent_version: "beyond-pg-init/0.1.0".to_string(), + boot_time_ms: 1234, + reconnect: false, + }; + let frame = encode_ready_frame(&p).unwrap(); + + let len = u32::from_be_bytes(frame[0..4].try_into().unwrap()) as usize; + assert_eq!(len, frame.len() - 4, "length covers type + payload"); + assert_eq!(frame[4], MSG_READY, "type byte is Ready (0x81)"); + + // Body round-trips to the named fields (string-keyed map = to_vec_named). + let body = &frame[5..]; + let v: serde_json::Value = rmp_serde::from_slice(body).unwrap(); + let obj = v.as_object().expect("Ready payload must be a map"); + assert!(obj.contains_key("agent_version")); + assert!(obj.contains_key("boot_time_ms")); + assert!(obj.contains_key("reconnect")); + } +} diff --git a/beyond-pg-init/src/volumes.rs b/beyond-pg-init/src/volumes.rs new file mode 100644 index 0000000..659be10 --- /dev/null +++ b/beyond-pg-init/src/volumes.rs @@ -0,0 +1,325 @@ +//! Data-volume mounting for `beyond-pg-init`. +//! +//! instd attaches data volumes as block devices and records them in MMDS; +//! `beyond-pg-init` must mount them before the supervisor child is spawned so +//! postgres finds its data dir. The postgres data volume is recorded with +//! `path = /var/lib/postgresql`. +//! +//! Ported from guest-init's `mmds::read_attachments` + `mounts` (the canonical +//! mount-option behavior per storage class), adapted to beyond-pg-init's +//! `[init]` logging and made tolerant of a not-yet-present block device (a +//! brief retry) — a missing volume is logged FATAL but does not panic the VM, +//! so postgres can still surface the underlying error. +//! +//! Runs as root (PID 1) during [`crate::bootsetup::run`], after MMDS has been +//! fetched and before [`crate::supervise::run`] spawns the supervisor. + +use std::path::Path; +use std::time::{Duration, Instant}; + +use nix::mount::{MsFlags, mount}; +use serde_json::Value; + +/// Max time to wait for a declared block device to appear before giving up. +const DEVICE_WAIT: Duration = Duration::from_millis(500); +const DEVICE_POLL: Duration = Duration::from_millis(50); + +/// One data-volume attachment as written by instd into MMDS. +pub struct AttachmentMeta { + pub volume_id: String, + pub device: String, + pub path: String, + pub fstype: String, + pub readonly: bool, + pub storage_class: String, +} + +/// Read volume attachments from the already-written MMDS metadata file and +/// mount each one. Best-effort: returns immediately if there are no volumes. +pub fn mount_from_mmds() { + let attachments = read_attachments(); + if attachments.is_empty() { + return; + } + eprintln!("[init] mounting {} data volume(s) from MMDS", attachments.len()); + mount_data_volumes(&attachments); +} + +/// Read volume attachments from the MMDS metadata file written by +/// [`crate::bootsetup::fetch_mmds`]. +/// +/// Returns an empty vec if the file is absent, unparseable, or the +/// `volumes` array is missing — instances without data volumes see none. +fn read_attachments() -> Vec { + let path = beyond_pg_core::mmds::MMDS_PATH; + let raw = match std::fs::read(path) { + Ok(b) => b, + Err(_) => return vec![], + }; + let val: Value = match serde_json::from_slice(&raw) { + Ok(v) => v, + Err(_) => return vec![], + }; + parse_attachments(&val) +} + +/// Parse the `latest.meta-data.volumes` array out of an MMDS value. +fn parse_attachments(val: &Value) -> Vec { + let arr = match val["latest"]["meta-data"]["volumes"].as_array() { + Some(a) => a, + None => return vec![], + }; + arr.iter() + .filter_map(|v| { + Some(AttachmentMeta { + volume_id: v["volume_id"].as_str()?.to_string(), + device: v["device"].as_str()?.to_string(), + path: v["path"].as_str()?.to_string(), + fstype: v["fstype"].as_str().unwrap_or("ext4").to_string(), + readonly: v["readonly"].as_bool().unwrap_or(false), + storage_class: v["storage_class"].as_str().unwrap_or("standard").to_string(), + }) + }) + .collect() +} + +/// Mount each attachment. Idempotent (skips already-mounted paths) and tolerant +/// of a transiently-absent block device (brief retry). A volume that can't be +/// mounted is logged FATAL but does NOT abort the VM, so postgres can still +/// start and surface the underlying error. +fn mount_data_volumes(attachments: &[AttachmentMeta]) { + for a in attachments { + if let Err(e) = mount_one(a) { + eprintln!( + "[init] FATAL: failed to mount data volume {} ({} → {}): {e}; continuing", + a.volume_id, a.device, a.path + ); + } + } +} + +fn mount_one(a: &AttachmentMeta) -> Result<(), String> { + if !wait_for_device(&a.device) { + return Err(format!("device {} not present after {DEVICE_WAIT:?}", a.device)); + } + + ensure_dir(&a.path)?; + + if is_mounted(&a.path, &a.device)? { + eprintln!("[init] already mounted {}, skipping", a.path); + return Ok(()); + } + if let Some(other) = mounted_device(&a.path)? { + return Err(format!( + "path {} is already mounted with {} but expected {}", + a.path, other, a.device + )); + } + + let (flags, opts) = mount_params(&a.storage_class, a.readonly); + mount( + Some(a.device.as_str()), + a.path.as_str(), + Some(a.fstype.as_str()), + flags, + Some(opts.as_str()), + ) + .map_err(|e| format!("mount {} → {}: {}", a.device, a.path, e))?; + + eprintln!( + "[init] mounted {} at {} ({}{})", + a.device, + a.path, + opts, + if a.readonly { ",ro" } else { "" } + ); + Ok(()) +} + +/// Wait briefly for the block device to appear — instd attaches it around the +/// same time the guest boots, so it may not be visible the instant we look. +fn wait_for_device(device: &str) -> bool { + let dev = Path::new(device); + let deadline = Instant::now() + DEVICE_WAIT; + loop { + if dev.exists() { + return true; + } + if Instant::now() >= deadline { + return false; + } + std::thread::sleep(DEVICE_POLL); + } +} + +/// Returns (mount flags, mount options string) for a given storage class. +/// +/// The opts string is passed as the ext4-specific data argument to mount(2). +/// VFS-level flags (noatime, ro) MUST go in the flags bitfield — passing them +/// in the data string makes ext4 reject the mount with EINVAL. Mirrors +/// guest-init's `mounts::mount_params`. +fn mount_params(storage_class: &str, readonly: bool) -> (MsFlags, String) { + let mut flags = MsFlags::MS_NOATIME; + if readonly { + flags |= MsFlags::MS_RDONLY; + } + let opts = match storage_class { + "standard" => "commit=60", + s if s.starts_with("database:") => "data=ordered,commit=30,barrier=1", + "ephemeral" => "commit=120,nobarrier", + "scratch" => "commit=120,nobarrier,data=writeback", + _ => "commit=60", + }; + (flags, opts.to_string()) +} + +/// Read `/proc/self/mountinfo` to check whether `path` is mounted with `device`. +fn is_mounted(path: &str, device: &str) -> Result { + let info = read_mountinfo()?; + Ok(info.iter().any(|(mp, dev)| mp == path && dev == device)) +} + +/// Return the device currently mounted at `path`, or `None` if nothing is. +fn mounted_device(path: &str) -> Result, String> { + let info = read_mountinfo()?; + Ok(info.into_iter().find(|(mp, _)| mp == path).map(|(_, dev)| dev)) +} + +fn read_mountinfo() -> Result, String> { + let content = std::fs::read_to_string("/proc/self/mountinfo") + .map_err(|e| format!("read mountinfo: {e}"))?; + Ok(parse_mountinfo(&content)) +} + +/// Parse `/proc/self/mountinfo` into `(mount_point, source)` pairs. +fn parse_mountinfo(content: &str) -> Vec<(String, String)> { + let mut entries = Vec::new(); + for line in content.lines() { + // Format: id parent major:minor root mount-point mount-options ... - fstype source ... + // mount-point is field 4 (0-indexed); source is the 2nd token after "-". + let fields: Vec<&str> = line.splitn(10, ' ').collect(); + if fields.len() < 5 { + continue; + } + let mount_point = fields[4]; + let Some(dash_pos) = line.find(" - ") else { + continue; + }; + let after_dash: Vec<&str> = line[dash_pos + 3..].splitn(3, ' ').collect(); + if after_dash.len() < 2 { + continue; + } + entries.push((mount_point.to_string(), after_dash[1].to_string())); + } + entries +} + +fn ensure_dir(path: &str) -> Result<(), String> { + std::fs::create_dir_all(path).map_err(|e| format!("create dir {path}: {e}")) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_attachments_postgres_volume() { + let val = serde_json::json!({ + "latest": { "meta-data": { "volumes": [ + { + "volume_id": "vol-abc", + "device": "/dev/vdb", + "path": "/var/lib/postgresql", + "fstype": "ext4", + "readonly": false, + "storage_class": "database:postgres" + } + ] } } + }); + let atts = parse_attachments(&val); + assert_eq!(atts.len(), 1); + let a = &atts[0]; + assert_eq!(a.volume_id, "vol-abc"); + assert_eq!(a.device, "/dev/vdb"); + assert_eq!(a.path, "/var/lib/postgresql"); + assert_eq!(a.fstype, "ext4"); + assert!(!a.readonly); + assert_eq!(a.storage_class, "database:postgres"); + } + + #[test] + fn parse_attachments_applies_defaults() { + // fstype/readonly/storage_class omitted → defaults. + let val = serde_json::json!({ + "latest": { "meta-data": { "volumes": [ + { "volume_id": "v", "device": "/dev/vdc", "path": "/data" } + ] } } + }); + let atts = parse_attachments(&val); + assert_eq!(atts.len(), 1); + assert_eq!(atts[0].fstype, "ext4"); + assert!(!atts[0].readonly); + assert_eq!(atts[0].storage_class, "standard"); + } + + #[test] + fn parse_attachments_skips_entries_missing_required_fields() { + // device missing → entry dropped (filter_map on the `?`s). + let val = serde_json::json!({ + "latest": { "meta-data": { "volumes": [ + { "volume_id": "v", "path": "/data" } + ] } } + }); + assert!(parse_attachments(&val).is_empty()); + } + + #[test] + fn parse_attachments_empty_when_no_volumes_key() { + let val = serde_json::json!({ "latest": { "meta-data": { "hostname": "pg" } } }); + assert!(parse_attachments(&val).is_empty()); + } + + #[test] + fn parse_attachments_empty_on_garbage() { + let val = serde_json::json!({ "anything": 1 }); + assert!(parse_attachments(&val).is_empty()); + } + + #[test] + fn mount_params_database_uses_ordered_journaling() { + let (flags, opts) = mount_params("database:postgres", false); + assert!(flags.contains(MsFlags::MS_NOATIME)); + assert!(!flags.contains(MsFlags::MS_RDONLY)); + assert!(opts.contains("data=ordered")); + assert!(opts.contains("commit=30")); + assert!(opts.contains("barrier=1")); + } + + #[test] + fn mount_params_standard_and_readonly() { + let (flags, opts) = mount_params("standard", true); + assert!(flags.contains(MsFlags::MS_NOATIME)); + assert!(flags.contains(MsFlags::MS_RDONLY)); + assert!(opts.contains("commit=60")); + assert!(!opts.contains("data=ordered")); + } + + #[test] + fn mount_params_unknown_falls_back_to_standard() { + let (_flags, opts) = mount_params("weird_class", false); + assert!(opts.contains("commit=60") && !opts.contains("data=ordered")); + } + + #[test] + fn parse_mountinfo_extracts_mountpoint_and_source() { + let line = "36 35 8:1 / /var/lib/postgresql rw,noatime shared:1 - ext4 /dev/vdb rw,data=ordered"; + let entries = parse_mountinfo(line); + assert_eq!(entries, vec![("/var/lib/postgresql".to_string(), "/dev/vdb".to_string())]); + } + + #[test] + fn parse_mountinfo_skips_malformed_lines() { + // No " - " separator → skipped. + assert!(parse_mountinfo("garbage without dash").is_empty()); + } +} diff --git a/beyond-pg-sink/tests/e2e.rs b/beyond-pg-sink/tests/e2e.rs index 707e78a..828dd46 100644 --- a/beyond-pg-sink/tests/e2e.rs +++ b/beyond-pg-sink/tests/e2e.rs @@ -2857,7 +2857,14 @@ fn wal_gap_stalls_replica() { }; wait_http_ready(sink_port); - for _ in 0..60 { + // The sink streams from the consistent point it captures the moment it + // connects. We MUST confirm it is streaming (appears in + // pg_stat_replication) before writing the pre-backup rows below — otherwise + // a slow connect under CI load leaves the sink starting from a later LSN + // and the pre-backup segment is never archived. This wait is therefore + // fatal, and long enough (60s) to absorb a loaded runner. + let mut streaming = false; + for _ in 0..120 { if primary_client .query_opt( "SELECT 1 FROM pg_stat_replication WHERE application_name='wal_sink_gap'", @@ -2866,10 +2873,15 @@ fn wal_gap_stalls_replica() { .unwrap() .is_some() { + streaming = true; break; } std::thread::sleep(Duration::from_millis(500)); } + assert!( + streaming, + "sink never connected as a streaming standby (application_name='wal_sink_gap')" + ); // ── 3. pre-backup rows — archived ──────────────────────────────────────── primary_client @@ -2878,12 +2890,16 @@ fn wal_gap_stalls_replica() { INSERT INTO gap_test (v) SELECT 'pre-' || g FROM generate_series(1,50) g;", ) .unwrap(); + // pg_switch_wal pads the current segment to the boundary; the walsender + // streams the full padded segment to the (already-streaming) sink, which + // seals it once the write reaches 16 MiB. primary_client .execute("SELECT pg_switch_wal()", &[]) .unwrap(); - // Wait for the pre-backup segment to land in the sink. - let deadline = Instant::now() + Duration::from_secs(30); + // Wait for the pre-backup segment to land in the sink (60s to match the + // sibling crash test and absorb CI load). + let deadline = Instant::now() + Duration::from_secs(60); loop { let count = std::fs::read_dir(&sink_dir) .unwrap() diff --git a/packer/files/postgresql/pg_hba.conf b/packer/files/postgresql/pg_hba.conf index 260c591..44386ce 100644 --- a/packer/files/postgresql/pg_hba.conf +++ b/packer/files/postgresql/pg_hba.conf @@ -3,6 +3,14 @@ # See DESIGN.md "Authentication" for rationale. # TYPE DATABASE USER ADDRESS METHOD +# Co-located pgbouncer: connects over the unix socket as the `pgbouncer` +# auth_user (PASSWORD NULL) to run auth_query. It runs as the `postgres` OS user, +# so `peer` (which maps OS name -> DB name) rejects the `pgbouncer` DB user with +# "Peer authentication failed". Trust is safe here: unix socket only, same VM, +# and the role exists solely for the auth_query lookup. MUST precede the +# catch-all `local all all peer` (pg_hba is first-match). +local all pgbouncer trust + # Unix socket: peer auth for local admin and bootstrap scripts local all all peer diff --git a/src/boot.rs b/src/boot.rs index e756b47..6539f46 100644 --- a/src/boot.rs +++ b/src/boot.rs @@ -68,12 +68,36 @@ pub async fn run() { /// Idempotent boot-time setup. Called by `supervisor` before spawning Postgres. pub async fn do_boot(cfg: &MmdsConfig) -> Result<(), BootError> { + ensure_socket_dir(); match cfg.pg_tier { PgTier::Single | PgTier::Primary => do_boot_primary(cfg).await, PgTier::Replica => do_boot_replica(cfg).await, } } +/// Ensure the Postgres unix-socket directory exists and is owned by postgres. +/// +/// `/run/postgresql` (= `/var/run/postgresql`, [`pg::PG_SOCKET_DIR`]) is normally +/// materialized by systemd-tmpfiles, but the Beyond VM has no systemd — PID 1 is +/// `beyond-pg-init`. Without it, Postgres dies at startup with +/// `could not create lock file "/var/run/postgresql/.s.PGSQL..lock"`. +/// Idempotent: a no-op when the dir already exists with the right owner. +fn ensure_socket_dir() { + let dir = pg::PG_SOCKET_DIR; + if let Err(e) = std::fs::create_dir_all(dir) { + warn!("ensure_socket_dir: create {dir}: {e}"); + return; + } + match std::process::Command::new("chown") + .args(["postgres:postgres", dir]) + .status() + { + Ok(s) if s.success() => info!("socket dir {dir} ready (owner postgres)"), + Ok(s) => warn!("chown {dir} exited {s}; continuing"), + Err(e) => warn!("chown {dir} failed to spawn: {e}; continuing"), + } +} + async fn do_boot_primary(cfg: &MmdsConfig) -> Result<(), BootError> { info!("boot: step 1/8 maybe_initdb"); maybe_initdb(cfg).await?; @@ -186,6 +210,19 @@ async fn maybe_initdb(cfg: &MmdsConfig) -> Result<(), BootError> { std::fs::create_dir_all(PGDATA)?; } + // A fresh durable volume mounts root-owned and empty over the image's + // /var/lib/postgresql; chown the tree to postgres so the postgres-user + // initdb (and the cluster it creates) can write it. Idempotent / harmless + // when already postgres-owned (the image-baked ephemeral case). + match std::process::Command::new("chown") + .args(["-R", "postgres:postgres", "/var/lib/postgresql"]) + .status() + { + Ok(s) if s.success() => info!("chowned /var/lib/postgresql → postgres"), + Ok(s) => warn!("chown /var/lib/postgresql exited {s}; continuing"), + Err(e) => warn!("chown /var/lib/postgresql failed to spawn: {e}; continuing"), + } + run_initdb(&cfg.postgres_password).await?; Ok(()) } @@ -201,6 +238,11 @@ async fn run_initdb(password: &str) -> Result<(), BootError> { // Set 0600 before writing the password. std::fs::set_permissions(pwfile.path(), std::fs::Permissions::from_mode(0o600))?; std::fs::write(pwfile.path(), password)?; + // initdb runs as the postgres user (see pg::initdb); the pwfile is created + // root-owned 0600, so chown it to postgres so initdb can read it. + let _ = std::process::Command::new("chown") + .args(["postgres:postgres", pwfile.path().to_str().unwrap_or("")]) + .status(); let path_str = pwfile .path() @@ -707,8 +749,14 @@ async fn http_get(url: &str) -> Result, String> { fn write_config_files(cfg: &MmdsConfig, tls: &crate::tls::TlsConfig) -> Result<(), BootError> { use config::write_atomic; - // 00-beyond.conf — image opinions, overwritten every boot - write_atomic(Path::new(&config::beyond_conf_path()), config::BEYOND_CONF)?; + // 00-beyond.conf — image opinions, overwritten every boot. The + // shared_preload_libraries list is filtered to the extensions actually + // installed in this image so a missing module can't crash postgres at + // startup (see config::beyond_conf). + write_atomic( + Path::new(&config::beyond_conf_path()), + &config::beyond_conf(), + )?; // 05-tls.conf — resolved cert paths, overrides 00-beyond.conf's defaults // via alpha order under conf.d/. Numbered 05 so it lands after 04-replica. diff --git a/src/config.rs b/src/config.rs index c6ef753..c622421 100644 --- a/src/config.rs +++ b/src/config.rs @@ -24,6 +24,80 @@ use crate::pg::PGDATA; pub const BEYOND_CONF: &str = include_str!("../packer/files/postgresql/00-beyond.conf"); +/// Directory holding the PostgreSQL shared-object extension modules. Matches +/// the PG18 Debian-derived layout (`pg_config --pkglibdir`) used everywhere in +/// this image — sibling to the `/var/lib/postgresql/18/...` paths in `pg.rs`. +const PKGLIBDIR: &str = "/usr/lib/postgresql/18/lib"; + +/// `00-beyond.conf` with `shared_preload_libraries` filtered down to the +/// libraries actually installed in this image. +/// +/// `00-beyond.conf` lists every extension the platform *wants* preloaded, but +/// the standalone postgres primitive ships without the auth/queue milestone +/// (`beyond_auth`, `beyond_queue`) or pgdg's `pg_cron` (dropped on a version +/// pin). preloading a missing module makes postgres die at startup with +/// `FATAL: could not access file "": No such file or directory`. +/// +/// This makes the supervisor self-adapting: with the extensions installed it +/// preloads them; without, it drops them and postgres boots. `pg_stat_statements` +/// and `auto_explain` ship with core postgres so they always survive the filter. +pub fn beyond_conf() -> String { + filter_shared_preload_libraries(BEYOND_CONF, PKGLIBDIR) +} + +/// Returns true iff `{pkglibdir}/{lib}.so` exists. Core-postgres libraries +/// (`pg_stat_statements`, `auto_explain`) are present in any standard install. +fn library_installed(pkglibdir: &str, lib: &str) -> bool { + Path::new(pkglibdir).join(format!("{lib}.so")).exists() +} + +/// Post-process a `postgresql.conf` body: rewrite the +/// `shared_preload_libraries = '...'` line, keeping only libraries whose shared +/// object exists under `pkglibdir`. Lines without that key pass through +/// untouched. If every listed library is missing, the key is emitted empty +/// (`shared_preload_libraries = ''`) rather than dropped, so an operator can +/// still see the (now-empty) setting. +fn filter_shared_preload_libraries(conf: &str, pkglibdir: &str) -> String { + const KEY: &str = "shared_preload_libraries"; + let mut out = String::with_capacity(conf.len()); + for line in conf.lines() { + if let Some(filtered) = filter_preload_line(line, KEY, pkglibdir) { + out.push_str(&filtered); + } else { + out.push_str(line); + } + out.push('\n'); + } + out +} + +/// If `line` is a `shared_preload_libraries = '...'` assignment, return the +/// rewritten line with only installed libraries kept; otherwise `None`. +fn filter_preload_line(line: &str, key: &str, pkglibdir: &str) -> Option { + let trimmed = line.trim_start(); + // Don't touch comments. + if trimmed.starts_with('#') { + return None; + } + let rest = trimmed.strip_prefix(key)?; + // The next non-space char after the key must be '=' (avoid matching e.g. + // `shared_preload_libraries.foo`). + let after_key = rest.trim_start(); + let value_part = after_key.strip_prefix('=')?; + // Extract the single-quoted list value. + let value = value_part.trim(); + let inner = value.strip_prefix('\'')?.strip_suffix('\'')?; + + let kept: Vec<&str> = inner + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .filter(|lib| library_installed(pkglibdir, lib)) + .collect(); + + Some(format!("{key} = '{}'", kept.join(","))) +} + pub const PG_HBA_CONF: &str = include_str!("../packer/files/postgresql/pg_hba.conf"); pub const PGBOUNCER_INI_BASE: &str = include_str!("../packer/files/pgbouncer/pgbouncer.ini"); @@ -453,6 +527,76 @@ mod tests { use super::*; use crate::tls::{TlsConfig, TlsSource}; + #[test] + fn filter_preload_keeps_only_installed_libs() { + // Fake pkglibdir with only pg_stat_statements + auto_explain present. + let dir = tempfile::tempdir().unwrap(); + for lib in ["pg_stat_statements", "auto_explain"] { + std::fs::write(dir.path().join(format!("{lib}.so")), b"").unwrap(); + } + let pkglibdir = dir.path().to_str().unwrap(); + let conf = "shared_preload_libraries = 'pg_stat_statements,auto_explain,pg_cron,beyond_auth,beyond_queue'\nfoo = 1\n"; + let out = filter_shared_preload_libraries(conf, pkglibdir); + assert!( + out.contains("shared_preload_libraries = 'pg_stat_statements,auto_explain'"), + "missing libs should be dropped: {out}" + ); + assert!(!out.contains("pg_cron"), "pg_cron not installed: {out}"); + assert!(!out.contains("beyond_auth"), "beyond_auth not installed: {out}"); + assert!(!out.contains("beyond_queue"), "beyond_queue not installed: {out}"); + // Other lines untouched. + assert!(out.contains("foo = 1")); + } + + #[test] + fn filter_preload_all_present_is_unchanged() { + let dir = tempfile::tempdir().unwrap(); + for lib in ["pg_stat_statements", "auto_explain", "pg_cron"] { + std::fs::write(dir.path().join(format!("{lib}.so")), b"").unwrap(); + } + let pkglibdir = dir.path().to_str().unwrap(); + let conf = "shared_preload_libraries = 'pg_stat_statements,auto_explain,pg_cron'\n"; + let out = filter_shared_preload_libraries(conf, pkglibdir); + assert!(out.contains("shared_preload_libraries = 'pg_stat_statements,auto_explain,pg_cron'")); + } + + #[test] + fn filter_preload_all_missing_emits_empty_value() { + let dir = tempfile::tempdir().unwrap(); + let pkglibdir = dir.path().to_str().unwrap(); + let conf = "shared_preload_libraries = 'pg_cron,beyond_auth'\n"; + let out = filter_shared_preload_libraries(conf, pkglibdir); + assert!( + out.contains("shared_preload_libraries = ''"), + "all missing → empty value (key retained): {out}" + ); + } + + #[test] + fn filter_preload_ignores_comments_and_other_keys() { + let dir = tempfile::tempdir().unwrap(); + let pkglibdir = dir.path().to_str().unwrap(); + let conf = "# shared_preload_libraries = 'pg_cron'\nshared_preload_libraries.foo = 'bar'\n"; + let out = filter_shared_preload_libraries(conf, pkglibdir); + // Commented line passes through verbatim. + assert!(out.contains("# shared_preload_libraries = 'pg_cron'")); + // A different key (dotted) is not the assignment we rewrite. + assert!(out.contains("shared_preload_libraries.foo = 'bar'")); + } + + #[test] + fn filter_preload_handles_real_embedded_conf() { + // The embedded 00-beyond.conf must contain the key; filtering against a + // dir with only core libs must drop the milestone extensions. + let dir = tempfile::tempdir().unwrap(); + for lib in ["pg_stat_statements", "auto_explain"] { + std::fs::write(dir.path().join(format!("{lib}.so")), b"").unwrap(); + } + let out = filter_shared_preload_libraries(BEYOND_CONF, dir.path().to_str().unwrap()); + assert!(out.contains("shared_preload_libraries = 'pg_stat_statements,auto_explain'")); + assert!(!out.contains("'pg_stat_statements,auto_explain,pg_cron")); + } + #[test] fn tls_conf_platform_includes_ca() { let tls = TlsConfig { diff --git a/src/pg.rs b/src/pg.rs index 9a58449..0c7fd88 100644 --- a/src/pg.rs +++ b/src/pg.rs @@ -159,20 +159,24 @@ pub async fn reload() -> Result<(), PgError> { /// password (created by the caller as a `tempfile::NamedTempFile`). pub async fn initdb(pgdata: &str, pwfile_path: &str) -> Result<(), PgError> { debug!("running initdb in {pgdata}"); - let out = Command::new("initdb") - .args([ - "-D", - pgdata, - "--waldir", - "/var/lib/postgresql/18/wal", - "--auth=scram-sha-256", - "--encoding=UTF8", - "--locale=en_US.UTF-8", - &format!("--pwfile={pwfile_path}"), - ]) - .stderr(std::process::Stdio::piped()) - .output() - .await?; + let mut cmd = Command::new("initdb"); + cmd.args([ + "-D", + pgdata, + "--waldir", + "/var/lib/postgresql/18/wal", + "--auth=scram-sha-256", + "--encoding=UTF8", + "--locale=en_US.UTF-8", + &format!("--pwfile={pwfile_path}"), + ]) + .stderr(std::process::Stdio::piped()); + // initdb refuses to run as root; run it as the postgres OS user (the same + // user that will own the cluster and that postgres/psql drop to). Required + // when PGDATA is a fresh durable volume initialized at runtime (not baked + // into the image as ephemeral data). The caller chowns the tree first. + drop_to_postgres_user(&mut cmd); + let out = cmd.output().await?; if out.status.success() { debug!("initdb complete"); diff --git a/src/supervisor.rs b/src/supervisor.rs index bce762d..f1fd516 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -1722,9 +1722,24 @@ async fn post_start(cfg: &MmdsConfig) -> Result<(), Box> .await .map_err(|e| format!("failed to set up pgbouncer auth: {e}"))?; - // Required extensions — fail the supervisor if any are missing; the process - // manager will restart and retry rather than running in a degraded state. + // Required extensions — fail the supervisor if CREATE EXTENSION fails for + // an extension whose shared object IS installed; the process manager will + // restart and retry rather than running in a degraded state. + // + // But the standalone postgres primitive ships without the auth/queue + // milestone (beyond_auth/beyond_queue) and pgdg's pg_cron (version pin + // drift). When an extension's .so isn't installed, treat it as a warning, + // not a fatal — the same self-adapting posture as the + // shared_preload_libraries filter (see config::beyond_conf). With the + // extensions present, the behavior is unchanged (still hard-required). for ext in REQUIRED_EXTENSIONS { + if !extension_installed(ext) { + warn!( + "required extension {ext} not installed (no {ext}.so in {EXTENSION_PKGLIBDIR}); \ + skipping (auth/queue milestone not in this image)" + ); + continue; + } pg::psql(&format!("CREATE EXTENSION IF NOT EXISTS {ext}")) .await .map_err(|e| format!("required extension {ext} failed: {e}"))?; @@ -1798,6 +1813,20 @@ async fn setup_pgbouncer_auth() -> Result<(), crate::pg::PgError> { const REQUIRED_EXTENSIONS: &[&str] = &["beyond_auth", "beyond_queue", "pg_cron"]; +/// Directory holding PostgreSQL extension shared objects (PG18 Debian layout, +/// `pg_config --pkglibdir`). Mirrors `config`'s PKGLIBDIR. +const EXTENSION_PKGLIBDIR: &str = "/usr/lib/postgresql/18/lib"; + +/// True iff the extension's shared object is present in the image. An extension +/// listed in [`REQUIRED_EXTENSIONS`] but with no installed `.so` (e.g. the +/// future auth/queue milestone, or a dropped pgdg `pg_cron`) is downgraded from +/// fatal to a warning so the standalone primitive still boots. +fn extension_installed(ext: &str) -> bool { + std::path::Path::new(EXTENSION_PKGLIBDIR) + .join(format!("{ext}.so")) + .exists() +} + const OPTIONAL_EXTENSIONS: &[&str] = &[ "pg_stat_statements", "auto_explain", diff --git a/src/tls.rs b/src/tls.rs index 941c13d..25cd818 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -222,6 +222,9 @@ fn now_secs() -> u64 { // --------------------------------------------------------------------------- fn generate_cert(cert_path: &Path, key_path: &Path) -> Result<(), TlsError> { + // Ed25519: fast signing on every TLS handshake (the reason we picked it over + // P-256). The rootfs must ship an OpenSSL that can load Ed25519 keys — see + // the image build; Postgres loads the server key through OpenSSL. let key_pair = rcgen::KeyPair::generate_for(&rcgen::PKCS_ED25519)?; let mut params = CertificateParams::default(); @@ -246,15 +249,49 @@ fn generate_cert(cert_path: &Path, key_path: &Path) -> Result<(), TlsError> { // Key is written with mode 0o600 set *before* content. This closes the // window in which a chmod-after-write approach would leave the private // key briefly readable under a wider umask. - crate::config::write_atomic_bytes_with_mode( - key_path, - key_pair.serialize_pem().as_bytes(), - 0o600, - )?; + // + // rcgen (aws-lc-rs) serializes Ed25519 keys as PKCS#8 *v2* (OneAsymmetricKey + // with the trailing public-key field). OpenSSL 3.0.13 — the version on the + // ubuntu-noble base image, which Postgres links against — cannot decode v2 + // Ed25519 keys and rejects them with `unsupported`, so Postgres can't load + // its server key. Re-emit as standard PKCS#8 *v1* (identical Ed25519 key and + // performance; v2 only appends the public key, which the cert already + // carries). Falls back to rcgen's PEM if the key isn't the expected Ed25519 + // shape (e.g. a future algorithm change). + let key_pem = ed25519_pkcs8_v1_pem(&key_pair.serialize_der()) + .unwrap_or_else(|| key_pair.serialize_pem()); + crate::config::write_atomic_bytes_with_mode(key_path, key_pem.as_bytes(), 0o600)?; Ok(()) } +/// Re-encode an Ed25519 private key from rcgen's PKCS#8 v2 DER to the +/// universally-accepted PKCS#8 v1 PEM. Returns `None` if `der_v2` is not a +/// recognizable Ed25519 PKCS#8 (so the caller keeps rcgen's own encoding). +fn ed25519_pkcs8_v1_pem(der_v2: &[u8]) -> Option { + use base64::Engine as _; + // The privateKey field encodes the 32-byte seed identically in v1 and v2 as + // the byte run `04 22 04 20 ` (OCTET STRING { OCTET STRING(32) }). + const PRIV_PREFIX: [u8; 4] = [0x04, 0x22, 0x04, 0x20]; + let pos = der_v2.windows(4).position(|w| w == PRIV_PREFIX)?; + let seed = der_v2.get(pos + 4..pos + 4 + 32)?; + // Canonical 48-byte Ed25519 PKCS#8 v1: + // SEQ { INTEGER 0, SEQ { OID 1.3.101.112 }, OCTET STRING { OCTET STRING(32) seed } } + let mut der_v1 = vec![ + 0x30, 0x2e, 0x02, 0x01, 0x00, 0x30, 0x05, 0x06, 0x03, 0x2b, 0x65, 0x70, 0x04, 0x22, 0x04, + 0x20, + ]; + der_v1.extend_from_slice(seed); + let b64 = base64::engine::general_purpose::STANDARD.encode(&der_v1); + let mut pem = String::from("-----BEGIN PRIVATE KEY-----\n"); + for line in b64.as_bytes().chunks(64) { + pem.push_str(std::str::from_utf8(line).expect("base64 is ASCII")); + pem.push('\n'); + } + pem.push_str("-----END PRIVATE KEY-----\n"); + Some(pem) +} + /// Hostname for the cert CN. Reads `/etc/hostname` (written by `init::run()` /// from MMDS), falls back to `gethostname(2)`, then to `"localhost"`. fn hostname() -> String {