perf(webapp): parallelize Phase 2 streaming batch-item ingest (TRI-10273)#3777
Conversation
…273) Phase 2 of the v3 streaming batch API (POST /api/v3/batches/:batchId/items) processed streamed items strictly sequentially. For batches of many large payloads — each offloaded to object storage inline — this serialized N object-store round-trips inside one request, blowing past Node's default 300s server.requestTimeout. The webapp then returned 408, which the SDK reads as "408 terminated" and retries 5x, turning a slow ingest into a ~26-minute failure. Ingest now runs through p-map over the NDJSON async iterable with bounded concurrency (STREAMING_BATCH_INGEST_CONCURRENCY, default 10). p-map pulls lazily, so at most `concurrency` items are read/in-flight at once — bounding peak memory to roughly concurrency x STREAMING_BATCH_ITEM_MAXIMUM_SIZE while preserving stream backpressure. Set the env to 1 for fully sequential ingestion. Safe by construction: run order derives from each item's index (enqueue timestamp = batch.createdAt + index), and enqueueBatchItem dedups atomically per index — neither depends on processing order. The NDJSON parser now stamps oversized-item markers with their emit position, removing the consumer's sequential lastIndex assumption. The count-check + conditional seal path is unchanged. Tests: bounded-concurrency ingest of a 100-item batch, in-flight cap assertion, concurrent dedup on Phase 2 retry, and emit-position marker indexing. Full existing sealing/idempotency suite still green (42/42). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (3)
📜 Recent review details⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
WalkthroughThis pull request refactors the Phase 2 streaming batch ingest endpoint to process NDJSON items with bounded concurrency rather than sequentially. It introduces a configurable Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Preview deployment for your docs. Learn more about Mintlify Previews.
💡 Tip: Enable Workflows to automatically generate PRs for you. |
- Enforce positive STREAMING_BATCH_INGEST_CONCURRENCY in the env schema (.int().positive()) — p-map requires concurrency >= 1, so 0/negative would throw at runtime. - Apply the same out-of-range index guard to oversized-item markers as normal items, so an oversized item with index >= runCount returns a 4xx instead of creating a stray pre-failed run. Covered by a new test. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Problem
Phase 2 of the v3 streaming batch API (
POST /api/v3/batches/:batchId/items) processed streamed items strictly sequentially. For a batch of many large payloads — each offloaded to object storage inline — this serialized N object-store round-trips inside a single request, exceeding Node's defaultserver.requestTimeout(300s). The webapp returned408, which the SDK reads as408 terminatedand retries 5×, turning a slow ingest into a ~26-minute failure (BatchTriggerError: Failed to stream items ... 408 terminated).Closes TRI-10273 — https://linear.app/triggerdotdev/issue/TRI-10273
Fix
Ingest now runs through
p-mapover the NDJSON async iterable with bounded concurrency (STREAMING_BATCH_INGEST_CONCURRENCY, default 10):p-mappulls lazily from the stream — at mostconcurrencyitems are read/in-flight at once, so peak memory is bounded to roughlyconcurrency × STREAMING_BATCH_ITEM_MAXIMUM_SIZEand request-body backpressure is preserved.1for fully sequential ingestion (escape hatch).env.server.ts; the service takes a requirednumber.Why this is safe (ordering/idempotency unchanged)
timestamp = batch.createdAt + index), not enqueue order.enqueueBatchItem.lastIndexassumption (the only order-dependent bit).Tests
webapptypecheck clean.Follow-ups (not in this PR)
application/storerefs instead of raw blobs) to remove object-store work from the request hot path and shrink the request body — bigger, protocol-level change.server.requestTimeoutbump as a safety net.🤖 Generated with Claude Code