From daedcf6d06dc8fc10597c17dae6390f93ad6fc89 Mon Sep 17 00:00:00 2001 From: Greg Magolan Date: Wed, 3 Jun 2026 07:39:22 -0700 Subject: [PATCH] feat(bazel): capture & forward Bazel stderr (phase 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Spawn Bazel with a runtime-owned stderr instead of Stdio::inherit() when a task opts in, so its output can be pre-processed before reaching the terminal. Phase 1 forwards verbatim; the pipeline seam, capture modes, ordering, flushing, and memory bounds are the foundation for later output cleanup (dedup + count), pattern-matched hooks, and hung-server detection. - stream/output.rs: OutputStream reader thread — \n/\r record splitting, per-read flush (keeps the curses progress UI live), LineProcessor pipeline seam (pass-through only), EIO-as-EOF, 1 MiB carry cap, last_activity atomic (stall-watchdog substrate). Single producer->consumer with a blocking write so back-pressure is automatic (deliberately not the unbounded Broadcaster). - capture.rs: conditional capture fd — plain pipe (non-TTY) or PTY (interactive, via nix::pty::openpty) with slave-drop / CLOEXEC / winsize discipline; pipe fallback off-Unix. - build.rs: OutputProcessor type + CaptureMode; `output` param on Build::spawn overrides the stderr Stdio and starts the OutputStream post-spawn; output_stream joined in wait() after the child is reaped. - mod.rs: `output` arg on ctx.bazel.build/.test; bazel.output.processor() constructor + type. - axl-types: Writable::to_boxed_write() for the forwarder thread. - bazel.axl: capture_output opt-in on BazelTrait (default off). - bazel_runner.axl: build/pass the processor; decouple --isatty from stdout (--isatty=1 for PTY, --isatty=0 --curses=no for the pipe path). Off by default: with no opt-in, stderr stays Stdio::inherit() and behavior is unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 1 + .../aspect-cli/src/builtins/aspect/bazel.axl | 11 + .../src/builtins/aspect/lib/bazel_runner.axl | 24 +- crates/axl-runtime/Cargo.toml | 3 +- crates/axl-runtime/src/engine/bazel/build.rs | 111 +++++- .../axl-runtime/src/engine/bazel/capture.rs | 237 +++++++++++ crates/axl-runtime/src/engine/bazel/mod.rs | 59 +++ .../src/engine/bazel/stream/mod.rs | 2 + .../src/engine/bazel/stream/output.rs | 369 ++++++++++++++++++ crates/axl-types/src/stream.rs | 85 ++++ 10 files changed, 899 insertions(+), 3 deletions(-) create mode 100644 crates/axl-runtime/src/engine/bazel/capture.rs create mode 100644 crates/axl-runtime/src/engine/bazel/stream/output.rs diff --git a/Cargo.lock b/Cargo.lock index 9e425d1ed..3f85f3795 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -361,6 +361,7 @@ dependencies = [ "hyper-util", "hyperlocal", "include_dir", + "libc", "liquid", "liquid-core", "md-5", diff --git a/crates/aspect-cli/src/builtins/aspect/bazel.axl b/crates/aspect-cli/src/builtins/aspect/bazel.axl index 3ea6fc483..3d00f151e 100644 --- a/crates/aspect-cli/src/builtins/aspect/bazel.axl +++ b/crates/aspect-cli/src/builtins/aspect/bazel.axl @@ -77,6 +77,17 @@ BazelTrait = trait( # each task's impl for lint/delivery. task_flags = attr(list[typing.Callable[[TaskContext], list[str]]], default = [], description = "Hooks returning additional flags; called with TaskContext at invocation time"), + # Captured-output opt-in. When True, the Bazel child's stderr is captured + # by the runtime, run through a processing pipeline, and forwarded to the + # real stderr instead of being inherited directly. Phase 1 forwards + # verbatim (a plain pipe in non-TTY contexts, a PTY interactively so the + # live progress UI is preserved). This is the foundation for later + # output cleanup (dedup repeated lines + count) and pattern-matched + # hooks / hung-server detection — those will add a reserved `output_match` + # hook list and dedup config here, and act on detection state from the + # existing `bazel_attempt_end` hook. Default off → behavior unchanged. + capture_output = attr(bool, default = False, description = "Capture the Bazel child's stderr, run it through the output processing pipeline, and forward it to the real stderr (instead of inheriting it directly)"), + # Lifecycle hooks — lists of callables; callers close over their own state build_start = attr(list[typing.Callable[[TaskContext], None]], default = [], description = "Hooks called once before the Bazel invocation starts"), build_event = attr(list[typing.Callable[[TaskContext, dict], None]], default = [], description = "Hooks called for each Build Event Protocol event received during the build"), diff --git a/crates/aspect-cli/src/builtins/aspect/lib/bazel_runner.axl b/crates/aspect-cli/src/builtins/aspect/lib/bazel_runner.axl index a03f8c541..a811426c2 100644 --- a/crates/aspect-cli/src/builtins/aspect/lib/bazel_runner.axl +++ b/crates/aspect-cli/src/builtins/aspect/lib/bazel_runner.axl @@ -279,7 +279,27 @@ def run_bazel_task(ctx: TaskContext, command: str, targets = None) -> TaskConclu # forwarded as a flag rather than expanded to argv (the flag exists # to bypass OS command-line length limits). These are defaults the # user can still override via --bazel-flag=... - base_flags = ["--isatty=" + str(int(ctx.std.io.stdout.is_tty))] + # + # When output capture is on, the child's stderr no longer goes to the real + # terminal — it goes to a runtime-owned pipe/PTY (see `output_processor` + # below). `--isatty` must match the captured fd, not the parent's stdout: + # - destination stderr is a TTY → a PTY is allocated → --isatty=1 so + # Bazel renders its curses UI into the PTY (forwarded near-verbatim). + # - otherwise → a plain pipe → --isatty=0 --curses=no so Bazel emits + # clean newline-terminated lines (the tractable substrate for the + # deferred dedup/match work). + # When capture is off, keep the historical behavior: derive --isatty from + # the parent's stdout (the child inherits the terminal directly). + output_processor = None + if bazel_trait.capture_output: + capture_is_tty = ctx.std.io.stderr.is_tty + output_processor = bazel.output.processor(tty = capture_is_tty) + if capture_is_tty: + base_flags = ["--isatty=1"] + else: + base_flags = ["--isatty=0", "--curses=no"] + else: + base_flags = ["--isatty=" + str(int(ctx.std.io.stdout.is_tty))] if targets == None: # --target-pattern-file is only honored when the task declares it. @@ -385,6 +405,7 @@ def run_bazel_task(ctx: TaskContext, command: str, targets = None) -> TaskConclu invocation = ctx.bazel.test( build_events = build_events, execution_log = bazel_trait.execution_log_sinks or False, + output = output_processor, announce_version = announce_version, announce_command = announce_command, *targets @@ -393,6 +414,7 @@ def run_bazel_task(ctx: TaskContext, command: str, targets = None) -> TaskConclu invocation = ctx.bazel.build( build_events = build_events, execution_log = bazel_trait.execution_log_sinks or False, + output = output_processor, announce_version = announce_version, announce_command = announce_command, *targets diff --git a/crates/axl-runtime/Cargo.toml b/crates/axl-runtime/Cargo.toml index 40ff5c090..509e58ba3 100644 --- a/crates/axl-runtime/Cargo.toml +++ b/crates/axl-runtime/Cargo.toml @@ -36,7 +36,8 @@ http-body-util = "0.1.3" url = "2.5.4" zstd = "0.13.3" -nix = { version = "0.30.1", features = ["fs", "signal"] } +nix = { version = "0.30.1", features = ["fs", "signal", "term"] } +libc = "0.2" wasmi = "0.51.0" wasmi_wasi = "0.51.0" diff --git a/crates/axl-runtime/src/engine/bazel/build.rs b/crates/axl-runtime/src/engine/bazel/build.rs index 6529accc1..d5f27c8b4 100644 --- a/crates/axl-runtime/src/engine/bazel/build.rs +++ b/crates/axl-runtime/src/engine/bazel/build.rs @@ -579,6 +579,64 @@ pub(crate) fn build_event_iter_methods(registry: &mut MethodsBuilder) { } } +/// How the captured stderr fd is allocated for a build. +/// +/// Pipe mode is the plain-`Stdio::piped()` substrate used in non-TTY contexts +/// (CI, redirected output): Bazel emits clean newline-terminated lines. +/// Pty mode allocates a pseudo-terminal so Bazel keeps its live curses UI; the +/// runtime forwards the master bytes near-verbatim. The mode is decided when +/// the processor is constructed, from whether the parent's stderr is a TTY +/// (see `bazel.output.processor`). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CaptureMode { + Pipe, + Pty, +} + +/// Starlark handle passed as `ctx.bazel.build(output = ...)` to enable stderr +/// capture + forwarding. Created via `bazel.output.processor(...)`, single-use +/// per build (mirrors `BuildEventIter`). +/// +/// Phase 1 carries only the capture mode; it is the seam where the deferred +/// dedup config and pattern matchers will be added. `Build::spawn` reads the +/// mode to decide how to allocate the child's stderr and starts an +/// `OutputStream` over the read end. +#[derive(Clone, Debug, ProvidesStaticType, Display, Trace, NoSerialize, Allocative)] +#[display("")] +pub struct OutputProcessor { + #[allocative(skip)] + mode: CaptureMode, +} + +impl OutputProcessor { + pub fn new(mode: CaptureMode) -> Self { + Self { mode } + } + + pub fn mode(&self) -> CaptureMode { + self.mode + } +} + +impl<'v> AllocValue<'v> for OutputProcessor { + fn alloc_value(self, heap: Heap<'v>) -> Value<'v> { + heap.alloc_complex_no_freeze(self) + } +} + +impl<'v> UnpackValue<'v> for OutputProcessor { + type Error = anyhow::Error; + fn unpack_value_impl(value: Value<'v>) -> Result, Self::Error> { + let v = value + .downcast_ref_err::() + .into_anyhow_result()?; + Ok(Some(v.clone())) + } +} + +#[starlark_value(type = "bazel.output.OutputProcessor")] +impl<'v> values::StarlarkValue<'v> for OutputProcessor {} + fn matches_kinds(event: &BuildEvent, kinds: Option<&Arc>>) -> bool { let Some(kinds) = kinds else { return true; @@ -716,6 +774,13 @@ pub struct Build { #[allocative(skip)] execlog_stream: RefCell>, + /// Captured-stderr forwarder, present only when the build was spawned with + /// `output = bazel.output.processor(...)`. Joined in `wait()` after the + /// child is reaped so all forwarded stderr is flushed before the task + /// prints its terminal summary. + #[allocative(skip)] + output_stream: RefCell>, + /// Shared UUID every gRPC sink indexes this invocation under. Minted /// before bazel emits `build_started` so forwarders can start /// immediately; distinct from Bazel's `build_started.uuid`. @@ -755,6 +820,7 @@ impl Build { startup_flags: Vec, stdout: Stdio, stderr: Stdio, + output: Option, current_dir: Option, announce: AnnounceSpawn, rt: AsyncRuntime, @@ -856,13 +922,43 @@ impl Build { announce_spawn(announce, version.as_ref(), &cmd); cmd.stdout(stdout); - cmd.stderr(stderr); + // When capturing, the child's stderr goes to a runtime-owned pipe/PTY + // instead of the resolved `stderr` Stdio; the `OutputStream` started + // after spawn reads, processes, and forwards it to the real stderr. + let mut capture = match &output { + Some(p) => Some(super::capture::Capture::open(p.mode())?), + None => None, + }; + match &mut capture { + Some(c) => { + cmd.stderr(std::mem::replace(&mut c.child_stderr, Stdio::null())); + } + None => { + cmd.stderr(stderr); + } + } cmd.stdin(Stdio::null()); let child = cmd .spawn() .map_err(|e| io::Error::other(format!("failed to spawn bazel: {e}")))?; + // Start forwarding captured stderr now that the child holds the write + // end. Drop the parent's PTY-slave copy first (release_after_spawn) so + // the master read can observe EOF when the child exits — otherwise the + // forwarder thread would hang forever in `wait()`. + let output_stream = match capture { + Some(mut c) => { + c.release_after_spawn(); + Some(super::stream::OutputStream::spawn( + c.reader, + Box::new(std::io::stderr()), + vec![], + )) + } + None => None, + }; + // Register the bazel client with the live-subprocess registry so // aspect-cli's OS-signal handler can forward SIGINT to it on // CI cancellation. The guard is stored on `Self` and unregisters @@ -952,6 +1048,7 @@ impl Build { build_event_stream: RefCell::new(build_event_stream), workspace_event_stream: RefCell::new(workspace_event_stream), execlog_stream: RefCell::new(execlog_stream), + output_stream: RefCell::new(output_stream), sink_invocation_id: RefCell::new(sink_invocation_id), live_guard: RefCell::new(Some(live_guard)), span: RefCell::new(span), @@ -1087,6 +1184,18 @@ pub(crate) fn build_methods(registry: &mut MethodsBuilder) { } }; + // Drain the captured-stderr forwarder. The child has exited (reaped + // above), so its stderr write end is closed and the reader reaches + // EOF; joining here guarantees all forwarded stderr is flushed before + // the caller (`_emit_terminal`) prints the task's terminal summary. + let output_stream = build.output_stream.take(); + if let Some(mut output_stream) = output_stream { + match output_stream.join() { + Ok(_) => {} + Err(err) => anyhow::bail!("output stream thread error: {}", err), + } + }; + // Drop the span to end the trace drop(build.span.replace(tracing::Span::none())); diff --git a/crates/axl-runtime/src/engine/bazel/capture.rs b/crates/axl-runtime/src/engine/bazel/capture.rs new file mode 100644 index 000000000..fa78880ad --- /dev/null +++ b/crates/axl-runtime/src/engine/bazel/capture.rs @@ -0,0 +1,237 @@ +//! Allocate the captured-stderr fd for a Bazel invocation. +//! +//! When a build is spawned with `output = bazel.output.processor(...)`, the +//! child's stderr must go to a fd the runtime controls instead of being +//! inherited, so [`super::stream::OutputStream`] can read, process, and forward +//! it. Two substrates: +//! +//! - [`CaptureMode::Pipe`] — a plain anonymous pipe (`Stdio::piped()` shape). +//! Used in non-TTY contexts; Bazel emits clean newline-terminated lines. +//! - [`CaptureMode::Pty`] — a pseudo-terminal. The slave becomes the child's +//! stderr; we read the master. Bazel keeps its live curses UI. Unix-only; +//! callers fall back to `Pipe` on platforms without `openpty`. +//! +//! # The slave-drop discipline (PTY) +//! +//! The parent MUST drop its copy of the PTY slave fd right after spawning the +//! child, or the master read never sees EOF and the forwarder thread hangs +//! forever. [`Capture`] holds the slave in `parent_slave` precisely so the +//! caller can drop it (via [`Capture::release_after_spawn`]) at the right +//! moment — after `cmd.spawn()` has dup'd it into the child. + +use std::io::Read; +use std::process::Stdio; + +use super::build::CaptureMode; + +/// The fds produced for one captured invocation: the `Stdio` handed to the +/// child's stderr, the read end the forwarder consumes, and (PTY only) the +/// parent's slave fd that must be dropped after spawn. +pub struct Capture { + /// Handed to `cmd.stderr(...)`. + pub child_stderr: Stdio, + /// The read end the `OutputStream` reads from. + pub reader: Box, + /// PTY slave fd retained by the parent; dropped after spawn so the master + /// can observe EOF. `None` for the pipe path. + parent_slave: Option, +} + +impl Capture { + /// Drop the parent's retained slave fd (no-op for the pipe path). Call + /// immediately after `cmd.spawn()`. + pub fn release_after_spawn(&mut self) { + self.parent_slave = None; + } + + /// Allocate the capture fds for `mode`. Falls back to a pipe if PTY + /// allocation isn't available on this platform or fails. + pub fn open(mode: CaptureMode) -> std::io::Result { + match mode { + CaptureMode::Pipe => open_pipe(), + CaptureMode::Pty => match open_pty() { + Ok(c) => Ok(c), + Err(e) => { + crate::trace!("PTY capture unavailable ({e}); falling back to pipe"); + open_pipe() + } + }, + } + } +} + +/// Anonymous pipe: child writes the write end (its stderr), we read the read +/// end. Implemented with `os_pipe` semantics via the standard library. +fn open_pipe() -> std::io::Result { + let (reader, writer) = os_pipe()?; + Ok(Capture { + child_stderr: Stdio::from(writer), + reader: Box::new(reader), + parent_slave: None, + }) +} + +#[cfg(unix)] +fn os_pipe() -> std::io::Result<(std::fs::File, std::fs::File)> { + use std::os::fd::FromRawFd; + let mut fds = [0_i32; 2]; + // SAFETY: `pipe(2)` writes two valid fds into `fds` on success; we wrap + // each in an owning `File` exactly once. + let rc = unsafe { libc::pipe(fds.as_mut_ptr()) }; + if rc != 0 { + return Err(std::io::Error::last_os_error()); + } + let reader = unsafe { std::fs::File::from_raw_fd(fds[0]) }; + let writer = unsafe { std::fs::File::from_raw_fd(fds[1]) }; + Ok((reader, writer)) +} + +#[cfg(not(unix))] +fn os_pipe() -> std::io::Result<(std::fs::File, std::fs::File)> { + Err(std::io::Error::other( + "output capture is only supported on Unix", + )) +} + +/// Allocate a PTY, returning the master as the reader and the slave as the +/// child's stderr. The slave is dup'd: one copy goes to the child (via the +/// `Stdio`), one is retained in `parent_slave` to be dropped after spawn. +#[cfg(unix)] +fn open_pty() -> std::io::Result { + use nix::pty::openpty; + use std::os::fd::{FromRawFd, IntoRawFd, OwnedFd}; + + // Seed the slave winsize from the real terminal so Bazel wraps at the + // right width; best-effort (ignored if stderr isn't a terminal). + let winsize = current_winsize(); + let pty = openpty(winsize.as_ref(), None).map_err(std::io::Error::from)?; + let master: OwnedFd = pty.master; + let slave: OwnedFd = pty.slave; + + // CLOEXEC on the master so the child doesn't inherit it. + set_cloexec(&master)?; + + // Two owning handles to the slave: one becomes the child's stderr Stdio, + // one is retained so the parent can hold the slave open until just after + // spawn (then drop it so the master observes EOF on child exit). + let slave_for_child = slave.try_clone()?; + let child_stderr = unsafe { Stdio::from_raw_fd(slave_for_child.into_raw_fd()) }; + let parent_slave = unsafe { std::fs::File::from_raw_fd(slave.into_raw_fd()) }; + let reader = unsafe { std::fs::File::from_raw_fd(master.into_raw_fd()) }; + + Ok(Capture { + child_stderr, + reader: Box::new(reader), + parent_slave: Some(parent_slave), + }) +} + +#[cfg(not(unix))] +fn open_pty() -> std::io::Result { + Err(std::io::Error::other("PTY capture is only supported on Unix")) +} + +#[cfg(unix)] +fn set_cloexec(fd: &F) -> std::io::Result<()> { + use nix::fcntl::{FcntlArg, FdFlag, fcntl}; + let raw = fcntl(fd, FcntlArg::F_GETFD).map_err(std::io::Error::from)?; + let mut flags = FdFlag::from_bits_truncate(raw); + flags.insert(FdFlag::FD_CLOEXEC); + fcntl(fd, FcntlArg::F_SETFD(flags)).map_err(std::io::Error::from)?; + Ok(()) +} + +/// The current terminal's window size, read from the real stderr. `None` when +/// stderr isn't a terminal (the PTY then uses the kernel default). +#[cfg(unix)] +fn current_winsize() -> Option { + use std::io::IsTerminal; + use std::os::fd::AsRawFd; + let stderr = std::io::stderr(); + if !stderr.is_terminal() { + return None; + } + let mut ws: libc::winsize = unsafe { std::mem::zeroed() }; + // SAFETY: TIOCGWINSZ writes a `winsize` through the pointer; we pass a + // valid stack slot and check the return code. + let rc = unsafe { libc::ioctl(stderr.as_raw_fd(), libc::TIOCGWINSZ, &mut ws) }; + if rc != 0 { + return None; + } + Some(nix::pty::Winsize { + ws_row: ws.ws_row, + ws_col: ws.ws_col, + ws_xpixel: ws.ws_xpixel, + ws_ypixel: ws.ws_ypixel, + }) +} + +#[cfg(all(test, unix))] +mod tests { + use super::*; + use crate::engine::bazel::stream::OutputStream; + use std::io::Write; + use std::process::Command; + use std::sync::{Arc, Mutex}; + + #[derive(Clone)] + struct SharedSink(Arc>>); + + impl Write for SharedSink { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.lock().unwrap().extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + /// End-to-end through the real capture machinery + `OutputStream`: open a + /// capture, spawn a child whose stderr is the captured fd, forward to a + /// sink, and assert the forwarder drains everything and terminates (no + /// hang). This exercises the slave-drop / EOF / EIO discipline that the + /// `Cursor`-based `OutputStream` unit tests can't. + fn round_trip(mode: CaptureMode) { + let mut capture = Capture::open(mode).expect("open capture"); + let child_stderr = std::mem::replace(&mut capture.child_stderr, Stdio::null()); + + let mut child = Command::new("sh") + .arg("-c") + .arg("printf 'err line 1\\nerr line 2\\n' 1>&2") + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(child_stderr) + .spawn() + .expect("spawn child"); + + // The #1 PTY mistake: the parent must drop its slave copy so the + // master read observes EOF. (No-op for the pipe path.) + capture.release_after_spawn(); + + let sink = Arc::new(Mutex::new(Vec::new())); + let mut stream = OutputStream::spawn( + capture.reader, + Box::new(SharedSink(sink.clone())), + vec![], + ); + + child.wait().expect("child wait"); + // join() must return — if the slave/EOF handling were wrong this hangs. + stream.join().expect("stream join"); + + let out = String::from_utf8_lossy(&sink.lock().unwrap()).into_owned(); + assert!(out.contains("err line 1"), "missing line 1 in {out:?}"); + assert!(out.contains("err line 2"), "missing line 2 in {out:?}"); + } + + #[test] + fn pipe_round_trip() { + round_trip(CaptureMode::Pipe); + } + + #[test] + fn pty_round_trip() { + round_trip(CaptureMode::Pty); + } +} diff --git a/crates/axl-runtime/src/engine/bazel/mod.rs b/crates/axl-runtime/src/engine/bazel/mod.rs index d47a0ec26..c13040317 100644 --- a/crates/axl-runtime/src/engine/bazel/mod.rs +++ b/crates/axl-runtime/src/engine/bazel/mod.rs @@ -35,6 +35,7 @@ use axl_types::stream::Writable; mod build; mod cancel; +mod capture; mod health_check; mod info; mod iter; @@ -378,6 +379,11 @@ pub(crate) fn bazel_methods(registry: &mut MethodsBuilder) { /// * `stdio` - Shorthand: set both `stdout` and `stderr` from a single /// `Stdio` bundle (typically `ctx.std.io`). Cannot be combined with /// `stdout`/`stderr`. + /// * `output` - An `OutputProcessor` from `bazel.output.processor(...)`. + /// When set, the child's stderr is captured by the runtime, run through + /// the output processing pipeline, and forwarded to the real stderr + /// (overriding the resolved `stderr` slot). Omit (the default) to leave + /// stderr handling to `stderr`/`stdio`/inherit. /// * `current_dir` - Working directory for the Bazel invocation. /// * `announce_version` - Print an `INFO: Bazel ` line before /// spawning. Resolved from the `--announce-bazel-version` task flag. @@ -432,6 +438,7 @@ pub(crate) fn bazel_methods(registry: &mut MethodsBuilder) { #[starlark(require = named)] stdout: Option>, #[starlark(require = named)] stderr: Option>, #[starlark(require = named, default = NoneOr::None)] stdio: NoneOr, + #[starlark(require = named, default = NoneOr::None)] output: NoneOr, #[starlark(require = named, default = NoneOr::None)] current_dir: NoneOr, #[starlark(require = named, default = false)] announce_version: bool, #[starlark(require = named, default = false)] announce_command: bool, @@ -456,6 +463,7 @@ pub(crate) fn bazel_methods(registry: &mut MethodsBuilder) { resolved_startup_flags, stdout, stderr, + output.into_option(), current_dir.into_option(), build::AnnounceSpawn { version: announce_version, @@ -492,6 +500,11 @@ pub(crate) fn bazel_methods(registry: &mut MethodsBuilder) { /// * `stdio` - Shorthand: set both `stdout` and `stderr` from a single /// `Stdio` bundle (typically `ctx.std.io`). Cannot be combined with /// `stdout`/`stderr`. + /// * `output` - An `OutputProcessor` from `bazel.output.processor(...)`. + /// When set, the child's stderr is captured by the runtime, run through + /// the output processing pipeline, and forwarded to the real stderr + /// (overriding the resolved `stderr` slot). Omit (the default) to leave + /// stderr handling to `stderr`/`stdio`/inherit. /// * `current_dir` - Working directory for the Bazel invocation. /// * `announce_version` - Print an `INFO: Bazel ` line before /// spawning. Resolved from the `--announce-bazel-version` task flag. @@ -544,6 +557,7 @@ pub(crate) fn bazel_methods(registry: &mut MethodsBuilder) { #[starlark(require = named)] stdout: Option>, #[starlark(require = named)] stderr: Option>, #[starlark(require = named, default = NoneOr::None)] stdio: NoneOr, + #[starlark(require = named, default = NoneOr::None)] output: NoneOr, #[starlark(require = named, default = NoneOr::None)] current_dir: NoneOr, #[starlark(require = named, default = false)] announce_version: bool, #[starlark(require = named, default = false)] announce_command: bool, @@ -568,6 +582,7 @@ pub(crate) fn bazel_methods(registry: &mut MethodsBuilder) { resolved_startup_flags, stdout, stderr, + output.into_option(), current_dir.into_option(), build::AnnounceSpawn { version: announce_version, @@ -1030,6 +1045,45 @@ fn parse_event_kind<'v>(value: values::Value<'v>) -> anyhow::Result { ) } +#[starlark_module] +fn register_output(globals: &mut GlobalsBuilder) { + /// Create a captured-output processor. Pass it as + /// `ctx.bazel.build(output = bazel.output.processor(...))` to capture the + /// child's stderr, run it through the processing pipeline, and forward it + /// to the real stderr instead of letting Bazel write the terminal directly. + /// + /// # Arguments + /// * `tty` - Whether the destination stderr is an interactive terminal. + /// `True` → allocate a PTY so Bazel keeps its live curses progress UI + /// (bytes forwarded near-verbatim). `False` → a plain pipe, so Bazel + /// emits clean newline-terminated lines (set `--curses=no --isatty=0` on + /// the invocation to match). Defaults to auto-detecting the real stderr. + /// + /// The caller is responsible for setting `--isatty` / `--curses` on the + /// Bazel invocation to match the chosen mode (see `bazel_runner.axl`). + #[starlark(as_type = build::OutputProcessor)] + fn processor( + #[starlark(require = named, default = NoneOr::None)] tty: NoneOr, + ) -> anyhow::Result { + use std::io::IsTerminal; + let is_tty = match tty { + NoneOr::Other(b) => b, + NoneOr::None => std::io::stderr().is_terminal(), + }; + let mode = if is_tty { + build::CaptureMode::Pty + } else { + build::CaptureMode::Pipe + }; + Ok(build::OutputProcessor::new(mode)) + } +} + +#[starlark_module] +fn register_output_types(globals: &mut GlobalsBuilder) { + const OutputProcessor: StarlarkValueAsType = StarlarkValueAsType::new(); +} + #[starlark_module] fn register_execlog_sinks(globals: &mut GlobalsBuilder) { #[starlark(as_type = sink::execlog::ExecLogSink)] @@ -1098,6 +1152,11 @@ pub fn register_globals(globals: &mut GlobalsBuilder) { register_build_events(globals); }); + globals.namespace("output", |globals| { + register_output_types(globals); + register_output(globals); + }); + globals.namespace("execution_log", |globals| { register_execlog_types(globals); register_execlog_sinks(globals); diff --git a/crates/axl-runtime/src/engine/bazel/stream/mod.rs b/crates/axl-runtime/src/engine/bazel/stream/mod.rs index aa3da0ae6..84bdf274f 100644 --- a/crates/axl-runtime/src/engine/bazel/stream/mod.rs +++ b/crates/axl-runtime/src/engine/bazel/stream/mod.rs @@ -1,6 +1,7 @@ pub mod broadcaster; pub mod build_event; pub mod execlog; +pub mod output; pub mod redaction; mod util; pub mod workspace_event; @@ -8,4 +9,5 @@ pub mod workspace_event; pub use broadcaster::Subscriber; pub use build_event::BuildEventStream; pub use execlog::ExecLogStream; +pub use output::OutputStream; pub use workspace_event::WorkspaceEventStream; diff --git a/crates/axl-runtime/src/engine/bazel/stream/output.rs b/crates/axl-runtime/src/engine/bazel/stream/output.rs new file mode 100644 index 000000000..da84df180 --- /dev/null +++ b/crates/axl-runtime/src/engine/bazel/stream/output.rs @@ -0,0 +1,369 @@ +//! Captured-output stream: read the Bazel child's stderr, run each record +//! through an extensible processing pipeline, and forward the surviving bytes +//! to the parent's real stderr. +//! +//! # Why this exists +//! +//! By default the Bazel child inherits the parent's stderr fd +//! (`Stdio::inherit()`), so its output reaches the terminal untouched and we +//! can't pre-process it. When a task opts into output capture, `Build::spawn` +//! hands the child a captured fd (a pipe in non-TTY contexts, a PTY master's +//! slave in interactive ones) and starts an `OutputStream` over the read end. +//! +//! # Design +//! +//! This mirrors [`super::build_event::BuildEventStream`]: a single dedicated +//! reader thread, owned by the `Build`, joined inside `Build::wait`. The +//! deliberate difference from the BES path is that we do **not** route raw +//! bytes through the unbounded [`super::broadcaster::Broadcaster`] — that +//! fan-out clones per subscriber into unbounded buffers, which is a memory +//! hazard for a high-volume byte stream. Instead the reader thread forwards +//! straight to the parent stderr with a blocking `write`, so a slow terminal +//! back-pressures the kernel pipe/PTY buffer and ultimately Bazel itself, +//! bounding memory for free. Any future best-effort side-channel to AXL (match +//! hits, dedup counts) must be bounded + drop-and-count, never the unbounded +//! broadcaster — forwarding to the terminal must never be gated on consumer +//! latency. +//! +//! # Record splitting and snappiness +//! +//! Bazel's curses progress UI rewrites in place with bare `\r` (carriage +//! return, no newline). Splitting only on `\n` would buffer the whole progress +//! region until a real newline arrived, freezing the live UI. So the loop +//! treats both `\r` and `\n` as record boundaries, and flushes the forward +//! writer once per `read()` syscall (not per newline — `LineWriter` would +//! reintroduce the freeze). The kernel already batches bytes into one read, so +//! a per-read flush is cheap and keeps output snappy. +//! +//! # Phase 1 scope +//! +//! The processing pipeline is a [`LineProcessor`] trait list, but phase 1 ships +//! only the pass-through behavior (an empty list). This is the seam where the +//! deferred line-dedup (clean up + count repeats) and pattern matchers +//! (hook-driving + hung-server detection) plug in later, additively, without +//! reshaping the loop. The `last_activity` atomic is maintained now so the +//! deferred stall/hung-detection follow-up has its substrate ready. + +use std::io::ErrorKind; +use std::io::Read; +use std::io::Write; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::thread::{self, JoinHandle}; +use std::time::Instant; + +use thiserror::Error; + +/// Read buffer size per `read()` syscall. One read's worth of bytes is the +/// natural flush boundary (see module docs). +const READ_BUF_SIZE: usize = 8 * 1024; + +/// Hard cap on the in-flight incomplete record. A pathological producer that +/// never emits `\r`/`\n` would otherwise grow `carry` without bound; past this +/// we force-flush the partial bytes so memory stays bounded. +const MAX_CARRY_BYTES: usize = 1024 * 1024; + +#[derive(Error, Debug)] +pub enum OutputStreamError { + #[error("io error: {0}")] + IO(#[from] std::io::Error), +} + +/// A single processing stage applied to each output record before it is +/// forwarded. Returning `None` drops the record from the forwarded stream +/// (e.g. a dedup stage that collapsed a repeat); returning `Some(bytes)` +/// forwards those bytes (which may differ from the input, e.g. a `(×N)` +/// annotation appended on a count flush). +/// +/// Phase 1 ships no implementors — the list is always empty and every record +/// is forwarded verbatim. The trait exists so dedup/matcher stages can be +/// added without touching the reader loop. +/// +/// A `record` is the bytes between two boundaries, *excluding* the boundary +/// byte itself; the boundary (`\r` or `\n`) is forwarded separately so +/// processors operate on logical-line content, not delimiters. +pub trait LineProcessor: Send { + fn process(&mut self, record: &[u8]) -> Option>; +} + +#[derive(Debug)] +pub struct OutputStream { + /// Reader thread handle, in an `Option` so `join()` can `take()` it + /// without consuming `self`. + handle: Option>>, + + /// Millis-since-start of the last successful read. The reader bumps this on + /// every read that returns bytes; the deferred stall watchdog will read it + /// to detect a hung server (no output for N seconds while the child is + /// alive). Maintained now so that follow-up has its substrate ready. + #[allow(dead_code)] + last_activity_ms: Arc, +} + +impl OutputStream { + /// Spawn the reader/forwarder thread. + /// + /// `reader` is the read end of the captured stderr (a pipe `ChildStderr` + /// boxed as `Read + Send`, or a PTY master). `forward` is a `Write + Send` + /// view over the parent's real stderr (see + /// `axl_types::stream::Writable::to_boxed_write`). `processors` is the + /// per-record pipeline (empty in phase 1). + /// + /// The thread runs until the reader reaches EOF — on Unix a pipe returns a + /// 0-length read when the write end closes, while a PTY master returns + /// `EIO` once the child closes the slave; both are treated as clean + /// end-of-stream. The parent must drop its copy of the captured fd / + /// PTY slave after spawning the child, or this read never terminates. + pub fn spawn( + mut reader: Box, + mut forward: Box, + mut processors: Vec>, + ) -> OutputStream { + let last_activity_ms = Arc::new(AtomicU64::new(0)); + let thread_activity = last_activity_ms.clone(); + + let handle = thread::spawn(move || -> Result<(), OutputStreamError> { + let start = Instant::now(); + let mut buf = [0u8; READ_BUF_SIZE]; + let mut carry: Vec = Vec::with_capacity(READ_BUF_SIZE); + + loop { + let n = match reader.read(&mut buf) { + Ok(0) => break, // clean EOF (pipe write end closed) + Ok(n) => n, + // PTY master read after the child closes the slave returns + // EIO on Linux — that is end-of-stream, not an error. + Err(e) if is_pty_eof(&e) => break, + Err(e) if e.kind() == ErrorKind::Interrupted => continue, + Err(e) => { + // Flush whatever we have, then surface the error. + let _ = flush_carry(&mut carry, &mut processors, &mut forward); + let _ = forward.flush(); + return Err(OutputStreamError::IO(e)); + } + }; + + thread_activity.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); + + carry.extend_from_slice(&buf[..n]); + process_carry(&mut carry, &mut processors, &mut forward)?; + + // Bound the incomplete-record buffer: a producer that never + // emits a boundary byte must not grow `carry` unboundedly. + if carry.len() > MAX_CARRY_BYTES { + forward.write_all(&carry)?; + carry.clear(); + } + + // Flush once per read so the `\r`-driven progress UI stays live. + forward.flush()?; + } + + // Drain the trailing partial record (output with no final newline). + flush_carry(&mut carry, &mut processors, &mut forward)?; + forward.flush()?; + Ok(()) + }); + + OutputStream { + handle: Some(handle), + last_activity_ms, + } + } + + /// Wait for the reader thread to finish (after the child's captured fd has + /// reached EOF). Called from `Build::wait` after `child.wait()`. + pub fn join(&mut self) -> Result<(), OutputStreamError> { + if let Some(handle) = self.handle.take() { + match handle.join() { + Ok(result) => result, + Err(_) => Err(OutputStreamError::IO(std::io::Error::other( + "output stream thread panicked", + ))), + } + } else { + Ok(()) + } + } + + /// Millis since stream start of the last read that returned bytes (0 if no + /// bytes have arrived yet). Substrate for the deferred stall watchdog. + #[allow(dead_code)] + pub fn last_activity_ms(&self) -> u64 { + self.last_activity_ms.load(Ordering::Relaxed) + } +} + +/// Split complete records (terminated by `\r` or `\n`) out of `carry`, run each +/// through the pipeline, and forward the survivors plus their boundary byte. +/// Leaves the trailing incomplete record in `carry`. +fn process_carry( + carry: &mut Vec, + processors: &mut [Box], + forward: &mut Box, +) -> Result<(), OutputStreamError> { + let mut start = 0; + let mut i = 0; + while i < carry.len() { + let b = carry[i]; + if b == b'\n' || b == b'\r' { + let record = &carry[start..i]; + emit_record(record, processors, forward)?; + forward.write_all(&[b])?; + i += 1; + start = i; + } else { + i += 1; + } + } + if start > 0 { + carry.drain(..start); + } + Ok(()) +} + +/// Forward the trailing incomplete record (no boundary byte) through the +/// pipeline. Used at EOF and on the carry-cap force-flush. +fn flush_carry( + carry: &mut Vec, + processors: &mut [Box], + forward: &mut Box, +) -> Result<(), OutputStreamError> { + if !carry.is_empty() { + emit_record(carry, processors, forward)?; + carry.clear(); + } + Ok(()) +} + +/// Run one record through the processor chain and forward the result. With an +/// empty chain (phase 1) this forwards the record verbatim. +fn emit_record( + record: &[u8], + processors: &mut [Box], + forward: &mut Box, +) -> Result<(), OutputStreamError> { + if processors.is_empty() { + forward.write_all(record)?; + return Ok(()); + } + let mut current = record.to_vec(); + for p in processors.iter_mut() { + match p.process(¤t) { + Some(out) => current = out, + None => return Ok(()), // dropped by a stage (e.g. dedup collapse) + } + } + forward.write_all(¤t)?; + Ok(()) +} + +/// Whether an error reading a captured fd means end-of-stream rather than a +/// real failure. A PTY master read after the child closes the slave returns +/// `EIO` (raw os error 5) on Linux; macOS typically returns a 0-length read +/// instead, handled by the `Ok(0)` arm. +fn is_pty_eof(e: &std::io::Error) -> bool { + e.raw_os_error() == Some(5) // EIO +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex; + + /// A `Write` sink that records everything written, shared with the test + /// thread via `Arc>`. + #[derive(Clone)] + struct SharedSink(Arc>>); + + impl Write for SharedSink { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.lock().unwrap().extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + fn run(input: &[u8]) -> Vec { + let sink = Arc::new(Mutex::new(Vec::new())); + let reader = std::io::Cursor::new(input.to_vec()); + let mut stream = + OutputStream::spawn(Box::new(reader), Box::new(SharedSink(sink.clone())), vec![]); + stream.join().unwrap(); + let out = sink.lock().unwrap().clone(); + out + } + + #[test] + fn passthrough_newline_delimited() { + let input = b"line one\nline two\nline three\n"; + assert_eq!(run(input), input); + } + + #[test] + fn passthrough_preserves_carriage_returns() { + // The curses-style progress stream: bare \r, no newline. + let input = b"[1 / 9] Building\r[2 / 9] Building\r[3 / 9] Building\r"; + assert_eq!(run(input), input); + } + + #[test] + fn flushes_trailing_partial_record() { + // No trailing newline — must still be forwarded. + let input = b"no trailing newline"; + assert_eq!(run(input), input); + } + + #[test] + fn mixed_cr_lf() { + let input = b"progress\rprogress\rdone\n"; + assert_eq!(run(input), input); + } + + #[test] + fn empty_input_eof() { + assert_eq!(run(b""), b""); + } + + #[test] + fn carry_cap_force_flushes_giant_line() { + // A record larger than MAX_CARRY_BYTES with no boundary must still be + // forwarded in full (in chunks), never dropped or grown unbounded. + let big = vec![b'x'; MAX_CARRY_BYTES + 4096]; + assert_eq!(run(&big), big); + } + + /// A processor that drops records equal to the previous one and annotates + /// a count — exercises the pipeline seam the deferred dedup will use. + struct ConsecutiveDedup { + prev: Option>, + } + + impl LineProcessor for ConsecutiveDedup { + fn process(&mut self, record: &[u8]) -> Option> { + if self.prev.as_deref() == Some(record) { + return None; + } + self.prev = Some(record.to_vec()); + Some(record.to_vec()) + } + } + + #[test] + fn processor_can_drop_records() { + let sink = Arc::new(Mutex::new(Vec::new())); + let reader = std::io::Cursor::new(b"a\na\nb\n".to_vec()); + let mut stream = OutputStream::spawn( + Box::new(reader), + Box::new(SharedSink(sink.clone())), + vec![Box::new(ConsecutiveDedup { prev: None })], + ); + stream.join().unwrap(); + // Second "a" record dropped; its newline boundary is still forwarded + // (boundaries are forwarded independently of record content). + let out = sink.lock().unwrap().clone(); + assert_eq!(out, b"a\n\nb\n"); + } +} diff --git a/crates/axl-types/src/stream.rs b/crates/axl-types/src/stream.rs index 4cfe7b705..2daf1e68a 100644 --- a/crates/axl-types/src/stream.rs +++ b/crates/axl-types/src/stream.rs @@ -176,6 +176,91 @@ impl Readable { } } +// --- Writer wrapper for to_boxed_write --- + +/// A `Write + Send` view over a `Writable`, used by background threads (e.g. +/// the captured-output forwarder in `axl-runtime`) that need to write to the +/// parent's real stdout/stderr off the Starlark eval thread. +/// +/// Each `write`/`flush` re-acquires the lock and re-checks the `Option`, so a +/// concurrent `close()` from Starlark turns subsequent writes into a +/// `BrokenPipe` error rather than a panic. Writing to a closed handle is a +/// soft failure: the forwarder is expected to stop, not crash. +pub struct WritableWriter(Writable); + +impl Write for WritableWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let closed = || std::io::Error::new(std::io::ErrorKind::BrokenPipe, "stream is closed"); + match &self.0 { + Writable::ChildStdin(arc) => { + let guard = arc.lock().unwrap(); + let mut borrowed = guard.borrow_mut(); + borrowed.as_mut().ok_or_else(closed)?.write(buf) + } + Writable::Stdout(arc) => { + let guard = arc.lock().unwrap(); + let mut borrowed = guard.borrow_mut(); + borrowed.as_mut().ok_or_else(closed)?.lock().write(buf) + } + Writable::Stderr(arc) => { + let guard = arc.lock().unwrap(); + let mut borrowed = guard.borrow_mut(); + borrowed.as_mut().ok_or_else(closed)?.lock().write(buf) + } + Writable::File(arc) => { + let mut guard = arc.lock().unwrap(); + guard.as_mut().ok_or_else(closed)?.write(buf) + } + } + } + + fn flush(&mut self) -> std::io::Result<()> { + match &self.0 { + Writable::ChildStdin(arc) => { + let guard = arc.lock().unwrap(); + let mut borrowed = guard.borrow_mut(); + if let Some(inner) = borrowed.as_mut() { + inner.flush()?; + } + } + Writable::Stdout(arc) => { + let guard = arc.lock().unwrap(); + let mut borrowed = guard.borrow_mut(); + if let Some(inner) = borrowed.as_mut() { + inner.lock().flush()?; + } + } + Writable::Stderr(arc) => { + let guard = arc.lock().unwrap(); + let mut borrowed = guard.borrow_mut(); + if let Some(inner) = borrowed.as_mut() { + inner.lock().flush()?; + } + } + Writable::File(arc) => { + if let Some(inner) = arc.lock().unwrap().as_mut() { + inner.flush()?; + } + } + } + Ok(()) + } +} + +// The Arc> interiors are Send; the RefCell is only ever touched +// under the Mutex, matching the existing `ArcMutex*Reader` Send asserts above. +unsafe impl Send for WritableWriter {} + +impl Writable { + /// Produce a boxed `Write + Send` from this `Writable`, for use by + /// background threads that forward captured child output to the parent's + /// real stdout/stderr. Shares the underlying handle (Arc-backed), so a + /// Starlark-side `close()` is observed as a `BrokenPipe` on the next write. + pub fn to_boxed_write(&self) -> Box { + Box::new(WritableWriter(self.dupe())) + } +} + #[starlark_value(type = "std.io.Readable")] impl<'v> values::StarlarkValue<'v> for Readable { fn get_methods() -> Option<&'static Methods> {