Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/smooth-cobras-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@livekit/components-react': patch
'@livekit/components-core': patch
---

Order transcriptions by data stream initial open time, NOT by first packet arrival time
195 changes: 195 additions & 0 deletions packages/core/src/components/textStream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import type { Room, TextStreamInfo, TextStreamReader } from 'livekit-client';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { ParticipantAgentAttributes } from '../helper';
import { setupTextStream, type TextStreamData } from './textStream';

const SEGMENT_ATTR = ParticipantAgentAttributes.TranscriptionSegmentId;
const TOPIC = 'lk.transcription';

type StreamHandler = (
reader: TextStreamReader,
participantInfo: { identity: string },
) => void | Promise<void>;

/**
* An async-iterable stand-in for a `TextStreamReader` whose text only flows once `release()`
* is called. This lets a test open several streams (capturing their first-received time) and
* then deliver their text in a different order than they were opened.
*/
class ControllableStream {
info: TextStreamInfo;
private chunks: string[];
private gate: Promise<void>;
private openGate!: () => void;

constructor(info: TextStreamInfo, chunks: string[]) {
this.info = info;
this.chunks = chunks;
this.gate = new Promise<void>((resolve) => {
this.openGate = resolve;
});
}

/** Let this stream's text start flowing. */
release() {
this.openGate();
}

async *[Symbol.asyncIterator]() {
await this.gate;
for (const chunk of this.chunks) {
yield chunk;
}
}
}

function transcriptionInfo(id: string, segmentId: string, timestamp: number): TextStreamInfo {
return {
id,
topic: TOPIC,
timestamp,
mimeType: 'text/plain',
attributes: { [SEGMENT_ATTR]: segmentId },
} as unknown as TextStreamInfo;
}

function createMockRoom() {
let handler: StreamHandler | undefined;
const room = {
registerTextStreamHandler: (_topic: string, cb: StreamHandler) => {
handler = cb;
},
unregisterTextStreamHandler: () => {
handler = undefined;
},
on: () => room,
off: () => room,
};
return {
room: room as unknown as Room,
open(stream: ControllableStream, identity: string) {
if (!handler) throw new Error('no text stream handler registered yet');
return handler(stream as unknown as TextStreamReader, { identity });
},
};
}

/** Let queued microtasks (async-iterator pulls) settle. */
const flush = () => new Promise((resolve) => setTimeout(resolve, 0));

describe('setupTextStream ordering', () => {
let now = 0;

beforeEach(() => {
now = 1_000;
vi.spyOn(Date, 'now').mockImplementation(() => now);
});

afterEach(() => {
vi.restoreAllMocks();
});

it('orders by first receipt of the segment, not by when its text finishes streaming (issue #1280)', async () => {
const { room, open } = createMockRoom();
const emissions: Array<Array<TextStreamData>> = [];
const sub = setupTextStream(room, TOPIC).subscribe((streams) => emissions.push(streams));

// The user speaks first, so their transcription stream is opened first...
now = 1_000;
const user = new ControllableStream(transcriptionInfo('user-1', 'seg-user', 1_000), ['Hello']);
open(user, 'user');

// ...then the agent replies, opening its stream a moment later.
now = 2_000;
const agent = new ControllableStream(transcriptionInfo('agent-1', 'seg-agent', 2_000), [
'Hi there!',
]);
open(agent, 'agent');

// But the agent's text streams in immediately while the user's transcript lags behind.
agent.release();
await flush();
user.release();
await flush();

const final = emissions.at(-1)!;
expect(final.map((s) => s.participantInfo.identity)).toEqual(['user', 'agent']);
expect(final.map((s) => s.text)).toEqual(['Hello', 'Hi there!']);

sub.unsubscribe();
});

it('orders by local first-received time, not the sender-stamped timestamp (DL clock drift)', async () => {
const { room, open } = createMockRoom();
const emissions: Array<Array<TextStreamData>> = [];
const sub = setupTextStream(room, TOPIC).subscribe((streams) => emissions.push(streams));

// First message opened locally, but its sender's clock is running far ahead (timestamp 9000).
now = 1_000;
const first = new ControllableStream(transcriptionInfo('first-1', 'seg-first', 9_000), [
'first',
]);
open(first, 'participant-a');

// Second message opened locally later, but carries an earlier sender timestamp (1000).
now = 2_000;
const second = new ControllableStream(transcriptionInfo('second-1', 'seg-second', 1_000), [
'second',
]);
open(second, 'participant-b');

// Deliver text in reverse order of opening. Insertion order would give [second, first];
// sorting by the sender timestamp would also give [second, first]. Only first-received
// ordering yields the chronological [first, second].
second.release();
await flush();
first.release();
await flush();

const final = emissions.at(-1)!;
expect(final.map((s) => s.text)).toEqual(['first', 'second']);

sub.unsubscribe();
});

it('keeps a transcription in place (and updates its text) when a later segment update arrives', async () => {
const { room, open } = createMockRoom();
const emissions: Array<Array<TextStreamData>> = [];
const sub = setupTextStream(room, TOPIC).subscribe((streams) => emissions.push(streams));

// A transcription segment is created first.
now = 1_000;
const create = new ControllableStream(transcriptionInfo('seg-a-create', 'seg-a', 1_000), [
'partial',
]);
open(create, 'speaker');
create.release();
await flush();

// A second, unrelated transcription opens afterwards.
now = 2_000;
const other = new ControllableStream(transcriptionInfo('other-1', 'seg-other', 2_000), [
'other',
]);
open(other, 'speaker-2');
other.release();
await flush();

// Now an update for the first segment arrives much later, as its own stream.
now = 5_000;
const update = new ControllableStream(transcriptionInfo('seg-a-update', 'seg-a', 5_000), [
'partial then final',
]);
open(update, 'speaker');
update.release();
await flush();

const final = emissions.at(-1)!;
// The updated segment must keep its original (earlier) position rather than jumping to the end.
expect(final.map((s) => s.participantInfo.identity)).toEqual(['speaker', 'speaker-2']);
expect(final[0].text).toBe('partial then final');
expect(final[0].streamInfo.attributes?.[SEGMENT_ATTR]).toBe('seg-a');

sub.unsubscribe();
});
});
40 changes: 38 additions & 2 deletions packages/core/src/components/textStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ export interface TextStreamData {
text: string;
participantInfo: { identity: string }; // Replace with the correct type from livekit-client
streamInfo: TextStreamInfo;
/**
* Client-side timestamp (ms since epoch) captured the moment this stream was first opened,
* before any text arrived. Used for chronological ordering: unlike `streamInfo.timestamp`
* (stamped by the sender's clock) this is sampled from a single local clock, so it stays
* comparable across streams from different senders, and unlike insertion order it is immune
* to text that streams in late.
*/
firstReceivedTime: number;
}

// Singleton getters for lazy initialization
Expand Down Expand Up @@ -56,9 +64,25 @@ export function setupTextStream(room: Room, topic: string): Observable<TextStrea

const textStreamsSubject = new Subject<TextStreamData[]>();
let textStreams: TextStreamData[] = [];
// The first time we become aware of a stream (its header arrives), keyed by the same
// identity used for de-duplication below — the transcription segment id when present,
// otherwise the stream id. Captured once and never overwritten, so create/update streams
// for the same segment share the earliest time.
let firstReceivedTimes = new Map<string, number>();

const segmentAttribute = ParticipantAgentAttributes.TranscriptionSegmentId;

// Emit a snapshot ordered chronologically by when each stream was first opened rather than
// by the order in which the first chunk happened to arrive. Streams whose text is delayed
// would otherwise sort after streams that streamed immediately, producing out-of-order
// transcriptions (e.g. an agent reply appearing before the user utterance it answered).
// Array#sort is stable, so streams sharing a timestamp keep their insertion order.
const emit = () => {
textStreamsSubject.next(
textStreams.slice().sort((a, b) => a.firstReceivedTime - b.firstReceivedTime),
);
};

// Create shared observable and store in cache
const sharedObservable = textStreamsSubject.pipe(
tap({
Expand All @@ -73,6 +97,16 @@ export function setupTextStream(room: Room, topic: string): Observable<TextStrea

const isTranscription = !!reader.info.attributes?.[segmentAttribute];

// Capture when this stream was first opened — now, before any text has streamed in —
// so ordering is unaffected by how quickly each stream's text arrives.
const streamKey =
(isTranscription ? reader.info.attributes?.[segmentAttribute] : undefined) ??
reader.info.id;
if (!firstReceivedTimes.has(streamKey)) {
firstReceivedTimes.set(streamKey, Date.now());
}
const firstReceivedTime = firstReceivedTimes.get(streamKey)!;

// Subscribe to the stream and update our array when new chunks arrive
streamObservable.subscribe((accumulatedText) => {
// Find and update the stream in our array
Expand All @@ -90,17 +124,18 @@ export function setupTextStream(room: Room, topic: string): Observable<TextStrea
};

// Emit the updated array
textStreamsSubject.next([...textStreams]);
emit();
} else {
// Handle case where stream ID wasn't found (new stream)
textStreams.push({
text: accumulatedText,
participantInfo,
streamInfo: reader.info,
firstReceivedTime,
});

// Emit the updated array with the new stream
textStreamsSubject.next([...textStreams]);
emit();
}
});
});
Expand All @@ -118,6 +153,7 @@ export function setupTextStream(room: Room, topic: string): Observable<TextStrea
room.on(RoomEvent.Disconnected, () => {
getObservableCache().delete(cacheKey);
textStreams = [];
firstReceivedTimes = new Map();
textStreamsSubject.next([]);
});

Expand Down
30 changes: 24 additions & 6 deletions packages/react/src/hooks/useSessionMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,27 +112,45 @@ export function useSessionMessages(session?: UseSessionReturn): UseSessionMessag
return merged;
}, [transcriptionMessages, chat.chatMessages]);

const messageFirstReceivedTimeMapRef = React.useRef(new Map<ReceivedMessage['id'], Date>());
// The time each transcription stream was first opened, captured in core before any text
// streamed in (keyed by the transcription message id, which is the stream id).
const transcriptionFirstReceivedTimes = React.useMemo(() => {
const map = new Map<string, number>();
for (const transcription of transcriptions) {
map.set(transcription.streamInfo.id, transcription.firstReceivedTime);
}
return map;
}, [transcriptions]);

// Sort by a single client clock rather than the embedded `timestamp` fields, which come from
// different clocks for transcriptions (sender) and chat (local) and can drift relative to one
// another. For transcriptions we use the stream-open time captured in core, so a transcript
// whose text arrives late still orders by when speech began rather than when its text showed
// up; chat messages have no such signal and fall back to their first-seen time.
const messageFirstReceivedTimeMapRef = React.useRef(new Map<ReceivedMessage['id'], number>());
const sortedReceivedMessages = React.useMemo(() => {
const now = new Date();
const now = Date.now();
for (const message of receivedMessages) {
if (messageFirstReceivedTimeMapRef.current.has(message.id)) {
continue;
}

messageFirstReceivedTimeMapRef.current.set(message.id, now);
messageFirstReceivedTimeMapRef.current.set(
message.id,
transcriptionFirstReceivedTimes.get(message.id) ?? now,
);
}

return receivedMessages.sort((a, b) => {
return receivedMessages.slice().sort((a, b) => {
const aFirstReceivedAt = messageFirstReceivedTimeMapRef.current.get(a.id);
const bFirstReceivedAt = messageFirstReceivedTimeMapRef.current.get(b.id);
if (typeof aFirstReceivedAt === 'undefined' || typeof bFirstReceivedAt === 'undefined') {
return 0;
}

return aFirstReceivedAt.getTime() - bFirstReceivedAt.getTime();
return aFirstReceivedAt - bFirstReceivedAt;
});
}, [receivedMessages]);
}, [receivedMessages, transcriptionFirstReceivedTimes]);

const previouslyReceivedMessageIdsRef = React.useRef(new Set());
React.useEffect(() => {
Expand Down
Loading