Skip to content

feat: S3 object storage offloading for V3 bucket data#673

Draft
Sleepful wants to merge 98 commits into
mainfrom
s3-offloading
Draft

feat: S3 object storage offloading for V3 bucket data#673
Sleepful wants to merge 98 commits into
mainfrom
s3-offloading

Conversation

@Sleepful

@Sleepful Sleepful commented Jun 11, 2026

Copy link
Copy Markdown
Collaborator

Summary

Offload BucketDataDocumentV3.ops[] arrays to object storage (S3), keeping only a metadata shell in MongoDB. The service reads S3 objects and streams ops to clients using the existing wire protocol — no protocol changes. Object storage is optional at configuration level; when not configured, ops remain inline in MongoDB as today.

Changes

  • ObjectStorage interfaceput/get/delete contract, decouples storage backend from core logic
  • S3ObjectStorage — production implementation using @aws-sdk/client-s3 (static import)
  • MemoryObjectStorage — in-memory test double (no Docker/S3 needed in CI)
  • Config — optional object_storage: section in MongoStorageConfig with type s3
  • Write pathflushBucketData() uploads zstd-compressed BSON ops to S3, inserts metadata shell in MongoDB
  • Read pathgetBucketDataBatchImpl() parallel pre-fetches S3 objects, patches doc.ops before existing decode loop
  • CompactioncompactSingleBucket() and clearBucketLeading() are S3-aware (read/write/cleanup old refs)
  • InjectionobjectStorage? threaded through the full chain, all optional fields, zero breaking changes

Design Decisions

  • No inline threshold — all documents offload to S3 when configured. A general threshold is deferred to a follow-up.
  • S3 path formatbucket-data/<group>/<def>/<bucket>/<minOp>-<maxOp>
  • Zstd whole-object compression — entire BSON ops array compressed as one blob
  • Batch sizingcompressed_size * 3 heuristic keeps batch memory bounded for S3-backed docs

Manual Verification [TODO]

S3ObjectStorage is not exercised in CI. To manually validate before shipping:

  1. Start MinIO: minio server /tmp/minio-data --console-address :9001
  2. Configure object_storage with endpoint: http://localhost:9000 and S3 type
  3. Write/read/compact via client API, verify with mc ls

forcePathStyle: true is already set when endpoint is present (required by MinIO).

Sleepful added 30 commits April 28, 2026 17:57
Renames all class, function, type, and collection accessor names in
the duplicated v5 storage implementation from V3→V5:
- MongoBucketBatchV3 → MongoBucketBatchV5
- MongoChecksumsV3 → MongoChecksumsV5
- MongoCompactorV3 → MongoCompactorV5
- MongoParameterCompactorV3 → MongoParameterCompactorV5
- MongoParameterLookupV3 → MongoParameterLookupV5
- MongoSyncBucketStorageV3 → MongoSyncBucketStorageV5
- PersistedBatchV3 → PersistedBatchV5
- SingleBucketStoreV3 → SingleBucketStoreV5
- SourceRecordStoreV3 → SourceRecordStoreV5
- VersionedPowerSyncMongoV3 → VersionedPowerSyncMongoV5

Also adds compressedBucketStorage to StorageConfig and wires up
MongoSyncBucketStorageV5 selection in createMongoSyncBucketStorage.

This is a pure mechanical rename with no behavior changes.
Change BucketDataDocumentV5 to store arrays of operations per document:
- Add BucketOperationV5 interface with per-op fields including op_id
- Add aggregated fields: min_op, checksum, count, size
- Implement serializeBucketDataV5() to group ops and compute aggregates
- Implement loadBucketDataDocumentV5() as generator yielding from ops array

Add chunking logic in PersistedBatchV5.flushBucketData():
- Group operations by bucket then chunk by 1MB size threshold
- Single-op chunks remain valid for backward compatibility

Update read path in MongoSyncBucketStorageV5 to iterate merged docs.
Update SingleBucketStoreV5 for new generator-based load function.
Overrides compactSingleBucket in MongoCompactorV5 to handle the
compressed bucket storage model:

1. Reads all documents in a bucket sorted by _id.o ascending
2. Loads all ops via loadBucketDataDocumentV5()
3. Filters superseded operations using the same row_id tracking
   logic as v3 (newest-to-oldest pass, keeps only latest PUT/REMOVE
   per row)
4. Re-chunks surviving ops by 1MB data-size threshold
5. Replaces old documents with new chunked docs in a transaction
6. Updates bucket_state with recomputed checksums, counts, and bytes

Unlike v3, v5 does not create MOVE/CLEAR ops during compaction.
Instead, superseded ops are dropped and surviving ops are fully
restructured into new documents.
…egation and activate v5 in test matrix

- Override MongoChecksumsV5.computePartialChecksumsForCollection to use
document-level checksum field instead of expanding ops arrays
- Handle partial ranges correctly by filtering ops when start > min_op
- Fix getBucketDataBatchV5 to respect op-level limits instead of document limits
- Update PowerSyncMongo.versioned to create VersionedPowerSyncMongoV5 for v5
- Add STORAGE_VERSION_5 to SUPPORTED_STORAGE_VERSIONS and STORAGE_VERSION_CONFIG
- Update getMongoStorageConfig to enable compressedBucketStorage for v5
- Fix v3-specific tests to only run on storageVersion == 3
…tractMongoSyncBucketStorage and MongoSyncBucketStorageBase → MongoSyncBucketStorage
…ter to MongoParameterCompactor base class

Make collectionFilter() and deleteFilter() concrete in the base class
with the V3/V5 implementation (returns {} and {lookup, _id, key}
respectively). Remove the abstract keyword from the base class.

Delete the now-redundant V3 and V5 parameter compactor subclasses:
- v3/MongoParameterCompactorV3.ts
- v5/MongoParameterCompactorV5.ts

Update MongoSyncBucketStorageV3 and V5 to instantiate MongoParameterCompactor
directly, passing the collection lister callback inline.
…acks interface to separate file

- Create common/MongoSyncBucketStorageCallbacks.ts with the full interface
- Replace inline MongoSyncBucketStorageBaseCallbacks in MongoSyncBucketStorageBase.ts
- Type _versionCallbacks as MongoSyncBucketStorageCallbacks in AbstractMongoSyncBucketStorage
- Update v3 and v5 implementations to import from the new file
- Use 'any' for createCompactor's storage parameter to avoid circular imports
Move getParameterSetsShared, getBucketDataBatchSharedWrapper,
getDataBucketChangesShared, and getParameterBucketChangesShared from
bucket-operations/storage-operations.ts into MongoSyncBucketStorageBase as
private method implementations. Eliminate the context object pattern by
accessing this.callbacks and this.group_id directly. Flatten the
getBucketDataBatchShared -> getBucketDataBatchSharedWrapper chain into a
single getBucketDataBatchImpl method. Delete the now-unused
bucket-operations/storage-operations.ts.
Extract identical types from v3/models.ts and v5/models.ts into a shared
common/models.ts without version suffixes:
- CurrentBucket
- RecordedLookup
- CurrentDataDocument
- BucketParameterDocument
- SourceTableDocument
- BucketStateDocument
- taggedBucketParameterDocumentToTagged

Update v3/models.ts and v5/models.ts to re-export from common/models.ts,
keeping only version-specific exports (BucketDataDocumentV3/V5, etc.).

Update all imports across the codebase to use non-suffixed names from
common/models.ts or version-specific names where appropriate.

Update storage-index.ts to use explicit exports to avoid naming conflicts
with v1/models.ts and models.ts.
…-export wrappers

Remove unnecessary re-export files that aliased serializeParameterLookup
with V3/V5 suffixes. Update PersistedBatchV3 and PersistedBatchV5 to
import serializeParameterLookup directly from document-formats/parameter-lookup.js.
- Rename ambiguous variables for clarity (context, seen, survivingOps)
- Extract inline MongoDB aggregation pipeline into named builder with stage comments
- Split large methods into focused private helpers
- Fix hidden state coupling in bucketStateUpdates via explicit return values
- Replace declare keyword abuse with proper getter narrowing
- Centralize scattered type casts into validation helpers with comments
- Rename misleading type alias FetchPartialBucketChecksumV3
- Add shared abstraction for listBucketDataCollections
- Add JSDoc comments for chunking policy and format methods
- Add validation helper for partial document field extraction
Sleepful added 27 commits May 28, 2026 00:41
The three files in document-formats/ (bucket-document-format, chunking,
parameter-lookup) are V3-only — no V1 consumer exists. Move them into
the v3/ subtree so the module boundary is clear.
Collapse single-use abstractions and remove structural overhead added
during compressed-bucket-storage development. Every change preserves
behavior; the diff from main now shows only genuine new functionality.

## Removals
- Delete 7 dead functions/types: serializeBucketDataV3, loadBucketDataV3,
  taggedBucketParameterDocumentToV3, buildBucketDataQuery,
  listParameterIndexCollectionsV3, listBucketDataCollectionsV3,
  bucketDataV3 (zero callers)
- Delete BucketDataKeyV3 type alias (identical to BucketDataKey)
- Delete BucketDataContextParams and BucketStateLookup interfaces
- Delete 2 redundant PersistedBatchV3 wrappers: serializeParameterLookup
  and taggedBucketParameterDocumentToTagged

## Inlines
- Inline 9 composite wrapper functions into their sole class methods:
  normalizeBatch, fetchPreStates, computePartialChecksumsDirectByDefinition,
  computePartialChecksumsInternal → MongoChecksumsV3;
  dirtyBucketBatches, dirtyBucketBatchForChecksums,
  computeChecksumsForBuckets, bucketStateFilter,
  resolveBucketDefinitionId → MongoCompactorV3
- Inline 4 collection accessor wrappers from PersistedBatchV3
  (parameterIndex, sourceTables, sourceRecords, bucketState)
- Inline extractRowsFromDocument's loadDocument param (one caller,
  one impl)
- Inline UpstreamType/StorageConfigType aliases

## Removals (classes/adapters)
- Remove BucketDocumentFormatAdapter class (wrapped 4 standalone
  functions); inline into SingleBucketStoreV3, PersistedBatchV3,
  and MongoSyncBucketStorageV3
- Replace 6 generic <T> collection accessors on VersionedPowerSyncMongoV3
  with concrete V3-typed methods; delete duplicate *V3() methods
- Unexport createBucketFilter (internal-only)
- Revert unnecessary protected→public on MongoCompactor's
  dirtyBucketBatchesForCollection and dirtyBucketBatchForChecksumsForCollection

## Restorations
- Restore 3 V1 files to exact main-branch content
  (MongoParameterCompactorV1, SingleBucketStoreV1,
  VersionedPowerSyncMongoV1 — only had cosmetic diffs)

## Renames and consolidations
- Rename BucketDataDocument → BucketDataDocumentV3 per reviewer feedback;
  delete obsolete single-op BucketDataDocumentV3 wrapper
- Merge parameter-lookup.ts (12 lines) → v3/models.ts
- Flatten document-formats/ subdirectory: move chunking.ts up to v3/,
  rename bucket-document-format.ts → bucket-format.ts and move to v3/
- Fix inline type imports → top-level imports

Net: 20 files, -333 lines. 2 files deleted (parameter-lookup.ts,
document-formats/bucket-document-format.ts). 1 subdirectory eliminated.
356/356 tests pass.
Resolve 6 merge conflicts (all mechanical — import reshuffling and rename
cascades). Fix modify/delete conflict on MongoParameterLookupV3.ts
(accept our deletion; upstream changes were import-source only).

Add explicit re-export of BucketDefinitionId and ParameterIndexId in
BucketDefinitionMapping.ts (upstream removed them; our V3 files still
import from local path). Fix stale loadBucketDataDocumentV3 references
reintroduced by auto-merge. Remove generic type param from test
bucketData() calls after accessor consolidation.
…ssor pattern

Fix loadBucketDataDocumentV3→loadBucketDataDocument, bucketState<BucketStateDocument>
and bucketData<BucketDataDocumentV3> stale references from auto-merge.

Revert db/storage/checksums accessor pattern in PersistedBatch,
MongoChecksums, and MongoCompactor base classes to protected readonly
constructor params. Replace subclass getter overrides with declare in
V1 and V3 subclasses. Restore MongoCompactorV1 to upstream/main content.
…etStorage

Revert rename to match upstream/main shape. Revert db and checksums
from private+getter to readonly field (set in constructor body via
factory methods). Update V1/V3 subclasses from getter overrides
to declare.

Together with prior parent class reverts, 8 of 10 V1 files now
match upstream/main exactly.

refactor: revert MongoBucketBatch _db pattern to readonly db field

Replace protected _db + public get db() with readonly db field.
Update V1/V3 subclasses from get db() getter to declare.
Replace 13 any usages with concrete types across 5 source files:
- PersistedBatchV3: 6 AnyBulkWriteOperation<any>/any[] → CurrentDataDocumentV3,
  BucketDataDoc, BucketDataDocumentV3, SourceTableDocumentV3, BucketStateDocumentV3
- MongoChecksumsV3: aggregate: any[] → bson.Document[]
- MongoSyncBucketStorageV3: doc: any → bson.Document, Collection<any> → Collection<BucketParameterDocumentV3>
- db.ts: Collection<any>[] → Collection<BucketDataDocumentV3>[], collectionsByPrefix<any> → <BucketDataDocumentV3>

One as-unknown-as cast retained in SingleBucketStoreV3 (required by
TypeScript — BucketDataDocumentGeneric does not overlap with
BucketDataDocumentV3).

356/356 tests pass.
…on code

Replace storageConfig.compressedBucketStorage || storageConfig.incrementalReprocessing
with storageConfig.version >= storage.STORAGE_VERSION_3 in both decision points
(db.ts versioned() and createMongoSyncBucketStorage). Remove the flag, its
doc comment, and the COMPRESSED_BUCKET_STORAGE_VERSION constant from models.ts.

Test infrastructure (TestStorageConfig.compressedBucketStorage) is unchanged
— the flag there is already set to storageVersion >= 3 at call sites.

356/356 tests pass.
Delete the suffix-dropped BucketStateDocument from common/models.ts
and use BucketStateDocumentV3 from v3/models.ts throughout
MongoCompactorV3. Remove now-unused BucketStateDocumentBase import
from common/models.ts.

355/356 tests pass (1 pre-existing flaky snapshot).
…DataDocument/SourceTableDocument

All four suffix-dropped types in common/models.ts were V3 types renamed
without the suffix. Restore V3 names throughout:

  CurrentBucket     → CurrentBucketV3
  RecordedLookup    → RecordedLookupV3
  CurrentDataDocument → CurrentDataDocumentV3
  SourceTableDocument → SourceTableDocumentV3

Move taggedBucketParameterDocumentToTagged to v3/models.ts and
return BucketParameterDocument → BucketParameterDocumentV3.

Redirect models.ts and storage-index.ts imports to use v3/models.ts.
Delete now-empty common/models.ts.

355/356 tests pass (1 pre-existing flaky snapshot).
When a V3 document contains ops both below and above the compaction
horizon (maxOpId), the compactor previously dropped ops above the
horizon. The document was deleted (because it had at least one relevant
op) but only ops <= maxOpId were re-inserted.

Fix: collect all ops from processable documents into batchOps via
candidateOps, deferring the push until processability is determined.
Ops > maxOpId bypass the dedup loop (no seen-map entry, no MOVE
conversion) and pass through to surviving unchanged.

Add comment section headers to compactSingleBucket for readability.
Add tests for mixed-document preservation and range overlap invariant.
Update existing maxOpId filtering test to reflect new pass-through
behavior.
Add a verification aggregate inside each batch transaction that
checks the documents being deleted haven't changed since the read
phase. The aggregate runs as the first operation in the transaction,
anchoring the snapshot. It compares document count, checksum sum,
and op count sum against expected values computed from the read.

If a concurrent compaction modified or deleted the same documents:
- Before transaction start: aggregate catches checksum/count mismatch
- During transaction: MongoDB write conflict aborts the write

Fixes a race condition where two concurrent compact jobs could delete
the same documents and each insert their own replacements, producing
duplicate ops.
The import path ../../src/storage/implementation/common/models.js
does not exist. CurrentBucket has been in ../models.ts and extended
by CurrentBucketV3 in v3/models.ts which includes the 'def' property
used in this test.
Collapse the leading contiguous sequence of MOVE/REMOVE/CLEAR ops
at the start of a bucket into a single CLEAR op, reducing op count
for clients syncing the bucket.

Read: paginated ascending from minId up to lastNotPut. Three
per-doc cases: past region (break), boundary inside doc (split and
accumulate survivors), fully cleared (accumulate all). Write: single
atomic transaction with verify aggregate guard against concurrent
compaction, then delete all cleared docs, insert one CLEAR doc,
and insert surviving ops from any boundary doc split.

Guard CLEAR with opsSincePut >= 2. Single-op sequences don't reduce
op count, so skip them.

Adjust existing MOVE-pass tests to expect CLEAR ops instead of raw
MOVE tombstones where the leading sequence qualifies for collapse.
…cument

Yield doc.target_op ?? null instead of hardcoded null. This fixes
two issues: the CLEAR pass now correctly accumulates target_op from
collapsed MOVEs, and the sync path now surfaces target_op in the
SyncBucketDataChunk response for checkpoint invalidation.
Upstream test used db.sourceRecordsV3() which was renamed to
db.sourceRecords() during the V3 suffix removal refactor.
Replace TRequest generic with concrete FetchPartialBucketChecksumByBucket
in createBucketFilter, buildPartialChecksumPipeline, and
normalizePartialChecksumResults. Call createBucketFilter directly
in buildPartialChecksumPipeline instead of threading it as a parameter.

Drop the unused createFilter parameter from the V3 override of
computePartialChecksumsForCollection. TypeScript allows method
overrides to have fewer parameters than the base, and the V3 body
no longer uses it. Callers still pass createBucketFilter to satisfy
the base class contract, but it is silently stripped.

The two generics TRequest and TBucketDataDocument on the override
must stay — MongoDB Collection<T> is invariant and V1 callers
pass types narrower than the concrete alternatives.
Add failing tests in storage_s3_writing.test.ts that exercise the
MemoryObjectStorage helper and confirm the S3 write path guard condition
works. Thread the objectStorage option through the storage stack
(MongoBucketStorage, MongoSyncBucketStorage, MongoBucketBatch,
PersistedBatch) so it is available for future implementation.

Model changes: make ops optional in BucketDataDocumentV3 to support
storage_ref-only documents. Add StorageRef type and loadBucketDataDocument
guard for empty ops. Add S3ObjectStorage config type and object_storage
config field. Add @aws-sdk/client-s3 and @mongodb-js/zstd dependencies.

Update existing compacting tests to use non-null assertions on ops since
it is now optional.
Implement S3 offloading in PersistedBatchV3: BSON-serialize, zstd-compress
and upload bucket data chunks to objectStorage. Insert metadata shells
with storage_ref in MongoDB instead of inline ops. Update Phase 2b test
assertions with non-null accessors now that the write path works.

Add storage_s3_reading.test.ts with 3 failing tests for the S3 read path:
round-trip write/read, missing S3 object handling, and mixed inline+S3
batch reads. All 3 must fail until the read path fetches from S3.
…test

Pre-fetch and decompress S3 objects for storage_ref docs during
getBucketDataBatch so ops from S3-backed documents are included
in bucket data responses and size tracking.

Add red test for S3-aware compaction (Phase 2d): verifies that
compacted_state is populated correctly, S3 objects are cleaned,
MongoDB docs are replaced, and read path survives compaction.
This test fails because compactSingleBucket does not yet fetch
ops from S3-backed storage_ref documents.
Compaction now pre-fetches S3-backed ops before decode, uploads new S3
objects after rechunking, and cleans up old storage_refs after transaction
commit. Batch size calculation accounts for storage_ref.compressed_size.

S3ObjectStorage implements the ObjectStorage interface using
@aws-sdk/client-s3, wired through MongoStorageProvider when config specifies
object_storage.type: s3.
- Align S3 path format: write and compact both use maxOp (_id.o)
  suffix (minOp-maxOp-maxOp), not minOp
- Scale compaction batch size by compressed_size * 3 for S3-backed
  docs, matching the read path multiplier
- clearBucketLeading(): upload CLEAR doc and boundary survivors to
  S3 when objectStorage is configured, with old ref cleanup after
  the transaction
- Fix compaction test: allow S3 path reuse when op ranges don't
  change after dedup
- Remove dead `compression` field from StorageRef interface and all sites
- Add comments explaining compressed_size * 3 heuristic for byte tracking
- Simplify S3 paths from ${minOp}-${maxOp}-${maxOp} to ${minOp}-${maxOp}
- Invert objectStorage guards: inline path first, S3 as else branch
- loadBucketDataDocument() now throws on undefined ops (empty arrays still ok)
- Set doc.ops = [] in S3 fetch error catch blocks for graceful skip
@changeset-bot

changeset-bot Bot commented Jun 11, 2026

Copy link
Copy Markdown

⚠️ No Changeset found

Latest commit: e17627c

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@rkistner rkistner left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks quite promising, and I like the structure.

Some initial high-level comments:

  1. Currently there are various places in the code doing the same compression/decompression and serialize/deserialize logic. Should we perhaps do this in a wrapper class for ObjectStorage? E.g. a BucketDataObjectStorage that wraps ObjectStorage and does that logic?
  2. NodeJS now has built-in zstd support. But I haven't checked how the APIs and performance compares with @mongodb/zstd. Since we're already using @mongodb/zstd implicitly, that should be fine.
  3. We do need a threshold for inlining ops directly in mongodb storage, before we can merge & release this: S3 has too much overhead for storing say individual 100-byte operations.

await session.endSession();
}

// After commit: delete old S3 objects (best-effort)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker for initial testing, but it could be problematic if we leave orphaned documents in the bucket indefinitely (either from the delete request failing, or from say a process crash/restart between the commit and the delete).

Is there some way we can ensure these are cleaned up eventually? Maybe persisting a "delete queue" in mongodb, or running a periodic cleanup job (maybe part of the compact job)?

Comment on lines +500 to +503
// Track sizes: for S3 docs multiply compressed_size by 3 as a rough
// decompressed estimate to keep chunk byte tracking bounded. Without a
// multiplier, metadata shells (~200 bytes) would let thousands of
// S3-backed docs pack into a single chunk before splitting.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have the size on the mongodb document - could we use that instead of the estimate?

Comment on lines +492 to +493
this.logger.warn(`Failed to fetch/decompress S3 object ${doc.storage_ref?.path}: ${err}`);
doc.ops = [];

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a hard error - setting doc.ops = [] may result in data inconsistencies.

Base automatically changed from compressed-bucket-storage to main June 15, 2026 08:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants