diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 01e8c5f..c2ed6c9 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -177,15 +177,20 @@ Each pack is self-describing — the block index is a footer (trailer → index ``` {db_path}/ -└── exports/{s3_prefix}/ ← ContentStore root (shared by exports + bless) - ├── manifests/{export_name} ← VolumeManifest GLVM (chunk_idx → [pack_ids]) - ├── manifests/{tag_name} ← Named manifest tag (same format, arbitrary name) - ├── manifests/bases/{image_name} ← Blessed base image VolumeManifest (glidefs bless) - ├── snapshots/{export_name}/{sequence:020} ← Versioned VolumeManifest (zero-padded sequence) - └── chunks/{chunk_idx:04}/ - └── {pack_id:016x}.pack ← GLPK pack (self-describing: header+index+data) +├── exports/{s3_prefix}/ ← ContentStore root (shared by exports + bless) +│ ├── manifests/{export_name} ← VolumeManifest GLVM (chunk_idx → [pack_ids]) +│ ├── manifests/{tag_name} ← Named manifest tag (same format, arbitrary name) +│ ├── manifests/bases/{image_name} ← Blessed base image VolumeManifest (glidefs bless) +│ ├── snapshots/{export_name}/{sequence:020} ← Versioned VolumeManifest (zero-padded sequence) +│ └── chunks/{chunk_idx:04}/ +│ └── {pack_id:016x}.pack ← GLPK pack (self-describing: header+index+data) +└── index/ ← Logical→physical resolution (name-keyed, prefix-independent) + ├── images/{image_name}.json ← image: → {pool, manifest} + └── snapshots/{volume}@{seq}.json ← snapshot: → {pool, volume, sequence, parent} ``` +(The volume index is `exports/{name}/export.json` itself — name-keyed and pool-independent — so it doubles as both the export definition and the `volume:` resolver.) + Chunk directories use 4-digit zero-padded indices (`chunks/0000/`, `chunks/0001/`, ...). A 1 TB device has 8,192 chunks (128 MiB each). A compacted chunk has exactly 1 pack file; an uncompacted chunk may have up to `DEFAULT_COMPACTION_THRESHOLD` (16) packs. **Manifest size by scenario:** @@ -462,14 +467,18 @@ GC reconcile_prefix(): ``` PUT /api/exports/fork-vm - { "manifest_name": "prod-vm", "snapshot_sequence": 42, "size_gb": 10 } + { "from": "snapshot:prod-vm@42", "size_gb": 10 } ← logical: no pool, no manifest, no sequence + │ + ▼ +resolve_source(Snapshot("prod-vm@42")) ← GET index/snapshots/prod-vm@42.json + → ResolvedSource { pool: "prod-vm", manifest_name: "prod-vm", snapshot_sequence: 42 } │ ▼ -router.create_export(config, readonly=false, manifest_name=Some("prod-vm"), snapshot_sequence=Some(42)) +router.create_export(config{s3_prefix="prod-vm"}, readonly=false, manifest_name=Some("prod-vm"), snapshot_sequence=Some(42)) │ ├── content_store.get_snapshot("prod-vm", 42) ← GET snapshots/prod-vm/00000000000000000042 ├── VolumeManifest::deserialize() - ├── ContentStore::put_manifest("fork-vm", ...) ← PUT manifests/fork-vm + ├── ContentStore::put_manifest("fork-vm", ...) ← PUT manifests/fork-vm (in prod-vm's pool, CoW) └── WriteCache::open_fresh_active(config) ← empty local block map ``` @@ -701,18 +710,20 @@ HTTP REST API for orchestrators (scale-to-zero, live migration). (`api.rs`) | Endpoint | Method | Purpose | |----------|--------|---------| -| `/api/exports/{name}` | `PUT` | Create or resize export (idempotent). With `manifest_name` + optional `snapshot_sequence`: fork from parent or specific snapshot. | +| `/api/exports/{name}` | `PUT` | Create, fork, re-attach, or resize a volume by **name alone** (idempotent). Body is fully logical — no `s3_prefix`/`manifest_name`/`snapshot_sequence`. To fork, set `from` to `"image:"`, `"volume:"`, or `"snapshot:"`; GlideFS resolves it to a pool + manifest and places the new volume in the source's pool for CoW. Omit `from` for a blank volume. | +| `/api/resolve/{name}` | `GET` | Resolve a volume's physical location (`{s3_prefix, manifest_name, size_gb,…}`) from the durable name-keyed index, reading `export.json` straight from S3. Works on **any** node — even one that has never attached or discovered the volume. The primitive behind dead-node recovery. | +| `/api/images/{name}` | `GET` | Resolve a blessed image's location (`{name, pool, manifest}`) from the logical image index. | | `/api/exports/{name}` | `GET` | Get export info (size, readonly, transport, device path) | | `/api/exports/{name}` | `DELETE` | Remove export. `?purge=true` also deletes local cache and all S3 snapshots. | | `/api/exports` | `GET` | List all active exports | -| `/api/exports/{name}/snapshot` | `POST` | Flush dirty blocks → S3, upload versioned manifest. Optional body `{"tag":"name"}` also publishes named alias. Returns `{sequence, manifest_etag}`. | +| `/api/exports/{name}/snapshot` | `POST` | Flush dirty blocks → S3, upload versioned manifest, and register the snapshot in the logical index. Optional body `{"tag":"name"}` also publishes a named alias. Returns `{snapshot_id, sequence, manifest_etag}` — fork from it via `from: "snapshot:"`. | | `/api/exports/{name}/snapshots` | `GET` | List snapshot sequences in ascending order | | `/api/exports/{name}/snapshots/{seq}` | `DELETE` | Delete a specific snapshot (idempotent) | | `/api/exports/{name}/tag` | `POST` | Publish current manifest under a named alias without re-flushing. Body: `{"tag":"name"}`. | | `/api/manifests/{s3_prefix}/{name}` | `HEAD` | Check manifest existence (200/404). No data transfer, no running export required. | | `/api/exports/{name}/drain` | `POST` | Flush all dirty blocks to S3 (no versioned snapshot) | | `/api/exports/{name}/promote` | `POST` | Toggle readonly → read-write | -| `/api/exports/{name}/promote-base` | `POST` | Publish a snapshot's manifest under `bases/{base_name}` (no data re-upload). Body: `{"base_name":"...","sequence":N}`. Idempotent; the promoted base is forkable and profileable like a blessed one. | +| `/api/exports/{name}/promote-base` | `POST` | Publish a snapshot's manifest under `bases/{base_name}` (no data re-upload) and register it in the image index. Body: `{"base_name":"...","sequence":N}`. Idempotent; the promoted base is forkable (`from: "image:"`) and profileable like a blessed one. | | `/api/profile/{s3_prefix}/{name}` | `POST` | Start a background boot-set profile of `bases/{name}` (202). Body (all optional): `{"cmd","seed_paths","fs_type","runs","timeout_secs","force","untrusted","max_blocks"}`. `seed_paths` are faulted under the tracer before the entrypoint. 503 when the server has no `[profile]` config. | | `/api/profile/{s3_prefix}/{name}` | `GET` | Profile status: `{"state":"running"}` in-flight; `{"state":"complete"}` when `.boot-set.meta` exists; 404 when neither (never profiled, or last attempt failed). | | `/api/exports/{name}/metrics` | `GET` | Per-export metrics snapshot (JSON) | @@ -724,6 +735,37 @@ HTTP REST API for orchestrators (scale-to-zero, live migration). (`api.rs`) Export definitions are saved to S3 as `{db_path}/exports/{name}/export.json`. On startup, `discover_exports()` lists all `export.json` files under the `exports/` prefix and loads them 32-wide parallel, then `create_export()` recovers each from local WAL 16-wide parallel. No S3 writes on the recovery path. (`router.rs:save_export`, `router.rs:discover_exports`, `cli/server.rs`) +### Logical Naming & Resolution (GlideFS owns the logical→physical mapping) + +Callers address everything by **stable logical name** and never supply a physical +`s3_prefix` or `manifest_name`. GlideFS owns three durable, name-keyed, prefix-independent +indexes — read on every resolve so **any node** can locate data from a name alone (the +basis for dead-node recovery: kill the node holding a mapping, the bytes stay addressable). + +| Index | Key | S3 location | Resolves a… | Written by | +|-------|-----|-------------|-------------|------------| +| **Volume** | volume name | `{db_path}/exports/{name}/export.json` | `volume:` → `(pool, manifests/{name})` | every create/fork/re-attach (`save_export`) | +| **Image** | image name | `{db_path}/index/images/{name}.json` | `image:` → `(pool, bases/{name})` | bless (HTTP + `glidefs bless` CLI) and `promote-base` (`index_image` / `registry::put_image_entry`) | +| **Snapshot** | `{volume}@{seq}` | `{db_path}/index/snapshots/{id}.json` | `snapshot:` → `(pool, volume, sequence)` | `snapshot_export` (`save_snapshot_entry`) | + +A create/fork request carries a logical `from` ref (`FromRef`, `block/registry.rs`); the +router's `resolve_source()` turns it into the physical coordinates the existing fork +machinery needs, and **places the new volume in the source's pool so CoW pack sharing +works**. The physical S3 layout is unchanged — only the *addressing* moved from the caller +into GlideFS. Lineage (`ExportConfig::source`, and a snapshot entry's `parent`) records the +`from` ref so GlideFS, not the caller, owns the parent/child graph. + +Re-attach: a `PUT` for a volume not held locally consults the volume index first; if it +exists, GlideFS adopts the persisted pool + geometry and attaches the real data instead of +creating a fresh empty volume at the wrong pool. (`router.rs:resolve_export`, +`router.rs:resolve_source`, `api.rs:create_or_attach_volume`.) + +**Remaining physical surface (build-time admin only).** A few endpoints still take a +`{s3_prefix}` path segment — `HEAD /api/manifests/{s3_prefix}/{name}` and +`POST|GET /api/profile/{s3_prefix}/{name}`. These are image-authoring/admin operations, not +the volume create/fork data path; the orchestrator's runtime volume lifecycle uses logical +names exclusively. + ## Observability Per-export Prometheus metrics exposed at `/metrics`. Latency histograms are sampled 1:64 to reduce mutex contention at high IOPS. (`metrics.rs`) diff --git a/README.md b/README.md index ced5bae..d162e7f 100644 --- a/README.md +++ b/README.md @@ -77,13 +77,13 @@ curl -X PUT localhost:8080/api/exports/my-vm \ -d '{"size_gb": 500}' # → {"name":"my-vm","size_bytes":500000000000,"readonly":false,"transport":"nbd","device":"/dev/nbd0"} -# Fork from the current state of an existing export +# Fork from the current state of an existing export — by logical name alone curl -X PUT localhost:8080/api/exports/my-vm-fork \ - -d '{"size_gb": 500, "manifest_name": "my-vm"}' + -d '{"size_gb": 500, "from": "volume:my-vm"}' -# Fork from a specific snapshot (returns sequence from POST /snapshot) +# Fork from a specific snapshot (snapshot_id comes from POST /snapshot) curl -X PUT localhost:8080/api/exports/my-vm-fork \ - -d '{"size_gb": 500, "manifest_name": "my-vm", "snapshot_sequence": 42}' + -d '{"size_gb": 500, "from": "snapshot:my-vm@42"}' # Use ublk transport (Linux 6.0+, requires --features ublk) curl -X PUT localhost:8080/api/exports/my-vm \ @@ -112,7 +112,9 @@ PUT is idempotent. Same size → returns current state. Larger size → grows th | Endpoint | Method | Description | |----------|--------|-------------| | `/api/exports` | GET | List exports (includes transport + device path) | -| `/api/exports/{name}` | PUT | Create or resize export. `manifest_name` + optional `snapshot_sequence` to fork. | +| `/api/exports/{name}` | PUT | Create or resize export by name. To fork, set `from` to `"image:"`, `"volume:"`, or `"snapshot:"`. | +| `/api/resolve/{name}` | GET | Resolve a volume's location (`s3_prefix`, `manifest_name`, …) by name — reads S3 directly, works on any node. | +| `/api/images/{name}` | GET | Resolve a blessed image's location (`pool`, `manifest`) by name. | | `/api/exports/{name}` | GET | Get export info | | `/api/exports/{name}` | DELETE | Remove export. `?purge=true` deletes local cache and all S3 snapshots. | | `/api/exports/{name}/drain` | POST | Flush all dirty blocks to S3 (no snapshot created) | @@ -137,11 +139,11 @@ glidefs bless --image ubuntu-22.04.raw --name ubuntu-22.04-v1 --s3-prefix bases Exports forked from base images share blocks via content addressing. Identical data is stored once. -Fork from a blessed image using `manifest_name: "bases/{name}"`: +Fork from a blessed image using `from: "image:{name}"`: ```sh curl -X PUT localhost:8080/api/exports/vm-1 \ - -d '{"size_gb": 50, "manifest_name": "bases/ubuntu-22.04-v1"}' + -d '{"size_gb": 50, "from": "image:ubuntu-22.04-v1"}' ``` ### Boot-set profiling (faster cold start) @@ -180,7 +182,7 @@ curl -sX POST localhost:8080/api/exports/prod/snapshot \ # 2. Fork — instant CoW, no data copied curl -X PUT localhost:8080/api/exports/vm-deploy-7 \ - -d '{"size_gb": 50, "manifest_name": "prod"}' + -d '{"size_gb": 50, "from": "volume:prod"}' # → {"device": "/dev/nbd1", ...} # 3. Mount + sync code + start @@ -229,7 +231,7 @@ if [ "$STATUS" -eq 200 ]; then else # Miss: fork from base, run setup, tag result curl -X PUT localhost:8080/api/exports/setup-work \ - -d '{"size_gb": 50, "manifest_name": "bases/ubuntu-24.04-v1"}' + -d '{"size_gb": 50, "from": "image:ubuntu-24.04-v1"}' mount /dev/nbd1 /mnt mise install node@22 && npm ci --prefix /mnt/app @@ -242,9 +244,9 @@ else SOURCE="setup-${SETUP_HASH}" fi -# Fork from setup state, sync code, deploy +# Fork from setup state (a tag is forkable as an image), sync code, deploy curl -X PUT localhost:8080/api/exports/vm-deploy-8 \ - -d "{\"size_gb\": 50, \"manifest_name\": \"${SOURCE}\"}" + -d "{\"size_gb\": 50, \"from\": \"image:${SOURCE}\"}" ``` Same `IMAGE_ID + LOCKFILE_HASH` next deploy → HEAD returns 200 → setup is skipped entirely. @@ -405,15 +407,15 @@ SEQ=$(curl -sX POST localhost:8080/api/exports/my-vm/snapshot | jq .sequence) curl localhost:8080/api/exports/my-vm/snapshots # → [1, 5, 42] -# Fork a new export from snapshot 42 (read-only parent blocks, CoW overlay for writes) +# Fork a new export from snapshot $SEQ (read-only parent blocks, CoW overlay for writes) curl -X PUT localhost:8080/api/exports/my-vm-test \ - -d "{\"size_gb\": 500, \"manifest_name\": \"my-vm\", \"snapshot_sequence\": $SEQ}" + -d "{\"size_gb\": 500, \"from\": \"snapshot:my-vm@$SEQ\"}" # Delete a snapshot when done curl -X DELETE localhost:8080/api/exports/my-vm/snapshots/5 ``` -`snapshot_sequence` is optional. Omit it to fork from the current state. +To fork from the live current state instead of a snapshot, use `from: "volume:my-vm"`. **GC and snapshots**: GC scans all snapshot manifests before deleting any pack. Packs referenced by a snapshot are kept alive even if they're no longer in the current manifest. Deleting a snapshot unpins its exclusive packs — they become eligible for GC after the grace period (default 24h). @@ -425,7 +427,7 @@ curl -X DELETE localhost:8080/api/exports/my-vm # 2. Fork from the target snapshot into the same name curl -X PUT localhost:8080/api/exports/my-vm \ - -d '{"size_gb": 500, "manifest_name": "my-vm", "snapshot_sequence": 5}' + -d '{"size_gb": 500, "from": "snapshot:my-vm@5"}' ``` No data is copied — the new export reads parent blocks from the existing S3 packs via the CoW overlay. @@ -434,7 +436,7 @@ No data is copied — the new export reads parent blocks from the existing S3 pa ```sh curl -X PUT localhost:8080/api/exports/my-vm-rollback \ - -d '{"size_gb": 500, "manifest_name": "my-vm", "snapshot_sequence": 5}' + -d '{"size_gb": 500, "from": "snapshot:my-vm@5"}' # verify my-vm-rollback, then swap at the load balancer ``` diff --git a/glidefs/src/block/api.rs b/glidefs/src/block/api.rs index f3447a5..afd6a21 100644 --- a/glidefs/src/block/api.rs +++ b/glidefs/src/block/api.rs @@ -5,6 +5,7 @@ //! Used by orchestrators for microVM scale-to-zero and live migration. use crate::block::metrics::prometheus_header; +use crate::block::registry::FromRef; use crate::block::router::{ExportInfo, ExportRouter, RouterError}; use crate::config::ExportConfig; use crate::task; @@ -25,20 +26,23 @@ use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use url::form_urlencoded; -/// Request to create or update an export (PUT /api/exports/{name}). -/// Name comes from URL path, not body. +/// Request to create, fork, re-attach, or resize a volume +/// (`PUT /api/exports/{name}`). Name comes from the URL path, not the body. +/// +/// Fully logical: callers never supply an `s3_prefix` or `manifest_name`. To +/// fork, set `from` to a logical ref (`"image:"`, `"volume:"`, +/// `"snapshot:"`); GlideFS resolves it to a pool + manifest internally and +/// places the new volume in the source's pool for CoW pack sharing. #[derive(Debug, Deserialize)] -pub struct PutExportRequest { +pub struct CreateVolumeRequest { pub size_gb: f64, + /// Logical source to fork from. Omit (or `null`) for a fresh blank volume. #[serde(default)] - pub s3_prefix: Option, + pub from: Option, #[serde(default)] pub readonly: bool, #[serde(default)] pub block_size: Option, - /// If set, fork this export from the named S3 manifest. - #[serde(default)] - pub manifest_name: Option, /// Blocks per S3 pack (default: inherit from global config). 0 = manual mode. #[serde(default)] pub flush_threshold: Option, @@ -48,9 +52,6 @@ pub struct PutExportRequest { /// Block device transport: "nbd" (default) or "ublk" (Linux 6.0+). #[serde(default)] pub transport: Option, - /// If set (with manifest_name), fork from this specific snapshot sequence. - #[serde(default)] - pub snapshot_sequence: Option, /// Cooldown compaction window in flush cycles (0/unset = disabled). Defers /// dead-ratio compaction of a chunk until it has been idle this many cycles; /// cuts S3 PUT write-amp on overwrite-heavy DB volumes. Typical value: 8. @@ -116,6 +117,36 @@ pub struct ListExportsResponse { pub exports: Vec, } +/// Response for `GET /api/resolve/{name}`: where a volume's data physically +/// lives, resolved from the durable name-keyed index (`export.json`). Lets any +/// node locate a volume given only its stable name — no `s3_prefix` required. +#[derive(Debug, Serialize, Deserialize)] +pub struct ResolveResponse { + pub name: String, + /// Effective S3 prefix (pool) holding this volume's packs + manifests. + pub s3_prefix: String, + /// Manifest name within the pool (equals the volume name). + pub manifest_name: String, + pub size_gb: f64, + #[serde(skip_serializing_if = "Option::is_none")] + pub block_size: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub transport: Option, +} + +impl ResolveResponse { + fn from_config(name: &str, cfg: &ExportConfig) -> Self { + ResolveResponse { + name: name.to_string(), + s3_prefix: cfg.s3_prefix().to_string(), + manifest_name: name.to_string(), + size_gb: cfg.size_gb, + block_size: cfg.block_size, + transport: cfg.transport.clone(), + } + } +} + /// Request to bless an OCI image (POST /api/bless/{s3_prefix}/{name}). #[derive(Debug, Deserialize)] pub struct BlessRequest { @@ -289,6 +320,169 @@ where handle_request(router, req).await } +/// Create, fork, re-attach, or resize a volume — the body of +/// `PUT /api/exports/{name}`. Inputs are fully logical: `req` carries no +/// physical coordinates and `from` is the parsed logical source ref. This +/// function owns the create/fork/re-attach/resize decision and feeds the +/// existing physical machinery (`create_export`) the coordinates it resolved. +async fn create_or_attach_volume( + router: &Arc, + name: &str, + req: &CreateVolumeRequest, + from: &FromRef, +) -> Response { + // Already attached on this node → resize-or-noop (idempotent create). + if let Some(export) = router.get_export_info(name).await { + let current_size_gb = export.size as f64 / 1_073_741_824.0; + if req.size_gb > current_size_gb { + return match router.resize_export(name, req.size_gb).await { + Ok(()) => { + let transport = export.transport.as_str(); + #[cfg(target_os = "linux")] + if let Err(e) = router.register_device(name, transport).await { + warn!(export = %name, error = %e, "device re-registration after resize failed"); + } + let _ = transport; + match router.get_export_info(name).await { + Some(info) => { + json_response(StatusCode::OK, &ExportInfoResponse::from(info)) + } + None => error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "Export resized but not found in map", + ), + } + } + Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()), + }; + } + return json_response(StatusCode::OK, &ExportInfoResponse::from(export)); + } + + // Not attached here. Resolve the durable, name-keyed index (export.json): + // if this volume already exists in S3 (created on another node, or before + // this node booted), recover its pool and ATTACH the real data — never + // create a fresh empty volume at the wrong pool. Resolve-by-name. + let resolved = match router.resolve_export(name).await { + Ok(r) => r, + Err(e) => { + return error_response( + StatusCode::INTERNAL_SERVER_ERROR, + &format!("failed to resolve export '{}': {}", name, e), + ); + } + }; + + let (config, fork_manifest, fork_seq) = if let Some(existing) = resolved { + // RE-ATTACH: adopt the persisted pool + on-disk geometry (these MUST + // match the stored data); honor runtime prefs; never re-fork, never + // shrink. `from` is ignored — the volume already exists. + let config = ExportConfig { + name: name.to_string(), + size_gb: req.size_gb.max(existing.size_gb), + s3_prefix: existing.s3_prefix.clone(), + block_size: req.block_size.or(existing.block_size), + flush_threshold: req.flush_threshold.or(existing.flush_threshold), + flush_mode: req.flush_mode.clone().or(existing.flush_mode.clone()), + transport: req.transport.clone().or(existing.transport.clone()), + compaction_cooldown: req.compaction_cooldown.or(existing.compaction_cooldown), + source: existing.source.clone(), + }; + info!( + export = %name, + s3_prefix = %config.s3_prefix(), + "re-attaching volume from persisted index" + ); + (config, None, None) + } else { + // CREATE or FORK: resolve the logical source to physical coordinates. + // The source's pool becomes the new volume's pool so CoW pack sharing + // works; blank volumes get their own pool (= their name). + let src = match router.resolve_source(from).await { + Ok(s) => s, + Err(RouterError::SourceNotFound(s)) => { + return error_response(StatusCode::NOT_FOUND, &format!("source not found: {s}")); + } + Err(e) => return error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()), + }; + let config = ExportConfig { + name: name.to_string(), + size_gb: req.size_gb, + s3_prefix: src.pool, + block_size: req.block_size, + flush_threshold: req.flush_threshold, + flush_mode: req.flush_mode.clone(), + transport: req.transport.clone(), + compaction_cooldown: req.compaction_cooldown, + source: from.as_source(), + }; + (config, src.manifest_name, src.snapshot_sequence) + }; + + let transport = config.transport.as_deref().unwrap_or("nbd").to_string(); + let is_fork = fork_manifest.is_some(); + + let t_handler = Instant::now(); + let t_create = Instant::now(); + let create_result = router + .create_export(config.clone(), req.readonly, fork_manifest.as_deref(), fork_seq) + .await; + let create_ms = t_create.elapsed().as_millis() as u64; + + match create_result { + Ok(()) => { + // Persist the index entry and register the device concurrently. + // save_export MUST succeed; register_device is best-effort. + let t_io = Instant::now(); + #[cfg(target_os = "linux")] + let (save_result, register_result) = tokio::join!( + router.save_export(&config), + router.register_device(name, &transport), + ); + #[cfg(not(target_os = "linux"))] + let save_result = router.save_export(&config).await; + + if let Err(e) = save_result { + // Without teardown, the in-memory entry silently shadows the + // missing S3 export.json — retries hit create_export's + // idempotency check and never re-attempt the S3 write. + router.cleanup_failed_create(name, &transport).await; + return error_response( + StatusCode::SERVICE_UNAVAILABLE, + &format!( + "Export definition not persisted to S3 ({e}); \ + in-memory state cleaned up, retry the request" + ), + ); + } + #[cfg(target_os = "linux")] + if let Err(e) = register_result { + warn!(export = %name, error = %e, "device registration failed"); + } + let _ = transport; + + tracing::info!( + target: "glidefs.timing", + export = %name, + fork = is_fork, + create_export_ms = create_ms, + io_ms = t_io.elapsed().as_millis() as u64, + total_ms = t_handler.elapsed().as_millis() as u64, + "PUT /api/exports timing" + ); + + match router.get_export_info(name).await { + Some(info) => json_response(StatusCode::CREATED, &ExportInfoResponse::from(info)), + None => error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "Export created but not found in map", + ), + } + } + Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()), + } +} + /// Handle API requests. async fn handle_request( router: Arc, @@ -347,7 +541,7 @@ where Err(e) => return Ok(error_response(StatusCode::BAD_REQUEST, &e.to_string())), }; - let put_req: PutExportRequest = match serde_json::from_slice(&body) { + let put_req: CreateVolumeRequest = match serde_json::from_slice(&body) { Ok(r) => r, Err(e) => { return Ok(error_response( @@ -412,168 +606,16 @@ where } } - if let Some(ref prefix) = put_req.s3_prefix - && (prefix.contains("..") || prefix.starts_with('/')) - { - return Ok(error_response( - StatusCode::BAD_REQUEST, - "Invalid s3_prefix: must not contain '..' or start with '/'", - )); - } - - if put_req.snapshot_sequence.is_some() && put_req.manifest_name.is_none() { - return Ok(error_response( - StatusCode::BAD_REQUEST, - "snapshot_sequence requires manifest_name to be set", - )); - } - - // Check if export already exists (direct lookup, not a full scan) - let existing = router.get_export_info(name).await; - - match existing { - Some(export) => { - // Export exists - check if resize needed - let current_size_gb = export.size as f64 / 1_073_741_824.0; - if put_req.size_gb > current_size_gb { - // Need to grow - match router.resize_export(name, put_req.size_gb).await { - Ok(()) => { - // Re-register device after resize (device was removed) - let transport = export.transport.as_str(); - #[cfg(target_os = "linux")] - if let Err(e) = router.register_device(name, transport).await { - warn!(export = %name, error = %e, "device re-registration after resize failed"); - } - let _ = transport; // suppress unused warning on non-Linux - match router.get_export_info(name).await { - Some(info) => json_response( - StatusCode::OK, - &ExportInfoResponse::from(info), - ), - None => error_response( - StatusCode::INTERNAL_SERVER_ERROR, - "Export resized but not found in map", - ), - } - } - Err(e) => { - error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()) - } - } - } else { - // Already at or above requested size - return current state - json_response(StatusCode::OK, &ExportInfoResponse::from(export)) - } + // Parse the logical source ref (`from`). GlideFS resolves it to a + // pool + manifest internally — callers never supply physical coords. + let from = match FromRef::parse(put_req.from.as_deref()) { + Ok(f) => f, + Err(e) => { + return Ok(error_response(StatusCode::BAD_REQUEST, &e.to_string())); } - None => { - // Export doesn't exist - create it - let transport = put_req.transport.as_deref().unwrap_or("nbd").to_string(); - let config = ExportConfig { - name: name.to_string(), - size_gb: put_req.size_gb, - s3_prefix: put_req.s3_prefix, - block_size: put_req.block_size, - flush_threshold: put_req.flush_threshold, - flush_mode: put_req.flush_mode, - transport: put_req.transport, - compaction_cooldown: put_req.compaction_cooldown, - }; - - let t_handler = Instant::now(); - let t_create = Instant::now(); - let create_result = router - .create_export( - config.clone(), - put_req.readonly, - put_req.manifest_name.as_deref(), - put_req.snapshot_sequence, - ) - .await; - let t_create_ms = t_create.elapsed().as_millis(); - match create_result - { - Ok(()) => { - // Run S3 persist and device registration concurrently. - // save_export must succeed; register_device is best-effort. - let t_join = Instant::now(); - #[cfg(target_os = "linux")] - let (save_result, register_result) = tokio::join!( - async { - let t = Instant::now(); - let r = router.save_export(&config).await; - (r, t.elapsed().as_millis()) - }, - async { - let t = Instant::now(); - let r = router.register_device(name, &transport).await; - (r, t.elapsed().as_millis()) - }, - ); - #[cfg(not(target_os = "linux"))] - let save_result = { - let t = Instant::now(); - let r = router.save_export(&config).await; - (r, t.elapsed().as_millis()) - }; - let t_join_ms = t_join.elapsed().as_millis(); - #[cfg(target_os = "linux")] - let (save_result, t_save_ms) = save_result; - #[cfg(target_os = "linux")] - let (register_result, t_register_ms) = register_result; - #[cfg(not(target_os = "linux"))] - let (save_result, t_save_ms) = save_result; - #[cfg(not(target_os = "linux"))] - let t_register_ms: u128 = 0; - - tracing::info!( - target: "glidefs.timing", - export = %name, - fork = put_req.manifest_name.is_some(), - total_ms = t_handler.elapsed().as_millis() as u64, - create_export_ms = t_create_ms as u64, - join_ms = t_join_ms as u64, - save_export_ms = t_save_ms as u64, - register_device_ms = t_register_ms as u64, - "PUT /api/exports timing" - ); - - if let Err(e) = save_result { - // Without teardown, the in-memory entry silently - // shadows the missing S3 export.json — retries - // hit `create_export`'s idempotency check and - // never re-attempt the S3 write. - router.cleanup_failed_create(name, &transport).await; - return Ok(error_response( - StatusCode::SERVICE_UNAVAILABLE, - &format!( - "Export definition not persisted to S3 ({e}); \ - in-memory state cleaned up, retry the request" - ), - )); - } + }; - #[cfg(target_os = "linux")] - if let Err(e) = register_result { - warn!(export = %name, error = %e, "device registration failed"); - } - let _ = transport; // suppress unused warning on non-Linux - - match router.get_export_info(name).await { - Some(info) => json_response( - StatusCode::CREATED, - &ExportInfoResponse::from(info), - ), - None => error_response( - StatusCode::INTERNAL_SERVER_ERROR, - "Export created but not found in map", - ), - } - } - Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()), - } - } - } + create_or_attach_volume(&router, name, &put_req, &from).await } // GET /api/exports/{name} - Get export info @@ -591,6 +633,49 @@ where } } + // GET /api/resolve/{name} - Resolve a volume's physical location by its + // stable logical name. Reads export.json directly from S3, so it works on + // ANY node — even one that has never attached or discovered the volume. + // This is the durable logical→physical mapping GlideFS owns. + (Method::GET, ["api", "resolve", name]) => { + if !is_valid_export_name(name) { + return Ok(error_response( + StatusCode::BAD_REQUEST, + &format!("Invalid name '{}'", name), + )); + } + match router.resolve_export(name).await { + Ok(Some(cfg)) => { + json_response(StatusCode::OK, &ResolveResponse::from_config(name, &cfg)) + } + Ok(None) => error_response( + StatusCode::NOT_FOUND, + &format!("Export '{}' not found", name), + ), + Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()), + } + } + + // GET /api/images/{name} - Resolve a logical image's physical location + // from the durable image index. Lets any node locate an image (and thus + // fork from it via `from: "image:"`) by name alone. + (Method::GET, ["api", "images", name]) => { + if !is_valid_export_name(name) { + return Ok(error_response( + StatusCode::BAD_REQUEST, + &format!("Invalid name '{}'", name), + )); + } + match router.load_image_entry(name).await { + Ok(Some(entry)) => json_response(StatusCode::OK, &entry), + Ok(None) => error_response( + StatusCode::NOT_FOUND, + &format!("Image '{}' not found", name), + ), + Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()), + } + } + // POST /api/exports/{name}/drain - Drain export to S3 (Method::POST, ["api", "exports", name, "drain"]) => { match router.drain_export(name).await { @@ -1556,6 +1641,156 @@ mod tests { assert_eq!(resp.status(), StatusCode::OK); } + // ========================================================================= + // Phase 1: resolve-by-name + re-attach across nodes (shared S3, fresh node) + // ========================================================================= + + async fn body_json(resp: Response) -> serde_json::Value { + let bytes = resp.into_body().collect().await.unwrap().to_bytes(); + serde_json::from_slice(&bytes).unwrap() + } + + /// Seed a persisted volume binding directly (simulating a forked volume + /// whose CoW pool differs from its name), bypassing the device layer. + fn custom_pool_config(name: &str, pool: &str) -> ExportConfig { + ExportConfig { + name: name.to_string(), + size_gb: 0.01, + s3_prefix: Some(pool.to_string()), + block_size: None, + flush_threshold: None, + flush_mode: None, + transport: None, + compaction_cooldown: None, + source: Some(format!("volume:{pool}")), + } + } + + /// A volume living in a CUSTOM pool (name != pool, as forks do) must be + /// resolvable by name alone on a fresh node that never attached or + /// discovered it. + #[tokio::test] + async fn test_resolve_by_name_custom_pool_fresh_node() { + let shared: Arc = + Arc::new(object_store::memory::InMemory::new()); + + // Node A persists the binding (name=vol1 lives in pool "custompool"). + let temp_a = TempDir::new().unwrap(); + let node_a = create_test_router_with_store(&temp_a, Arc::clone(&shared)).await; + node_a + .save_export(&custom_pool_config("vol1", "custompool")) + .await + .unwrap(); + + // Node B: fresh node, same bucket, never saw vol1. + let temp_b = TempDir::new().unwrap(); + let node_b = create_test_router_with_store(&temp_b, Arc::clone(&shared)).await; + let resp = request(&node_b, Method::GET, "/api/resolve/vol1", None).await; + assert_eq!(resp.status(), StatusCode::OK); + let v = body_json(resp).await; + assert_eq!(v["s3_prefix"], "custompool"); + assert_eq!(v["manifest_name"], "vol1"); + } + + /// Re-attaching by name on a fresh node must adopt the persisted custom + /// pool — NOT create a fresh empty volume at the wrong pool and clobber + /// export.json. + #[tokio::test] + async fn test_reattach_by_name_adopts_persisted_pool() { + let shared: Arc = + Arc::new(object_store::memory::InMemory::new()); + + let temp_a = TempDir::new().unwrap(); + let node_a = create_test_router_with_store(&temp_a, Arc::clone(&shared)).await; + node_a + .save_export(&custom_pool_config("vol1", "custompool")) + .await + .unwrap(); + + // Node B re-attaches by name alone (no physical coords exist anymore). + let temp_b = TempDir::new().unwrap(); + let node_b = create_test_router_with_store(&temp_b, Arc::clone(&shared)).await; + let resp = request( + &node_b, + Method::PUT, + "/api/exports/vol1", + Some(r#"{"size_gb": 0.01}"#), + ) + .await; + assert_eq!(resp.status(), StatusCode::CREATED); + + // The binding survived: a THIRD fresh node still resolves to custompool, + // proving node B adopted the pool instead of overwriting export.json. + let temp_c = TempDir::new().unwrap(); + let node_c = create_test_router_with_store(&temp_c, Arc::clone(&shared)).await; + let resp = request(&node_c, Method::GET, "/api/resolve/vol1", None).await; + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!(body_json(resp).await["s3_prefix"], "custompool"); + } + + /// A blank volume (no `from`) gets its own pool (= its name). + #[tokio::test] + async fn test_create_blank_volume_owns_pool() { + let temp = TempDir::new().unwrap(); + let router = create_test_router(&temp).await; + let resp = request( + &router, + Method::PUT, + "/api/exports/v2", + Some(r#"{"size_gb": 0.01}"#), + ) + .await; + assert_eq!(resp.status(), StatusCode::CREATED); + let resp = request(&router, Method::GET, "/api/resolve/v2", None).await; + assert_eq!(body_json(resp).await["s3_prefix"], "v2"); + } + + /// Forking from a logical source that doesn't exist → 404. + #[tokio::test] + async fn test_create_from_unknown_image_returns_404() { + let temp = TempDir::new().unwrap(); + let router = create_test_router(&temp).await; + let resp = request( + &router, + Method::PUT, + "/api/exports/v3", + Some(r#"{"size_gb": 0.01, "from": "image:ghost"}"#), + ) + .await; + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + /// A malformed `from` ref → 400. + #[tokio::test] + async fn test_create_from_invalid_ref_returns_400() { + let temp = TempDir::new().unwrap(); + let router = create_test_router(&temp).await; + let resp = request( + &router, + Method::PUT, + "/api/exports/v4", + Some(r#"{"size_gb": 0.01, "from": "bogus:x"}"#), + ) + .await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn test_resolve_missing_returns_404() { + let temp = TempDir::new().unwrap(); + let router = create_test_router(&temp).await; + let resp = request(&router, Method::GET, "/api/resolve/ghost", None).await; + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn test_resolve_invalid_name_returns_400() { + let temp = TempDir::new().unwrap(); + let router = create_test_router(&temp).await; + let resp = request(&router, Method::GET, "/api/resolve/-bad", None).await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + /// Wraps `InMemory` and fails `put_opts` when armed. Used to simulate /// the S3 outage that makes `save_export` return `Err` mid-PUT. struct PutFailingStore { @@ -2114,11 +2349,11 @@ mod tests { } #[tokio::test] - async fn test_api_fork_from_snapshot_sequence() { + async fn test_api_fork_from_snapshot() { let temp = TempDir::new().unwrap(); let router = create_test_router(&temp).await; - // Create source export + // Create source volume. request( &router, Method::PUT, @@ -2127,35 +2362,111 @@ mod tests { ) .await; - // Snapshot source + // Snapshot it → GlideFS assigns a stable snapshot id. let resp = request(&router, Method::POST, "/api/exports/source/snapshot", None).await; assert_eq!(resp.status(), StatusCode::OK); - let body = resp.into_body().collect().await.unwrap().to_bytes(); - let snap: serde_json::Value = serde_json::from_slice(&body).unwrap(); - let seq = snap["sequence"].as_u64().unwrap(); - - // Fork from snapshot_sequence via API - let body = format!( - r#"{{"size_gb": 0.01, "s3_prefix": "source", "manifest_name": "source", "snapshot_sequence": {}}}"#, - seq - ); + let snap = body_json(resp).await; + let id = snap["snapshot_id"] + .as_str() + .expect("snapshot response carries a stable snapshot_id") + .to_string(); + + // Fork a new volume from the snapshot by logical id alone — no pool, + // no manifest, no sequence. + let body = format!(r#"{{"size_gb": 0.01, "from": "snapshot:{}"}}"#, id); let resp = request(&router, Method::PUT, "/api/exports/fork1", Some(&body)).await; assert_eq!(resp.status(), StatusCode::CREATED); + + // fork1 was placed in source's pool for CoW sharing. + let resp = request(&router, Method::GET, "/api/resolve/fork1", None).await; + assert_eq!(body_json(resp).await["s3_prefix"], "source"); } #[tokio::test] - async fn test_api_snapshot_sequence_without_manifest_name_returns_400() { + async fn test_api_fork_from_image() { let temp = TempDir::new().unwrap(); let router = create_test_router(&temp).await; + // Create + snapshot a source volume, then promote the snapshot to a + // named image — this registers the logical image index entry. + request( + &router, + Method::PUT, + "/api/exports/golden", + Some(r#"{"size_gb": 0.01}"#), + ) + .await; + let resp = request(&router, Method::POST, "/api/exports/golden/snapshot", None).await; + let seq = body_json(resp).await["sequence"].as_u64().unwrap(); + let resp = request( + &router, + Method::POST, + "/api/exports/golden/promote-base", + Some(&format!( + r#"{{"sequence": {}, "base_name": "ubuntu"}}"#, + seq + )), + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + + // The image is now resolvable by logical name. + let resp = request(&router, Method::GET, "/api/images/ubuntu", None).await; + assert_eq!(resp.status(), StatusCode::OK); + let img = body_json(resp).await; + assert_eq!(img["pool"], "golden"); + assert_eq!(img["manifest"], "bases/ubuntu"); + + // Fork a new volume from the image by logical name alone. let resp = request( &router, Method::PUT, - "/api/exports/vol1", - Some(r#"{"size_gb": 0.01, "snapshot_sequence": 1}"#), + "/api/exports/vm1", + Some(r#"{"size_gb": 0.01, "from": "image:ubuntu"}"#), ) .await; - assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + assert_eq!(resp.status(), StatusCode::CREATED); + + // vm1 was placed in the image's pool (CoW sharing) and records lineage. + let resp = request(&router, Method::GET, "/api/resolve/vm1", None).await; + assert_eq!(body_json(resp).await["s3_prefix"], "golden"); + } + + #[tokio::test] + async fn test_tag_is_forkable_as_image() { + let temp = TempDir::new().unwrap(); + let router = create_test_router(&temp).await; + + // Create a volume and tag its current manifest under a name. + request( + &router, + Method::PUT, + "/api/exports/work", + Some(r#"{"size_gb": 0.01}"#), + ) + .await; + let resp = request( + &router, + Method::POST, + "/api/exports/work/tag", + Some(r#"{"tag": "setup-v1"}"#), + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + + // The tag is registered in the image index and resolvable by name. + let resp = request(&router, Method::GET, "/api/images/setup-v1", None).await; + assert_eq!(resp.status(), StatusCode::OK); + + // And forkable via `from: "image:"`. + let resp = request( + &router, + Method::PUT, + "/api/exports/deploy", + Some(r#"{"size_gb": 0.01, "from": "image:setup-v1"}"#), + ) + .await; + assert_eq!(resp.status(), StatusCode::CREATED); } // ========================================================================= diff --git a/glidefs/src/block/mod.rs b/glidefs/src/block/mod.rs index 7bec497..a1ad4ec 100644 --- a/glidefs/src/block/mod.rs +++ b/glidefs/src/block/mod.rs @@ -13,6 +13,7 @@ pub mod manifest; pub mod pack; pub mod pack_index_cache; pub mod readahead; +pub mod registry; pub mod scrubber; pub mod metrics; diff --git a/glidefs/src/block/registry.rs b/glidefs/src/block/registry.rs new file mode 100644 index 0000000..3d0bb3c --- /dev/null +++ b/glidefs/src/block/registry.rs @@ -0,0 +1,218 @@ +//! Logical → physical resolution: GlideFS owns the volume / image / snapshot +//! name → location mapping, so callers address everything by stable logical +//! names and never supply an `s3_prefix` or `manifest_name`. +//! +//! The physical S3 layout is unchanged. These durable, name-keyed index objects +//! live under `{db_path}/index/` (a sibling of `{db_path}/exports/`, where +//! `export.json` — the volume index — already lives), and are read on every +//! resolve so any node can locate a volume/image/snapshot from its name alone. + +use serde::{Deserialize, Serialize}; + +/// A logical source reference for creating or forking a volume. Parsed from the +/// `from` field of a create request. Its canonical string form is persisted as +/// the resulting volume's lineage (`ExportConfig::source`). +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FromRef { + /// Fresh blank volume — no backing manifest. + Blank, + /// Fork from a blessed image by logical name. + Image(String), + /// Fork from another volume's current committed state. + Volume(String), + /// Fork from a specific snapshot by its stable id. + Snapshot(String), +} + +/// Error parsing a `from` ref. +#[derive(Debug, thiserror::Error)] +pub enum FromRefError { + #[error( + "invalid `from` ref '{0}': expected 'image:', 'volume:', or 'snapshot:'" + )] + Invalid(String), + #[error("`from` ref '{0}' has an empty target name")] + EmptyTarget(String), +} + +impl FromRef { + /// Parse a `from` field. `None` or empty/whitespace → [`FromRef::Blank`]. + pub fn parse(s: Option<&str>) -> Result { + let Some(s) = s.map(str::trim).filter(|s| !s.is_empty()) else { + return Ok(FromRef::Blank); + }; + let (kind, target) = s + .split_once(':') + .ok_or_else(|| FromRefError::Invalid(s.to_string()))?; + if target.is_empty() { + return Err(FromRefError::EmptyTarget(s.to_string())); + } + match kind { + "image" => Ok(FromRef::Image(target.to_string())), + "volume" => Ok(FromRef::Volume(target.to_string())), + "snapshot" => Ok(FromRef::Snapshot(target.to_string())), + _ => Err(FromRefError::Invalid(s.to_string())), + } + } + + /// Canonical string form, persisted as a volume's lineage. `Blank` → `None`. + pub fn as_source(&self) -> Option { + match self { + FromRef::Blank => None, + FromRef::Image(n) => Some(format!("image:{n}")), + FromRef::Volume(n) => Some(format!("volume:{n}")), + FromRef::Snapshot(id) => Some(format!("snapshot:{id}")), + } + } + + /// The target name/id this ref points at (`None` for `Blank`). + pub fn target(&self) -> Option<&str> { + match self { + FromRef::Blank => None, + FromRef::Image(n) | FromRef::Volume(n) | FromRef::Snapshot(n) => Some(n), + } + } +} + +/// Physical coordinates a [`FromRef`] resolves to, fed into the existing fork +/// machinery (`create_export`'s `s3_prefix` / `manifest_name` / +/// `snapshot_sequence`). +#[derive(Debug, Clone)] +pub struct ResolvedSource { + /// Pool (`s3_prefix`) the new volume must live in so CoW pack sharing works. + /// `None` for a blank volume (it gets its own pool = its name). + pub pool: Option, + /// Manifest name within the pool to fork from. `None` for a blank volume. + pub manifest_name: Option, + /// Snapshot sequence, when forking from a snapshot. + pub snapshot_sequence: Option, +} + +impl ResolvedSource { + /// A blank volume: its own pool, no backing manifest. + pub fn blank() -> Self { + ResolvedSource { + pool: None, + manifest_name: None, + snapshot_sequence: None, + } + } +} + +/// Durable image-index entry: where a blessed image's manifest physically lives. +/// Written when an image is blessed/promoted; read when a volume forks from +/// `image:`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ImageEntry { + /// Logical image name. + pub name: String, + /// Pool holding the image's packs + manifest. + pub pool: String, + /// Manifest name within the pool (e.g. `"bases/"`). + pub manifest: String, +} + +/// Durable snapshot-index entry: resolves a stable snapshot id to its physical +/// `(pool, owning volume, sequence)` and parent lineage. Written by +/// `snapshot_export`; read when a volume forks from `snapshot:`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SnapshotEntry { + /// Stable snapshot id (`"{volume}@{sequence}"`). + pub id: String, + /// Pool holding the snapshot manifest + the volume's packs. + pub pool: String, + /// Volume the snapshot was taken from (`snapshot_s3_key` uses this name). + pub volume: String, + /// Monotonic sequence assigned by `snapshot_export`. + pub sequence: u64, + /// Optional human label supplied at snapshot time. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub label: Option, + /// Lineage: the source the owning volume was forked from, if any. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub parent: Option, +} + +/// Stable snapshot id from `(volume, sequence)`. Unique by construction +/// (sequences are monotonic and never reused for a volume). +pub fn snapshot_id(volume: &str, sequence: u64) -> String { + format!("{volume}@{sequence}") +} + +/// S3 path for an image-index entry, derived from `db_path` + name alone. +pub fn image_index_path(db_path: &str, name: &str) -> object_store::path::Path { + object_store::path::Path::from(format!("{db_path}/index/images/{name}.json")) +} + +/// S3 path for a snapshot-index entry, derived from `db_path` + id alone. +pub fn snapshot_index_path(db_path: &str, id: &str) -> object_store::path::Path { + object_store::path::Path::from(format!("{db_path}/index/snapshots/{id}.json")) +} + +/// Write an image-index entry to S3 (idempotent). Free function so both the +/// daemon (`ExportRouter`) and the `glidefs bless` CLI register images in the +/// same logical index — every bless path, HTTP or CLI, is resolvable by name. +pub async fn put_image_entry( + object_store: &std::sync::Arc, + db_path: &str, + entry: &ImageEntry, +) -> Result<(), object_store::Error> { + let path = image_index_path(db_path, &entry.name); + let json = serde_json::to_vec(entry).map_err(|e| object_store::Error::Generic { + store: "image-index", + source: Box::new(e), + })?; + object_store + .put(&path, bytes::Bytes::from(json).into()) + .await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_blank() { + assert_eq!(FromRef::parse(None).unwrap(), FromRef::Blank); + assert_eq!(FromRef::parse(Some("")).unwrap(), FromRef::Blank); + assert_eq!(FromRef::parse(Some(" ")).unwrap(), FromRef::Blank); + } + + #[test] + fn parse_kinds() { + assert_eq!( + FromRef::parse(Some("image:ubuntu")).unwrap(), + FromRef::Image("ubuntu".into()) + ); + assert_eq!( + FromRef::parse(Some("volume:inst1/vol0")).unwrap(), + FromRef::Volume("inst1/vol0".into()) + ); + assert_eq!( + FromRef::parse(Some("snapshot:vol0@7")).unwrap(), + FromRef::Snapshot("vol0@7".into()) + ); + } + + #[test] + fn parse_rejects_unknown_and_empty_target() { + assert!(FromRef::parse(Some("bogus:x")).is_err()); + assert!(FromRef::parse(Some("noscheme")).is_err()); + assert!(FromRef::parse(Some("image:")).is_err()); + } + + #[test] + fn roundtrip_as_source() { + for s in ["image:ubuntu", "volume:v1", "snapshot:v1@3"] { + let r = FromRef::parse(Some(s)).unwrap(); + assert_eq!(r.as_source().as_deref(), Some(s)); + } + assert_eq!(FromRef::Blank.as_source(), None); + } + + #[test] + fn snapshot_id_format() { + assert_eq!(snapshot_id("vol0", 7), "vol0@7"); + } +} diff --git a/glidefs/src/block/router.rs b/glidefs/src/block/router.rs index 1a7085a..53739fa 100644 --- a/glidefs/src/block/router.rs +++ b/glidefs/src/block/router.rs @@ -9,6 +9,7 @@ use crate::block::content_store::ContentStore; use crate::block::flush_scheduler::flush_scheduler; use crate::block::handler::BlockHandler; use crate::block::pack_index_cache::PackIndexCache; +use crate::block::registry::{FromRef, ImageEntry, ResolvedSource, SnapshotEntry}; use crate::block::metrics::{ExportMetrics, MetricsSnapshot}; use crate::block::state::Active; use crate::block::volume_manifest::VolumeManifest; @@ -39,6 +40,17 @@ use tracing::{debug, error, info, trace, warn}; /// volumes — see `VolumeManifest::size_estimate` for the math. pub const DEFAULT_MANIFEST_CACHE_BYTES: usize = 64 * 1024 * 1024; +/// Per-transport device-map filenames in `cache_dir`. Each maps +/// `export_name → dev_id` and is rewritten on every device add/remove, so it +/// names exactly the exports this node currently owns a device for. Boot uses +/// these (via `discover_local_exports`) to recover only the node's working set +/// instead of every `export.json` in the shared bucket. Kept in sync with +/// `block::ublk::DEVICE_MAP_FILE` and `block::nbd::DEVICE_MAP_FILE` (private to +/// those modules; the `ublk` one is feature-gated, so we don't reference them +/// across the boundary). +const UBLK_DEVICE_MAP_FILE: &str = "ublk_devices.json"; +const NBD_DEVICE_MAP_FILE: &str = "nbd_devices.json"; + /// Encodes a base-manifest key for the foyer manifest cache. fn base_manifest_cache_key(s3_prefix: &str, manifest_name: &str) -> String { @@ -124,6 +136,12 @@ pub enum RouterError { current: usize, max: usize, }, + + #[error("Invalid `from` ref: {0}")] + InvalidFromRef(String), + + #[error("Source not found: {0}")] + SourceNotFound(String), } /// Status of an in-flight OCI bless operation. @@ -207,6 +225,10 @@ pub struct AggregateStats { pub struct SnapshotResponse { pub manifest_etag: Option, pub sequence: u64, + /// Stable logical snapshot id (`"{volume}@{sequence}"`). Callers fork from + /// this via `from: "snapshot:"` — they never handle the raw sequence or + /// pool. Recorded in the durable snapshot index (when `snapshot_persisted`). + pub snapshot_id: String, /// Whether the versioned snapshot was persisted to S3. /// `false` means the manifest was saved but the versioned snapshot key wasn't. pub snapshot_persisted: bool, @@ -1002,6 +1024,138 @@ impl ExportRouter { } } + /// Resolve a volume's persisted physical location by its stable logical name. + /// + /// Reads `export.json` (the name-keyed, prefix-independent index) directly + /// from S3, so ANY node can learn where a volume's data lives given only its + /// name — even one that never attached or discovered it. This is the + /// durable logical→physical mapping GlideFS owns: callers no longer need to + /// remember `s3_prefix`. Returns `None` if no such volume was ever persisted. + pub async fn resolve_export(&self, name: &str) -> Result, RouterError> { + validate_export_name(name)?; + self.load_export(name).await + } + + // ========================================================================= + // Logical registry: image + snapshot indexes (sibling of export.json). + // + // These durable, name-keyed JSON objects let GlideFS resolve a logical + // `from` ref (`image:`/`volume:`/`snapshot:`) to physical coordinates, so + // callers never supply an `s3_prefix` or `manifest_name`. See + // `crate::block::registry`. + // ========================================================================= + + /// S3 path for an image-index entry. + fn index_image_path(&self, name: &str) -> Path { + crate::block::registry::image_index_path(&self.db_path, name) + } + + /// S3 path for a snapshot-index entry. + fn index_snapshot_path(&self, id: &str) -> Path { + crate::block::registry::snapshot_index_path(&self.db_path, id) + } + + /// Persist an image-index entry (idempotent). + pub async fn save_image_entry(&self, entry: &ImageEntry) -> Result<(), RouterError> { + let path = self.index_image_path(&entry.name); + let json = serde_json::to_vec(entry)?; + self.object_store + .put(&path, Bytes::from(json).into()) + .await?; + debug!("Saved image index entry: {}", path); + Ok(()) + } + + /// Load an image-index entry by logical name. + pub async fn load_image_entry(&self, name: &str) -> Result, RouterError> { + let path = self.index_image_path(name); + match self.object_store.get(&path).await { + Ok(result) => { + let data = result.bytes().await?; + Ok(Some(serde_json::from_slice(&data)?)) + } + Err(object_store::Error::NotFound { .. }) => Ok(None), + Err(e) => Err(e.into()), + } + } + + /// Persist a snapshot-index entry (idempotent). + pub async fn save_snapshot_entry(&self, entry: &SnapshotEntry) -> Result<(), RouterError> { + let path = self.index_snapshot_path(&entry.id); + let json = serde_json::to_vec(entry)?; + self.object_store + .put(&path, Bytes::from(json).into()) + .await?; + debug!("Saved snapshot index entry: {}", path); + Ok(()) + } + + /// Load a snapshot-index entry by stable id. + pub async fn load_snapshot_entry( + &self, + id: &str, + ) -> Result, RouterError> { + let path = self.index_snapshot_path(id); + match self.object_store.get(&path).await { + Ok(result) => { + let data = result.bytes().await?; + Ok(Some(serde_json::from_slice(&data)?)) + } + Err(object_store::Error::NotFound { .. }) => Ok(None), + Err(e) => Err(e.into()), + } + } + + /// Resolve a logical [`FromRef`] to the physical coordinates the existing + /// fork machinery needs. This is the heart of "callers speak names only": + /// the source's pool becomes the new volume's pool (so CoW pack sharing + /// works), and the source manifest/sequence drive the fork. + pub async fn resolve_source( + &self, + from: &FromRef, + ) -> Result { + match from { + FromRef::Blank => Ok(ResolvedSource::blank()), + FromRef::Image(name) => { + let entry = self + .load_image_entry(name) + .await? + .ok_or_else(|| RouterError::SourceNotFound(format!("image:{name}")))?; + Ok(ResolvedSource { + pool: Some(entry.pool), + manifest_name: Some(entry.manifest), + snapshot_sequence: None, + }) + } + FromRef::Volume(name) => { + let cfg = self + .load_export(name) + .await? + .ok_or_else(|| RouterError::SourceNotFound(format!("volume:{name}")))?; + // Fork from the volume's current committed manifest, which lives + // at `manifests/{name}` within the volume's pool. + Ok(ResolvedSource { + pool: Some(cfg.s3_prefix().to_string()), + manifest_name: Some(name.clone()), + snapshot_sequence: None, + }) + } + FromRef::Snapshot(id) => { + let entry = self + .load_snapshot_entry(id) + .await? + .ok_or_else(|| RouterError::SourceNotFound(format!("snapshot:{id}")))?; + // Snapshots are read via `get_snapshot(volume, sequence)`, so the + // fork "manifest name" is the owning volume's name. + Ok(ResolvedSource { + pool: Some(entry.pool), + manifest_name: Some(entry.volume), + snapshot_sequence: Some(entry.sequence), + }) + } + } + } + /// Delete export definition from S3 (idempotent). async fn delete_export_definition(&self, name: &str) -> Result<(), RouterError> { let path = self.export_json_path(name); @@ -1018,9 +1172,14 @@ impl ExportRouter { } } - /// Discover all exports from S3. + /// Discover ALL exports under the global `{db_path}/exports/` prefix. /// - /// Lists the `{db_path}/exports/` prefix and loads each `export.json` in parallel. + /// Lists the prefix and loads each `export.json` in parallel. Boot no + /// longer uses this (it would resurrect every export on a shared bucket as + /// a live device — see [`discover_local_exports`]); retained for tests and + /// potential admin/debug enumeration, hence `allow(dead_code)` in the + /// binary build where it currently has no non-test caller. + #[allow(dead_code)] pub async fn discover_exports(&self) -> Result, RouterError> { use futures::stream::{self, StreamExt}; @@ -1066,6 +1225,73 @@ impl ExportRouter { Ok(exports) } + /// Discover only the exports **this node** currently owns a device for. + /// + /// Unlike [`discover_exports`], which lists every `export.json` under the + /// global `{db_path}/exports/` prefix — and would resurrect every export on + /// a shared bucket as a live kernel device — this reads the node-local + /// device maps (`ublk_devices.json` / `nbd_devices.json` in `cache_dir`, + /// rewritten on every device add/remove) and loads only those exports' + /// configs. A node thus recovers only its working set on boot; everything + /// else stays dormant in S3 and attaches on demand by name + /// (`resolve_export` / `PUT /api/exports/{name}`). A fresh node (no maps) + /// recovers nothing — correct, it attaches by name when a caller asks. + pub async fn discover_local_exports(&self) -> Result, RouterError> { + use futures::stream::{self, StreamExt}; + + // Union of export names this node has a (ublk or nbd) device for. + // Values are dev ids we don't need here, so accept any JSON value. + let mut names: std::collections::HashSet = std::collections::HashSet::new(); + for file in [UBLK_DEVICE_MAP_FILE, NBD_DEVICE_MAP_FILE] { + let path = self.cache_dir.join(file); + match std::fs::read_to_string(&path) { + Ok(data) => { + match serde_json::from_str::>(&data) { + Ok(map) => names.extend(map.into_keys()), + Err(e) => warn!("corrupt device map {}, ignoring: {}", file, e), + } + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => warn!("failed to read device map {}: {}", file, e), + } + } + + if names.is_empty() { + info!("No local device-map entries — recovering no exports (fresh node / clean state)"); + return Ok(Vec::new()); + } + + let total = names.len(); + let exports: Vec = stream::iter(names) + .map(|name| async move { + match self.load_export(&name).await { + Ok(Some(config)) => Some(config), + Ok(None) => { + // Stale map entry: the device was persisted but the + // export.json was purged. Skip — the map is rewritten + // without it on the next persist_devices. + debug!("Local device-map entry '{}' has no export.json, skipping", name); + None + } + Err(e) => { + warn!("Failed to load local export '{}': {}", name, e); + None + } + } + }) + .buffer_unordered(32) + .filter_map(|x| async { x }) + .collect() + .await; + + info!( + "Recovered {}/{} local export(s) from device map", + exports.len(), + total + ); + Ok(exports) + } + /// Create a new export. /// /// If `readonly` is true, the export will reject writes. Used during live migration to @@ -1563,17 +1789,20 @@ impl ExportRouter { // Clone Arc'd components from the per-shard guard, then drop the // guard so we don't hold it across .await on the snapshot below. - let (cache, content_store, pack_index_cache, volume_manifest) = { + let (cache, content_store, pack_index_cache, volume_manifest, pool) = { let entry = self .exports .get(name) .ok_or_else(|| RouterError::ExportNotFound(name.to_string()))?; let s = entry.value(); + // Effective pool: the volume's s3_prefix, or its name by default. + let pool = s.s3_prefix.clone().unwrap_or_else(|| name.to_string()); ( Arc::clone(&s.cache), Arc::clone(&s.content_store), Arc::clone(&s.pack_index_cache), Arc::clone(&s.volume_manifest), + pool, ) }; @@ -1623,12 +1852,43 @@ impl ExportRouter { .put_manifest(tag, result.manifest_bytes.clone(), None) .await .map_err(RouterError::ContentStore)?; + // Make the tag forkable by `from: "image:"`. + self.index_image(tag, &pool, tag).await; info!("Tagged snapshot of '{}' as '{}'", name, tag); } + // Record the snapshot in the durable logical index so it is forkable by + // a stable id (`snapshot:`) from any node — GlideFS owns the + // snapshot→location mapping, not the caller. Only index a snapshot whose + // versioned key was actually persisted. + let snapshot_id = crate::block::registry::snapshot_id(name, result.sequence); + if result.snapshot_persisted { + // Lineage: the parent volume's own source, if any. + let parent = self.load_export(name).await.ok().flatten().and_then(|c| c.source); + let entry = SnapshotEntry { + id: snapshot_id.clone(), + pool: pool.clone(), + volume: name.to_string(), + sequence: result.sequence, + label: tag.map(|t| t.to_string()), + parent, + }; + if let Err(e) = self.save_snapshot_entry(&entry).await { + // Non-fatal: the snapshot bytes are durable; only the logical + // index entry failed. Surface it but don't fail the snapshot. + warn!( + export = %name, + snapshot_id = %snapshot_id, + error = %e, + "failed to persist snapshot index entry" + ); + } + } + Ok(SnapshotResponse { manifest_etag: result.manifest_etag, sequence: result.sequence, + snapshot_id, snapshot_persisted: result.snapshot_persisted, tag: tag.map(|t| t.to_string()), }) @@ -1644,7 +1904,7 @@ impl ExportRouter { validate_export_name(tag)?; // Clone the Arc'd content_store + serialize manifest under the // shard guard; drop the guard before the .await on put_manifest. - let (manifest_bytes, content_store) = { + let (manifest_bytes, content_store, pool) = { let entry = self .exports .get(name) @@ -1655,12 +1915,16 @@ impl ExportRouter { .read() .serialize() .map_err(|e| RouterError::Manifest(e.to_string()))?; - (manifest_bytes, Arc::clone(&s.content_store)) + let pool = s.s3_prefix.clone().unwrap_or_else(|| name.to_string()); + (manifest_bytes, Arc::clone(&s.content_store), pool) }; content_store .put_manifest(tag, manifest_bytes, None) .await .map_err(RouterError::ContentStore)?; + // A tag is a named, forkable manifest — register it in the image index + // so it resolves via `from: "image:"`. + self.index_image(tag, &pool, tag).await; info!("Tagged export '{}' as '{}'", name, tag); Ok(()) } @@ -1683,12 +1947,14 @@ impl ExportRouter { ) -> Result { validate_export_name(name)?; validate_export_name(base_name)?; - let content_store = { + let (content_store, pool) = { let entry = self .exports .get(name) .ok_or_else(|| RouterError::ExportNotFound(name.to_string()))?; - Arc::clone(&entry.value().content_store) + let s = entry.value(); + let pool = s.s3_prefix.clone().unwrap_or_else(|| name.to_string()); + (Arc::clone(&s.content_store), pool) }; let manifest_name = format!("bases/{}", base_name); @@ -1702,6 +1968,9 @@ impl ExportRouter { base = %base_name, "promote: base already exists, skipping" ); + // Ensure the logical image index points at the existing base, so + // `image:` resolves even if the entry predates indexing. + self.index_image(base_name, &pool, &manifest_name).await; return Ok(false); } @@ -1718,6 +1987,9 @@ impl ExportRouter { .put_manifest(&manifest_name, snapshot_bytes, None) .await .map_err(RouterError::ContentStore)?; + // Register the blessed base in the logical image index so it is forkable + // by `from: "image:"` from any node. + self.index_image(base_name, &pool, &manifest_name).await; info!( export = %name, sequence, @@ -1727,6 +1999,22 @@ impl ExportRouter { Ok(true) } + /// Record (idempotently, best-effort) an image-index entry mapping a logical + /// image name to its physical `(pool, manifest)`. Best-effort: a failed + /// index write is logged but does not fail the publish — the base bytes are + /// durable; a missing index entry only affects logical resolution and is + /// backfilled on the next promote/bless. + async fn index_image(&self, name: &str, pool: &str, manifest: &str) { + let entry = ImageEntry { + name: name.to_string(), + pool: pool.to_string(), + manifest: manifest.to_string(), + }; + if let Err(e) = self.save_image_entry(&entry).await { + warn!(image = %name, error = %e, "failed to persist image index entry"); + } + } + /// Check if a manifest exists in S3 (HEAD request, no data transfer). /// /// Does not require a running export — resolves the manifest path from @@ -2166,6 +2454,9 @@ impl ExportRouter { .put_manifest(&manifest_key, manifest_data, None) .await .map_err(RouterError::ContentStore)?; + // Register the blessed image in the logical index so it is forkable by + // `from: "image:"` from any node. + self.index_image(name, s3_prefix, &manifest_key).await; if !boot_blocks.is_empty() { let data = crate::block::manifest::serialize_block_list(&boot_blocks); content_store.put_boot_set(name, data).await.map_err(RouterError::ContentStore)?; @@ -2767,6 +3058,10 @@ impl ExportRouter { // Remove export (preserves cache files) self.remove_export(name, false).await?; + // Preserve lineage across resize: remove_export(purge=false) left + // export.json in S3, so recover its `source` ref before recreating. + let prior_source = self.load_export(name).await.ok().flatten().and_then(|c| c.source); + // Recreate with new size, loading from the manifest we just drained. // This preserves access to pre-resize data via the block_map. let config = ExportConfig { @@ -2779,6 +3074,7 @@ impl ExportRouter { transport: Some(transport), // Preserve the cooldown knob across resize (0 if it was disabled). compaction_cooldown: (compaction_cooldown != 0).then_some(compaction_cooldown), + source: prior_source, }; self.create_export(config.clone(), readonly, Some(name), None) @@ -3408,6 +3704,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router.create_export(make("a"), false, None, None).await.unwrap(); router.create_export(make("b"), false, None, None).await.unwrap(); @@ -3496,6 +3793,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, } } @@ -3947,6 +4245,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }, ExportConfig { name: "discover-vol2".to_string(), @@ -3957,6 +4256,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }, ExportConfig { name: "discover-vol3".to_string(), @@ -3967,6 +4267,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }, ]; @@ -3984,6 +4285,47 @@ mod tests { assert!(names.contains(&"discover-vol3")); } + #[tokio::test] + async fn test_discover_local_exports_only_mapped() { + let temp_dir = TempDir::new().unwrap(); + let router = create_test_router(&temp_dir).await; + + // S3 holds three exports (as if created across the shared bucket)... + for name in ["local-a", "local-b", "global-c"] { + router.save_export(&test_export_config(name)).await.unwrap(); + } + + // ...but THIS node's device maps own only local-a (ublk) + local-b + // (nbd), plus a stale "ghost" entry whose export.json was purged. + std::fs::write( + temp_dir.path().join("ublk_devices.json"), + r#"{"local-a":1,"ghost":99}"#, + ) + .unwrap(); + std::fs::write(temp_dir.path().join("nbd_devices.json"), r#"{"local-b":0}"#).unwrap(); + + let discovered = router.discover_local_exports().await.unwrap(); + let mut names: Vec<_> = discovered.iter().map(|c| c.name.clone()).collect(); + names.sort(); + // Only mapped exports with an export.json: not global-c (in S3 but not + // owned here), not ghost (mapped but no export.json). + assert_eq!(names, vec!["local-a".to_string(), "local-b".to_string()]); + } + + #[tokio::test] + async fn test_discover_local_exports_empty_without_maps() { + let temp_dir = TempDir::new().unwrap(); + let router = create_test_router(&temp_dir).await; + // export.json exists in S3, but no device maps on disk → a fresh node + // recovers nothing (it attaches by name on demand instead). + router + .save_export(&test_export_config("orphan")) + .await + .unwrap(); + let discovered = router.discover_local_exports().await.unwrap(); + assert!(discovered.is_empty()); + } + #[tokio::test] async fn test_create_export_persists_to_s3() { let temp_dir = TempDir::new().unwrap(); @@ -4183,6 +4525,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("source"), None) @@ -4245,6 +4588,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("bases/rootfs-abc123"), None) @@ -4316,6 +4660,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("src"), None) @@ -4369,6 +4714,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("src"), None) @@ -4403,6 +4749,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; let result = router @@ -4442,6 +4789,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(b_config, false, Some("a"), None) @@ -4464,6 +4812,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(c_config, false, Some("b"), None) @@ -4583,6 +4932,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("parent"), None) @@ -4882,6 +5232,7 @@ mod tests { flush_mode: Some("manual".to_string()), transport: None, compaction_cooldown: None, + source: None, }; router.create_export(config, false, None, None).await.unwrap(); @@ -4932,6 +5283,7 @@ mod tests { flush_mode: Some("manual".to_string()), transport: None, compaction_cooldown: None, + source: None, }; router.create_export(config, false, None, None).await.unwrap(); @@ -4976,6 +5328,7 @@ mod tests { flush_mode: Some("manual".to_string()), transport: None, compaction_cooldown: None, + source: None, }; assert_eq!( manual.flush_threshold_or(500), @@ -4992,6 +5345,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; assert_eq!(custom.flush_threshold_or(500), 1000, "export override wins"); @@ -5004,6 +5358,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; assert_eq!( default.flush_threshold_or(500), @@ -5027,6 +5382,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: Some(8), + source: None, }; router.save_export(&config).await.unwrap(); @@ -5325,6 +5681,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; fork_router .create_export(fork_config, false, Some("parent"), None) @@ -5402,6 +5759,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }).await.unwrap(); drop(child); @@ -5437,6 +5795,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }, false, Some("child"), @@ -5551,6 +5910,7 @@ mod tests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("parent"), None) diff --git a/glidefs/src/block/ublk/worker_pool.rs b/glidefs/src/block/ublk/worker_pool.rs index f92fac9..b5ffea7 100644 --- a/glidefs/src/block/ublk/worker_pool.rs +++ b/glidefs/src/block/ublk/worker_pool.rs @@ -790,11 +790,26 @@ async fn run_worker_loop( state.executor.tick(); } - // Drive io_uring. Block in the kernel for up to WORKER_IDLE_NSEC - // unless an SQE completes or eventfd fires. With nothing hosted, - // we still spin a short timeout so Shutdown via channel-close is - // promptly noticed. - let to_wait = if state.executor.all_done() { 0 } else { 1 }; + // Drive io_uring. ALWAYS block in the kernel for up to + // WORKER_IDLE_NSEC, waking immediately when an SQE completes or the + // eventfd fires. + // + // The eventfd watcher is a *daemon* task (`spawn_daemon` above) that + // keeps a `PollAdd` permanently armed on the eventfd and re-arms it + // forever, independent of any hosted queue. Every `WorkerHandle::send` + // (AddQueue / RemoveQueue / Shutdown) writes the eventfd → PollAdd CQE + // → `io_uring_enter` returns at once. So a queue-less worker has + // nothing to gain from busy-polling: no I/O can target it until a + // queue is assigned, and that assignment wakes it via the eventfd. + // + // Previously this was `if all_done() { 0 } else { 1 }` — i.e. a worker + // with no hosted queues used `to_wait = 0`, making `io_uring_enter` + // return instantly and busy-spin a full core. With N idle workers + // (N = pool size − workers hosting queues) that burned ~N cores doing + // nothing; on a host running few VMs that's most of the pool. Channel + // close (sender dropped with no final message) is still noticed within + // one WORKER_IDLE_NSEC tick, which is fine for shutdown latency. + let to_wait = 1; let ts = io_uring::types::Timespec::new().nsec(WORKER_IDLE_NSEC); let submit_result = ring.with_mut(|r| { let args = io_uring::types::SubmitArgs::new().timespec(&ts); diff --git a/glidefs/src/cli/bless.rs b/glidefs/src/cli/bless.rs index 2b1cdaa..4d2f1e2 100644 --- a/glidefs/src/cli/bless.rs +++ b/glidefs/src/cli/bless.rs @@ -113,6 +113,26 @@ async fn record_profile_sidecars( } } +/// Register a freshly-blessed base in the logical image index so it is forkable +/// by `from: "image:"` from any node (best-effort; the base bytes are +/// already durable, a missing index entry is backfilled on the next bless). +async fn register_image_index( + object_store: &Arc, + db_path: &str, + name: &str, + s3_prefix: &str, + manifest_key: &str, +) { + let entry = crate::block::registry::ImageEntry { + name: name.to_string(), + pool: s3_prefix.to_string(), + manifest: manifest_key.to_string(), + }; + if let Err(e) = crate::block::registry::put_image_entry(object_store, db_path, &entry).await { + tracing::warn!(image = %name, error = %e, "failed to write image index entry"); + } +} + pub async fn run_bless( image_path: PathBuf, name: String, @@ -167,6 +187,7 @@ pub async fn run_bless( .put_manifest(&manifest_key, volume_manifest.serialize()?, None) .await .context("Failed to upload manifest")?; + register_image_index(&object_store, &db_path, &name, &s3_prefix, &manifest_key).await; let elapsed = start.elapsed(); @@ -405,6 +426,7 @@ pub async fn run_bless_oci( .put_manifest(&manifest_key, manifest_data, None) .await .map_err(|e| anyhow::anyhow!("failed to upload manifest: {e}"))?; + register_image_index(&object_store, &db_path, &name, &s3_prefix, &manifest_key).await; // The entrypoint to profile (image default, overridable via GLIDEFS_PROFILE_CMD). let (mut argv, env, workdir) = crate::oci::boot_set::run_command(&resolved.config); @@ -669,6 +691,7 @@ pub async fn run_bless_oci_erofs( .put_manifest(&manifest_key, manifest_data, None) .await .map_err(|e| anyhow::anyhow!("failed to upload manifest: {e}"))?; + register_image_index(&object_store, &db_path, &name, &s3_prefix, &manifest_key).await; // The entrypoint to profile (image default, overridable via GLIDEFS_PROFILE_CMD). let (mut argv, env, workdir) = crate::oci::boot_set::run_command(&resolved.config); diff --git a/glidefs/src/cli/server.rs b/glidefs/src/cli/server.rs index 4da3946..089d1bb 100644 --- a/glidefs/src/cli/server.rs +++ b/glidefs/src/cli/server.rs @@ -166,13 +166,15 @@ pub async fn build_router_only(config_path: PathBuf) -> Result { nbd_config.max_exports(), ); - // Discover exports from S3. - info!("Discovering exports from S3..."); - let discovered_count = match router.discover_exports().await { + // Recover only THIS node's working set — the exports it owns a device for + // (per the local device maps), not every export.json in the shared bucket. + // Anything else attaches on demand by name (resolve-by-name). + info!("Recovering this node's exports from local device map..."); + let discovered_count = match router.discover_local_exports().await { Ok(discovered) => { use futures::stream::{self, StreamExt}; let total = discovered.len(); - info!("Found {} export(s) in S3, recovering in parallel...", total); + info!("Found {} local export(s), recovering in parallel...", total); let s3_prefixes: std::collections::HashSet = discovered .iter() diff --git a/glidefs/src/config.rs b/glidefs/src/config.rs index cdbd6ae..4670229 100644 --- a/glidefs/src/config.rs +++ b/glidefs/src/config.rs @@ -434,6 +434,14 @@ pub struct ExportConfig { /// every dead-ratio crossing, the default). Typical enabled value: 8. #[serde(skip_serializing_if = "Option::is_none", default)] pub compaction_cooldown: Option, + + /// Lineage: the logical source this volume was created from, as a `FromRef` + /// string (`"image:"`, `"volume:"`, `"snapshot:"`). `None` + /// for a blank volume or a volume created before lineage tracking. Recorded + /// in the durable name-keyed index so GlideFS — not the caller — owns the + /// parent/child graph. See `block::registry`. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub source: Option, } impl ExportConfig { @@ -584,6 +592,7 @@ impl NbdConfig { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }]; } @@ -597,6 +606,7 @@ impl NbdConfig { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }] } } @@ -920,6 +930,7 @@ impl Settings { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }], device_name: None, device_size_gb: None, diff --git a/glidefs/tests/blktests.rs b/glidefs/tests/blktests.rs index c52012d..743769e 100644 --- a/glidefs/tests/blktests.rs +++ b/glidefs/tests/blktests.rs @@ -223,6 +223,7 @@ mod blktests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(nbd_config, false, None, None) @@ -252,6 +253,7 @@ mod blktests { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(ublk_config, false, None, None) diff --git a/glidefs/tests/docker_integration/bless_api.rs b/glidefs/tests/docker_integration/bless_api.rs index 14bc735..de95e49 100644 --- a/glidefs/tests/docker_integration/bless_api.rs +++ b/glidefs/tests/docker_integration/bless_api.rs @@ -387,6 +387,7 @@ async fn test_bless_api_full_flow() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(config, false, Some("bases/myimage"), None) diff --git a/glidefs/tests/docker_integration/data_integrity.rs b/glidefs/tests/docker_integration/data_integrity.rs index e24a2e3..a4d9965 100644 --- a/glidefs/tests/docker_integration/data_integrity.rs +++ b/glidefs/tests/docker_integration/data_integrity.rs @@ -129,6 +129,7 @@ transport_test! { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; let result = server2 .router diff --git a/glidefs/tests/docker_integration/integrity_suite.rs b/glidefs/tests/docker_integration/integrity_suite.rs index 1473dbf..541baec 100644 --- a/glidefs/tests/docker_integration/integrity_suite.rs +++ b/glidefs/tests/docker_integration/integrity_suite.rs @@ -1446,6 +1446,7 @@ async fn fork_chain_integrity() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; server .router @@ -1554,6 +1555,7 @@ async fn fork_deep_chain_read_through() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; server .router @@ -2609,6 +2611,7 @@ async fn soak_mixed_operations() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; server .router @@ -2993,6 +2996,7 @@ async fn soak_fork_chain_churn() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; server .router diff --git a/glidefs/tests/docker_integration/mod.rs b/glidefs/tests/docker_integration/mod.rs index 1d458b7..7072091 100644 --- a/glidefs/tests/docker_integration/mod.rs +++ b/glidefs/tests/docker_integration/mod.rs @@ -753,6 +753,7 @@ impl TestServer { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; self.router .create_export(config, false, None, None) @@ -773,6 +774,7 @@ impl TestServer { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; self.router.save_export(&config).await.unwrap(); } @@ -788,6 +790,7 @@ impl TestServer { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; self.router .create_export(config, false, Some(name), None) @@ -808,6 +811,7 @@ impl TestServer { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; self.router .create_export(config, false, Some(name), None) @@ -833,6 +837,7 @@ impl TestServer { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; self.router .create_export(config, false, Some(source_manifest), None) @@ -859,6 +864,7 @@ impl TestServer { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; self.router .create_export(config, false, Some(source_manifest), Some(snapshot_sequence)) @@ -879,6 +885,7 @@ impl TestServer { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; self.router .create_export(config, true, Some(source_manifest), None) diff --git a/glidefs/tests/fio_bench.rs b/glidefs/tests/fio_bench.rs index cbbfb93..bb9136a 100644 --- a/glidefs/tests/fio_bench.rs +++ b/glidefs/tests/fio_bench.rs @@ -79,6 +79,7 @@ mod fio_bench { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router.create_export(config, false, None, None).await.unwrap(); diff --git a/glidefs/tests/fio_verify.rs b/glidefs/tests/fio_verify.rs index aed73c8..73fb1fe 100644 --- a/glidefs/tests/fio_verify.rs +++ b/glidefs/tests/fio_verify.rs @@ -106,6 +106,7 @@ mod fio_verify { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(config, false, None, None) @@ -377,6 +378,7 @@ mod fio_verify { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router2 .create_export(config, false, Some("verify"), None) @@ -512,6 +514,7 @@ mod fio_verify { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router2 .create_export(config, false, Some("verify"), None) @@ -624,6 +627,7 @@ mod fio_verify { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router2 .create_export(config, false, Some("verify"), None) diff --git a/glidefs/tests/integration/chunk_prefetch.rs b/glidefs/tests/integration/chunk_prefetch.rs index 68d1af0..c083f19 100644 --- a/glidefs/tests/integration/chunk_prefetch.rs +++ b/glidefs/tests/integration/chunk_prefetch.rs @@ -51,6 +51,7 @@ async fn test_prefetch_chunk_metas_cold_start() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router1 .create_export(config.clone(), false, None, None) diff --git a/glidefs/tests/integration/data_safety.rs b/glidefs/tests/integration/data_safety.rs index f5d0112..e05431c 100644 --- a/glidefs/tests/integration/data_safety.rs +++ b/glidefs/tests/integration/data_safety.rs @@ -2217,6 +2217,7 @@ async fn test_cold_wake_stress_concurrent_writes() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router1.create_export(config, false, None, None).await.unwrap(); @@ -2287,6 +2288,7 @@ async fn test_cold_wake_stress_concurrent_writes() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router2.create_export(config2, false, Some("vol1"), None).await.unwrap(); @@ -2567,6 +2569,7 @@ async fn test_concurrent_sub_block_writes_same_not_present_block() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router1 .create_export(config1, false, None, None) @@ -2625,6 +2628,7 @@ async fn test_concurrent_sub_block_writes_same_not_present_block() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router2 .create_export(config2, false, Some("vol"), None) diff --git a/glidefs/tests/integration/flush_ordering.rs b/glidefs/tests/integration/flush_ordering.rs index 0b837e4..47d1444 100644 --- a/glidefs/tests/integration/flush_ordering.rs +++ b/glidefs/tests/integration/flush_ordering.rs @@ -67,6 +67,7 @@ fn test_export_config(name: &str) -> ExportConfig { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, } } diff --git a/glidefs/tests/integration/flush_safety.rs b/glidefs/tests/integration/flush_safety.rs index 890d547..0d0d3e7 100644 --- a/glidefs/tests/integration/flush_safety.rs +++ b/glidefs/tests/integration/flush_safety.rs @@ -150,6 +150,7 @@ fn test_export_config(name: &str) -> ExportConfig { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, } } @@ -1028,6 +1029,7 @@ async fn test_readahead_prefetches_next_pack_index() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(config.clone(), false, None, None) diff --git a/glidefs/tests/integration/snapshots.rs b/glidefs/tests/integration/snapshots.rs index 369122f..6f5183e 100644 --- a/glidefs/tests/integration/snapshots.rs +++ b/glidefs/tests/integration/snapshots.rs @@ -157,6 +157,7 @@ async fn test_fork_from_snapshot() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(config, false, None, None) @@ -182,6 +183,7 @@ async fn test_fork_from_snapshot() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("vm1"), Some(snap1.sequence)) @@ -343,6 +345,7 @@ async fn test_purge_export_deletes_snapshots() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(config, false, None, None) @@ -523,6 +526,7 @@ async fn test_remove_without_purge_preserves_snapshots() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(config, false, None, None) @@ -584,6 +588,7 @@ async fn test_snapshot_tag_and_fork() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router.create_export(config, false, None, None).await.unwrap(); @@ -607,6 +612,7 @@ async fn test_snapshot_tag_and_fork() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("setup-abc123"), None) @@ -656,6 +662,7 @@ async fn test_standalone_tag() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router.create_export(config, false, None, None).await.unwrap(); @@ -679,6 +686,7 @@ async fn test_standalone_tag() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("my-tag"), None) @@ -1151,6 +1159,7 @@ async fn test_fork_from_snapshot_zero_overwrite_sees_original() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(config, false, None, None) @@ -1190,6 +1199,7 @@ async fn test_fork_from_snapshot_zero_overwrite_sees_original() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("vm1"), Some(snap1.sequence)) @@ -1220,6 +1230,7 @@ async fn test_fork_from_snapshot_zero_overwrite_sees_original() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork2_config, false, Some("vm1"), Some(_snap2.sequence)) @@ -1445,6 +1456,7 @@ async fn test_fork_parent_deleted_child_survives() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(parent_config, false, None, None) @@ -1473,6 +1485,7 @@ async fn test_fork_parent_deleted_child_survives() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(child_config, false, Some("parent"), None) @@ -1532,6 +1545,7 @@ async fn test_fork_parent_deleted_child_survives() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router2 .create_export(restore_config, false, Some("child"), None) @@ -1619,6 +1633,7 @@ async fn test_fork_concurrent_parent_write_during_fork() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(config, false, None, None) @@ -1659,6 +1674,7 @@ async fn test_fork_concurrent_parent_write_during_fork() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("parent"), None) @@ -1735,6 +1751,7 @@ async fn test_fork_inherit_then_overwrite_all() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(config, false, None, None) @@ -1760,6 +1777,7 @@ async fn test_fork_inherit_then_overwrite_all() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("parent"), None) @@ -1808,6 +1826,7 @@ async fn test_fork_inherit_then_overwrite_all() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router2 .create_export(restore_config, false, Some("child"), None) @@ -1868,6 +1887,7 @@ async fn test_fork_from_snapshot_during_active_writes() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(config, false, None, None) @@ -1904,6 +1924,7 @@ async fn test_fork_from_snapshot_during_active_writes() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(fork_config, false, Some("vm1"), Some(snap.sequence)) @@ -1969,6 +1990,7 @@ async fn test_fork_both_children_from_same_parent() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(parent_config, false, None, None) @@ -1993,6 +2015,7 @@ async fn test_fork_both_children_from_same_parent() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(config, false, Some("parent"), None) diff --git a/glidefs/tests/zc_glidefs.rs b/glidefs/tests/zc_glidefs.rs index e533e91..18702f6 100644 --- a/glidefs/tests/zc_glidefs.rs +++ b/glidefs/tests/zc_glidefs.rs @@ -108,6 +108,7 @@ async fn setup_router_full( flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(config, false, None, None) @@ -481,6 +482,7 @@ async fn zc_glidefs_multi_device_soak() { flush_mode: None, transport: None, compaction_cooldown: None, + source: None, }; router .create_export(cfg, false, None, None)