Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions apps/desktop/src/main/services/chat/agentChatService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11390,6 +11390,64 @@ describe("createAgentChatService", () => {
)).toEqual(["event-2", "event-3", "event-4"]);
});

it("byte-caps a snapshot whose merged events exceed the response budget", async () => {
// Regression: individual chat events can carry multi-MB tool outputs.
// Event-count caps alone let a snapshot serialize past the desktop RPC
// client's 16 MiB per-message limit, which used to fail every in-flight
// call on the shared runtime socket.
const { service } = createService();
const session = await service.createSession({
laneId: "lane-1",
provider: "codex",
model: "gpt-5.4",
});

const envelopes: AgentChatEventEnvelope[] = Array.from({ length: 4 }, (_, index) => ({
sessionId: session.id,
timestamp: `2026-04-23T10:0${index}:00.000Z`,
event: { type: "text", text: `event-${index}-${"x".repeat(3_000_000)}` },
sequence: index + 1,
}));
const transcriptFile = path.join(tmpRoot, "transcripts", `${session.id}.chat.jsonl`);
fs.writeFileSync(transcriptFile, "ignored\n", "utf8");
vi.mocked(parseAgentChatTranscript).mockReturnValue(envelopes);

const history = service.getChatEventHistory(session.id);

// 4 × ~3 MB events exceed the 8 MB response budget: only the newest
// events that fit are returned, and the trim is reported as window
// truncation so clients know to page for the rest.
expect(history.events.length).toBeLessThan(envelopes.length);
expect(history.events.length).toBeGreaterThan(0);
expect(history.windowTruncated).toBe(true);
expect(history.truncated).toBe(true);
expect(JSON.stringify(history.events).length).toBeLessThanOrEqual(8_000_000);
const lastEvent = history.events.at(-1)?.event;
expect(lastEvent?.type === "text" ? lastEvent.text.startsWith("event-3-") : false).toBe(true);
});

it("always returns at least the newest event even when it alone exceeds the byte budget", async () => {
const { service } = createService();
const session = await service.createSession({
laneId: "lane-1",
provider: "codex",
model: "gpt-5.4",
});

const giant: AgentChatEventEnvelope = {
sessionId: session.id,
timestamp: "2026-04-23T10:00:00.000Z",
event: { type: "text", text: "giant-".concat("y".repeat(9_000_000)) },
sequence: 1,
};
const transcriptFile = path.join(tmpRoot, "transcripts", `${session.id}.chat.jsonl`);
fs.writeFileSync(transcriptFile, "ignored\n", "utf8");
vi.mocked(parseAgentChatTranscript).mockReturnValue([giant]);

const history = service.getChatEventHistory(session.id);
expect(history.events).toHaveLength(1);
});

it("marks window truncation when the service response cap removes events", async () => {
const { service } = createService();
const session = await service.createSession({
Expand Down
115 changes: 105 additions & 10 deletions apps/desktop/src/main/services/chat/agentChatService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5297,7 +5297,70 @@ export function createAgentChatService(args: {
const CHAT_EVENT_HISTORY_RESPONSE_MAX_PER_SESSION = 20_000;
const CHAT_EVENT_HISTORY_TRANSCRIPT_MAX_BYTES = 2_000_000;
const CHAT_EVENT_HISTORY_TRANSCRIPT_CACHE_MAX_SESSIONS = 32;
// Byte budgets alongside the event-count caps above. Individual events are
// unbounded (multi-MB tool outputs exist in real transcripts), so count caps
// alone cannot keep a history snapshot under the desktop RPC client's
// 16 MiB per-message limit; one over-limit response used to fail every
// in-flight call on the shared runtime socket. The ring budget is the
// working bound (ring 4 MB + 2 MB transcript tail ≈ 6 MB merged); the
// response budget is a backstop that should not trigger in practice.
const CHAT_EVENT_HISTORY_BUFFER_MAX_CHARS = 4_000_000;
const CHAT_EVENT_HISTORY_RESPONSE_MAX_CHARS = 8_000_000;
const eventHistoryBySession = new Map<string, AgentChatEventEnvelope[]>();

const safeJsonChars = (value: unknown): number => {
try {
return JSON.stringify(value).length;
} catch {
return 2_048;
}
};

// Keep the newest items whose cumulative serialized size fits the budget
// (always at least the newest one, even when it alone exceeds it).
const keepNewestWithinCharBudget = <T>(
items: T[],
maxChars: number,
sizeOf: (item: T) => number,
): T[] => {
let total = 0;
let start = items.length;
while (start > 0) {
const next = total + sizeOf(items[start - 1]!);
if (start < items.length && next > maxChars) break;
total = next;
start -= 1;
}
return start > 0 ? items.slice(start) : items;
};

// Envelopes are immutable once recorded; cache their serialized size so
// byte-budget trims do not re-stringify multi-MB events on every snapshot.
const envelopeSizeCache = new WeakMap<AgentChatEventEnvelope, number>();

const estimateEnvelopeChars = (envelope: AgentChatEventEnvelope): number => {
let size = envelopeSizeCache.get(envelope);
if (size == null) {
size = safeJsonChars(envelope);
envelopeSizeCache.set(envelope, size);
}
return size;
};

const trimEnvelopesToByteBudget = (
envelopes: AgentChatEventEnvelope[],
maxChars: number,
): AgentChatEventEnvelope[] => keepNewestWithinCharBudget(envelopes, maxChars, estimateEnvelopeChars);

// The single policy for what bounds the in-memory event ring: an event-count
// cap, then a byte budget. Applied wherever the ring is (re)written.
const boundRingEnvelopes = (envelopes: AgentChatEventEnvelope[]): AgentChatEventEnvelope[] =>
trimEnvelopesToByteBudget(
envelopes.length > CHAT_EVENT_HISTORY_BUFFER_MAX_PER_SESSION
? envelopes.slice(-CHAT_EVENT_HISTORY_BUFFER_MAX_PER_SESSION)
: envelopes,
CHAT_EVENT_HISTORY_BUFFER_MAX_CHARS,
);
const transcriptHistoryCacheBySession = new Map<string, {
transcriptPath: string;
size: number;
Expand All @@ -5311,10 +5374,7 @@ export function createAgentChatService(args: {
const recordChatEventInHistory = (envelope: AgentChatEventEnvelope): void => {
const current = eventHistoryBySession.get(envelope.sessionId) ?? [];
current.push(envelope);
if (current.length > CHAT_EVENT_HISTORY_BUFFER_MAX_PER_SESSION) {
current.splice(0, current.length - CHAT_EVENT_HISTORY_BUFFER_MAX_PER_SESSION);
}
eventHistoryBySession.set(envelope.sessionId, current);
eventHistoryBySession.set(envelope.sessionId, boundRingEnvelopes(current));
};

const rememberTranscriptHistoryCache = (
Expand Down Expand Up @@ -6790,18 +6850,26 @@ export function createAgentChatService(args: {
if (merged.length > CHAT_EVENT_HISTORY_RESPONSE_MAX_PER_SESSION) {
merged = merged.slice(-CHAT_EVENT_HISTORY_RESPONSE_MAX_PER_SESSION);
}
eventHistoryBySession.set(trimmedId, merged.slice(-CHAT_EVENT_HISTORY_BUFFER_MAX_PER_SESSION));
eventHistoryBySession.set(trimmedId, boundRingEnvelopes(merged.slice()));

const parentVisibleMerged = merged.filter((entry) => !isCodexSubagentTranscriptEnvelope(entry));
const parentVisibleLength = parentVisibleMerged.length;
const transcriptTruncated = transcriptHistory.truncated;
const countWindowed = parentVisibleLength > maxEvents
? parentVisibleMerged.slice(-maxEvents)
: parentVisibleMerged;
// Backstop byte budget so the serialized snapshot always fits one RPC
// message. The ring and transcript-tail budgets keep snapshots well under
// it, so it only trims when a single envelope dwarfs both (>~6 MB); such
// trimmed events sit AFTER tailStartOffset and are not reachable through
// getChatEventHistoryPage (which pages strictly older) — an accepted
// seam, the alternative being a response the client must discard.
const windowed = trimEnvelopesToByteBudget(countWindowed, CHAT_EVENT_HISTORY_RESPONSE_MAX_CHARS);
const windowTruncated =
mergedLengthBeforeResponseCap > CHAT_EVENT_HISTORY_RESPONSE_MAX_PER_SESSION
|| parentVisibleLength > maxEvents;
|| parentVisibleLength > maxEvents
|| windowed.length < countWindowed.length;
const truncated = transcriptTruncated || windowTruncated;
const windowed = parentVisibleLength > maxEvents
? parentVisibleMerged.slice(-maxEvents)
: parentVisibleMerged;
return {
sessionId: trimmedId,
events: windowed,
Expand Down Expand Up @@ -25369,6 +25437,20 @@ export function createAgentChatService(args: {
return options.limit !== undefined ? sliced.slice(0, options.limit) : sliced;
}

// Subagent transcripts merge unbounded sources (full persisted transcript,
// live `thread/turns/list?itemsView=full` pulls) and individual messages can
// carry multi-MB tool outputs. Keep the newest messages that fit one RPC
// response (always at least one) so a single fetch can never exceed the
// desktop client's per-message limit. Note for `offset`/`limit` callers: the
// bound keeps the newest suffix of the requested window, so a response may
// start later than `offset` and a short page does not imply end-of-data.
const SUBAGENT_TRANSCRIPT_RESPONSE_MAX_CHARS = 4_000_000;

const boundSubagentTranscriptResponse = (
messages: AgentChatSubagentTranscriptMessage[],
): AgentChatSubagentTranscriptMessage[] =>
keepNewestWithinCharBudget(messages, SUBAGENT_TRANSCRIPT_RESPONSE_MAX_CHARS, safeJsonChars);

function mergeSubagentTranscriptMessages(
left: AgentChatSubagentTranscriptMessage[],
right: AgentChatSubagentTranscriptMessage[],
Expand Down Expand Up @@ -25452,8 +25534,11 @@ export function createAgentChatService(args: {
* - **Cursor**: SDK `task` events tag every lifecycle envelope with the
* subagent's `agentId`; we filter the parent stream by that value.
* - **Everything else (droid, lmstudio, …)**: `null`.
*
* Exposed via getSubagentTranscript, which byte-bounds whatever this
* returns — new runtime branches here need no size handling of their own.
*/
const getSubagentTranscript = async ({
const collectSubagentTranscript = async ({
sessionId,
agentId,
laneId,
Expand Down Expand Up @@ -25638,6 +25723,16 @@ export function createAgentChatService(args: {
});
};

// The byte bound is a response-boundary invariant: enforce it once here so
// every runtime branch in collectSubagentTranscript stays bounded by
// construction rather than by remembering to wrap each return.
const getSubagentTranscript = async (
args: AgentChatSubagentTranscriptArgs,
): Promise<AgentChatSubagentTranscriptMessage[] | null> => {
const messages = await collectSubagentTranscript(args);
return messages ? boundSubagentTranscriptResponse(messages) : messages;
};

const normalizeClaudeContextUsage = (
usage: SDKControlGetContextUsageResponse,
): AgentChatContextUsage => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,76 @@ describe("RuntimeRpcClient", () => {
expect(transport.writes).toEqual([]);
});

it("rejects only the request answered by an oversized response and keeps the connection alive", async () => {
const transport = new MockTransport();
const client = new RuntimeRpcClient(transport);

const oversized = client.call("chat.getChatEventHistory", {});
const unrelated = client.call("projects.list", {});
const oversizedId = requestId(transport.writes[0]!);
const unrelatedId = requestId(transport.writes[1]!);

// Stream a single >16 MiB response line in chunks, as a socket would.
const head = `{"jsonrpc":"2.0","id":${oversizedId},"result":{"events":["`;
transport.emitData(head);
const filler = "x".repeat(1024 * 1024);
for (let i = 0; i < 17; i++) transport.emitData(filler);
transport.emitData(`"]}}\n`);

await expect(oversized).rejects.toThrow(
/response for method chat\.getChatEventHistory exceeded 16 MiB .* and was discarded/,
);
expect(client.isClosed()).toBe(false);

// The unrelated in-flight call and brand-new calls still complete.
transport.emitData({ jsonrpc: "2.0", id: unrelatedId, result: ["project"] });
await expect(unrelated).resolves.toEqual(["project"]);
const after = client.call("projects.list", {});
transport.emitData({ jsonrpc: "2.0", id: requestId(transport.writes[2]!), result: ["still-alive"] });
await expect(after).resolves.toEqual(["still-alive"]);
});

it("parses lines that arrive in the same chunk after an oversized line ends", async () => {
const transport = new MockTransport();
const client = new RuntimeRpcClient(transport);

const oversized = client.call("chat.getChatEventHistory", {});
const follower = client.call("projects.list", {});
const oversizedId = requestId(transport.writes[0]!);
const followerId = requestId(transport.writes[1]!);

transport.emitData(`{"jsonrpc":"2.0","id":${oversizedId},"result":"`);
transport.emitData("y".repeat(17 * 1024 * 1024));
// The oversized line terminator and the follower response share a chunk.
transport.emitData(`"}\n{"jsonrpc":"2.0","id":${followerId},"result":"ok"}\n`);

await expect(oversized).rejects.toThrow(/exceeded 16 MiB/);
await expect(follower).resolves.toBe("ok");
expect(client.isClosed()).toBe(false);
});

it("discards an oversized notification without rejecting pending calls", async () => {
const transport = new MockTransport();
const client = new RuntimeRpcClient(transport);
const warn = vi.spyOn(console, "warn").mockImplementation(() => {});
try {
const pending = client.call("projects.list", {});
const pendingId = requestId(transport.writes[0]!);

// A notification whose params embed an "id" field matching the pending
// call must not be mistaken for that call's response.
transport.emitData(`{"jsonrpc":"2.0","method":"runtime/event","params":{"id":${pendingId},"blob":"`);
transport.emitData("z".repeat(17 * 1024 * 1024));
transport.emitData(`"}}\n`);

expect(client.isClosed()).toBe(false);
transport.emitData({ jsonrpc: "2.0", id: pendingId, result: ["project"] });
await expect(pending).resolves.toEqual(["project"]);
} finally {
warn.mockRestore();
}
});

it("dispatches JSON-RPC notifications without resolving pending calls", async () => {
const transport = new MockTransport();
const client = new RuntimeRpcClient(transport);
Expand Down
Loading
Loading