Skip to content

Add jitter control loop and JitterAwareStreamGrouping#8593

Open
GGraziadei wants to merge 27 commits into
apache:masterfrom
GGraziadei:8538-jitter-control-loop
Open

Add jitter control loop and JitterAwareStreamGrouping#8593
GGraziadei wants to merge 27 commits into
apache:masterfrom
GGraziadei:8538-jitter-control-loop

Conversation

@GGraziadei

@GGraziadei GGraziadei commented May 10, 2026

Copy link
Copy Markdown
Contributor

What is the purpose of the change

This PR introduces an optional jitter control loop that propagates EWMA jitter measurements to upstream components in a thread-safe manner without imapcting on troughput and latency.
This control signal is utilized by JitterAwareStreamGrouping to dynamically route traffic through cluster components based on real-time task jitter.

  • Reduces backpressure: Dynamically steers traffic away from bottlenecked or high-jitter tasks.
  • Improves cluster stability: Minimizes systemic task jitter by balancing the load more intelligently.
  • Boosts overall perf: Optimizes general system throughput and resource utilization.

How was the change tested

  • Unit test
  • Smoke test on dev-cluster
  • Benchmark: storm-perf/JitterAwareGroupingTopology (see results in the comment)

In the context of #8583

# Conflicts:
#	storm-client/src/jvm/org/apache/storm/Config.java
#	storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
#	storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
#	storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@reiabreu

reiabreu commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

@GGraziadei can this one be closed?

@GGraziadei

Copy link
Copy Markdown
Contributor Author

Hi @reiabreu

I am currently working asynchronously to propose a code change aimed at stabilizing jitter without impacting throughput or increasing latency.

Unlike the discrete event simulation results, the feedback tuple being evaluated with storm perf is currently degrading performance.

To address this, I am trying to create a feedback tick stream to replace the current ratio-based mechanism.

If performance remains degraded even after this modification, I will post an update and either close the PR or ask for further help in Dev maillist.

@GGraziadei

Copy link
Copy Markdown
Contributor Author

Hi @reiabreu, I can confirm that the progress on this PR hasn't been lost.

I successfully reproduced these results locally using the storm-perf/FileReadWordCountTopo on a storm development cluster. I am currently refining the solution and will commit the changes to convert this draft into a formal PR soon.

Apologies for the delay—it took some time to verify, but I am confident that using the FeedbackStream tick is the right approach to propagate the control signal without impacting too much on throughput or increasing latency.

Metric No FeedbackTuple (Reference) FeedbackTuple Variance
Elapsed Time (sec) 120 120 0.00%
Workers 2 2 0.00%
Tasks 7 7 0.00%
Executors 7 7 0.00%
Transferred (messages) 35,338,000 34,518,000 -2.32%
Transfer Rate (messages/s) 588,966 575,300 -2.32%
Spout Executors 1 1 0.00%
Spout Transferred (messages) 4,444,000 4,388,000 -1.26%
Spout Acks 4,412,000 4,298,000 -2.58%
Spout Throughput (acks/s) 74,066 73,133 -1.26%
Spout Avg Complete Latency (ms) 457.837 467.308 +2.07%
Spout Max Complete Latency (ms) 457.837 467.308 +2.07%

@GGraziadei GGraziadei force-pushed the 8538-jitter-control-loop branch from ec2950b to d8e7cda Compare June 7, 2026 16:36
@GGraziadei GGraziadei changed the title jitter control loop for predictive backpressure scheduling Add jitter control loop and JitterAwareStreamGrouping Jun 12, 2026
@GGraziadei

GGraziadei commented Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

Benchmark

To validate this PR I executed a couple of benchmarks to be sure:

  • No throughput/latency degradation
  • A real advantage to use JitterAwareStreamGrouping in stressed env
    The benchmark is available in storm-perf/JitterAwareGroupingTopology

I executed the bench with this configuration:

topology.upstream.feedback.freq.secs: 2

I asked an LLM to prepare the attached report, given the benchmark outcome.

Attached to this comment are also the raw data (storm-perf result + Grafana metrics)

(baseline)
------------------------------------------------------------------------------------------------------------------
elapsed (sec),workers,tasks,executors,transferred (messages),transfer rate (messages/s),spout_executors,spout_transferred (messages),spout_acks,spout_throughput (acks/s),spout_avg_complete_latency(ms),spout_max_complete_latency(ms)
------------------------------------------------------------------------------------------------------------------
60,2,10,10,0,0,0,0,0,0,NaN,0.000
120,2,10,10,136220,2270,1,10180,4800,169,29085.146,29085.146
180,2,10,10,74860,1247,1,5060,1640,84,35036.022,35036.022
240,2,10,10,72340,1205,1,4580,1680,76,39128.936,39128.936

Grafana graphs: https://snapshots.raintank.io/dashboard/snapshot/9258h5bMeFXXgdCJ5pcRDZpewUvaGt0P

(feedback control loop)
------------------------------------------------------------------------------------------------------------------
elapsed (sec),workers,tasks,executors,transferred (messages),transfer rate (messages/s),spout_executors,spout_transferred (messages),spout_acks,spout_throughput (acks/s),spout_avg_complete_latency(ms),spout_max_complete_latency(ms)
------------------------------------------------------------------------------------------------------------------
60,2,10,10,0,0,0,0,0,0,NaN,0.000
120,2,10,10,212840,3547,1,15880,11960,264,16683.642,16683.642
180,2,10,10,156920,2615,1,10800,8800,180,16439.405,16683.642
240,2,10,10,42980,716,1,5140,1180,85,15821.700,16683.642

Grafana graphs:
https://snapshots.raintank.io/dashboard/snapshot/t5D0t0X5FDbdUCUJut1bsdvTIIpdWepu

Experimental Results- Impact of FeedbackTuple on Jitter-Aware Stream Grouping.pdf

@GGraziadei GGraziadei marked this pull request as ready for review June 12, 2026 17:01

@rzo1 rzo1 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @GGraziadei, thanks for sticking with this one. The test coverage is solid (the suffixed-gauge regression guard and the Kryo round-trip tests catch exactly the failure modes that would otherwise silently fall back to round-robin), and the tick mechanism follows the existing metrics-tick pattern, which makes it easy to follow.

I have concerns about the benchmark results and a handful of implementation issues that I think need to be addressed before we can merge this.

Benchmark

Looking closer at the numbers from your last comment, I don't think they show what we need them to show:

  1. Both runs are operating way past saturation. The baseline reports complete latencies of 29-39s, which is at or above the default topology.message.timeout.secs of 30, so a large fraction of baseline tuples time out and fail (acks are about 3% of transferred). With max.spout.pending=4000 the ack throughput is simply 4000 / complete_latency: 4000/29 is roughly 138/s vs. your measured 169, and 4000/16.7 is roughly 240/s vs. your measured 264. So the +56% at the 120s mark mostly reflects that one run crossed the timeout threshold and the other didn't.
  2. The trend reverses later in the run. The feedback run decays from 3547 to 2615 to 716 msg/s and ends up worse than the baseline in the 240s window (716 vs. 1205). That pattern needs an explanation, see also my comment on the routing policy below.
  3. The baseline should be LoadAwareShuffleGrouping rather than round-robin. That's what users would actually run here, and it already steers away from loaded tasks. Beating plain round-robin on deliberately skewed workers is the easiest possible win.
  4. The skew in the benchmark doesn't really exercise the jitter signal. JitteryWorkerBolt gives task i a delay of i*base + uniform(0, base), so the mean differs per task but the noise width (which is what RFC-1889 jitter measures) is the same for all of them. Whatever signal the grouping picks up comes indirectly from queueing-induced variance under overload. If the goal is routing to the least loaded task, latency level or queue depth seems like the more appropriate signal than jitter.

Could you rerun at an operating point where complete latency stays well below the message timeout, with a few repetitions, and against load-aware shuffle? One more thing: in the table from June 6, avg and max complete latency are identical in both columns, which looks like the same field was read twice. Worth double-checking the extraction.

Routing policy

chooseTasks currently returns the single lowest-jitter task for every tuple between feedback updates (which are 2-10s apart). All upstream tasks independently pick the same target, overload it, and stampede to the next one on the following report. That's bang-bang control and would explain the decaying throughput above. Power-of-two-choices among the best candidates, or weighted selection, would avoid the herd. LoadAwareShuffleGrouping already implements that shape and is a good template.

Implementation issues (details inline)

  1. System components take part in the feedback loop. Ackers send useless feedback to all user tasks, and metrics-consumer bolts (added to the system topology after addUpstreamFeedback) emit on an undeclared stream, which throws in local mode via the TupleImpl sanity check and kills the executor.
  2. ConfigUtils.upstreamFeedbackStreamId has no default (ObjectReader.getString(null) throws) and is resolved unconditionally in the Executor constructor, even with the feature disabled. The javadoc promises a __feedback default.
  3. The stream id config is only validated as a string. Setting it to e.g. "default" makes addUpstreamFeedback overwrite a user-declared stream's output fields on every component. I'd just make it a constant.
  4. The Kryo registration is inserted mid-list and shifts the registration ids of ConsList, BackPressureStatus, NodeInfo and friends. Please append at the end.
  5. The feature silently depends on topology.stats.ewma.enable, which defaults to false. Without it, all records are VOID and the grouping round-robins forever. We should at least log a warning, better fail validation.

I also looked at this from a security angle since it adds a new wire type and message path. The new Kryo type is a record of three doubles, no gadget potential, and the receive path does proper instanceof checks instead of casting, good. One thing for the docs: this turns data-plane tuples into a routing control signal, so with storm.messaging.netty.authentication=false (the default) forged feedback could deterministically steer a topology's traffic to a single task. That's within Storm's existing trust model, but the config docs should recommend netty auth/TLS when enabling this.

Happy to take another look once the system-component and config issues are fixed and there's a benchmark against load-aware shuffle at a non-saturated operating point.

setupTicks(false);
setupMetrics();
if (upstreamFeedbackEnabled) {
scheduleUpstreamFeedbackTick(upstreamFeedbackFreqSecs);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This schedules the feedback tick for every bolt executor, including system components. Two problems with that:

  1. The acker's upstream sources are every spout and bolt in the topology, so each acker task sends an all-VOID feedback tuple (ackers have no jitter gauges) to every user task. Pure overhead.
  2. Metrics-consumer bolts are added to the system topology after addUpstreamFeedback, so they never get the __feedback stream declared. When their tick fires, Task.getTuple creates a TupleImpl on an undeclared stream. In local mode (doSanityCheck) the getComponentOutputFields lookup throws and kills the executor.

Guarding with !Utils.isSystemId(componentId) here should fix both.


public static String upstreamFeedbackStreamId(Map<String, Object> conf) {
Object value = conf.get(Config.TOPOLOGY_UPSTREAM_FEEDBACK_STREAM_ID);
return ObjectReader.getString(value);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ObjectReader.getString(value) throws on null, and this is called unconditionally in the Executor constructor even when the feature is disabled, so any conf map without the key crashes every executor at startup. The javadoc in Config promises a __feedback default, so this should be

return ObjectReader.getString(value, "__feedback");

Though see my comment on Config.java, I'd prefer dropping the config entirely.

* Defaults to "__feedback" if not explicitly configured.
*/
@IsString
public static final String TOPOLOGY_UPSTREAM_FEEDBACK_STREAM_ID = "topology.upstream.feedback.stream";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User stream ids can't start with __, but this config is only @IsString. If someone sets it to "default", addUpstreamFeedback calls put_to_streams("default", ...) on every component and silently replaces user-declared output fields topology-wide. Is there a use case for making this configurable at all? FEEDBACK_TICK_STREAM_ID is already a constant, I'd put this one next to it in Constants. If it has to stay configurable, validate the __ prefix.

k.register(Values.class);
k.register(org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class);
k.register(org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class);
k.register(org.apache.storm.executor.EwmaFeedbackRecord.class);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registering here shifts the sequential Kryo ids of everything below (ConsList, BackPressureStatus, NodeInfo, ...). BackPressureStatus crosses the wire between workers, and if two communicating JVMs ever disagree on the list (mixed-version supervisors during a rolling upgrade) deserialization corrupts silently. Please append new registrations at the end of the list.

}

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Winner-take-all between feedback updates means every upstream task pins the same lowest-jitter target for the whole feedback interval, overloads it, and the herd moves on at the next report. Consider power-of-two-choices or weighted selection, see LoadAwareShuffleGrouping.

Small doc nit: the comment says "averaged value" but EwmaFeedbackRecord.aggregate takes the max across per-source gauges.

import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.Thrift;
import org.apache.storm.executor.ChildEwmaStats;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file only adds an unused import, leftover from an earlier revision? Please drop.

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.Config;
import org.apache.storm.executor.ChildEwmaStats;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, unused import. Please drop.

* each subsequent task is progressively slower. This mimics real-world conditions (GC pressure,
* I/O, resource contention) where downstream tasks diverge in responsiveness. With upstream
* feedback enabled, {@link JitterAwareStreamGrouping} routes more tuples to the fastest tasks,
* improving throughput and reducing complete latency by &ge;10% compared to plain round-robin.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc claims a 10%+ improvement over round-robin. Let's keep performance claims out of the code until the benchmark questions are settled. Also, a missing input.file currently fails with a bare NPE from new FileInputStream(null), a short usage message would help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants