diff --git a/.changeset/smooth-cobras-check.md b/.changeset/smooth-cobras-check.md new file mode 100644 index 000000000..db65daa72 --- /dev/null +++ b/.changeset/smooth-cobras-check.md @@ -0,0 +1,6 @@ +--- +'@livekit/components-react': patch +'@livekit/components-core': patch +--- + +Order transcriptions by data stream initial open time, NOT by first packet arrival time diff --git a/packages/core/src/components/textStream.test.ts b/packages/core/src/components/textStream.test.ts new file mode 100644 index 000000000..697cb6b61 --- /dev/null +++ b/packages/core/src/components/textStream.test.ts @@ -0,0 +1,195 @@ +import type { Room, TextStreamInfo, TextStreamReader } from 'livekit-client'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { ParticipantAgentAttributes } from '../helper'; +import { setupTextStream, type TextStreamData } from './textStream'; + +const SEGMENT_ATTR = ParticipantAgentAttributes.TranscriptionSegmentId; +const TOPIC = 'lk.transcription'; + +type StreamHandler = ( + reader: TextStreamReader, + participantInfo: { identity: string }, +) => void | Promise; + +/** + * An async-iterable stand-in for a `TextStreamReader` whose text only flows once `release()` + * is called. This lets a test open several streams (capturing their first-received time) and + * then deliver their text in a different order than they were opened. + */ +class ControllableStream { + info: TextStreamInfo; + private chunks: string[]; + private gate: Promise; + private openGate!: () => void; + + constructor(info: TextStreamInfo, chunks: string[]) { + this.info = info; + this.chunks = chunks; + this.gate = new Promise((resolve) => { + this.openGate = resolve; + }); + } + + /** Let this stream's text start flowing. */ + release() { + this.openGate(); + } + + async *[Symbol.asyncIterator]() { + await this.gate; + for (const chunk of this.chunks) { + yield chunk; + } + } +} + +function transcriptionInfo(id: string, segmentId: string, timestamp: number): TextStreamInfo { + return { + id, + topic: TOPIC, + timestamp, + mimeType: 'text/plain', + attributes: { [SEGMENT_ATTR]: segmentId }, + } as unknown as TextStreamInfo; +} + +function createMockRoom() { + let handler: StreamHandler | undefined; + const room = { + registerTextStreamHandler: (_topic: string, cb: StreamHandler) => { + handler = cb; + }, + unregisterTextStreamHandler: () => { + handler = undefined; + }, + on: () => room, + off: () => room, + }; + return { + room: room as unknown as Room, + open(stream: ControllableStream, identity: string) { + if (!handler) throw new Error('no text stream handler registered yet'); + return handler(stream as unknown as TextStreamReader, { identity }); + }, + }; +} + +/** Let queued microtasks (async-iterator pulls) settle. */ +const flush = () => new Promise((resolve) => setTimeout(resolve, 0)); + +describe('setupTextStream ordering', () => { + let now = 0; + + beforeEach(() => { + now = 1_000; + vi.spyOn(Date, 'now').mockImplementation(() => now); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('orders by first receipt of the segment, not by when its text finishes streaming (issue #1280)', async () => { + const { room, open } = createMockRoom(); + const emissions: Array> = []; + const sub = setupTextStream(room, TOPIC).subscribe((streams) => emissions.push(streams)); + + // The user speaks first, so their transcription stream is opened first... + now = 1_000; + const user = new ControllableStream(transcriptionInfo('user-1', 'seg-user', 1_000), ['Hello']); + open(user, 'user'); + + // ...then the agent replies, opening its stream a moment later. + now = 2_000; + const agent = new ControllableStream(transcriptionInfo('agent-1', 'seg-agent', 2_000), [ + 'Hi there!', + ]); + open(agent, 'agent'); + + // But the agent's text streams in immediately while the user's transcript lags behind. + agent.release(); + await flush(); + user.release(); + await flush(); + + const final = emissions.at(-1)!; + expect(final.map((s) => s.participantInfo.identity)).toEqual(['user', 'agent']); + expect(final.map((s) => s.text)).toEqual(['Hello', 'Hi there!']); + + sub.unsubscribe(); + }); + + it('orders by local first-received time, not the sender-stamped timestamp (DL clock drift)', async () => { + const { room, open } = createMockRoom(); + const emissions: Array> = []; + const sub = setupTextStream(room, TOPIC).subscribe((streams) => emissions.push(streams)); + + // First message opened locally, but its sender's clock is running far ahead (timestamp 9000). + now = 1_000; + const first = new ControllableStream(transcriptionInfo('first-1', 'seg-first', 9_000), [ + 'first', + ]); + open(first, 'participant-a'); + + // Second message opened locally later, but carries an earlier sender timestamp (1000). + now = 2_000; + const second = new ControllableStream(transcriptionInfo('second-1', 'seg-second', 1_000), [ + 'second', + ]); + open(second, 'participant-b'); + + // Deliver text in reverse order of opening. Insertion order would give [second, first]; + // sorting by the sender timestamp would also give [second, first]. Only first-received + // ordering yields the chronological [first, second]. + second.release(); + await flush(); + first.release(); + await flush(); + + const final = emissions.at(-1)!; + expect(final.map((s) => s.text)).toEqual(['first', 'second']); + + sub.unsubscribe(); + }); + + it('keeps a transcription in place (and updates its text) when a later segment update arrives', async () => { + const { room, open } = createMockRoom(); + const emissions: Array> = []; + const sub = setupTextStream(room, TOPIC).subscribe((streams) => emissions.push(streams)); + + // A transcription segment is created first. + now = 1_000; + const create = new ControllableStream(transcriptionInfo('seg-a-create', 'seg-a', 1_000), [ + 'partial', + ]); + open(create, 'speaker'); + create.release(); + await flush(); + + // A second, unrelated transcription opens afterwards. + now = 2_000; + const other = new ControllableStream(transcriptionInfo('other-1', 'seg-other', 2_000), [ + 'other', + ]); + open(other, 'speaker-2'); + other.release(); + await flush(); + + // Now an update for the first segment arrives much later, as its own stream. + now = 5_000; + const update = new ControllableStream(transcriptionInfo('seg-a-update', 'seg-a', 5_000), [ + 'partial then final', + ]); + open(update, 'speaker'); + update.release(); + await flush(); + + const final = emissions.at(-1)!; + // The updated segment must keep its original (earlier) position rather than jumping to the end. + expect(final.map((s) => s.participantInfo.identity)).toEqual(['speaker', 'speaker-2']); + expect(final[0].text).toBe('partial then final'); + expect(final[0].streamInfo.attributes?.[SEGMENT_ATTR]).toBe('seg-a'); + + sub.unsubscribe(); + }); +}); diff --git a/packages/core/src/components/textStream.ts b/packages/core/src/components/textStream.ts index ea3a0db03..d9447b118 100644 --- a/packages/core/src/components/textStream.ts +++ b/packages/core/src/components/textStream.ts @@ -7,6 +7,14 @@ export interface TextStreamData { text: string; participantInfo: { identity: string }; // Replace with the correct type from livekit-client streamInfo: TextStreamInfo; + /** + * Client-side timestamp (ms since epoch) captured the moment this stream was first opened, + * before any text arrived. Used for chronological ordering: unlike `streamInfo.timestamp` + * (stamped by the sender's clock) this is sampled from a single local clock, so it stays + * comparable across streams from different senders, and unlike insertion order it is immune + * to text that streams in late. + */ + firstReceivedTime: number; } // Singleton getters for lazy initialization @@ -56,9 +64,25 @@ export function setupTextStream(room: Room, topic: string): Observable(); let textStreams: TextStreamData[] = []; + // The first time we become aware of a stream (its header arrives), keyed by the same + // identity used for de-duplication below — the transcription segment id when present, + // otherwise the stream id. Captured once and never overwritten, so create/update streams + // for the same segment share the earliest time. + let firstReceivedTimes = new Map(); const segmentAttribute = ParticipantAgentAttributes.TranscriptionSegmentId; + // Emit a snapshot ordered chronologically by when each stream was first opened rather than + // by the order in which the first chunk happened to arrive. Streams whose text is delayed + // would otherwise sort after streams that streamed immediately, producing out-of-order + // transcriptions (e.g. an agent reply appearing before the user utterance it answered). + // Array#sort is stable, so streams sharing a timestamp keep their insertion order. + const emit = () => { + textStreamsSubject.next( + textStreams.slice().sort((a, b) => a.firstReceivedTime - b.firstReceivedTime), + ); + }; + // Create shared observable and store in cache const sharedObservable = textStreamsSubject.pipe( tap({ @@ -73,6 +97,16 @@ export function setupTextStream(room: Room, topic: string): Observable { // Find and update the stream in our array @@ -90,17 +124,18 @@ export function setupTextStream(room: Room, topic: string): Observable { getObservableCache().delete(cacheKey); textStreams = []; + firstReceivedTimes = new Map(); textStreamsSubject.next([]); }); diff --git a/packages/react/src/hooks/useSessionMessages.ts b/packages/react/src/hooks/useSessionMessages.ts index 9e8617b9b..8bac65954 100644 --- a/packages/react/src/hooks/useSessionMessages.ts +++ b/packages/react/src/hooks/useSessionMessages.ts @@ -112,27 +112,45 @@ export function useSessionMessages(session?: UseSessionReturn): UseSessionMessag return merged; }, [transcriptionMessages, chat.chatMessages]); - const messageFirstReceivedTimeMapRef = React.useRef(new Map()); + // The time each transcription stream was first opened, captured in core before any text + // streamed in (keyed by the transcription message id, which is the stream id). + const transcriptionFirstReceivedTimes = React.useMemo(() => { + const map = new Map(); + for (const transcription of transcriptions) { + map.set(transcription.streamInfo.id, transcription.firstReceivedTime); + } + return map; + }, [transcriptions]); + + // Sort by a single client clock rather than the embedded `timestamp` fields, which come from + // different clocks for transcriptions (sender) and chat (local) and can drift relative to one + // another. For transcriptions we use the stream-open time captured in core, so a transcript + // whose text arrives late still orders by when speech began rather than when its text showed + // up; chat messages have no such signal and fall back to their first-seen time. + const messageFirstReceivedTimeMapRef = React.useRef(new Map()); const sortedReceivedMessages = React.useMemo(() => { - const now = new Date(); + const now = Date.now(); for (const message of receivedMessages) { if (messageFirstReceivedTimeMapRef.current.has(message.id)) { continue; } - messageFirstReceivedTimeMapRef.current.set(message.id, now); + messageFirstReceivedTimeMapRef.current.set( + message.id, + transcriptionFirstReceivedTimes.get(message.id) ?? now, + ); } - return receivedMessages.sort((a, b) => { + return receivedMessages.slice().sort((a, b) => { const aFirstReceivedAt = messageFirstReceivedTimeMapRef.current.get(a.id); const bFirstReceivedAt = messageFirstReceivedTimeMapRef.current.get(b.id); if (typeof aFirstReceivedAt === 'undefined' || typeof bFirstReceivedAt === 'undefined') { return 0; } - return aFirstReceivedAt.getTime() - bFirstReceivedAt.getTime(); + return aFirstReceivedAt - bFirstReceivedAt; }); - }, [receivedMessages]); + }, [receivedMessages, transcriptionFirstReceivedTimes]); const previouslyReceivedMessageIdsRef = React.useRef(new Set()); React.useEffect(() => {