Skip to content

[FEAT]: pytest-xdist support for distributed test execution#73

Open
nina-msft wants to merge 11 commits into
microsoft:mainfrom
nina-msft:dev/nina-msft/8259
Open

[FEAT]: pytest-xdist support for distributed test execution#73
nina-msft wants to merge 11 commits into
microsoft:mainfrom
nina-msft:dev/nina-msft/8259

Conversation

@nina-msft

@nina-msft nina-msft commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

Description

Summary

Adds first-class pytest-xdist support to RAMPART so attack/probe sessions can shard across worker processes and still produce a single coherent report. Worker processes serialize their results through xdist's workeroutput channel; the controller deserializes, merges, and writes the final report.

What's included

New: _xdist.py

  • Versioned schema (rampart.xdist.v1) for worker → controller payloads
  • _sanitize JSON-safe coercion at the trust boundary with depth limit, byte-size cap, and ANSI-escape stripping on deserialize to defend against terminal injection from worker output
  • SizeLimitError raised by finalize_worker when the serialized payload exceeds the configured cap; controller logs and continues
  • Configurable cap via new pytest CLI option --rampart-xdist-max-bytes / ini option rampart_xdist_max_bytes (default 64 MiB), validated positive
  • Sink resolution re-raises KeyboardInterrupt/SystemExit, catches Exception

Trial-group aggregation across workers

  • Trial metadata is captured as data on RampartSession at collection time and shipped through the rampart.xdist.v1 payload (trial_specs), so the controller aggregates trial groups from merged worker data instead of session.items — which is not reliably populated with trial clones on the controller at session finish. Without this, trial-group FAIL/PASS verdicts silently disappeared under xdist even though per-clone results were present.
  • trial_specs is back-compatible: payloads without trials emit an empty list; malformed entries are skipped and non-finite thresholds are clamped on deserialize.

Deterministic result ordering & sink resolution (review feedback)

  • Merged results sort by a stable key (nodeid, result_index, source_worker) so aggregated reports are reproducible regardless of worker completion order. nodeid/result_index are assigned authoritatively (absorb-time enumerate; deserialize-time outer-key + enumerate); source_worker is tagged on handle_testnodedown.
  • New pytest_rampart_sinks hook (_hooks.py) is the authoritative sink source — keyed on hook-impl existence, not truthiness — and short-circuits the fixture path so projects defining both don't double-register. The rampart_sinks fixture remains the single-process fallback.
  • Controller-side discovery now unwraps a @pytest.fixture-wrapped parameterless rampart_sinks to its underlying function (_get_wrapped_function() with _fixture_function/__wrapped__ fallbacks) and calls it directly, restoring the documented session-fixture fallback under -n. Fixtures that depend on other fixtures still warn and point at the hook.
  • Shared strip_ansi extracted to rampart/common/text.py (CSI/OSC/DCS/NF + C0/C1 stripping) and reused by both the plugin and the xdist boundary.
  • Incomplete-run reasons are surfaced as a terminal warning when workers crash or report partial data.

Updated plugin hooks (plugin.py, _session.py)

  • pytest_addoption registers the new CLI/ini option; pytest_addhooks registers pytest_rampart_sinks
  • Worker, controller, and non-xdist branches dispatched cleanly; worker branch wraps finalize_worker to surface size-limit truncation as a warning

Reporting

  • JsonFileReportSink now projects report.metadata, so xdist run-mode info (worker_count, dist_mode, incomplete reasons) lands in the emitted JSON file

Tests

  • 82 unit tests in test_xdist.py (serialization, sanitization, size limits, ANSI stripping, controller detection, sink resolution/unwrap, option parsing, ordering determinism, trial-spec round-trip/merge)
  • 13 unit tests in test_text.py for shared strip_ansi
  • test_plugin.py extended (57 tests) for sink-hook resolution, OSC sanitizing, and the incomplete-warning path
  • test_json_file.py extended (14 tests) for metadata projection
  • Integration test_xdist_aggregation.py (13 tests) exercising end-to-end aggregation across workers, including trial-group verdicts under both --dist=loadgroup and --dist=load

Docs

  • New xdist.md user guide, incl. the pytest_rampart_sinks hook, sink precedence, and durability limitations
  • Configuration, CI-integration, pytest-integration, results-and-reporting, and API reference pages updated to describe the new option (under a dedicated "Pytest Options" section, not under env vars)
  • Architecture page notes the controller/worker split

Incidental

  • onedrive.py: file-level pyright directive suppressing the Unknown* cascade caused by msgraph-sdk shipping without type stubs. No behavior change.

Security notes

The xdist boundary treats worker output as untrusted:

  • All values pass through _sanitize → JSON-safe primitives only
  • Strings are stripped of ANSI escape sequences on deserialize (including nested strings inside dicts/lists) to prevent terminal injection from a compromised/misbehaving worker
  • Total payload size is capped to prevent controller memory exhaustion; truncation is recorded with a marker and raised as SizeLimitError
  • Depth cap (MAX_METADATA_DEPTH = 6) prevents pathological nesting

Validation

  • uv run ruff check rampart tests → all checks pass; ruff format --check clean
  • uv run pyright rampart → 0 errors, 0 warnings
  • uv run pytest tests/unit → 594 passed
  • uv run pytest tests/integration/test_xdist_aggregation.py → 13 passed (trial-group verdicts verified under --dist=loadgroup and --dist=load)
  • uv run --group docs mkdocs build --strict → no cross-reference warnings (remaining output is Windows-only symlink noise from the font-download plugin; clean on CI/Linux)
  • Ran pytest -n 4 --dist=loadgroup against the HelpDesk Bot example in rampart-examples: 1 report generated. Running with xdist took ~99.66s versus ~230.73s serial (yay for speed!)

Breaking changes

None

Checklist

  • pre-commit run --all-files passes
  • Tests added or updated for changes
  • Documentation updated

@nina-msft nina-msft requested review from a team and bashirpartovi June 4, 2026 01:35
The controller's _aggregate_trial_results iterated session.items looking for a _rampart_trial_base attribute set on cloned items at collection time. Under pytest-xdist that attribute is not reliably reachable on the controller at pytest_sessionfinish, so trial-group verdicts silently disappeared from the terminal summary and the JSON report -- per-clone results were present but no aggregate FAIL/PASS line was emitted.

Decouple aggregation from pytest.Item state:

- Store trial metadata as data on RampartSession._trial_specs (a dict[clone_nodeid, TrialSpec]) at collection time on every process.

- Ship 	rial_specs through the existing 
ampart.xdist.v1 worker payload (back-compatible: missing/empty list is treated as no trials).

- Controller merges specs from each worker and aggregates from the merged data instead of session.items.

Also fixes a related JSON-sink gap: JsonFileReportSink now projects 
eport.metadata, so xdist run-mode info (worker_count, dist_mode, incomplete reasons) actually lands in the emitted file.

Tests:

- New TestTrialSpecs unit class (round-trip, malformed entries, non-finite thresholds, idempotent merge, first-writer-wins).

- handle_testnodedown test covering trial-spec merge.

- Strengthened 	ests/integration/test_xdist_aggregation.py to assert the trial-group line is present and correct under --dist=loadgroup and --dist=load (the prior tests only checked per-clone counts and would have missed this regression).

- JSON-sink metadata projection unit tests.

565 unit tests + 13 xdist integration tests pass; ruff clean.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
spencrr

This comment was marked as resolved.

@bashirpartovi

Copy link
Copy Markdown
Contributor

Great job @nina-msft, really a solid PR. The overall architecture makes sense to me, which is workers collect, the controller merges, and only the controller emits. I also like that the worker output path treats the boundary seriously rather than just passing objects around and hoping for the best. I feel like we should definitely keep that part even if some of the surrounding design changes.

The first thing I would fix before merge is ordering. build_report() sorts by r.metadata.get("test_name", ""), but test_name is derived from node.nodeid.split("::")[-1], so it is only the trailing test name. That collides for same-named tests in different files, parametrized cases, and trial clones. Once two results have the same sort key, insertion order wins, and under xdist insertion order is basically worker completion order. So the output can still move around between runs. I think the fix is just to keep the full nodeid and sort on that. Maybe it is worth adding a small regression test that merges the same two worker payloads in reverse order and gets identical report output.

I am a little bit worried about the sink discovery piece. discover_sinks_from_conftest() / _resolve_sink_candidate() end up doing a partial version of pytest's fixture resolution which is to find a rampart_sinks attribute, unwrap fixture internals, inspect the signature, and call it only if it is parameterless. That works for the happy path, but if someone has a normal fixture like rampart_sinks(tmp_path_factory) or rampart_sinks(my_config), xdist mode skips that sink with a warning. If that fixture is their only sink configuration, the controller has no sinks to emit to, so the report they expected would be missing. I think that's a pretty silent problem, the tests could all be green, but the report never gets offloaded.

This would be cleaner as an explicit hook instead of fixture reflection. The key difference is that the existing rampart_sinks is a fixture, so it only gets resolved when something inside the fixture system asks for it. That works in single-process mode because the bootstrap fixture can call request.getfixturevalue("rampart_sinks"). In xdist though, the controller is the process that needs to emit the final report, and the controller does not run tests or fixtures. That is why this PR has to scan conftest modules and manually unwrap the fixture function.

A hook would be different:

def pytest_rampart_sinks(config):
    return [JsonFileReportSink(output_dir=Path(".report"))]

That would be a hook implementation, not another fixture. RAMPART would register the hook spec from its pytest plugin, and users would implement the hook from a root conftest.py or an installed plugin, the same way they implement other pytest hooks. Then the controller can call config.pluginmanager.hook.pytest_rampart_sinks(config=config) without running any fixtures. It still would not support fixture dependencies, but at least that limitation would be explicit instead of discovered through reflection.

The one thing to be careful about is having two sink configuration surfaces. If both pytest_rampart_sinks and the existing rampart_sinks fixture are present, the plugin should have a clear rule, probably something like "the hook is authoritative, the fixture is the legacy single-process fallback", and defining both generates a warning rather than emitting it twice. Without that precedence rule, this could get confusing quickly. With that rule documented, I would much rather support the hook as the xdist-compatible path than maintain a partial fixture resolver. I think for the longer term, it may be cleaner to deprecate the fixture-based sink registration entirely and make the hook the one supported sink API, once users have had time to migrate.

I think we need to put more throughts into whether to use workeroutput transport. I am not saying it is wrong, there is a good argument for it, since it is the native xdist channel and can support remote workers without requiring a shared filesystem. But it does mean results only get to the controller at worker shutdown. If a worker gets killed before pytest_sessionfinish, the run is correctly marked incomplete, but all results from that worker are gone, including tests that already finished. For small suites that may be fine. For long agent suites, that could be a lot of data to lose.

The size cap has a similar issue. If one worker payload goes over --rampart-xdist-max-bytes, we replace the whole payload with a truncation marker. So one huge transcript can make us lose the other results from that worker too. I do not think that has to block this PR, but I would call it out clearly in the docs. Longer term, a per-test stream or per-worker shard file would be more durable. workeroutput is simpler and works better for remote workers, so I mostly just want that tradeoff to be explicit.

One maintainability, there are now two serializers for roughly the same data. _xdist.py has _serialize_result / _deserialize_result, while JsonFileReportSink still has its own report serialization. Since the xdist path already has to round-trip Result, it might be worth pulling the shared bits into something like rampart/reporting/serde.py and making the JSON sink use it too; otherwise every future Result field has to be remembered in two places.

Two small notes

  1. the onedrive.py pyright suppression looks unrelated to xdist, so I would probably split that out unless it is needed to get this PR green.
  2. I am glad incomplete / incomplete_reasons are in the metadata, but I would think about surfacing incomplete runs somewhere CI users are likely to notice. The awkward case is all tests passing while the report is marked incomplete.

I like the direction. The two things I would most like to see tightened are the full-nodeid ordering and replacing the controller-side sink fixture scanning with an explicit hook or some similar explicit controller-safe sink registration path.

@spencrr spencrr left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Nina! I like:

  • one (de)serialization spot in _xdist.py with sanitization and schme versioning.
  • using trial_specs over worker channel so that --dist-load works correctly
  • workers disable _emit_sinks and controller is the single source of truth

Other small AI Finds

  • Worker-side register_trial_spec is dead work. Workers run pytest_collection_modifyitems, register specs, ship them across, and never aggregate or evaluate. That's fine for correctness, but it means the spec is computed twice (once on the worker, once on the controller's own collection pass, which the merge then drops via setdefault). Worth a one-line comment in pytest_collection_modifyitems clarifying why we register on every process even though only the controller acts on it - otherwise future readers will be tempted to "optimize" the worker path and break the fallback.
  • xdist_group marker is added unconditionally to every trial clone. Correct behavior (xdist sees it; pytest without xdist treats it as an unknown marker). If the repo ever turns on --strict-markers you'll need a conditional addinivalue_line("markers", "xdist_group: ...") somewhere; not a problem today.
  • _session.py and _xdist.py "documented deviations" notes. The architecture doc was actually updated in this PR to describe _session.py/_xdist.py as first-class members of the plugin package - so the Note: ... documented deviation from the architecture blocks at the top of those modules are now stale and can come out. Minor.
  • Trial-spec aggregation runs on every controller pytest_sessionfinish call, including when only one worker reported. That's fine for correctness; just observing that worker-result completeness isn't a precondition for trial aggregation, which is the right call - missing-worker should mark incomplete, not block the report.
  • _emit_sinks background-task path. pre-existing - when an event loop is already running at session-finish (e.g., some pytest-asyncio configurations) the emission becomes fire-and-forget with no awaiter. Not changed by this PR, but the new xdist controller path is the first place I'd actually worry about this in production, because the controller's sink emission carries cross-worker data that didn't exist before. Worth at least filing as a follow-up.

Comment thread tests/unit/pytest_plugin/test_xdist_aggregation.py
Comment thread rampart/pytest_plugin/plugin.py
Comment thread rampart/pytest_plugin/plugin.py
Comment thread rampart/pytest_plugin/_xdist.py Outdated
Comment thread docs/usage/xdist.md
Comment thread rampart/pytest_plugin/_xdist.py Outdated
Comment thread rampart/pytest_plugin/_xdist.py
nina-msft and others added 6 commits June 16, 2026 15:08
Integrate Bashir's and spencrr's review feedback on PR microsoft#73:

- Deterministic ordering: sort the report by (nodeid, result_index,
  source_worker); set nodeid/result_index authoritatively on absorb and
  on worker-payload deserialize, with source_worker as the final
  tie-breaker so the same nodeid from multiple workers stays totally
  ordered. Also sort trial-group terminal lines.
- Sink discovery: add an explicit pytest_rampart_sinks hook resolved on
  the controller; demote the rampart_sinks fixture to a single-process
  fallback (hook authoritative, no double-register), and drop the
  private fixture-reflection path.
- Harden terminal-injection defense: one shared strip_ansi
  (rampart/common/text.py) covering CSI/OSC/DCS/NF + C1 controls, used by
  the xdist transport, terminal summary, and incomplete-run warning.
- Fix is_xdist_controller for -d/--tx runs (detect via dist mode plus
  endpoints, not numprocesses alone).
- Surface incomplete runs with a terminal warning emitted before the
  has_results guard.
- Document the workeroutput durability and size-cap limitations in
  docs/usage/xdist.md (durable per-worker transport is a stacked PR).
- Keep the two Result serializers separate, with doc-comments explaining
  why they must not be naively merged.
- Remove now-stale "documented deviation" module notes for _session.py
  and _xdist.py, and clarify why trial specs register on every process.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Restore the documented session-fixture fallback under pytest-xdist.
Controller-side sink discovery rejected a @pytest.fixture-wrapped
rampart_sinks (a FixtureFunctionDefinition, for which inspect.isfunction
is False), so no sink was registered and no aggregated report was written
under -n. Unwrap the fixture to its underlying function via
_get_wrapped_function() (with _fixture_function/__wrapped__ fallbacks) and
call it directly; fixtures that depend on other fixtures still warn and
point at the pytest_rampart_sinks hook.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Use plain inline code instead of an mkdocs-autorefs link target, since no
intersphinx inventory is configured for pytest. Resolves the strict-build
warning: Could not find cross-reference target 'pytest.Config'.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Both modes produce an identical, correct aggregated report; the choice is
about execution, not correctness. Default --dist=load spreads trial clones
across all workers and is usually fastest; --dist=loadgroup pins a group to
one worker and matters only when trials share per-group worker state. Adds a
'Choosing loadgroup vs load' guide (with an illustrative benchmark) in
xdist.md and aligns the ci-integration and pytest-integration references.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Hoist _aggregate_trial_results/_evaluate_gates above the controller vs
single-process branch so the shared steps run once and only the
xdist-specific metadata/sink-discovery is guarded by is_xdist_controller
(addresses review nit). Drop the now-vestigial session parameter from
_aggregate_trial_results: trial aggregation reads entirely from the merged
RampartSession (trial_specs), not session.items.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@nina-msft

nina-msft commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

(In response to #73 (comment) above)

Thanks for the thorough read, @bashirpartovi — this shaped the last round of changes. Status on each point:

Ordering (full nodeid): Fixed. build_report() now sorts on (nodeid, result_index, source_worker) with the nodeid assigned authoritatively (not the trailing test_name), so same-named/parametrized/trial cases no longer collide. Added the reverse-order merge regression test you suggested — it merges the same two worker payloads in both orders and asserts identical report output.

Sink discovery → explicit hook: Done. Added a pytest_rampart_sinks(config) hookspec; the controller resolves it via config.pluginmanager.hook with no fixture machinery. Precedence is exactly what you proposed: the hook is authoritative, the rampart_sinks fixture is the legacy single-process fallback, and when a hook implementation exists the fixture path is skipped entirely (no double-registration). Documented in xdist.md. The fixture fallback still works on the controller for the common parameterless case (unwrapped to its underlying function), but anything with fixture deps now warns and points at the hook instead of silently disappearing.

workeroutput durability & size-cap-drops-whole-payload: Called out explicitly in a new "Durability limitations" section — a worker killed before sessionfinish loses its already-finished results, and an over-cap payload is dropped wholesale (worker marked incomplete). I agree the longer-term answer is a durable per-worker shard; I'm splitting that into a stacked follow-up PR on top of this one (incremental JSONL shards that survive a killed worker, with the cap applied per-record) so this PR stays focused.

Two serializers: I looked at extracting a shared module and decided to hold off for now. The two projections deliberately differ — the xdist one is a full-fidelity Result round-trip with sanitization + size handling, the JSON-sink one is the flatter public report shape — so unifying them today would couple things that legitimately diverge. I've added explicit "must not be naively merged" doc-comments on both and filed a follow-up to revisit once the shared surface is real. Open to doing it now if you feel strongly.

onedrive pyright suppression: Kept intentionally — it's a file-level pyright directive silencing the Unknown* cascade from msgraph-sdk shipping without type stubs (plus a reportMissingImports ignore on the import). No behavior change, and pyright/CI is red without it. Happy to pull it into its own PR if you'd prefer, but it's load-bearing for green CI on this branch.

Surfacing incomplete runs: Added a terminal warning at session end listing incomplete_reasons, so the "all tests green but report incomplete" case is visible in the pytest output, not just buried in metadata.

@nina-msft

nina-msft commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

(In response to #73 (review) above)

Thanks @spencrr — addressed the small stuff:

  • Worker-side register_trial_spec dead work: Added the clarifying comment in pytest_collection_modifyitems explaining why specs register on every process even though only the controller acts on them (so nobody "optimizes" the worker path and breaks the fallback).
  • Stale "documented deviation" notes: Removed from _session.py and _xdist.py now that architecture.md lists them as first-class. (Kept the separate handler-factory note in plugin.py — different deviation.)
  • _emit_sinks fire-and-forget: Agreed this is the first place it matters in production. Filed as a follow-up — out of scope for this PR since it's pre-existing.
  • xdist_group + --strict-markers: Noted; not a problem today, will add the conditional addinivalue_line if/when we enable strict markers.
  • Trial aggregation per sessionfinish: Right — worker completeness isn't a precondition; missing workers mark incomplete rather than block the report. Intentional.

(Replied inline on the individual threads with specifics.)

…sion diagnostic

Address PR review follow-ups:

- Move test_xdist_aggregation.py from tests/integration/ to
  tests/unit/pytest_plugin/ and mark it `slow`. These are pytester
  subprocess tests with no live external dependency, so the integration
  tier (reserved for live-LLM tests) was the wrong home; they now run in
  CI/coverage, which only collect tests/unit.

- Remove the worker `package_version` stamp and the controller-side
  version-skew warning: a log-only, untested diagnostic for mixed-version
  remote workers. SCHEMA_VERSION already rejects incompatible payloads,
  making it redundant.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants