Skip to content

STORM-2359: Add progress-based tuple timeout (topology.message.progress.timeout.secs)#8787

Open
Gowtham-Gowts wants to merge 7 commits into
apache:masterfrom
Gowtham-Gowts:STORM-2359
Open

STORM-2359: Add progress-based tuple timeout (topology.message.progress.timeout.secs)#8787
Gowtham-Gowts wants to merge 7 commits into
apache:masterfrom
Gowtham-Gowts:STORM-2359

Conversation

@Gowtham-Gowts

Copy link
Copy Markdown

Summary

Adds topology.message.progress.timeout.secs — an opt-in config that changes
tuple expiry from wall-clock-since-emit to time-since-last-bolt-progress.

When set, a tuple tree is only expired if no bolt has acked any tuple in the
tree for N seconds. Tuple trees actively being processed (even if queued under
backpressure) are rescued from RotatingMap rotation as long as forward progress
is being made.

topology.message.timeout.secs remains the hard upper-bound, unchanged.

Motivation

Raised on dev@ (June 2026):
https://lists.apache.org/thread/tjcso52j1tjk6d344vjfmkd9ngy6k2rb

Users face an inherent tradeoff:

  • Short timeout → orphans reclaimed quickly, but live tuples fail under backpressure
  • Long timeout → backpressure-safe, but orphan reclamation is slow after worker failure

This config dissolves that tradeoff. Discussed with rzo1 on the issue tracker:
#6141

Changes

File Change
Config.java New TOPOLOGY_MESSAGE_PROGRESS_TIMEOUT_SECS constant
Acker.java lastProgressTime in AckObject; rescueRecentlyActiveEntries()
RotatingMap.java peekOldestBucket() package-private method
Executor.java WARN log for STORM-3514 dangerous config combo
AckerProgressTimeoutTest.java 5 new unit tests
RotatingMapPeekTest.java 6 new unit tests
conf/defaults.yaml null default entry with comment
docs/Guaranteeing-message-processing.md New section

Backward Compatibility

  • Off by defaulttopology.message.progress.timeout.secs is null.
    Zero behavior change for all existing topologies.
  • No Thrift IDL changes
  • No new stream IDs
  • No bolt or spout API changes
  • Memory cost: +8 bytes per in-flight tuple tree (one long field)

Related Issues

@rzo1

rzo1 commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

@GGraziadei would appreciate your look for a review here too. Thx.

Adds topology.message.progress.timeout.secs as a new optional topology
configuration. When set, tuple trees will only be expired if no bolt
has acked any tuple in the tree for the configured number of seconds,
rather than expiring on wall-clock time since emit.

Default is null (disabled), preserving existing behavior for all
topologies that do not set this config.
Adds a package-private method that returns an unmodifiable view of the
oldest (soonest-to-expire) bucket without removing it or triggering
the expiry callback.

This is used by Acker.rescueRecentlyActiveEntries() to inspect which
tuple trees are about to be evicted on the next rotate(), so that
trees with recent bolt activity can be re-inserted (rescued) before
rotation removes them.

The returned map is unmodifiable and callers must not retain references
across calls to rotate() or put().
Core implementation of STORM-2359.

Changes to Acker:
- AckObject gains a lastProgressTime field (8 bytes per in-flight tree)
  initialized to startTime and updated on every updateAck() call. This
  records the wall-clock time of the most recent partial bolt ack for
  the tree.
- prepare() reads topology.message.progress.timeout.secs from topoConf.
  When set, progressTimeoutEnabled=true and progressTimeoutMs is set.
- execute() on tick: when progressTimeoutEnabled, calls
  rescueRecentlyActiveEntries() before pending.rotate().
- rescueRecentlyActiveEntries(): scans the oldest bucket (the one about
  to be evicted by rotate()), collects entries whose lastProgressTime is
  within the progress window, and re-inserts them via pending.put().
  put() moves the key to the head bucket, rescuing it from eviction.
  Keys are collected into a list first to avoid ConcurrentModification.

AckObject visibility changed from private static to package-private
static to allow direct access from unit tests.

When topology.message.progress.timeout.secs is null (default), behavior
is identical to the original implementation — rescueRecentlyActiveEntries
is never called and there is no overhead on the tick path.

Memory cost: +8 bytes per in-flight tuple tree (lastProgressTime long).
At 1M concurrent trees that is ~8MB, within normal heap budgets.

No Thrift IDL changes. No new stream IDs. No bolt/spout API changes.
Fully backward compatible.

Related: apache#6141 (STORM-2359)
Related: apache#7296 (STORM-3514)
Adds a WARN log when topology.enable.message.timeouts=false is combined
with a positive topology.max.spout.pending on a spout executor.

This combination causes orphaned tuple trees from dead workers to
accumulate in the spout pending map forever. Once the map fills to
max.spout.pending, nextTuple() is never called again — permanently
stalling the spout. This behavior was reported in STORM-3514 (closed
Won't Fix) and re-surfaced in the dev@ mailing list thread from 2026.

The warning fires at executor setup time so operators see it in the
worker logs on topology startup, before the problem manifests.

See: apache#7296 (STORM-3514)
AckerProgressTimeoutTest (5 tests):
- testFeatureDisabledByDefault_treeExpiresOnWallClockRotation: verifies
  no behavior change when config is unset
- testProgressRescuesActiveTreeFromExpiry: tree with recent bolt ack
  must survive rotation
- testNoProgressExpiresTree: tree with no bolt activity must still expire
  (orphan reclamation preserved)
- testFullAckCompletesTreeNormally: normal completion (XOR==0) must still
  emit ACK to spout with progress timeout enabled
- testProgressTimeoutLargerThanWallClockIsEffectivelyNoop: setting
  progress timeout > wall-clock timeout does not break anything

RotatingMapPeekTest (6 tests):
- testPeekOldestBucketEmptyOnCreation
- testPeekOldestBucketDoesNotContainFreshlyPutKey
- testPeekOldestBucketContainsKeyAfterRotations
- testPeekOldestBucketAfterReinsertion: verifies rescue pattern works
- testPeekOldestBucketIsUnmodifiable: returned map must throw on mutation
- testPeekOldestBucketAfterEviction: evicted key not visible

All tests use Time.SimulatedTime for deterministic time-based assertions,
following Storm's testing best practice (DEVELOPER.md).
Acker (org.apache.storm.daemon) calls peekOldestBucket() on RotatingMap
(org.apache.storm.utils). Package-private visibility does not work across
packages — method must be public.
- conf/defaults.yaml: add null default entry with explanatory comment
- docs/Guaranteeing-message-processing.md: add new section explaining
  the progress-based timeout feature, when to use it, configuration
  example, and the STORM-3514 dangerous config warning
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants