feat: S3 object storage offloading for V3 bucket data#673
Conversation
Renames all class, function, type, and collection accessor names in the duplicated v5 storage implementation from V3→V5: - MongoBucketBatchV3 → MongoBucketBatchV5 - MongoChecksumsV3 → MongoChecksumsV5 - MongoCompactorV3 → MongoCompactorV5 - MongoParameterCompactorV3 → MongoParameterCompactorV5 - MongoParameterLookupV3 → MongoParameterLookupV5 - MongoSyncBucketStorageV3 → MongoSyncBucketStorageV5 - PersistedBatchV3 → PersistedBatchV5 - SingleBucketStoreV3 → SingleBucketStoreV5 - SourceRecordStoreV3 → SourceRecordStoreV5 - VersionedPowerSyncMongoV3 → VersionedPowerSyncMongoV5 Also adds compressedBucketStorage to StorageConfig and wires up MongoSyncBucketStorageV5 selection in createMongoSyncBucketStorage. This is a pure mechanical rename with no behavior changes.
Change BucketDataDocumentV5 to store arrays of operations per document: - Add BucketOperationV5 interface with per-op fields including op_id - Add aggregated fields: min_op, checksum, count, size - Implement serializeBucketDataV5() to group ops and compute aggregates - Implement loadBucketDataDocumentV5() as generator yielding from ops array Add chunking logic in PersistedBatchV5.flushBucketData(): - Group operations by bucket then chunk by 1MB size threshold - Single-op chunks remain valid for backward compatibility Update read path in MongoSyncBucketStorageV5 to iterate merged docs. Update SingleBucketStoreV5 for new generator-based load function.
Overrides compactSingleBucket in MongoCompactorV5 to handle the compressed bucket storage model: 1. Reads all documents in a bucket sorted by _id.o ascending 2. Loads all ops via loadBucketDataDocumentV5() 3. Filters superseded operations using the same row_id tracking logic as v3 (newest-to-oldest pass, keeps only latest PUT/REMOVE per row) 4. Re-chunks surviving ops by 1MB data-size threshold 5. Replaces old documents with new chunked docs in a transaction 6. Updates bucket_state with recomputed checksums, counts, and bytes Unlike v3, v5 does not create MOVE/CLEAR ops during compaction. Instead, superseded ops are dropped and surviving ops are fully restructured into new documents.
…egation and activate v5 in test matrix - Override MongoChecksumsV5.computePartialChecksumsForCollection to use document-level checksum field instead of expanding ops arrays - Handle partial ranges correctly by filtering ops when start > min_op - Fix getBucketDataBatchV5 to respect op-level limits instead of document limits - Update PowerSyncMongo.versioned to create VersionedPowerSyncMongoV5 for v5 - Add STORAGE_VERSION_5 to SUPPORTED_STORAGE_VERSIONS and STORAGE_VERSION_CONFIG - Update getMongoStorageConfig to enable compressedBucketStorage for v5 - Fix v3-specific tests to only run on storageVersion == 3
…tractMongoSyncBucketStorage and MongoSyncBucketStorageBase → MongoSyncBucketStorage
…ter to MongoParameterCompactor base class
Make collectionFilter() and deleteFilter() concrete in the base class
with the V3/V5 implementation (returns {} and {lookup, _id, key}
respectively). Remove the abstract keyword from the base class.
Delete the now-redundant V3 and V5 parameter compactor subclasses:
- v3/MongoParameterCompactorV3.ts
- v5/MongoParameterCompactorV5.ts
Update MongoSyncBucketStorageV3 and V5 to instantiate MongoParameterCompactor
directly, passing the collection lister callback inline.
…acks interface to separate file - Create common/MongoSyncBucketStorageCallbacks.ts with the full interface - Replace inline MongoSyncBucketStorageBaseCallbacks in MongoSyncBucketStorageBase.ts - Type _versionCallbacks as MongoSyncBucketStorageCallbacks in AbstractMongoSyncBucketStorage - Update v3 and v5 implementations to import from the new file - Use 'any' for createCompactor's storage parameter to avoid circular imports
Move getParameterSetsShared, getBucketDataBatchSharedWrapper, getDataBucketChangesShared, and getParameterBucketChangesShared from bucket-operations/storage-operations.ts into MongoSyncBucketStorageBase as private method implementations. Eliminate the context object pattern by accessing this.callbacks and this.group_id directly. Flatten the getBucketDataBatchShared -> getBucketDataBatchSharedWrapper chain into a single getBucketDataBatchImpl method. Delete the now-unused bucket-operations/storage-operations.ts.
… MongoCompactorV3 and V5
Extract identical types from v3/models.ts and v5/models.ts into a shared common/models.ts without version suffixes: - CurrentBucket - RecordedLookup - CurrentDataDocument - BucketParameterDocument - SourceTableDocument - BucketStateDocument - taggedBucketParameterDocumentToTagged Update v3/models.ts and v5/models.ts to re-export from common/models.ts, keeping only version-specific exports (BucketDataDocumentV3/V5, etc.). Update all imports across the codebase to use non-suffixed names from common/models.ts or version-specific names where appropriate. Update storage-index.ts to use explicit exports to avoid naming conflicts with v1/models.ts and models.ts.
…-export wrappers Remove unnecessary re-export files that aliased serializeParameterLookup with V3/V5 suffixes. Update PersistedBatchV3 and PersistedBatchV5 to import serializeParameterLookup directly from document-formats/parameter-lookup.js.
…ageBase.ts to MongoSyncBucketStorage.ts
- Rename ambiguous variables for clarity (context, seen, survivingOps) - Extract inline MongoDB aggregation pipeline into named builder with stage comments - Split large methods into focused private helpers - Fix hidden state coupling in bucketStateUpdates via explicit return values - Replace declare keyword abuse with proper getter narrowing - Centralize scattered type casts into validation helpers with comments - Rename misleading type alias FetchPartialBucketChecksumV3 - Add shared abstraction for listBucketDataCollections - Add JSDoc comments for chunking policy and format methods - Add validation helper for partial document field extraction
The three files in document-formats/ (bucket-document-format, chunking, parameter-lookup) are V3-only — no V1 consumer exists. Move them into the v3/ subtree so the module boundary is clear.
Collapse single-use abstractions and remove structural overhead added during compressed-bucket-storage development. Every change preserves behavior; the diff from main now shows only genuine new functionality. ## Removals - Delete 7 dead functions/types: serializeBucketDataV3, loadBucketDataV3, taggedBucketParameterDocumentToV3, buildBucketDataQuery, listParameterIndexCollectionsV3, listBucketDataCollectionsV3, bucketDataV3 (zero callers) - Delete BucketDataKeyV3 type alias (identical to BucketDataKey) - Delete BucketDataContextParams and BucketStateLookup interfaces - Delete 2 redundant PersistedBatchV3 wrappers: serializeParameterLookup and taggedBucketParameterDocumentToTagged ## Inlines - Inline 9 composite wrapper functions into their sole class methods: normalizeBatch, fetchPreStates, computePartialChecksumsDirectByDefinition, computePartialChecksumsInternal → MongoChecksumsV3; dirtyBucketBatches, dirtyBucketBatchForChecksums, computeChecksumsForBuckets, bucketStateFilter, resolveBucketDefinitionId → MongoCompactorV3 - Inline 4 collection accessor wrappers from PersistedBatchV3 (parameterIndex, sourceTables, sourceRecords, bucketState) - Inline extractRowsFromDocument's loadDocument param (one caller, one impl) - Inline UpstreamType/StorageConfigType aliases ## Removals (classes/adapters) - Remove BucketDocumentFormatAdapter class (wrapped 4 standalone functions); inline into SingleBucketStoreV3, PersistedBatchV3, and MongoSyncBucketStorageV3 - Replace 6 generic <T> collection accessors on VersionedPowerSyncMongoV3 with concrete V3-typed methods; delete duplicate *V3() methods - Unexport createBucketFilter (internal-only) - Revert unnecessary protected→public on MongoCompactor's dirtyBucketBatchesForCollection and dirtyBucketBatchForChecksumsForCollection ## Restorations - Restore 3 V1 files to exact main-branch content (MongoParameterCompactorV1, SingleBucketStoreV1, VersionedPowerSyncMongoV1 — only had cosmetic diffs) ## Renames and consolidations - Rename BucketDataDocument → BucketDataDocumentV3 per reviewer feedback; delete obsolete single-op BucketDataDocumentV3 wrapper - Merge parameter-lookup.ts (12 lines) → v3/models.ts - Flatten document-formats/ subdirectory: move chunking.ts up to v3/, rename bucket-document-format.ts → bucket-format.ts and move to v3/ - Fix inline type imports → top-level imports Net: 20 files, -333 lines. 2 files deleted (parameter-lookup.ts, document-formats/bucket-document-format.ts). 1 subdirectory eliminated. 356/356 tests pass.
Resolve 6 merge conflicts (all mechanical — import reshuffling and rename cascades). Fix modify/delete conflict on MongoParameterLookupV3.ts (accept our deletion; upstream changes were import-source only). Add explicit re-export of BucketDefinitionId and ParameterIndexId in BucketDefinitionMapping.ts (upstream removed them; our V3 files still import from local path). Fix stale loadBucketDataDocumentV3 references reintroduced by auto-merge. Remove generic type param from test bucketData() calls after accessor consolidation.
…ssor pattern Fix loadBucketDataDocumentV3→loadBucketDataDocument, bucketState<BucketStateDocument> and bucketData<BucketDataDocumentV3> stale references from auto-merge. Revert db/storage/checksums accessor pattern in PersistedBatch, MongoChecksums, and MongoCompactor base classes to protected readonly constructor params. Replace subclass getter overrides with declare in V1 and V3 subclasses. Restore MongoCompactorV1 to upstream/main content.
…etStorage Revert rename to match upstream/main shape. Revert db and checksums from private+getter to readonly field (set in constructor body via factory methods). Update V1/V3 subclasses from getter overrides to declare. Together with prior parent class reverts, 8 of 10 V1 files now match upstream/main exactly. refactor: revert MongoBucketBatch _db pattern to readonly db field Replace protected _db + public get db() with readonly db field. Update V1/V3 subclasses from get db() getter to declare.
Replace 13 any usages with concrete types across 5 source files: - PersistedBatchV3: 6 AnyBulkWriteOperation<any>/any[] → CurrentDataDocumentV3, BucketDataDoc, BucketDataDocumentV3, SourceTableDocumentV3, BucketStateDocumentV3 - MongoChecksumsV3: aggregate: any[] → bson.Document[] - MongoSyncBucketStorageV3: doc: any → bson.Document, Collection<any> → Collection<BucketParameterDocumentV3> - db.ts: Collection<any>[] → Collection<BucketDataDocumentV3>[], collectionsByPrefix<any> → <BucketDataDocumentV3> One as-unknown-as cast retained in SingleBucketStoreV3 (required by TypeScript — BucketDataDocumentGeneric does not overlap with BucketDataDocumentV3). 356/356 tests pass.
…on code Replace storageConfig.compressedBucketStorage || storageConfig.incrementalReprocessing with storageConfig.version >= storage.STORAGE_VERSION_3 in both decision points (db.ts versioned() and createMongoSyncBucketStorage). Remove the flag, its doc comment, and the COMPRESSED_BUCKET_STORAGE_VERSION constant from models.ts. Test infrastructure (TestStorageConfig.compressedBucketStorage) is unchanged — the flag there is already set to storageVersion >= 3 at call sites. 356/356 tests pass.
Delete the suffix-dropped BucketStateDocument from common/models.ts and use BucketStateDocumentV3 from v3/models.ts throughout MongoCompactorV3. Remove now-unused BucketStateDocumentBase import from common/models.ts. 355/356 tests pass (1 pre-existing flaky snapshot).
…DataDocument/SourceTableDocument All four suffix-dropped types in common/models.ts were V3 types renamed without the suffix. Restore V3 names throughout: CurrentBucket → CurrentBucketV3 RecordedLookup → RecordedLookupV3 CurrentDataDocument → CurrentDataDocumentV3 SourceTableDocument → SourceTableDocumentV3 Move taggedBucketParameterDocumentToTagged to v3/models.ts and return BucketParameterDocument → BucketParameterDocumentV3. Redirect models.ts and storage-index.ts imports to use v3/models.ts. Delete now-empty common/models.ts. 355/356 tests pass (1 pre-existing flaky snapshot).
When a V3 document contains ops both below and above the compaction horizon (maxOpId), the compactor previously dropped ops above the horizon. The document was deleted (because it had at least one relevant op) but only ops <= maxOpId were re-inserted. Fix: collect all ops from processable documents into batchOps via candidateOps, deferring the push until processability is determined. Ops > maxOpId bypass the dedup loop (no seen-map entry, no MOVE conversion) and pass through to surviving unchanged. Add comment section headers to compactSingleBucket for readability. Add tests for mixed-document preservation and range overlap invariant. Update existing maxOpId filtering test to reflect new pass-through behavior.
Add a verification aggregate inside each batch transaction that checks the documents being deleted haven't changed since the read phase. The aggregate runs as the first operation in the transaction, anchoring the snapshot. It compares document count, checksum sum, and op count sum against expected values computed from the read. If a concurrent compaction modified or deleted the same documents: - Before transaction start: aggregate catches checksum/count mismatch - During transaction: MongoDB write conflict aborts the write Fixes a race condition where two concurrent compact jobs could delete the same documents and each insert their own replacements, producing duplicate ops.
The import path ../../src/storage/implementation/common/models.js does not exist. CurrentBucket has been in ../models.ts and extended by CurrentBucketV3 in v3/models.ts which includes the 'def' property used in this test.
Collapse the leading contiguous sequence of MOVE/REMOVE/CLEAR ops at the start of a bucket into a single CLEAR op, reducing op count for clients syncing the bucket. Read: paginated ascending from minId up to lastNotPut. Three per-doc cases: past region (break), boundary inside doc (split and accumulate survivors), fully cleared (accumulate all). Write: single atomic transaction with verify aggregate guard against concurrent compaction, then delete all cleared docs, insert one CLEAR doc, and insert surviving ops from any boundary doc split. Guard CLEAR with opsSincePut >= 2. Single-op sequences don't reduce op count, so skip them. Adjust existing MOVE-pass tests to expect CLEAR ops instead of raw MOVE tombstones where the leading sequence qualifies for collapse.
…cument Yield doc.target_op ?? null instead of hardcoded null. This fixes two issues: the CLEAR pass now correctly accumulates target_op from collapsed MOVEs, and the sync path now surfaces target_op in the SyncBucketDataChunk response for checkpoint invalidation.
Upstream test used db.sourceRecordsV3() which was renamed to db.sourceRecords() during the V3 suffix removal refactor.
Replace TRequest generic with concrete FetchPartialBucketChecksumByBucket in createBucketFilter, buildPartialChecksumPipeline, and normalizePartialChecksumResults. Call createBucketFilter directly in buildPartialChecksumPipeline instead of threading it as a parameter. Drop the unused createFilter parameter from the V3 override of computePartialChecksumsForCollection. TypeScript allows method overrides to have fewer parameters than the base, and the V3 body no longer uses it. Callers still pass createBucketFilter to satisfy the base class contract, but it is silently stripped. The two generics TRequest and TBucketDataDocument on the override must stay — MongoDB Collection<T> is invariant and V1 callers pass types narrower than the concrete alternatives.
Add failing tests in storage_s3_writing.test.ts that exercise the MemoryObjectStorage helper and confirm the S3 write path guard condition works. Thread the objectStorage option through the storage stack (MongoBucketStorage, MongoSyncBucketStorage, MongoBucketBatch, PersistedBatch) so it is available for future implementation. Model changes: make ops optional in BucketDataDocumentV3 to support storage_ref-only documents. Add StorageRef type and loadBucketDataDocument guard for empty ops. Add S3ObjectStorage config type and object_storage config field. Add @aws-sdk/client-s3 and @mongodb-js/zstd dependencies. Update existing compacting tests to use non-null assertions on ops since it is now optional.
Implement S3 offloading in PersistedBatchV3: BSON-serialize, zstd-compress and upload bucket data chunks to objectStorage. Insert metadata shells with storage_ref in MongoDB instead of inline ops. Update Phase 2b test assertions with non-null accessors now that the write path works. Add storage_s3_reading.test.ts with 3 failing tests for the S3 read path: round-trip write/read, missing S3 object handling, and mixed inline+S3 batch reads. All 3 must fail until the read path fetches from S3.
…test Pre-fetch and decompress S3 objects for storage_ref docs during getBucketDataBatch so ops from S3-backed documents are included in bucket data responses and size tracking. Add red test for S3-aware compaction (Phase 2d): verifies that compacted_state is populated correctly, S3 objects are cleaned, MongoDB docs are replaced, and read path survives compaction. This test fails because compactSingleBucket does not yet fetch ops from S3-backed storage_ref documents.
Compaction now pre-fetches S3-backed ops before decode, uploads new S3 objects after rechunking, and cleans up old storage_refs after transaction commit. Batch size calculation accounts for storage_ref.compressed_size. S3ObjectStorage implements the ObjectStorage interface using @aws-sdk/client-s3, wired through MongoStorageProvider when config specifies object_storage.type: s3.
- Align S3 path format: write and compact both use maxOp (_id.o) suffix (minOp-maxOp-maxOp), not minOp - Scale compaction batch size by compressed_size * 3 for S3-backed docs, matching the read path multiplier - clearBucketLeading(): upload CLEAR doc and boundary survivors to S3 when objectStorage is configured, with old ref cleanup after the transaction - Fix compaction test: allow S3 path reuse when op ranges don't change after dedup
- Remove dead `compression` field from StorageRef interface and all sites
- Add comments explaining compressed_size * 3 heuristic for byte tracking
- Simplify S3 paths from ${minOp}-${maxOp}-${maxOp} to ${minOp}-${maxOp}
- Invert objectStorage guards: inline path first, S3 as else branch
- loadBucketDataDocument() now throws on undefined ops (empty arrays still ok)
- Set doc.ops = [] in S3 fetch error catch blocks for graceful skip
|
rkistner
left a comment
There was a problem hiding this comment.
This looks quite promising, and I like the structure.
Some initial high-level comments:
- Currently there are various places in the code doing the same compression/decompression and serialize/deserialize logic. Should we perhaps do this in a wrapper class for ObjectStorage? E.g. a BucketDataObjectStorage that wraps ObjectStorage and does that logic?
- NodeJS now has built-in zstd support. But I haven't checked how the APIs and performance compares with
@mongodb/zstd. Since we're already using@mongodb/zstdimplicitly, that should be fine. - We do need a threshold for inlining ops directly in mongodb storage, before we can merge & release this: S3 has too much overhead for storing say individual 100-byte operations.
| await session.endSession(); | ||
| } | ||
|
|
||
| // After commit: delete old S3 objects (best-effort) |
There was a problem hiding this comment.
Not a blocker for initial testing, but it could be problematic if we leave orphaned documents in the bucket indefinitely (either from the delete request failing, or from say a process crash/restart between the commit and the delete).
Is there some way we can ensure these are cleaned up eventually? Maybe persisting a "delete queue" in mongodb, or running a periodic cleanup job (maybe part of the compact job)?
| // Track sizes: for S3 docs multiply compressed_size by 3 as a rough | ||
| // decompressed estimate to keep chunk byte tracking bounded. Without a | ||
| // multiplier, metadata shells (~200 bytes) would let thousands of | ||
| // S3-backed docs pack into a single chunk before splitting. |
There was a problem hiding this comment.
We already have the size on the mongodb document - could we use that instead of the estimate?
| this.logger.warn(`Failed to fetch/decompress S3 object ${doc.storage_ref?.path}: ${err}`); | ||
| doc.ops = []; |
There was a problem hiding this comment.
This should be a hard error - setting doc.ops = [] may result in data inconsistencies.
Summary
Offload
BucketDataDocumentV3.ops[]arrays to object storage (S3), keeping only a metadata shell in MongoDB. The service reads S3 objects and streams ops to clients using the existing wire protocol — no protocol changes. Object storage is optional at configuration level; when not configured, ops remain inline in MongoDB as today.Changes
put/get/deletecontract, decouples storage backend from core logic@aws-sdk/client-s3(static import)object_storage:section inMongoStorageConfigwith types3flushBucketData()uploads zstd-compressed BSON ops to S3, inserts metadata shell in MongoDBgetBucketDataBatchImpl()parallel pre-fetches S3 objects, patchesdoc.opsbefore existing decode loopcompactSingleBucket()andclearBucketLeading()are S3-aware (read/write/cleanup old refs)objectStorage?threaded through the full chain, all optional fields, zero breaking changesDesign Decisions
bucket-data/<group>/<def>/<bucket>/<minOp>-<maxOp>compressed_size * 3heuristic keeps batch memory bounded for S3-backed docsManual Verification [TODO]
S3ObjectStorage is not exercised in CI. To manually validate before shipping:
minio server /tmp/minio-data --console-address :9001object_storagewithendpoint: http://localhost:9000and S3 typemc lsforcePathStyle: trueis already set whenendpointis present (required by MinIO).