Add jitter control loop and JitterAwareStreamGrouping#8593
Conversation
# Conflicts: # storm-client/src/jvm/org/apache/storm/Config.java # storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java # storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java # storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
|
@GGraziadei can this one be closed? |
|
Hi @reiabreu I am currently working asynchronously to propose a code change aimed at stabilizing jitter without impacting throughput or increasing latency. Unlike the discrete event simulation results, the feedback tuple being evaluated with storm perf is currently degrading performance. To address this, I am trying to create a feedback tick stream to replace the current ratio-based mechanism. If performance remains degraded even after this modification, I will post an update and either close the PR or ask for further help in Dev maillist. |
|
Hi @reiabreu, I can confirm that the progress on this PR hasn't been lost. I successfully reproduced these results locally using the storm-perf/FileReadWordCountTopo on a storm development cluster. I am currently refining the solution and will commit the changes to convert this draft into a formal PR soon. Apologies for the delay—it took some time to verify, but I am confident that using the FeedbackStream tick is the right approach to propagate the control signal without impacting too much on throughput or increasing latency.
|
ec2950b to
d8e7cda
Compare
JitterAwareStreamGrouping
BenchmarkTo validate this PR I executed a couple of benchmarks to be sure:
I executed the bench with this configuration: I asked an LLM to prepare the attached report, given the benchmark outcome. Attached to this comment are also the raw data (storm-perf result + Grafana metrics) Experimental Results- Impact of FeedbackTuple on Jitter-Aware Stream Grouping.pdf |
rzo1
left a comment
There was a problem hiding this comment.
Hi @GGraziadei, thanks for sticking with this one. The test coverage is solid (the suffixed-gauge regression guard and the Kryo round-trip tests catch exactly the failure modes that would otherwise silently fall back to round-robin), and the tick mechanism follows the existing metrics-tick pattern, which makes it easy to follow.
I have concerns about the benchmark results and a handful of implementation issues that I think need to be addressed before we can merge this.
Benchmark
Looking closer at the numbers from your last comment, I don't think they show what we need them to show:
- Both runs are operating way past saturation. The baseline reports complete latencies of 29-39s, which is at or above the default
topology.message.timeout.secsof 30, so a large fraction of baseline tuples time out and fail (acks are about 3% of transferred). Withmax.spout.pending=4000the ack throughput is simply4000 / complete_latency: 4000/29 is roughly 138/s vs. your measured 169, and 4000/16.7 is roughly 240/s vs. your measured 264. So the +56% at the 120s mark mostly reflects that one run crossed the timeout threshold and the other didn't. - The trend reverses later in the run. The feedback run decays from 3547 to 2615 to 716 msg/s and ends up worse than the baseline in the 240s window (716 vs. 1205). That pattern needs an explanation, see also my comment on the routing policy below.
- The baseline should be
LoadAwareShuffleGroupingrather than round-robin. That's what users would actually run here, and it already steers away from loaded tasks. Beating plain round-robin on deliberately skewed workers is the easiest possible win. - The skew in the benchmark doesn't really exercise the jitter signal.
JitteryWorkerBoltgives task i a delay ofi*base + uniform(0, base), so the mean differs per task but the noise width (which is what RFC-1889 jitter measures) is the same for all of them. Whatever signal the grouping picks up comes indirectly from queueing-induced variance under overload. If the goal is routing to the least loaded task, latency level or queue depth seems like the more appropriate signal than jitter.
Could you rerun at an operating point where complete latency stays well below the message timeout, with a few repetitions, and against load-aware shuffle? One more thing: in the table from June 6, avg and max complete latency are identical in both columns, which looks like the same field was read twice. Worth double-checking the extraction.
Routing policy
chooseTasks currently returns the single lowest-jitter task for every tuple between feedback updates (which are 2-10s apart). All upstream tasks independently pick the same target, overload it, and stampede to the next one on the following report. That's bang-bang control and would explain the decaying throughput above. Power-of-two-choices among the best candidates, or weighted selection, would avoid the herd. LoadAwareShuffleGrouping already implements that shape and is a good template.
Implementation issues (details inline)
- System components take part in the feedback loop. Ackers send useless feedback to all user tasks, and metrics-consumer bolts (added to the system topology after
addUpstreamFeedback) emit on an undeclared stream, which throws in local mode via theTupleImplsanity check and kills the executor. ConfigUtils.upstreamFeedbackStreamIdhas no default (ObjectReader.getString(null)throws) and is resolved unconditionally in theExecutorconstructor, even with the feature disabled. The javadoc promises a__feedbackdefault.- The stream id config is only validated as a string. Setting it to e.g.
"default"makesaddUpstreamFeedbackoverwrite a user-declared stream's output fields on every component. I'd just make it a constant. - The Kryo registration is inserted mid-list and shifts the registration ids of
ConsList,BackPressureStatus,NodeInfoand friends. Please append at the end. - The feature silently depends on
topology.stats.ewma.enable, which defaults to false. Without it, all records are VOID and the grouping round-robins forever. We should at least log a warning, better fail validation.
I also looked at this from a security angle since it adds a new wire type and message path. The new Kryo type is a record of three doubles, no gadget potential, and the receive path does proper instanceof checks instead of casting, good. One thing for the docs: this turns data-plane tuples into a routing control signal, so with storm.messaging.netty.authentication=false (the default) forged feedback could deterministically steer a topology's traffic to a single task. That's within Storm's existing trust model, but the config docs should recommend netty auth/TLS when enabling this.
Happy to take another look once the system-component and config issues are fixed and there's a benchmark against load-aware shuffle at a non-saturated operating point.
| setupTicks(false); | ||
| setupMetrics(); | ||
| if (upstreamFeedbackEnabled) { | ||
| scheduleUpstreamFeedbackTick(upstreamFeedbackFreqSecs); |
There was a problem hiding this comment.
This schedules the feedback tick for every bolt executor, including system components. Two problems with that:
- The acker's upstream sources are every spout and bolt in the topology, so each acker task sends an all-VOID feedback tuple (ackers have no jitter gauges) to every user task. Pure overhead.
- Metrics-consumer bolts are added to the system topology after
addUpstreamFeedback, so they never get the__feedbackstream declared. When their tick fires,Task.getTuplecreates aTupleImplon an undeclared stream. In local mode (doSanityCheck) thegetComponentOutputFieldslookup throws and kills the executor.
Guarding with !Utils.isSystemId(componentId) here should fix both.
|
|
||
| public static String upstreamFeedbackStreamId(Map<String, Object> conf) { | ||
| Object value = conf.get(Config.TOPOLOGY_UPSTREAM_FEEDBACK_STREAM_ID); | ||
| return ObjectReader.getString(value); |
There was a problem hiding this comment.
ObjectReader.getString(value) throws on null, and this is called unconditionally in the Executor constructor even when the feature is disabled, so any conf map without the key crashes every executor at startup. The javadoc in Config promises a __feedback default, so this should be
return ObjectReader.getString(value, "__feedback");Though see my comment on Config.java, I'd prefer dropping the config entirely.
| * Defaults to "__feedback" if not explicitly configured. | ||
| */ | ||
| @IsString | ||
| public static final String TOPOLOGY_UPSTREAM_FEEDBACK_STREAM_ID = "topology.upstream.feedback.stream"; |
There was a problem hiding this comment.
User stream ids can't start with __, but this config is only @IsString. If someone sets it to "default", addUpstreamFeedback calls put_to_streams("default", ...) on every component and silently replaces user-declared output fields topology-wide. Is there a use case for making this configurable at all? FEEDBACK_TICK_STREAM_ID is already a constant, I'd put this one next to it in Constants. If it has to stay configurable, validate the __ prefix.
| k.register(Values.class); | ||
| k.register(org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class); | ||
| k.register(org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class); | ||
| k.register(org.apache.storm.executor.EwmaFeedbackRecord.class); |
There was a problem hiding this comment.
Registering here shifts the sequential Kryo ids of everything below (ConsList, BackPressureStatus, NodeInfo, ...). BackPressureStatus crosses the wire between workers, and if two communicating JVMs ever disagree on the list (mixed-version supervisors during a rolling upgrade) deserialization corrupts silently. Please append new registrations at the end of the list.
| } | ||
|
|
||
| @Override | ||
| public List<Integer> chooseTasks(int taskId, List<Object> values) { |
There was a problem hiding this comment.
Winner-take-all between feedback updates means every upstream task pins the same lowest-jitter target for the whole feedback interval, overloads it, and the herd moves on at the next report. Consider power-of-two-choices or weighted selection, see LoadAwareShuffleGrouping.
Small doc nit: the comment says "averaged value" but EwmaFeedbackRecord.aggregate takes the max across per-source gauges.
| import java.util.Set; | ||
| import org.apache.storm.Config; | ||
| import org.apache.storm.Thrift; | ||
| import org.apache.storm.executor.ChildEwmaStats; |
There was a problem hiding this comment.
This file only adds an unused import, leftover from an earlier revision? Please drop.
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import org.apache.storm.Config; | ||
| import org.apache.storm.executor.ChildEwmaStats; |
There was a problem hiding this comment.
Same here, unused import. Please drop.
| * each subsequent task is progressively slower. This mimics real-world conditions (GC pressure, | ||
| * I/O, resource contention) where downstream tasks diverge in responsiveness. With upstream | ||
| * feedback enabled, {@link JitterAwareStreamGrouping} routes more tuples to the fastest tasks, | ||
| * improving throughput and reducing complete latency by ≥10% compared to plain round-robin. |
There was a problem hiding this comment.
The javadoc claims a 10%+ improvement over round-robin. Let's keep performance claims out of the code until the benchmark questions are settled. Also, a missing input.file currently fails with a bare NPE from new FileInputStream(null), a short usage message would help.
What is the purpose of the change
This PR introduces an optional jitter control loop that propagates EWMA jitter measurements to upstream components in a thread-safe manner without imapcting on troughput and latency.
This control signal is utilized by
JitterAwareStreamGroupingto dynamically route traffic through cluster components based on real-time task jitter.How was the change tested
storm-perf/JitterAwareGroupingTopology(see results in the comment)In the context of #8583