[FEAT]: pytest-xdist support for distributed test execution#73
[FEAT]: pytest-xdist support for distributed test execution#73nina-msft wants to merge 11 commits into
pytest-xdist support for distributed test execution#73Conversation
The controller's _aggregate_trial_results iterated session.items looking for a _rampart_trial_base attribute set on cloned items at collection time. Under pytest-xdist that attribute is not reliably reachable on the controller at pytest_sessionfinish, so trial-group verdicts silently disappeared from the terminal summary and the JSON report -- per-clone results were present but no aggregate FAIL/PASS line was emitted. Decouple aggregation from pytest.Item state: - Store trial metadata as data on RampartSession._trial_specs (a dict[clone_nodeid, TrialSpec]) at collection time on every process. - Ship rial_specs through the existing ampart.xdist.v1 worker payload (back-compatible: missing/empty list is treated as no trials). - Controller merges specs from each worker and aggregates from the merged data instead of session.items. Also fixes a related JSON-sink gap: JsonFileReportSink now projects eport.metadata, so xdist run-mode info (worker_count, dist_mode, incomplete reasons) actually lands in the emitted file. Tests: - New TestTrialSpecs unit class (round-trip, malformed entries, non-finite thresholds, idempotent merge, first-writer-wins). - handle_testnodedown test covering trial-spec merge. - Strengthened ests/integration/test_xdist_aggregation.py to assert the trial-group line is present and correct under --dist=loadgroup and --dist=load (the prior tests only checked per-clone counts and would have missed this regression). - JSON-sink metadata projection unit tests. 565 unit tests + 13 xdist integration tests pass; ruff clean. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Great job @nina-msft, really a solid PR. The overall architecture makes sense to me, which is workers collect, the controller merges, and only the controller emits. I also like that the worker output path treats the boundary seriously rather than just passing objects around and hoping for the best. I feel like we should definitely keep that part even if some of the surrounding design changes. The first thing I would fix before merge is ordering. I am a little bit worried about the sink discovery piece. This would be cleaner as an explicit hook instead of fixture reflection. The key difference is that the existing A hook would be different: def pytest_rampart_sinks(config):
return [JsonFileReportSink(output_dir=Path(".report"))]That would be a hook implementation, not another fixture. RAMPART would register the hook spec from its pytest plugin, and users would implement the hook from a root The one thing to be careful about is having two sink configuration surfaces. If both I think we need to put more throughts into whether to use The size cap has a similar issue. If one worker payload goes over One maintainability, there are now two serializers for roughly the same data. Two small notes
I like the direction. The two things I would most like to see tightened are the full-nodeid ordering and replacing the controller-side sink fixture scanning with an explicit hook or some similar explicit controller-safe sink registration path. |
spencrr
left a comment
There was a problem hiding this comment.
Hey Nina! I like:
- one (de)serialization spot in
_xdist.pywith sanitization and schme versioning. - using
trial_specsover worker channel so that--dist-loadworks correctly - workers disable
_emit_sinksand controller is the single source of truth
Other small AI Finds
- Worker-side
register_trial_specis dead work. Workers runpytest_collection_modifyitems, register specs, ship them across, and never aggregate or evaluate. That's fine for correctness, but it means the spec is computed twice (once on the worker, once on the controller's own collection pass, which the merge then drops viasetdefault). Worth a one-line comment inpytest_collection_modifyitemsclarifying why we register on every process even though only the controller acts on it - otherwise future readers will be tempted to "optimize" the worker path and break the fallback. xdist_groupmarker is added unconditionally to every trial clone. Correct behavior (xdist sees it; pytest without xdist treats it as an unknown marker). If the repo ever turns on--strict-markersyou'll need a conditionaladdinivalue_line("markers", "xdist_group: ...")somewhere; not a problem today._session.pyand_xdist.py"documented deviations" notes. The architecture doc was actually updated in this PR to describe_session.py/_xdist.pyas first-class members of the plugin package - so theNote: ... documented deviation from the architectureblocks at the top of those modules are now stale and can come out. Minor.- Trial-spec aggregation runs on every controller
pytest_sessionfinishcall, including when only one worker reported. That's fine for correctness; just observing that worker-result completeness isn't a precondition for trial aggregation, which is the right call - missing-worker should mark incomplete, not block the report. _emit_sinksbackground-task path. pre-existing - when an event loop is already running at session-finish (e.g., some pytest-asyncio configurations) the emission becomes fire-and-forget with no awaiter. Not changed by this PR, but the new xdist controller path is the first place I'd actually worry about this in production, because the controller's sink emission carries cross-worker data that didn't exist before. Worth at least filing as a follow-up.
Integrate Bashir's and spencrr's review feedback on PR microsoft#73: - Deterministic ordering: sort the report by (nodeid, result_index, source_worker); set nodeid/result_index authoritatively on absorb and on worker-payload deserialize, with source_worker as the final tie-breaker so the same nodeid from multiple workers stays totally ordered. Also sort trial-group terminal lines. - Sink discovery: add an explicit pytest_rampart_sinks hook resolved on the controller; demote the rampart_sinks fixture to a single-process fallback (hook authoritative, no double-register), and drop the private fixture-reflection path. - Harden terminal-injection defense: one shared strip_ansi (rampart/common/text.py) covering CSI/OSC/DCS/NF + C1 controls, used by the xdist transport, terminal summary, and incomplete-run warning. - Fix is_xdist_controller for -d/--tx runs (detect via dist mode plus endpoints, not numprocesses alone). - Surface incomplete runs with a terminal warning emitted before the has_results guard. - Document the workeroutput durability and size-cap limitations in docs/usage/xdist.md (durable per-worker transport is a stacked PR). - Keep the two Result serializers separate, with doc-comments explaining why they must not be naively merged. - Remove now-stale "documented deviation" module notes for _session.py and _xdist.py, and clarify why trial specs register on every process. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Restore the documented session-fixture fallback under pytest-xdist. Controller-side sink discovery rejected a @pytest.fixture-wrapped rampart_sinks (a FixtureFunctionDefinition, for which inspect.isfunction is False), so no sink was registered and no aggregated report was written under -n. Unwrap the fixture to its underlying function via _get_wrapped_function() (with _fixture_function/__wrapped__ fallbacks) and call it directly; fixtures that depend on other fixtures still warn and point at the pytest_rampart_sinks hook. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Use plain inline code instead of an mkdocs-autorefs link target, since no intersphinx inventory is configured for pytest. Resolves the strict-build warning: Could not find cross-reference target 'pytest.Config'. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Both modes produce an identical, correct aggregated report; the choice is about execution, not correctness. Default --dist=load spreads trial clones across all workers and is usually fastest; --dist=loadgroup pins a group to one worker and matters only when trials share per-group worker state. Adds a 'Choosing loadgroup vs load' guide (with an illustrative benchmark) in xdist.md and aligns the ci-integration and pytest-integration references. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Hoist _aggregate_trial_results/_evaluate_gates above the controller vs single-process branch so the shared steps run once and only the xdist-specific metadata/sink-discovery is guarded by is_xdist_controller (addresses review nit). Drop the now-vestigial session parameter from _aggregate_trial_results: trial aggregation reads entirely from the merged RampartSession (trial_specs), not session.items. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
(In response to #73 (comment) above) Thanks for the thorough read, @bashirpartovi — this shaped the last round of changes. Status on each point: Ordering (full nodeid): Fixed. Sink discovery → explicit hook: Done. Added a workeroutput durability & size-cap-drops-whole-payload: Called out explicitly in a new "Durability limitations" section — a worker killed before Two serializers: I looked at extracting a shared module and decided to hold off for now. The two projections deliberately differ — the xdist one is a full-fidelity onedrive pyright suppression: Kept intentionally — it's a file-level pyright directive silencing the Surfacing incomplete runs: Added a terminal warning at session end listing |
|
(In response to #73 (review) above) Thanks @spencrr — addressed the small stuff:
(Replied inline on the individual threads with specifics.) |
…sion diagnostic Address PR review follow-ups: - Move test_xdist_aggregation.py from tests/integration/ to tests/unit/pytest_plugin/ and mark it `slow`. These are pytester subprocess tests with no live external dependency, so the integration tier (reserved for live-LLM tests) was the wrong home; they now run in CI/coverage, which only collect tests/unit. - Remove the worker `package_version` stamp and the controller-side version-skew warning: a log-only, untested diagnostic for mixed-version remote workers. SCHEMA_VERSION already rejects incompatible payloads, making it redundant. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Description
Summary
Adds first-class
pytest-xdistsupport to RAMPART so attack/probe sessions can shard across worker processes and still produce a single coherent report. Worker processes serialize their results through xdist'sworkeroutputchannel; the controller deserializes, merges, and writes the final report.What's included
New:
_xdist.pyrampart.xdist.v1) for worker → controller payloads_sanitizeJSON-safe coercion at the trust boundary with depth limit, byte-size cap, and ANSI-escape stripping on deserialize to defend against terminal injection from worker outputSizeLimitErrorraised byfinalize_workerwhen the serialized payload exceeds the configured cap; controller logs and continues--rampart-xdist-max-bytes/ ini optionrampart_xdist_max_bytes(default 64 MiB), validated positiveKeyboardInterrupt/SystemExit, catchesExceptionTrial-group aggregation across workers
RampartSessionat collection time and shipped through therampart.xdist.v1payload (trial_specs), so the controller aggregates trial groups from merged worker data instead ofsession.items— which is not reliably populated with trial clones on the controller at session finish. Without this, trial-group FAIL/PASS verdicts silently disappeared under xdist even though per-clone results were present.trial_specsis back-compatible: payloads without trials emit an empty list; malformed entries are skipped and non-finite thresholds are clamped on deserialize.Deterministic result ordering & sink resolution (review feedback)
(nodeid, result_index, source_worker)so aggregated reports are reproducible regardless of worker completion order. nodeid/result_index are assigned authoritatively (absorb-time enumerate; deserialize-time outer-key + enumerate); source_worker is tagged onhandle_testnodedown.pytest_rampart_sinkshook (_hooks.py) is the authoritative sink source — keyed on hook-impl existence, not truthiness — and short-circuits the fixture path so projects defining both don't double-register. Therampart_sinksfixture remains the single-process fallback.@pytest.fixture-wrapped parameterlessrampart_sinksto its underlying function (_get_wrapped_function()with_fixture_function/__wrapped__fallbacks) and calls it directly, restoring the documented session-fixture fallback under-n. Fixtures that depend on other fixtures still warn and point at the hook.strip_ansiextracted torampart/common/text.py(CSI/OSC/DCS/NF + C0/C1 stripping) and reused by both the plugin and the xdist boundary.Updated plugin hooks (
plugin.py,_session.py)pytest_addoptionregisters the new CLI/ini option;pytest_addhooksregisterspytest_rampart_sinksfinalize_workerto surface size-limit truncation as a warningReporting
JsonFileReportSinknow projectsreport.metadata, so xdist run-mode info (worker_count, dist_mode, incomplete reasons) lands in the emitted JSON fileTests
test_xdist.py(serialization, sanitization, size limits, ANSI stripping, controller detection, sink resolution/unwrap, option parsing, ordering determinism, trial-spec round-trip/merge)test_text.pyfor sharedstrip_ansitest_plugin.pyextended (57 tests) for sink-hook resolution, OSC sanitizing, and the incomplete-warning pathtest_json_file.pyextended (14 tests) for metadata projectiontest_xdist_aggregation.py(13 tests) exercising end-to-end aggregation across workers, including trial-group verdicts under both--dist=loadgroupand--dist=loadDocs
xdist.mduser guide, incl. thepytest_rampart_sinkshook, sink precedence, and durability limitationsIncidental
onedrive.py: file-level pyright directive suppressing theUnknown*cascade caused bymsgraph-sdkshipping without type stubs. No behavior change.Security notes
The xdist boundary treats worker output as untrusted:
_sanitize→ JSON-safe primitives onlySizeLimitErrorMAX_METADATA_DEPTH = 6) prevents pathological nestingValidation
uv run ruff check rampart tests→ all checks pass;ruff format --checkcleanuv run pyright rampart→ 0 errors, 0 warningsuv run pytest tests/unit→ 594 passeduv run pytest tests/integration/test_xdist_aggregation.py→ 13 passed (trial-group verdicts verified under--dist=loadgroupand--dist=load)uv run --group docs mkdocs build --strict→ no cross-reference warnings (remaining output is Windows-only symlink noise from the font-download plugin; clean on CI/Linux)pytest -n 4 --dist=loadgroupagainst the HelpDesk Bot example inrampart-examples: 1 report generated. Running with xdist took ~99.66s versus ~230.73s serial (yay for speed!)Breaking changes
None
Checklist
pre-commit run --all-filespasses