STORM-2359: Add progress-based tuple timeout (topology.message.progress.timeout.secs)#8787
Open
Gowtham-Gowts wants to merge 7 commits into
Open
STORM-2359: Add progress-based tuple timeout (topology.message.progress.timeout.secs)#8787Gowtham-Gowts wants to merge 7 commits into
Gowtham-Gowts wants to merge 7 commits into
Conversation
Contributor
|
@GGraziadei would appreciate your look for a review here too. Thx. |
Adds topology.message.progress.timeout.secs as a new optional topology configuration. When set, tuple trees will only be expired if no bolt has acked any tuple in the tree for the configured number of seconds, rather than expiring on wall-clock time since emit. Default is null (disabled), preserving existing behavior for all topologies that do not set this config.
Adds a package-private method that returns an unmodifiable view of the oldest (soonest-to-expire) bucket without removing it or triggering the expiry callback. This is used by Acker.rescueRecentlyActiveEntries() to inspect which tuple trees are about to be evicted on the next rotate(), so that trees with recent bolt activity can be re-inserted (rescued) before rotation removes them. The returned map is unmodifiable and callers must not retain references across calls to rotate() or put().
Core implementation of STORM-2359. Changes to Acker: - AckObject gains a lastProgressTime field (8 bytes per in-flight tree) initialized to startTime and updated on every updateAck() call. This records the wall-clock time of the most recent partial bolt ack for the tree. - prepare() reads topology.message.progress.timeout.secs from topoConf. When set, progressTimeoutEnabled=true and progressTimeoutMs is set. - execute() on tick: when progressTimeoutEnabled, calls rescueRecentlyActiveEntries() before pending.rotate(). - rescueRecentlyActiveEntries(): scans the oldest bucket (the one about to be evicted by rotate()), collects entries whose lastProgressTime is within the progress window, and re-inserts them via pending.put(). put() moves the key to the head bucket, rescuing it from eviction. Keys are collected into a list first to avoid ConcurrentModification. AckObject visibility changed from private static to package-private static to allow direct access from unit tests. When topology.message.progress.timeout.secs is null (default), behavior is identical to the original implementation — rescueRecentlyActiveEntries is never called and there is no overhead on the tick path. Memory cost: +8 bytes per in-flight tuple tree (lastProgressTime long). At 1M concurrent trees that is ~8MB, within normal heap budgets. No Thrift IDL changes. No new stream IDs. No bolt/spout API changes. Fully backward compatible. Related: apache#6141 (STORM-2359) Related: apache#7296 (STORM-3514)
Adds a WARN log when topology.enable.message.timeouts=false is combined with a positive topology.max.spout.pending on a spout executor. This combination causes orphaned tuple trees from dead workers to accumulate in the spout pending map forever. Once the map fills to max.spout.pending, nextTuple() is never called again — permanently stalling the spout. This behavior was reported in STORM-3514 (closed Won't Fix) and re-surfaced in the dev@ mailing list thread from 2026. The warning fires at executor setup time so operators see it in the worker logs on topology startup, before the problem manifests. See: apache#7296 (STORM-3514)
AckerProgressTimeoutTest (5 tests): - testFeatureDisabledByDefault_treeExpiresOnWallClockRotation: verifies no behavior change when config is unset - testProgressRescuesActiveTreeFromExpiry: tree with recent bolt ack must survive rotation - testNoProgressExpiresTree: tree with no bolt activity must still expire (orphan reclamation preserved) - testFullAckCompletesTreeNormally: normal completion (XOR==0) must still emit ACK to spout with progress timeout enabled - testProgressTimeoutLargerThanWallClockIsEffectivelyNoop: setting progress timeout > wall-clock timeout does not break anything RotatingMapPeekTest (6 tests): - testPeekOldestBucketEmptyOnCreation - testPeekOldestBucketDoesNotContainFreshlyPutKey - testPeekOldestBucketContainsKeyAfterRotations - testPeekOldestBucketAfterReinsertion: verifies rescue pattern works - testPeekOldestBucketIsUnmodifiable: returned map must throw on mutation - testPeekOldestBucketAfterEviction: evicted key not visible All tests use Time.SimulatedTime for deterministic time-based assertions, following Storm's testing best practice (DEVELOPER.md).
Acker (org.apache.storm.daemon) calls peekOldestBucket() on RotatingMap (org.apache.storm.utils). Package-private visibility does not work across packages — method must be public.
- conf/defaults.yaml: add null default entry with explanatory comment - docs/Guaranteeing-message-processing.md: add new section explaining the progress-based timeout feature, when to use it, configuration example, and the STORM-3514 dangerous config warning
eb98f66 to
05506e7
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds
topology.message.progress.timeout.secs— an opt-in config that changestuple expiry from wall-clock-since-emit to time-since-last-bolt-progress.
When set, a tuple tree is only expired if no bolt has acked any tuple in the
tree for N seconds. Tuple trees actively being processed (even if queued under
backpressure) are rescued from RotatingMap rotation as long as forward progress
is being made.
topology.message.timeout.secsremains the hard upper-bound, unchanged.Motivation
Raised on dev@ (June 2026):
https://lists.apache.org/thread/tjcso52j1tjk6d344vjfmkd9ngy6k2rb
Users face an inherent tradeoff:
This config dissolves that tradeoff. Discussed with rzo1 on the issue tracker:
#6141
Changes
Config.javaTOPOLOGY_MESSAGE_PROGRESS_TIMEOUT_SECSconstantAcker.javalastProgressTimeinAckObject;rescueRecentlyActiveEntries()RotatingMap.javapeekOldestBucket()package-private methodExecutor.javaAckerProgressTimeoutTest.javaRotatingMapPeekTest.javaconf/defaults.yamldocs/Guaranteeing-message-processing.mdBackward Compatibility
topology.message.progress.timeout.secsisnull.Zero behavior change for all existing topologies.
longfield)Related Issues