feat(eot): add audio eot model support#1613
Conversation
add audio eot model and local inference support, deprecating silero and turn detector plugins
🦋 Changeset detectedLatest commit: 86d8aa7 The changes in this PR will be included in the next version bump. Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
| * | ||
| * Port of Python `livekit.agents.inference.eot.detector`. | ||
| */ | ||
| import type { InferenceExecutor } from '../../ipc/inference_executor.js'; |
There was a problem hiding this comment.
I wonder if there will be any overlap coe between the eot under inference v.s. turn detector plugins? Or this is specifically to audio-based eot?
There was a problem hiding this comment.
Both VAD and the turn detector plugins are getting deprecated, so the overlap will be short-term.
…frame The AudioFrame emitted on START_OF_SPEECH / END_OF_SPEECH sliced off the prefix-padding samples but still reported `samplesPerChannel = speechBufferIndex`, so the frame's metadata claimed more samples than its data contained and downstream consumers (STT, transcription) lost the pre-roll context the buffer machinery is designed to preserve. Slice from 0 instead so data length matches samplesPerChannel and the prefix-padding pre-roll is delivered, matching the Python original. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| protected override async _run(): Promise<void> { | ||
| while (true) { | ||
| try { | ||
| await this._raceWithSwap(this._transport.run()); | ||
| return; | ||
| } catch (err) { | ||
| if (err instanceof SwapAbortError) { | ||
| if (this._closing) return; | ||
| // A swap already happened (e.g. predict timeout → fallback). | ||
| // The new transport is mounted; loop and run it. Routing the | ||
| // swap through `SwapAbortError` (rather than through the | ||
| // cloud/local branch below) is what prevents the "timeout | ||
| // flips backend mid-await" misclassification — the catch | ||
| // exits early before ever consulting `_backend`. | ||
| continue; | ||
| } | ||
| const e = err instanceof Error ? err : new Error(String(err)); | ||
| if (this._backend === 'cloud') { | ||
| this._fallBackToLocal(e); | ||
| continue; | ||
| } | ||
| this._onLocalFailure(e); | ||
| return; | ||
| } | ||
| } |
There was a problem hiding this comment.
🟡 ReadableStream reader lock conflict causes local transport to crash after cloud→local fallback
After a cloud→local fallback (triggered by either a transport error or predictEndOfTurn timeout), the LocalTransport.run() throws a TypeError because the audio channel's ReadableStream is already locked by the orphaned cloud transport's drain task.
The flow: CloudTransport._runOnce() creates a drainAudioTask that calls stream._drainAudioChannel() (agents/src/inference/eot/base.ts:610-626), which holds a reader lock on this._audioChannel.stream(). When _fallBackToLocal fires (agents/src/inference/eot/detector.ts:231), it calls this._transport.detach() — but CloudTransport.detach() (agents/src/inference/eot/transports.ts:342-345) only closes the send channel and clears _ws; it does NOT cancel or release the drain task's reader. The old drain task's reader.read() remains pending indefinitely.
When the _run loop continues and runs LocalTransport.run() → stream._drainAudioChannel(), it calls this._audioChannel.stream().getReader() on the same ReadableStream instance (since createStreamChannel().stream() always returns the same underlying transform.readable — see agents/src/stream/stream_channel.ts:23). This throws TypeError ("ReadableStream is locked").
Impact and partial self-recovery
The TypeError is caught by the _run loop as a local failure, which logs a misleading "local audio eot mini failed" warning and resolves the in-flight prediction with 1.0. However, the system partially recovers because the orphaned cloud drain task continues calling this._transport.pushFrame(value) (where this._transport is now the local transport) — so the local ring buffer IS still fed with audio. Subsequent warmup() → startInference() calls snapshot the buffer and run inference successfully. Net effect: one spurious warning + one prediction defaulting to 1.0 on the first turn after fallback.
Prompt for agents
The root cause is that _drainAudioChannel() (base.ts:610) acquires a reader lock on the stream's _audioChannel that is never released when the cloud transport is detached during fallback. The local transport then cannot get its own reader.
Possible approaches:
1. Make _drainAudioChannel signal-aware: accept an AbortSignal, and when aborted, break out of the while loop so the finally block releases the reader. The cloud transport's _runOnce drainAudioTask should pass its controller's signal, and detach() should abort it.
2. Alternatively, have _fallBackToLocal close and recreate the _audioChannel, so the local transport gets a fresh stream. This would require draining any buffered-but-unread frames first.
3. Skip calling stream._drainAudioChannel() in LocalTransport.run() when the stream detects that another drain is already active (e.g. a flag). Since the orphaned drain already feeds the local buffer, the local transport doesn't strictly need its own drain loop.
Approach 1 is cleanest and aligns with the existing Task cancellation pattern — the drain task already receives a controller via Task.from but _drainAudioChannel never consults it.
Was this helpful? React with 👍 or 👎 to provide feedback.
…dal-eou # Conflicts: # agents/src/voice/agent_activity.ts # agents/src/voice/agent_session.ts # agents/src/voice/audio_recognition.ts # examples/src/gemini_realtime_agent.ts # examples/src/runway_avatar.ts
| let speakingWon = false; | ||
| try { | ||
| await Promise.race([ | ||
| inner(innerController), | ||
| this.userSpeakingEvent.waitOnce(controller.signal).then(() => { | ||
| speakingWon = true; | ||
| }), | ||
| ]); | ||
| if (speakingWon) { | ||
| this.logger.debug(context, 'user spoke during endpointing, cancelling end of turn task'); | ||
| } | ||
| } finally { | ||
| controller.signal.removeEventListener('abort', onOuterAbort); | ||
| // If the speaking-event branch won (or the outer was aborted), tear | ||
| // down the inner bounce so it doesn't keep awaiting the delay. | ||
| innerController.abort(); | ||
| } |
There was a problem hiding this comment.
🟡 Unhandled promise rejection when speaking-guard aborts the inner bounce task
In bounceEOUTaskWithSpeakingGuard, when the userSpeakingEvent wins the Promise.race at line 630, the inner(innerController) promise is orphaned (no one awaits it). The finally block at line 643 then calls innerController.abort(), which causes the inner function's await delay(...) (at the original bounceEOUTask line ~1492) to reject with an AbortError. Because the race already resolved, this rejection is unhandled and will produce UnhandledPromiseRejection warnings in Node.js. The outer Task's .catch() at agents/src/voice/audio_recognition.ts:1576 only wraps the guard function itself, not the orphaned inner promise.
Prompt for agents
In bounceEOUTaskWithSpeakingGuard (audio_recognition.ts around line 610-644), the Promise.race between inner(innerController) and the speaking-event waiter leaves the inner promise unhandled when speaking wins. When innerController.abort() fires in the finally block, the inner function's delay() rejects with AbortError that nobody catches.
Fix: capture the inner promise in a variable before the race, and add a .catch(() => {}) to suppress the expected AbortError after the race completes (similar to how fire-and-forget promises are handled elsewhere in the codebase). For example:
const innerPromise = inner(innerController);
// Prevent unhandled rejection when we abort the inner task after speaking wins.
innerPromise.catch(() => {});
await Promise.race([innerPromise, ...]);
Alternatively, await the innerPromise in a try/catch after the race to ensure the rejection is consumed:
try { await innerPromise; } catch { /* expected abort */ }
The second approach is cleaner because it ensures the inner task has fully torn down before the guard function returns.
Was this helpful? React with 👍 or 👎 to provide feedback.
Description
add audio eot model and local inference support, deprecating silero and turn detector plugins
Changes Made
Pre-Review Checklist
Testing
restaurant_agent.tsandrealtime_agent.tswork properly (for major changes)Additional Notes
Note to reviewers: Please ensure the pre-review checklist is completed before starting your review.