Skip to content

Work-stealing imbalance causing idle workers in short-duration parallel for_each tasks on Android #792

@yijianshuangdiao

Description

@yijianshuangdiao

Description

We observed an issue where some worker threads remain completely idle while others execute all iterations during for_each_index parallel execution. This happens across all three partitioner types (Static, Dynamic, Guided) on Android devices.

Environment

  • Platform: Android (ARM64, real device)
  • Taskflow version: v4.0.0
  • Compiler: Clang (via Android NDK)
  • C++ Standard: C++20
  • Executor configuration: 3 workers

Test Setup

We created a diagnostic test that:

  1. Uses for_each_index to parallelize N tasks across 3 workers
  2. Each task performs a small workload (~1.1ms per iteration on our test device)
  3. Tracks which worker thread executes each iteration via ObserverInterface and a custom ThreadTracker
  4. Reports frames where any worker has 0 iterations (completely idle)

Test parameters:

  • Tasks per frame: 6
  • Total frames: 5000
  • Partitioner types tested: Static, Dynamic, Guided

Results

Observer set_up: 3 works
=== Detailed Partitioner omparison ===
Workers: 3
Task per frame: 6
Frames: 5000
[Static Partitioner]
Frame 1: IDLE T2 | iters=[2,4,0] obs=[1,2,0]
Frame 2576: IDLE T2 | iters=[4,2,0] obs=[0,2,1]
Frame 4545: IDLE T2 | iters=[4,2,0] obs=[0,2,1]
Idle frames: 3/5000
[Dynamic Partitioner]
Frame 32: IDLE T2 | iters=[3,3,0] obs=[0,2,1]
Frame 436: IDLE T2 | iters=[3,3,0] obs=[2,1,0]
Frame 1444: IDLE T2 | iters=[3,3,0] obs=[0,1,2]
Frame 1581: IDLE T2 | iters=[3,3,0] obs=[2,0,1]
Frame 1582: IDLE T2 | iters=[3,3,0] obs=[2,0,1]
Frame 1693: IDLE T2 | iters=[3,3,0] obs=[1,1,1]
Idle frames: 6/5000
[Guided Partitioner]
Frame 2750: IDLE T2 | iters=[3,3,0] obs=[0,2,1]
Frame 2752: IDLE T2 | iters=[3,3,0] obs=[1,2,0]
Idle frames: 2/5000
--- Comparison ---
Static  idle: 3/5000
Dynamic idle: 6/5000
Guided  idle: 2/5000

Example log from an idle frame:

Frame 1: IDLE T2 | iters=[2,4,0] obs=[1,2,0]

This indicates:

  • Worker 2 executed 0 iterations (completely idle)
  • Workers 0 and 1 executed all 6 iterations
  • Observer shows uneven task distribution

Questions

  1. Is this expected behavior for short-duration parallel tasks? We understand that work-stealing has overhead, but complete worker idleness seems suboptimal.

  2. Are there recommended configurations or patterns for scenarios with:

    • Small per-iteration workload (~1ms)
    • Low task count (not enough to amortize scheduling overhead)
    • Real-time/interactive applications on mobile devices

Minimal Reproducible Example

#include <jni.h>
#include <string>
#include <atomic>
#include <array>
#include <thread>
#include <vector>
#include <mutex>
#include <map>
#include <chrono>
#include <algorithm>
#include <functional>

#include <android/log.h>

#if __cplusplus < 202002L
namespace std {
    template<typename T>
    constexpr T bit_ceil(T x) noexcept {
        if (x <= 1) return T(1);
        T result = T(1);
        while (result < x) result <<= 1;
        return result;
    }
    template<typename T>
    constexpr int bit_width(T x) noexcept {
        if (x == 0) return 0;
        int w = 0;
        while (x != 0) { x >>= 1; ++w; }
        return w;
    }
}
#endif

#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/for_each.hpp>

#define LOG_TAG "TaskflowDemo"
#define LOGW(...) __android_log_print(ANDROID_LOG_WARN, LOG_TAG, __VA_ARGS__)
#define LOGE(...) __android_log_print(ANDROID_LOG_ERROR, LOG_TAG, __VA_ARGS__)

static constexpr size_t NUM_WORKERS = 3;

struct ThreadTracker {
    std::mutex mtx;
    std::map<std::thread::id, size_t> thread_id_map;
    std::atomic<size_t> next_slot{0};

    std::array<std::atomic<size_t>, NUM_WORKERS> iteration_counts;

    ThreadTracker() {
        for (size_t i = 0; i < NUM_WORKERS; i++) {
            iteration_counts[i].store(0, std::memory_order_relaxed);
        }
    }

    size_t get_or_assign_slot() {
        auto tid = std::this_thread::get_id();
        std::lock_guard<std::mutex> lock(mtx);
        auto it = thread_id_map.find(tid);
        if (it != thread_id_map.end()) {
            return it->second;
        }
        size_t slot = next_slot.fetch_add(1, std::memory_order_relaxed);
        thread_id_map[tid] = slot;
        return slot;
    }

    void record_iteration() {
        size_t slot = get_or_assign_slot();
        if (slot < NUM_WORKERS) {
            iteration_counts[slot].fetch_add(1, std::memory_order_relaxed);
        }
    }

    void reset() {
        for (size_t i = 0; i < NUM_WORKERS; i++) {
            iteration_counts[i].store(0, std::memory_order_relaxed);
        }
        std::lock_guard<std::mutex> lock(mtx);
        thread_id_map.clear();
        next_slot.store(0, std::memory_order_relaxed);
    }

    size_t get_count(size_t slot) const {
        return iteration_counts[slot].load(std::memory_order_relaxed);
    }
};

struct DemoObserver : public tf::ObserverInterface {
    std::array<std::atomic<size_t>, NUM_WORKERS> worker_task_counts;

    DemoObserver() {
        for (size_t i = 0; i < NUM_WORKERS; i++) {
            worker_task_counts[i].store(0, std::memory_order_relaxed);
        }
    }

    void set_up(size_t nw) override {
        LOGW("Observer set_up: %zu workers", nw);
    }

    void on_entry(tf::WorkerView wv, tf::TaskView) override {
        size_t wid = wv.id();
        if (wid < NUM_WORKERS) {
            worker_task_counts[wid].fetch_add(1, std::memory_order_relaxed);
        }
    }

    void on_exit(tf::WorkerView, tf::TaskView) override {}

    void reset() {
        for (size_t i = 0; i < NUM_WORKERS; i++) {
            worker_task_counts[i].store(0, std::memory_order_relaxed);
        }
    }

    size_t get_count(size_t wid) const {
        return worker_task_counts[wid].load(std::memory_order_relaxed);
    }
};

static void runParallelForDetailedTest(int num_tasks, int num_frames) {
    tf::Executor executor(NUM_WORKERS);
    auto observer = executor.make_observer<DemoObserver>();
    ThreadTracker tracker;

    LOGW("=== Detailed Partitioner Comparison ===");
    LOGW("Workers: %zu", NUM_WORKERS);
    LOGW("Tasks per frame: %d", num_tasks);
    LOGW("Frames: %d", num_frames);

    auto test_partitioner = [&](const char* name, auto partitioner) -> int {
        int idle_frames = 0;
        LOGW("[%s Partitioner]", name);

        for (int frame = 0; frame < num_frames; frame++) {
            tracker.reset();
            observer->reset();

            tf::Taskflow taskflow;
            taskflow.for_each_index(0, num_tasks, 1,
                [&tracker](int) {
                    volatile int dummy = 0;
                    for (int j = 0; j < 500000; j++) {
                        dummy += j;
                    }
                    tracker.record_iteration();
                }, partitioner);

            executor.run(taskflow).wait();

            size_t iter_counts[NUM_WORKERS];
            size_t obs_counts[NUM_WORKERS];
            size_t active = 0;
            size_t idle_slot = NUM_WORKERS;

            for (size_t w = 0; w < NUM_WORKERS; w++) {
                iter_counts[w] = tracker.get_count(w);
                obs_counts[w] = observer->get_count(w);
                if (iter_counts[w] > 0) active++;
                else idle_slot = w;
            }

            if (active < NUM_WORKERS) {
                idle_frames++;
                LOGW("  Frame %d: IDLE T%zu | iters=[%zu,%zu,%zu] obs=[%zu,%zu,%zu]",
                     frame, idle_slot,
                     iter_counts[0], iter_counts[1], iter_counts[2],
                     obs_counts[0], obs_counts[1], obs_counts[2]);
            }
        }

        LOGW("  Idle frames: %d/%d", idle_frames, num_frames);
        return idle_frames;
    };

    int static_idle = test_partitioner("Static", tf::StaticPartitioner());
    int dynamic_idle = test_partitioner("Dynamic", tf::DynamicPartitioner());
    int guided_idle = test_partitioner("Guided", tf::GuidedPartitioner());

    LOGW("--- Comparison ---");
    LOGW("Static  idle: %d/%d", static_idle, num_frames);
    LOGW("Dynamic idle: %d/%d", dynamic_idle, num_frames);
    LOGW("Guided  idle: %d/%d", guided_idle, num_frames);
}

extern "C" JNIEXPORT void JNICALL
Java_com_example_taskflowdemo_MainActivity_runDetailedTest(
        JNIEnv *, jobject,
        jint num_tasks, jint num_frames) {
    runParallelForDetailedTest(num_tasks, num_frames);
}

Thank you for your time and assistance.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions