diff --git a/examples/local_video/README.md b/examples/local_video/README.md index 86feee7c7..7f421ee63 100644 --- a/examples/local_video/README.md +++ b/examples/local_video/README.md @@ -8,7 +8,7 @@ For smoother local rendering, especially above 720p, run the publisher/subscribe - list_devices: enumerate available cameras and their capabilities - publisher: capture from a selected camera and publish a video track -- subscriber: connect to a room, subscribe to video tracks, and display in a window +- subscriber: connect to a room, subscribe to video tracks, and display each track in its own window LiveKit connection can be provided via flags or environment variables: - `--url` or `LIVEKIT_URL` @@ -137,6 +137,8 @@ Subscriber flags (in addition to the common connection flags above): - `--e2ee-key `: Enable end-to-end decryption with the given shared key. Must match the key used by the publisher. Notes: -- If the active video track is unsubscribed or unpublished, the app clears its state and will automatically attach to the next matching video track when it appears. +- The subscriber opens a separate window for every video track it is subscribed to. A small status panel in the main window shows the room, identity, filter, and the list of currently-displayed tracks. +- Closing a track's window unsubscribes from that publication. The window reappears automatically if the publisher republishes the track (or publishes a new one) and it still matches the optional `--participant` filter. +- If the active video track is unsubscribed or unpublished, its window is closed automatically. - For E2EE to work, both publisher and subscriber must specify the same `--e2ee-key` value. If the keys don't match, the subscriber will not be able to decode the video. - The timestamp overlay updates at ~2 Hz so the latency value is readable rather than flickering every frame. diff --git a/examples/local_video/src/subscriber.rs b/examples/local_video/src/subscriber.rs index f7ebe6d99..abb1529c2 100644 --- a/examples/local_video/src/subscriber.rs +++ b/examples/local_video/src/subscriber.rs @@ -464,6 +464,56 @@ impl Default for SimulcastState { } } +/// Read-only descriptor of a subscribed video track. +#[derive(Clone)] +struct TrackInfo { + sid: TrackSid, + track_name: String, + participant_identity: String, + viewport_id: egui::ViewportId, +} + +/// Per-track UI-only state that lives on the main thread. +struct TrackUiState { + viewport: AspectConstrainedViewport, + latency_display: LatencyDisplay, + /// Set to `true` when the user clicks the close button on the track's window; + /// the main `update` removes the entry and unsubscribes on the next pass. + user_closed: bool, +} + +impl TrackUiState { + fn new() -> Self { + Self { + viewport: AspectConstrainedViewport::new(None), + latency_display: LatencyDisplay::default(), + user_closed: false, + } + } +} + +/// All state needed to display, drive, and control a single subscribed video track. +#[derive(Clone)] +struct TrackEntry { + info: Arc, + shared: Arc>, + simulcast: Arc>, + ui: Arc>, + publication: RemoteTrackPublication, +} + +type TracksMap = HashMap; + +/// Deterministic [`egui::ViewportId`] derived from a track SID so windows survive repaints. +fn viewport_id_for_track(sid: &TrackSid) -> egui::ViewportId { + egui::ViewportId::from_hash_of(("livekit-track-window", sid.as_str())) +} + +/// Stable [`egui::Id`] for a per-track overlay area. +fn track_overlay_id(sid: &TrackSid, suffix: &str) -> egui::Id { + egui::Id::new((suffix, sid.as_str())) +} + fn codec_label(mime: &str) -> String { let base = mime.split(';').next().unwrap_or(mime).trim(); let last = base.rsplit('/').next().unwrap_or(base).trim(); @@ -705,13 +755,10 @@ async fn handle_track_subscribed( publication: RemoteTrackPublication, participant: RemoteParticipant, allowed_identity: &Option, - shared: &Arc>, - active_sid: &Arc>>, + tracks: &Arc>, ctrl_c_received: &Arc, - simulcast: &Arc>, repaint_ctx: &Arc>>, ) { - // If a participant filter is set, skip others if let Some(ref allow) = allowed_identity { if participant.identity().as_str() != allow { debug!("Skipping track from '{}' (filter set to '{}')", participant.identity(), allow); @@ -725,60 +772,78 @@ async fn handle_track_subscribed( let sid = publication.sid().clone(); let codec = codec_label(&publication.mime_type()); - // Only handle if we don't already have an active video track + let simulcasted = publication.simulcasted(); + let livekit::track::TrackDimension(dim_w, dim_h) = publication.dimension(); + + let info = Arc::new(TrackInfo { + sid: sid.clone(), + track_name: publication.name(), + participant_identity: participant.identity().0.clone(), + viewport_id: viewport_id_for_track(&sid), + }); + + let shared = Arc::new(Mutex::new(SharedYuv { + width: 0, + height: 0, + frame: None, + codec, + fps: 0.0, + dirty: false, + received_at_us: None, + frame_metadata: None, + gpu_done: None, + })); + + let simulcast = Arc::new(Mutex::new(SimulcastState { + available: simulcasted, + publication: Some(publication.clone()), + requested_quality: None, + active_quality: None, + full_dims: Some((dim_w, dim_h)), + })); + + let ui = Arc::new(Mutex::new(TrackUiState::new())); + + let entry = TrackEntry { + info: info.clone(), + shared: shared.clone(), + simulcast: simulcast.clone(), + ui, + publication: publication.clone(), + }; + { - let mut active = active_sid.lock(); - if active.as_ref() == Some(&sid) { - debug!("Track {} already active, ignoring duplicate subscribe", sid); - return; - } - if active.is_some() { - debug!( - "A video track is already active ({}), ignoring new subscribe {}", - active.as_ref().unwrap(), - sid - ); + let mut map = tracks.lock(); + if map.contains_key(&sid) { + debug!("Track {} already tracked, ignoring duplicate subscribe", sid); return; } - *active = Some(sid.clone()); + map.insert(sid.clone(), entry); } - // Update HUD codec label and feature flags early (before first frame arrives) - { - let mut s = shared.lock(); - s.codec = codec; + if let Some(ctx) = repaint_ctx.lock().as_ref() { + ctx.request_repaint(); } info!( "Subscribed to video track: {} (sid {}) from {} - codec: {}, simulcast: {}, dimension: {}x{}, packet_trailer_features: {:?}", - publication.name(), + info.track_name, publication.sid(), participant.identity(), publication.mime_type(), - publication.simulcasted(), - publication.dimension().0, - publication.dimension().1, + simulcasted, + dim_w, + dim_h, publication.packet_trailer_features(), ); let rtc_track = video_track.rtc_track(); - - // Start background sink task immediately so stats lookup cannot delay first-frame handling. - let shared2 = shared.clone(); - let active_sid2 = active_sid.clone(); - let my_sid = sid.clone(); + let shared_sink = shared.clone(); + let tracks_sink = tracks.clone(); + let my_sid_sink = sid.clone(); + let viewport_id = info.viewport_id; let ctrl_c_sink = ctrl_c_received.clone(); let repaint_ctx_sink = repaint_ctx.clone(); - // Initialize simulcast state for this publication - { - let mut sc = simulcast.lock(); - sc.available = publication.simulcasted(); - let dim = publication.dimension(); - sc.full_dims = Some((dim.0, dim.1)); - sc.requested_quality = None; - sc.active_quality = None; - sc.publication = Some(publication.clone()); - } tokio::spawn(async move { let mut sink = NativeVideoStream::new(rtc_track); let mut frames: u64 = 0; @@ -791,6 +856,9 @@ async fn handle_track_subscribed( if ctrl_c_sink.load(Ordering::Acquire) { break; } + if !tracks_sink.lock().contains_key(&my_sid_sink) { + break; + } let next = tokio::select! { _ = wait_for_shutdown(ctrl_c_sink.clone()) => None, frame = sink.next() => frame, @@ -801,13 +869,18 @@ async fn handle_track_subscribed( let h = frame.buffer.height(); if !logged_first { - debug!("First frame: {}x{}, type {:?}", w, h, frame.buffer.buffer_type()); + debug!( + "First frame on {}: {}x{}, type {:?}", + my_sid_sink, + w, + h, + frame.buffer.buffer_type() + ); logged_first = true; } - let mut s = shared2.lock(); + let mut s = shared_sink.lock(); - // Update smoothed FPS (~500ms window) fps_window_frames += 1; let win_elapsed = fps_window_start.elapsed(); if win_elapsed >= Duration::from_millis(500) { @@ -815,7 +888,6 @@ async fn handle_track_subscribed( fps_smoothed = if fps_smoothed <= 0.0 { inst_fps } else { - // light EMA smoothing to reduce jitter (fps_smoothed * 0.7) + (inst_fps * 0.3) }; s.fps = fps_smoothed; @@ -832,28 +904,23 @@ async fn handle_track_subscribed( drop(s); if let Some(ctx) = repaint_ctx_sink.lock().as_ref() { - ctx.request_repaint(); + ctx.request_repaint_of(viewport_id); } frames += 1; let elapsed = last_log.elapsed(); if elapsed >= Duration::from_secs(2) { let fps = frames as f64 / elapsed.as_secs_f64(); - info!("Receiving video: {}x{}, ~{:.1} fps", w, h, fps); + info!("Receiving video on {}: {}x{}, ~{:.1} fps", my_sid_sink, w, h, fps); frames = 0; last_log = Instant::now(); } } - info!("Video stream ended for {}", my_sid); - // Clear active sid if still ours - let mut active = active_sid2.lock(); - if active.as_ref() == Some(&my_sid) { - *active = None; - } + info!("Video stream ended for {}", my_sid_sink); }); let ctrl_c_stats = ctrl_c_received.clone(); - let active_sid_stats = active_sid.clone(); + let tracks_stats = tracks.clone(); let my_sid_stats = sid.clone(); let simulcast_stats = simulcast.clone(); tokio::spawn(async move { @@ -868,7 +935,7 @@ async fn handle_track_subscribed( if ctrl_c_stats.load(Ordering::Acquire) { break; } - if active_sid_stats.lock().as_ref() != Some(&my_sid_stats) { + if !tracks_stats.lock().contains_key(&my_sid_stats) { break; } @@ -885,7 +952,7 @@ async fn handle_track_subscribed( update_simulcast_quality_from_stats(&stats, &simulcast_stats); } Err(e) if !logged_initial => { - debug!("Failed to get stats for video track: {:?}", e); + debug!("Failed to get stats for video track {}: {:?}", my_sid_stats, e); logged_initial = true; } Err(_) => {} @@ -896,21 +963,6 @@ async fn handle_track_subscribed( }); } -fn clear_hud_and_simulcast(shared: &Arc>, simulcast: &Arc>) { - { - let mut s = shared.lock(); - s.codec.clear(); - s.fps = 0.0; - s.frame = None; - s.dirty = false; - s.received_at_us = None; - s.frame_metadata = None; - s.gpu_done = None; - } - let mut sc = simulcast.lock(); - *sc = SimulcastState::default(); -} - fn timing_overlay_lines( shared: &Arc>, latency_display: &mut LatencyDisplay, @@ -942,8 +994,13 @@ fn timing_overlay_lines( )) } -fn paint_timing_overlay(ctx: &egui::Context, video_rect: egui::Rect, lines: &[String]) { - egui::Area::new("timing_overlay".into()) +fn paint_timing_overlay( + ctx: &egui::Context, + id: egui::Id, + video_rect: egui::Rect, + lines: &[String], +) { + egui::Area::new(id) .fixed_pos(video_rect.left_top() + egui::vec2(10.0, 10.0)) .interactable(false) .show(ctx, |ui| { @@ -965,44 +1022,25 @@ fn paint_timing_overlay(ctx: &egui::Context, video_rect: egui::Rect, lines: &[St }); } -fn handle_track_unsubscribed( +fn handle_track_removed( publication: RemoteTrackPublication, - shared: &Arc>, - active_sid: &Arc>>, - simulcast: &Arc>, + tracks: &Arc>, + reason: &str, ) { - let sid = publication.sid().clone(); - let mut active = active_sid.lock(); - if active.as_ref() == Some(&sid) { - info!("Video track unsubscribed ({}), clearing active sink", sid); - *active = None; + let sid = publication.sid(); + if tracks.lock().remove(&sid).is_some() { + info!("Video track {} ({}); closing window", reason, sid); } - clear_hud_and_simulcast(shared, simulcast); -} - -fn handle_track_unpublished( - publication: RemoteTrackPublication, - shared: &Arc>, - active_sid: &Arc>>, - simulcast: &Arc>, -) { - let sid = publication.sid().clone(); - let mut active = active_sid.lock(); - if active.as_ref() == Some(&sid) { - info!("Video track unpublished ({}), clearing active sink", sid); - *active = None; - } - clear_hud_and_simulcast(shared, simulcast); } struct VideoApp { - shared: Arc>, - simulcast: Arc>, + tracks: Arc>, repaint_ctx: Arc>>, ctrl_c_received: Arc, - viewport: AspectConstrainedViewport, display_timestamp: bool, - latency_display: LatencyDisplay, + room_name: String, + identity: String, + participant_filter: Option, } impl eframe::App for VideoApp { @@ -1013,116 +1051,231 @@ impl eframe::App for VideoApp { return; } - let mut aspect_just_changed = false; - if let Some((width, height)) = video_size(&self.shared) { - aspect_just_changed = self.viewport.set_video_size(ctx, width, height); - } - self.viewport.constrain(ctx, aspect_just_changed); - - let timing_lines = self - .display_timestamp - .then(|| timing_overlay_lines(&self.shared, &mut self.latency_display)) - .flatten(); - - egui::CentralPanel::default().frame(egui::Frame::NONE).show(ctx, |ui| { - // Ensure we keep repainting for smooth playback. - ui.ctx().request_repaint(); - - // Render into a centered rect that matches the source aspect ratio. This keeps resize - // smooth (no feedback loop) and avoids stretching/distortion while dragging. - let available = ui.available_size(); - let size = if let Some(aspect) = self.viewport.aspect() { - let mut w = available.x.max(1.0); - let mut h = (w / aspect).max(1.0); - if h > available.y.max(1.0) { - h = available.y.max(1.0); - w = (h * aspect).max(1.0); + // Prune entries the user closed on a previous pass, unsubscribing from each. + // Snapshot the remaining entries so deferred viewport callbacks can own their state. + let entries: Vec = { + let mut map = self.tracks.lock(); + let to_remove: Vec = map + .iter() + .filter_map(|(sid, entry)| { + let closed = entry.ui.lock().user_closed; + closed.then(|| sid.clone()) + }) + .collect(); + for sid in &to_remove { + if let Some(entry) = map.remove(sid) { + info!("User closed window for {}; unsubscribing", sid); + entry.publication.set_subscribed(false); } - egui::vec2(w, h) - } else { - egui::vec2(available.x.max(1.0), available.y.max(1.0)) - }; + } + map.values().cloned().collect() + }; - ui.with_layout( - egui::Layout::centered_and_justified(egui::Direction::LeftToRight), - |ui| { - let (rect, _) = ui.allocate_exact_size(size, egui::Sense::hover()); - let cb = egui_wgpu_backend::Callback::new_paint_callback( - rect, - YuvPaintCallback { shared: self.shared.clone() }, - ); - ui.painter().add(cb); - if let Some(lines) = timing_lines.as_ref() { - paint_timing_overlay(ui.ctx(), rect, lines); - } + render_status_panel( + ctx, + &self.room_name, + &self.identity, + self.participant_filter.as_deref(), + &entries, + ); + + let display_timestamp = self.display_timestamp; + for entry in entries { + let viewport_id = entry.info.viewport_id; + let title = + format!("{} • {}", entry.info.track_name, entry.info.participant_identity); + let entry_for_callback = entry.clone(); + ctx.show_viewport_deferred( + viewport_id, + egui::ViewportBuilder::default() + .with_title(title) + .with_inner_size([960.0, 540.0]) + .with_min_inner_size([320.0, 180.0]), + move |ctx, _class| { + render_track_window(ctx, &entry_for_callback, display_timestamp); }, ); - }); + } - // Non-timing video stats stay in egui so they don't become part of the frame timing record. - egui::Area::new("video_hud".into()) - .anchor(egui::Align2::RIGHT_TOP, egui::vec2(-10.0, 10.0)) - .interactable(false) - .show(ctx, |ui| { - let s = self.shared.lock(); - if s.width == 0 || s.height == 0 || s.fps <= 0.0 || s.codec.is_empty() { - return; - } - let mut text = format!("{} {}x{} {:.1}fps", s.codec, s.width, s.height, s.fps); - drop(s); + ctx.request_repaint_after(Duration::from_millis(250)); + } +} - let sc = self.simulcast.lock(); - if sc.available { - let layer = sc.active_quality.map(video_quality_label).unwrap_or("NA"); - text.push_str(&format!("\nSimulcast: {}", layer)); +fn render_status_panel( + ctx: &egui::Context, + room_name: &str, + identity: &str, + participant_filter: Option<&str>, + entries: &[TrackEntry], +) { + egui::CentralPanel::default().show(ctx, |ui| { + ui.heading("LiveKit Video Subscriber"); + ui.add_space(8.0); + egui::Grid::new("status_grid").num_columns(2).spacing([12.0, 4.0]).show(ui, |ui| { + ui.label("Room"); + ui.label(room_name); + ui.end_row(); + ui.label("Identity"); + ui.label(identity); + ui.end_row(); + ui.label("Filter"); + ui.label(participant_filter.unwrap_or("(all participants)")); + ui.end_row(); + }); + ui.add_space(12.0); + ui.label(format!("Subscribed video tracks: {}", entries.len())); + ui.add_space(4.0); + if entries.is_empty() { + ui.label("Waiting for video tracks..."); + } else { + for entry in entries { + let (w, h, fps) = { + let s = entry.shared.lock(); + (s.width, s.height, s.fps) + }; + let dims = if w == 0 || h == 0 { + "negotiating".to_string() } else { - text.push_str("\nSimulcast: off"); - } - drop(sc); - - egui::Frame::NONE - .fill(egui::Color32::from_black_alpha(140)) - .corner_radius(egui::CornerRadius::same(4)) - .inner_margin(egui::Margin::same(6)) - .show(ui, |ui| { - ui.add( - egui::Label::new(egui::RichText::new(text).color(egui::Color32::WHITE)) - .extend(), - ); - }); - }); + format!("{}x{} ~{:.1}fps", w, h, fps) + }; + ui.label(format!( + "• {} from {} [{}] ({})", + entry.info.track_name, entry.info.participant_identity, entry.info.sid, dims, + )); + } + } + }); +} + +fn render_track_window(ctx: &egui::Context, entry: &TrackEntry, display_timestamp: bool) { + // Detect the user clicking the close button; signal the parent to drop the entry next pass. + if ctx.input(|i| i.viewport().close_requested()) { + entry.ui.lock().user_closed = true; + // Wake the parent so it removes the entry and stops rendering this viewport. + ctx.request_repaint_of(egui::ViewportId::ROOT); + return; + } - // Simulcast layer controls: bottom-left overlay - egui::Area::new("simulcast_controls".into()) - .anchor(egui::Align2::LEFT_BOTTOM, egui::vec2(10.0, -10.0)) - .interactable(true) - .show(ctx, |ui| { - let mut sc = self.simulcast.lock(); - if !sc.available { - return; + let aspect_just_changed = if let Some((width, height)) = video_size(&entry.shared) { + let mut ui_state = entry.ui.lock(); + ui_state.viewport.set_video_size(ctx, width, height) + } else { + false + }; + { + let mut ui_state = entry.ui.lock(); + ui_state.viewport.constrain(ctx, aspect_just_changed); + } + + let timing_lines = display_timestamp + .then(|| { + let mut ui_state = entry.ui.lock(); + timing_overlay_lines(&entry.shared, &mut ui_state.latency_display) + }) + .flatten(); + + let aspect = entry.ui.lock().viewport.aspect(); + egui::CentralPanel::default().frame(egui::Frame::NONE).show(ctx, |ui| { + ui.ctx().request_repaint(); + + let available = ui.available_size(); + let size = if let Some(aspect) = aspect { + let mut w = available.x.max(1.0); + let mut h = (w / aspect).max(1.0); + if h > available.y.max(1.0) { + h = available.y.max(1.0); + w = (h * aspect).max(1.0); + } + egui::vec2(w, h) + } else { + egui::vec2(available.x.max(1.0), available.y.max(1.0)) + }; + + ui.with_layout( + egui::Layout::centered_and_justified(egui::Direction::LeftToRight), + |ui| { + let (rect, _) = ui.allocate_exact_size(size, egui::Sense::hover()); + let cb = egui_wgpu_backend::Callback::new_paint_callback( + rect, + YuvPaintCallback { + sid: entry.info.sid.clone(), + shared: entry.shared.clone(), + }, + ); + ui.painter().add(cb); + if let Some(lines) = timing_lines.as_ref() { + paint_timing_overlay( + ui.ctx(), + track_overlay_id(&entry.info.sid, "timing_overlay"), + rect, + lines, + ); } - let selected = sc.requested_quality.or(sc.active_quality); - ui.horizontal(|ui| { - let choices = [ - (livekit::track::VideoQuality::Low, "Low"), - (livekit::track::VideoQuality::Medium, "Med"), - (livekit::track::VideoQuality::High, "High"), - ]; - for (q, label) in choices { - let is_selected = selected.is_some_and(|s| s == q); - let resp = ui.selectable_label(is_selected, label); - if resp.clicked() { - if let Some(ref pub_remote) = sc.publication { - pub_remote.set_video_quality(q); - sc.requested_quality = Some(q); - } + }, + ); + }); + + egui::Area::new(track_overlay_id(&entry.info.sid, "video_hud")) + .anchor(egui::Align2::RIGHT_TOP, egui::vec2(-10.0, 10.0)) + .interactable(false) + .show(ctx, |ui| { + let s = entry.shared.lock(); + if s.width == 0 || s.height == 0 || s.fps <= 0.0 || s.codec.is_empty() { + return; + } + let mut text = format!("{} {}x{} {:.1}fps", s.codec, s.width, s.height, s.fps); + drop(s); + + let sc = entry.simulcast.lock(); + if sc.available { + let layer = sc.active_quality.map(video_quality_label).unwrap_or("NA"); + text.push_str(&format!("\nSimulcast: {}", layer)); + } else { + text.push_str("\nSimulcast: off"); + } + drop(sc); + + egui::Frame::NONE + .fill(egui::Color32::from_black_alpha(140)) + .corner_radius(egui::CornerRadius::same(4)) + .inner_margin(egui::Margin::same(6)) + .show(ui, |ui| { + ui.add( + egui::Label::new(egui::RichText::new(text).color(egui::Color32::WHITE)) + .extend(), + ); + }); + }); + + egui::Area::new(track_overlay_id(&entry.info.sid, "simulcast_controls")) + .anchor(egui::Align2::LEFT_BOTTOM, egui::vec2(10.0, -10.0)) + .interactable(true) + .show(ctx, |ui| { + let mut sc = entry.simulcast.lock(); + if !sc.available { + return; + } + let selected = sc.requested_quality.or(sc.active_quality); + ui.horizontal(|ui| { + let choices = [ + (livekit::track::VideoQuality::Low, "Low"), + (livekit::track::VideoQuality::Medium, "Med"), + (livekit::track::VideoQuality::High, "High"), + ]; + for (q, label) in choices { + let is_selected = selected.is_some_and(|s| s == q); + let resp = ui.selectable_label(is_selected, label); + if resp.clicked() { + if let Some(ref pub_remote) = sc.publication { + pub_remote.set_video_quality(q); + sc.requested_quality = Some(q); } } - }); + } }); + }); - ctx.request_repaint_after(Duration::from_millis(16)); - } + ctx.request_repaint_after(Duration::from_millis(16)); } #[tokio::main] @@ -1195,33 +1348,15 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { info!("End-to-end encryption activated"); } - // Shared YUV buffer for UI/GPU - let shared = Arc::new(Mutex::new(SharedYuv { - width: 0, - height: 0, - frame: None, - codec: String::new(), - fps: 0.0, - dirty: false, - received_at_us: None, - frame_metadata: None, - gpu_done: None, - })); + // One entry per subscribed video track; UI threads and background tasks share this map. + let tracks = Arc::new(Mutex::new(TracksMap::new())); + let repaint_ctx = Arc::new(Mutex::new(None::)); - // Subscribe to room events: on first video track, start sink task let allowed_identity = args.participant.clone(); - let shared_clone = shared.clone(); - // Track currently active video track SID to handle unpublish/unsubscribe - let active_sid = Arc::new(Mutex::new(None::)); - // Shared simulcast UI/control state - let simulcast = Arc::new(Mutex::new(SimulcastState::default())); - let repaint_ctx = Arc::new(Mutex::new(None::)); - let simulcast_events = simulcast.clone(); + let tracks_events = tracks.clone(); let repaint_ctx_events = repaint_ctx.clone(); let ctrl_c_events = ctrl_c_received.clone(); tokio::spawn(async move { - let active_sid = active_sid.clone(); - let simulcast = simulcast_events; let mut events = room.subscribe(); info!("Subscribed to room events"); while let Some(evt) = events.recv().await { @@ -1233,36 +1368,41 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { publication, participant, &allowed_identity, - &shared_clone, - &active_sid, + &tracks_events, &ctrl_c_events, - &simulcast, &repaint_ctx_events, ) .await; } RoomEvent::TrackUnsubscribed { publication, .. } => { - handle_track_unsubscribed(publication, &shared_clone, &active_sid, &simulcast); + handle_track_removed(publication, &tracks_events, "unsubscribed"); } RoomEvent::TrackUnpublished { publication, .. } => { - handle_track_unpublished(publication, &shared_clone, &active_sid, &simulcast); + handle_track_removed(publication, &tracks_events, "unpublished"); } _ => {} } } }); - // Start UI let app = VideoApp { - shared, - simulcast, + tracks, repaint_ctx, ctrl_c_received: ctrl_c_received.clone(), - viewport: AspectConstrainedViewport::new(None), display_timestamp: args.display_timestamp, - latency_display: LatencyDisplay::default(), + room_name: args.room_name.clone(), + identity: args.identity.clone(), + participant_filter: args.participant.clone(), + }; + let native_options = eframe::NativeOptions { + viewport: egui::ViewportBuilder::default() + .with_title("LiveKit Video Subscriber") + .with_inner_size([520.0, 360.0]) + .with_min_inner_size([360.0, 240.0]), + persist_window: false, + vsync: false, + ..Default::default() }; - let native_options = viewport_aspect::native_options(None); eframe::run_native( "LiveKit Video Subscriber", native_options, @@ -1278,9 +1418,21 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { // ===== WGPU I420 renderer ===== struct YuvPaintCallback { + /// Which track's [`YuvGpuState`] entry to render. + sid: TrackSid, shared: Arc>, } +/// Per-track GPU state map, stored once in egui-wgpu's [`CallbackResources`] for the whole app. +/// +/// eframe's wgpu backend shares a single renderer (and therefore a single +/// [`CallbackResources`] map) across every viewport, so each [`YuvPaintCallback`] +/// looks up its state by SID rather than relying on type-keyed storage. +#[derive(Default)] +struct YuvGpuRegistry { + states: HashMap, +} + struct YuvGpuState { pipeline: wgpu::RenderPipeline, sampler: wgpu::Sampler, @@ -1402,9 +1554,11 @@ impl CallbackTrait for YuvPaintCallback { return Vec::new(); } - // Fetch or create our GPU state - if resources.get::().is_none() { - // Build pipeline and initial small textures; will be recreated on first upload + if resources.get::().is_none() { + resources.insert(YuvGpuRegistry::default()); + } + let registry = resources.get_mut::().unwrap(); + if !registry.states.contains_key(&self.sid) { let shader_src = include_str!("yuv_shader.wgsl"); let shader = device.create_shader_module(wgpu::ShaderModuleDescriptor { label: Some("yuv_shader"), @@ -1591,9 +1745,9 @@ impl CallbackTrait for YuvPaintCallback { #[cfg(target_os = "macos")] native_import_failed_logged: false, }; - resources.insert(new_state); + registry.states.insert(self.sid.clone(), new_state); } - let state = resources.get_mut::().unwrap(); + let state = registry.states.get_mut(&self.sid).unwrap(); let dims = (shared.width, shared.height); let frame_for_upload = if shared.dirty { @@ -1839,7 +1993,10 @@ impl CallbackTrait for YuvPaintCallback { render_pass: &mut wgpu::RenderPass<'static>, resources: &egui_wgpu_backend::CallbackResources, ) { - let Some(state) = resources.get::() else { + let Some(registry) = resources.get::() else { + return; + }; + let Some(state) = registry.states.get(&self.sid) else { return; }; if state.dims == (0, 0) { diff --git a/examples/local_video/src/viewport_aspect.rs b/examples/local_video/src/viewport_aspect.rs index ce33ae2e5..c148a4052 100644 --- a/examples/local_video/src/viewport_aspect.rs +++ b/examples/local_video/src/viewport_aspect.rs @@ -63,6 +63,7 @@ impl AspectConstrainedViewport { } } +#[allow(dead_code)] // used by other binaries in this crate (e.g. `publisher`) pub(crate) fn native_options(initial_aspect: Option) -> eframe::NativeOptions { let aspect = initial_aspect.filter(|aspect| valid_aspect(*aspect)).unwrap_or(DEFAULT_ASPECT); eframe::NativeOptions { @@ -94,6 +95,7 @@ fn aspect_size(long_edge: f32, aspect: f32) -> egui::Vec2 { } } +#[allow(dead_code)] // used via `native_options` from other binaries in this crate fn initial_window_size(aspect: Option) -> egui::Vec2 { aspect_size(INITIAL_LONG_EDGE, aspect.filter(|a| valid_aspect(*a)).unwrap_or(DEFAULT_ASPECT)) }