From 74e71e092cd5a100f7ebea4db2ac7acd3b343c77 Mon Sep 17 00:00:00 2001 From: 7Sageer <7sageer@djwcb.cn> Date: Thu, 18 Jun 2026 14:06:29 +0800 Subject: [PATCH 1/2] feat(agent-core): add turn and llm request telemetry fields Add `llm_request` event per LLM request attempt (turn_id, step_no, client_request_id, model, provider, outcome, duration_ms, mode, attempt_no, max_attempts, retryable, token usage, stream timing, finish_reason, error_type, status_code). Add `input_submitted` and `turn_ended` events, and enrich `turn_started`, `turn_interrupted`, `tool_call`, and `tool_call_dedup_*` with turn_id, mode, step_no, tool_call_id, and permission_mode. Introduce `LLMTelemetryFields` as a dedicated channel from loop to KosongLLM, kept separate from `LLMRequestLogFields` so request logs stay minimal. Extract `classifyApiError` into telemetry.ts. --- packages/agent-core/src/agent/index.ts | 1 + packages/agent-core/src/agent/turn/index.ts | 151 ++++++----- .../agent-core/src/agent/turn/kosong-llm.ts | 104 +++++++- .../agent-core/src/agent/turn/telemetry.ts | 102 ++++++++ .../agent-core/src/agent/turn/tool-dedup.ts | 9 +- packages/agent-core/src/loop/llm.ts | 9 + packages/agent-core/src/loop/retry.ts | 8 + packages/agent-core/src/loop/run-turn.ts | 3 + packages/agent-core/src/loop/turn-step.ts | 3 + packages/agent-core/test/agent/turn.test.ts | 234 +++++++++++++++++- .../test/agent/turn/tool-dedup.test.ts | 3 +- 11 files changed, 537 insertions(+), 90 deletions(-) create mode 100644 packages/agent-core/src/agent/turn/telemetry.ts diff --git a/packages/agent-core/src/agent/index.ts b/packages/agent-core/src/agent/index.ts index ea1cd806f..eaf59211f 100644 --- a/packages/agent-core/src/agent/index.ts +++ b/packages/agent-core/src/agent/index.ts @@ -226,6 +226,7 @@ export class Agent { provider, systemPrompt: this.config.systemPrompt, capability: this.config.modelCapabilities, + telemetry: this.telemetry, generate: this.generate, completionBudgetConfig, }); diff --git a/packages/agent-core/src/agent/turn/index.ts b/packages/agent-core/src/agent/turn/index.ts index 7cd2e7330..6149065ad 100644 --- a/packages/agent-core/src/agent/turn/index.ts +++ b/packages/agent-core/src/agent/turn/index.ts @@ -2,14 +2,9 @@ import { createHash } from 'node:crypto'; import { createControlledPromise, type ControlledPromise } from '@antfu/utils'; import { - APIConnectionError, APIContextOverflowError, - APIEmptyResponseError, - APIStatusError, - APITimeoutError, grandTotal, inputTotal, - isContextOverflowStatusError, type ContentPart, type TokenUsage, } from '@moonshot-ai/kosong'; @@ -39,6 +34,7 @@ import { abortable, isUserCancellation, userCancellationReason } from '../../uti import { USER_PROMPT_ORIGIN, type PromptOrigin } from '../context'; import { renderUserPromptHookBlockResult, renderUserPromptHookResult } from '../../session/hooks'; import { canonicalTelemetryArgs, isPlainRecord } from './canonical-args'; +import { classifyApiError, type TelemetryMode } from './telemetry'; import { ToolCallDeduplicator } from './tool-dedup'; interface ActiveTurn { @@ -64,6 +60,15 @@ interface PromptHookEndResult { readonly blocked: boolean; } +interface TrackedToolCall { + readonly name: string; + readonly turnId: number; + readonly step: number; + readonly mode: TelemetryMode; + readonly permissionMode: string; + readonly startedAt: number; +} + const LLM_NOT_SET_MESSAGE = 'LLM not set, send "/login" to login'; /** Origin tag for the synthetic "continue" prompt that drives each goal turn. */ @@ -101,10 +106,10 @@ export class TurnFlow { private steerBuffer: BufferedSteer[] = []; private turnId = -1; private activeTurn: 'resuming' | ActiveTurn | null = null; - private readonly toolCallStartedAt = new Map(); + private readonly toolCallStartedAt = new Map(); private readonly toolCallDupType = new Map(); private readonly stepToolCallKeys = new Map>(); - private readonly telemetryModeByTurn = new Map(); + private readonly telemetryModeByTurn = new Map(); private readonly currentStepByTurn = new Map(); private readonly interruptedTelemetryTurnIds = new Set(); private readonly stepFailureByTurn = new Map(); @@ -412,6 +417,7 @@ export class TurnFlow { input: readonly ContentPart[], origin: PromptOrigin, ): Promise { + this.beginTelemetryTurn(turnId, origin); this.agent.usage.beginTurn(); const startedAt = Date.now(); this.agent.emitEvent({ type: 'turn.started', turnId, origin }); @@ -424,6 +430,8 @@ export class TurnFlow { }; this.agent.usage.endTurn(); this.agent.emitEvent(ended); + this.trackTurnEnded(turnId, ended, Date.now() - startedAt); + this.endTelemetryTurn(turnId); return ended; } @@ -443,10 +451,7 @@ export class TurnFlow { this.currentStep = 0; this.stepToolCallKeys.clear(); this.toolCallDupType.clear(); - const telemetryMode = this.telemetryMode(); - this.telemetryModeByTurn.set(turnId, telemetryMode); - this.currentStepByTurn.set(turnId, 0); - this.agent.telemetry.track('turn_started', { mode: telemetryMode }); + this.beginTelemetryTurn(turnId, origin); this.agent.fullCompaction.resetForTurn(); this.agent.usage.beginTurn(); this.agent.emitEvent({ type: 'turn.started', turnId, origin }); @@ -536,10 +541,8 @@ export class TurnFlow { if (ended.reason !== 'completed') { this.trackTurnInterrupted(turnId, this.currentStepByTurn.get(turnId) ?? this.currentStep); } - this.telemetryModeByTurn.delete(turnId); - this.currentStepByTurn.delete(turnId); - this.interruptedTelemetryTurnIds.delete(turnId); - this.stepFailureByTurn.delete(turnId); + this.trackTurnEnded(turnId, ended, Date.now() - startedAt); + this.endTelemetryTurn(turnId); return { event: ended, stopReason: completedStopReason, blockedByUserPromptHook }; } @@ -600,7 +603,11 @@ export class TurnFlow { private async runStepLoop(turnId: number, signal: AbortSignal): Promise { let stopHookContinuationUsed = false; let goalOutcomeMessageContinuationUsed = false; - const deduper = new ToolCallDeduplicator({ telemetry: this.agent.telemetry }); + const telemetryMode = this.telemetryModeByTurn.get(turnId) ?? this.telemetryMode(); + const deduper = new ToolCallDeduplicator({ + telemetry: this.agent.telemetry, + telemetryMode, + }); await this.agent.mcp?.waitForInitialLoad(signal); // Surface the active goal at the start of the turn (append-only; no-op when // there is no active goal). Each goal continuation is its own turn, so this @@ -620,6 +627,7 @@ export class TurnFlow { dispatchEvent: this.buildDispatchEvent(turnId), tools: this.agent.tools.loopTools, log: this.agent.log, + telemetryMode, maxSteps: loopControl?.maxStepsPerTurn, maxRetryAttempts: loopControl?.maxRetriesPerStep, recordStepUsage: async (usage) => { @@ -818,6 +826,10 @@ export class TurnFlow { ); this.toolCallStartedAt.set(event.toolCallId, { name: event.name, + turnId, + step: event.step, + mode: this.telemetryModeByTurn.get(turnId) ?? this.telemetryMode(), + permissionMode: this.agent.permission.mode, startedAt: Date.now(), }); return; @@ -830,6 +842,11 @@ export class TurnFlow { this.toolCallDupType.delete(event.toolCallId); const outcome = telemetryToolOutcome(event.result); const properties: Record = { + turn_id: started.turnId, + step_no: started.step, + tool_call_id: event.toolCallId, + mode: started.mode, + permission_mode: started.permissionMode, tool_name: started.name, outcome, duration_ms: Date.now() - started.startedAt, @@ -867,6 +884,7 @@ export class TurnFlow { this.agent.telemetry.track('tool_call_dedup_detected', { turn_id: turnId, step_no: step, + mode: this.telemetryModeByTurn.get(turnId) ?? this.telemetryMode(), tool_name: toolName, dup_type: dupType, args_hash: createHash('sha256').update(argsText).digest('hex').slice(0, 8), @@ -885,12 +903,40 @@ export class TurnFlow { if (this.interruptedTelemetryTurnIds.has(turnId)) return; this.interruptedTelemetryTurnIds.add(turnId); this.agent.telemetry.track('turn_interrupted', { + turn_id: turnId, mode: this.telemetryModeByTurn.get(turnId) ?? this.telemetryMode(), at_step: atStep, }); } - private telemetryMode(): 'agent' | 'plan' { + private beginTelemetryTurn(turnId: number, origin: PromptOrigin): TelemetryMode { + const mode = this.telemetryMode(); + this.telemetryModeByTurn.set(turnId, mode); + this.currentStepByTurn.set(turnId, 0); + if (origin.kind === 'user') { + this.agent.telemetry.track('input_submitted', { turn_id: turnId, mode }); + } + this.agent.telemetry.track('turn_started', { turn_id: turnId, mode }); + return mode; + } + + private trackTurnEnded(turnId: number, ended: TurnEndedEvent, durationMs: number): void { + this.agent.telemetry.track('turn_ended', { + turn_id: turnId, + mode: this.telemetryModeByTurn.get(turnId) ?? this.telemetryMode(), + outcome: telemetryTurnOutcome(ended), + duration_ms: durationMs, + }); + } + + private endTelemetryTurn(turnId: number): void { + this.telemetryModeByTurn.delete(turnId); + this.currentStepByTurn.delete(turnId); + this.interruptedTelemetryTurnIds.delete(turnId); + this.stepFailureByTurn.delete(turnId); + } + + private telemetryMode(): TelemetryMode { return this.agent.planMode.isActive ? 'plan' : 'agent'; } @@ -1063,71 +1109,22 @@ function interruptedStep(event: LoopTurnInterruptedEvent): number { return event.activeStep ?? event.attemptedSteps; } -interface ApiErrorClassification { - readonly errorType: string; - readonly statusCode?: number; -} - -function classifyApiError(error: unknown, summary: KimiErrorPayload): ApiErrorClassification { - const statusCode = apiStatusCode(error) ?? summaryStatusCode(summary); - if (statusCode !== undefined) { - if (statusCode === 429) return { errorType: 'rate_limit', statusCode }; - if (statusCode === 401 || statusCode === 403) return { errorType: 'auth', statusCode }; - if (statusCode >= 500) return { errorType: '5xx_server', statusCode }; - if (isContextOverflowStatusError(statusCode, summary.message)) { - return { errorType: 'context_overflow', statusCode }; - } - if (statusCode >= 400) return { errorType: '4xx_client', statusCode }; - return { errorType: 'api', statusCode }; - } - - if (summary.code === ErrorCodes.PROVIDER_RATE_LIMIT) return { errorType: 'rate_limit' }; - if (summary.code === ErrorCodes.PROVIDER_AUTH_ERROR) return { errorType: 'auth' }; - if (summary.code === ErrorCodes.CONTEXT_OVERFLOW) return { errorType: 'context_overflow' }; - if (isApiConnectionError(error, summary)) return { errorType: 'network' }; - if (isApiTimeoutError(error, summary)) return { errorType: 'timeout' }; - if (isApiEmptyResponseError(error, summary)) return { errorType: 'empty_response' }; - return { errorType: 'other' }; -} - -function apiStatusCode(error: unknown): number | undefined { - if (error instanceof APIStatusError) { - const statusCode = (error as { readonly statusCode?: unknown }).statusCode; - return typeof statusCode === 'number' ? statusCode : undefined; - } - if (typeof error !== 'object' || error === null) return undefined; - const statusCode = (error as { readonly statusCode?: unknown }).statusCode; - if (typeof statusCode === 'number') return statusCode; - const status = (error as { readonly status?: unknown }).status; - return typeof status === 'number' ? status : undefined; -} - -function summaryStatusCode(summary: KimiErrorPayload): number | undefined { - const statusCode = summary.details?.['statusCode']; - return typeof statusCode === 'number' ? statusCode : undefined; -} - -function isApiConnectionError(error: unknown, summary: KimiErrorPayload): boolean { - return error instanceof APIConnectionError || summary.name === 'APIConnectionError'; -} - -function isApiTimeoutError(error: unknown, summary: KimiErrorPayload): boolean { - return ( - error instanceof APITimeoutError || - summary.name === 'APITimeoutError' || - summary.name === 'TimeoutError' - ); -} - -function isApiEmptyResponseError(error: unknown, summary: KimiErrorPayload): boolean { - return error instanceof APIEmptyResponseError || summary.name === 'APIEmptyResponseError'; -} - function currentTurnInputTokens(usage: TokenUsage | undefined): number | undefined { if (usage === undefined) return undefined; return inputTotal(usage); } +function telemetryTurnOutcome(event: TurnEndedEvent): 'success' | 'error' | 'cancelled' { + switch (event.reason) { + case 'completed': + return 'success'; + case 'failed': + return 'error'; + case 'cancelled': + return 'cancelled'; + } +} + type ToolTelemetryResult = Extract['result']; function telemetryToolOutcome(result: ToolTelemetryResult): 'success' | 'error' | 'cancelled' { diff --git a/packages/agent-core/src/agent/turn/kosong-llm.ts b/packages/agent-core/src/agent/turn/kosong-llm.ts index fb6b34074..d871a143b 100644 --- a/packages/agent-core/src/agent/turn/kosong-llm.ts +++ b/packages/agent-core/src/agent/turn/kosong-llm.ts @@ -15,12 +15,16 @@ * provider's finish-reason spelling. */ +import { randomUUID } from 'node:crypto'; + import { emptyUsage, generate as kosongGenerate, + inputTotal, isRetryableGenerateError, type ChatProvider, type GenerateCallbacks, + type GenerateResult, type Message, type ModelCapability, type StreamedMessagePart, @@ -37,6 +41,9 @@ import { type CompletionBudgetConfig, } from '../../utils/completion-budget'; import type { GenerateOptionsWithRequestLogFields } from '../llm-request-logger'; +import { isAbortError } from '../../loop/errors'; +import type { TelemetryClient, TelemetryPropertyValue } from '../../telemetry'; +import { classifyApiError, telemetryTurnId } from './telemetry'; export type GenerateFn = typeof kosongGenerate; @@ -44,6 +51,7 @@ export interface KosongLLMConfig { readonly provider: ChatProvider; readonly systemPrompt: string; readonly capability?: ModelCapability | undefined; + readonly telemetry?: TelemetryClient | undefined; /** * Optional override for the kosong `generate()` entry point. Lets the * agent host (and its test harness) inject a scripted generator without @@ -65,17 +73,20 @@ export class KosongLLM implements LLM { private readonly provider: ChatProvider; private readonly generate: GenerateFn; private readonly completionBudgetConfig: CompletionBudgetConfig | undefined; + private readonly telemetry: TelemetryClient | undefined; constructor(config: KosongLLMConfig) { this.provider = config.provider; this.modelName = config.provider.modelName; this.systemPrompt = config.systemPrompt; this.capability = config.capability; + this.telemetry = config.telemetry; this.generate = config.generate ?? kosongGenerate; this.completionBudgetConfig = config.completionBudgetConfig; } async chat(params: LLMChatParams): Promise { + const clientRequestId = randomUUID(); let requestStartedAt = Date.now(); let firstChunkAt: number | undefined; let streamEndedAt: number | undefined; @@ -106,14 +117,28 @@ export class KosongLLM implements LLM { requestLogFields: params.requestLogFields, }; - const result = await this.generate( - effectiveProvider, - this.systemPrompt, - [...params.tools], - params.messages, - callbacks, - options, - ); + let result: GenerateResult; + try { + result = await this.generate( + effectiveProvider, + this.systemPrompt, + [...params.tools], + params.messages, + callbacks, + options, + ); + } catch (error) { + this.trackLlmRequest({ + params, + clientRequestId, + requestStartedAt, + firstChunkAt, + streamEndedAt, + outcome: params.signal.aborted || isAbortError(error) ? 'cancelled' : 'error', + error, + }); + throw error; + } // Replay merged content parts onto loop per-block callbacks after the // stream drained. This preserves WAL append order and stops partial @@ -139,12 +164,75 @@ export class KosongLLM implements LLM { : buildStreamTiming(requestStartedAt, firstChunkAt, streamEndedAt), }; + this.trackLlmRequest({ + params, + clientRequestId, + requestStartedAt, + firstChunkAt, + streamEndedAt, + outcome: 'success', + result, + streamTiming: response.streamTiming, + }); + return response; } isRetryableError(error: unknown): boolean { return isRetryableGenerateError(error); } + + private trackLlmRequest(input: { + readonly params: LLMChatParams; + readonly clientRequestId: string; + readonly requestStartedAt: number; + readonly firstChunkAt?: number | undefined; + readonly streamEndedAt?: number | undefined; + readonly outcome: 'success' | 'error' | 'cancelled'; + readonly result?: GenerateResult | undefined; + readonly streamTiming?: LLMStreamTiming | undefined; + readonly error?: unknown; + }): void { + const { params, clientRequestId, requestStartedAt, outcome, result, streamTiming, error } = input; + const fields = params.telemetryFields; + const properties: Record = { + turn_id: telemetryTurnId(fields?.turnId), + step_no: fields?.step, + client_request_id: clientRequestId, + model: this.modelName, + outcome, + duration_ms: Math.max(0, Date.now() - requestStartedAt), + mode: fields?.mode, + provider: this.provider.name, + attempt_no: fields?.attemptNo, + max_attempts: fields?.maxAttempts, + retryable: outcome === 'error' ? isRetryableGenerateError(error) : false, + }; + + const usage = result?.usage; + if (usage !== undefined && usage !== null) { + properties['input_tokens'] = inputTotal(usage); + properties['output_tokens'] = usage.output; + properties['cache_read_tokens'] = usage.inputCacheRead; + properties['cache_creation_tokens'] = usage.inputCacheCreation; + } + if (streamTiming !== undefined) { + properties['first_token_latency_ms'] = streamTiming.firstTokenLatencyMs; + properties['stream_duration_ms'] = streamTiming.streamDurationMs; + } + if (result?.finishReason !== undefined && result.finishReason !== null) { + properties['finish_reason'] = result.finishReason; + } + if (outcome === 'error' && error !== undefined) { + const classification = classifyApiError(error); + properties['error_type'] = classification.errorType; + if (classification.statusCode !== undefined) { + properties['status_code'] = classification.statusCode; + } + } + + this.telemetry?.track('llm_request', properties); + } } function buildStreamTiming( diff --git a/packages/agent-core/src/agent/turn/telemetry.ts b/packages/agent-core/src/agent/turn/telemetry.ts new file mode 100644 index 000000000..8a004969a --- /dev/null +++ b/packages/agent-core/src/agent/turn/telemetry.ts @@ -0,0 +1,102 @@ +import { + APIConnectionError, + APIEmptyResponseError, + APIStatusError, + APITimeoutError, + isContextOverflowStatusError, +} from '@moonshot-ai/kosong'; + +import { ErrorCodes, type KimiErrorPayload } from '#/errors'; + +export type TelemetryMode = 'agent' | 'plan'; + +export interface ApiErrorClassification { + readonly errorType: string; + readonly statusCode?: number; +} + +/** + * Classify a provider/API error into a stable `errorType` for telemetry. + * + * - When `error` is a native kosong `ChatProviderError` subclass (e.g. + * `APIStatusError`, `APIConnectionError`), `summary` may be omitted — the + * status code / `instanceof` checks carry all the information. This is the + * `llm_request` path inside `KosongLLM`. + * - When `error` is a `KimiError` or an already-serialized payload (the + * turn-level `api_error` path, where the error has been summarized), pass + * `summary` so the error code (`PROVIDER_RATE_LIMIT`, `PROVIDER_AUTH_ERROR`, + * `CONTEXT_OVERFLOW`) can be recognized; otherwise classification falls back + * to `'other'`. + */ +export function classifyApiError( + error: unknown, + summary?: KimiErrorPayload | undefined, +): ApiErrorClassification { + const statusCode = apiStatusCode(error) ?? summaryStatusCode(summary); + if (statusCode !== undefined) { + if (statusCode === 429) return { errorType: 'rate_limit', statusCode }; + if (statusCode === 401 || statusCode === 403) return { errorType: 'auth', statusCode }; + if (statusCode >= 500) return { errorType: '5xx_server', statusCode }; + if (isContextOverflowStatusError(statusCode, summary?.message ?? errorMessage(error))) { + return { errorType: 'context_overflow', statusCode }; + } + if (statusCode >= 400) return { errorType: '4xx_client', statusCode }; + return { errorType: 'api', statusCode }; + } + + if (summary?.code === ErrorCodes.PROVIDER_RATE_LIMIT) return { errorType: 'rate_limit' }; + if (summary?.code === ErrorCodes.PROVIDER_AUTH_ERROR) return { errorType: 'auth' }; + if (summary?.code === ErrorCodes.CONTEXT_OVERFLOW) return { errorType: 'context_overflow' }; + if (isApiConnectionError(error, summary)) return { errorType: 'network' }; + if (isApiTimeoutError(error, summary)) return { errorType: 'timeout' }; + if (isApiEmptyResponseError(error, summary)) return { errorType: 'empty_response' }; + return { errorType: 'other' }; +} + +function apiStatusCode(error: unknown): number | undefined { + if (error instanceof APIStatusError) { + const statusCode = (error as { readonly statusCode?: unknown }).statusCode; + return typeof statusCode === 'number' ? statusCode : undefined; + } + if (typeof error !== 'object' || error === null) return undefined; + const statusCode = (error as { readonly statusCode?: unknown }).statusCode; + if (typeof statusCode === 'number') return statusCode; + const status = (error as { readonly status?: unknown }).status; + return typeof status === 'number' ? status : undefined; +} + +function summaryStatusCode(summary: KimiErrorPayload | undefined): number | undefined { + const statusCode = summary?.details?.['statusCode']; + return typeof statusCode === 'number' ? statusCode : undefined; +} + +function isApiConnectionError(error: unknown, summary: KimiErrorPayload | undefined): boolean { + return error instanceof APIConnectionError || summary?.name === 'APIConnectionError'; +} + +function isApiTimeoutError(error: unknown, summary: KimiErrorPayload | undefined): boolean { + return ( + error instanceof APITimeoutError || + summary?.name === 'APITimeoutError' || + summary?.name === 'TimeoutError' + ); +} + +function isApiEmptyResponseError(error: unknown, summary: KimiErrorPayload | undefined): boolean { + return error instanceof APIEmptyResponseError || summary?.name === 'APIEmptyResponseError'; +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +/** + * Coerce a string turn id into a number when it is exactly integer-shaped + * (e.g. `"0"` → `0`); otherwise keep the original string. Lets telemetry emit + * a numeric `turn_id` for the common case while tolerating non-numeric ids. + */ +export function telemetryTurnId(turnId: string | undefined): number | string | undefined { + if (turnId === undefined) return undefined; + const numeric = Number(turnId); + return Number.isInteger(numeric) && String(numeric) === turnId ? numeric : turnId; +} diff --git a/packages/agent-core/src/agent/turn/tool-dedup.ts b/packages/agent-core/src/agent/turn/tool-dedup.ts index 31170f8cf..779e2452b 100644 --- a/packages/agent-core/src/agent/turn/tool-dedup.ts +++ b/packages/agent-core/src/agent/turn/tool-dedup.ts @@ -4,6 +4,7 @@ import type { TelemetryClient } from '../../telemetry'; import type { ExecutableToolResult } from '../../loop/types'; import { canonicalTelemetryArgs } from './canonical-args'; +import type { TelemetryMode } from './telemetry'; const REMINDER_TEXT_1 = '\n\n\n' + @@ -133,9 +134,14 @@ export class ToolCallDeduplicator { private consecutiveKey: string | null = null; private consecutiveCount = 0; private readonly telemetry: TelemetryClient | undefined; + private readonly telemetryMode: TelemetryMode | undefined; - constructor(options?: { readonly telemetry?: TelemetryClient | undefined }) { + constructor(options?: { + readonly telemetry?: TelemetryClient | undefined; + readonly telemetryMode?: TelemetryMode | undefined; + }) { this.telemetry = options?.telemetry; + this.telemetryMode = options?.telemetryMode; } beginStep(): void { @@ -251,6 +257,7 @@ export class ToolCallDeduplicator { tool_name: toolName, repeat_count: streak, action, + mode: this.telemetryMode, }); } diff --git a/packages/agent-core/src/loop/llm.ts b/packages/agent-core/src/loop/llm.ts index 1749796df..1d8d7b062 100644 --- a/packages/agent-core/src/loop/llm.ts +++ b/packages/agent-core/src/loop/llm.ts @@ -28,6 +28,14 @@ export interface LLMRequestLogFields { readonly attempt?: string; } +export interface LLMTelemetryFields { + readonly turnId?: string; + readonly step?: number; + readonly mode?: 'agent' | 'plan'; + readonly attemptNo?: number; + readonly maxAttempts?: number; +} + export interface LLMStreamTiming { readonly firstTokenLatencyMs: number; readonly streamDurationMs: number; @@ -38,6 +46,7 @@ export interface LLMChatParams { tools: readonly Tool[]; signal: AbortSignal; requestLogFields?: LLMRequestLogFields; + telemetryFields?: LLMTelemetryFields; onTextDelta?: ((delta: string) => void) | undefined; onThinkDelta?: ((delta: string) => void) | undefined; onToolCallDelta?: ((delta: ToolCallDelta) => void) | undefined; diff --git a/packages/agent-core/src/loop/retry.ts b/packages/agent-core/src/loop/retry.ts index 199b409e3..e6efe959a 100644 --- a/packages/agent-core/src/loop/retry.ts +++ b/packages/agent-core/src/loop/retry.ts @@ -21,6 +21,7 @@ export interface ChatWithRetryInput { readonly turnId: string; readonly currentStep: number; readonly stepUuid: string; + readonly telemetryMode?: 'agent' | 'plan' | undefined; readonly maxAttempts?: number; readonly log?: Logger | undefined; } @@ -94,6 +95,13 @@ function paramsForAttempt( attempt === 1 ? { turnStep } : { turnStep, attempt: `${String(attempt)}/${String(maxAttempts)}` }, + telemetryFields: { + turnId: input.turnId, + step: input.currentStep, + mode: input.telemetryMode, + attemptNo: attempt, + maxAttempts, + }, }; } diff --git a/packages/agent-core/src/loop/run-turn.ts b/packages/agent-core/src/loop/run-turn.ts index 326dba854..f124fe46a 100644 --- a/packages/agent-core/src/loop/run-turn.ts +++ b/packages/agent-core/src/loop/run-turn.ts @@ -38,6 +38,7 @@ export interface RunTurnInput { readonly tools?: readonly ExecutableTool[] | undefined; readonly hooks?: LoopHooks | undefined; readonly log?: Logger | undefined; + readonly telemetryMode?: 'agent' | 'plan' | undefined; readonly maxSteps?: number | undefined; readonly maxRetryAttempts?: number; readonly recordStepUsage?: @@ -55,6 +56,7 @@ export async function runTurn(input: RunTurnInput): Promise { tools, hooks, log, + telemetryMode, maxSteps, maxRetryAttempts, recordStepUsage: hostRecordStepUsage, @@ -90,6 +92,7 @@ export async function runTurn(input: RunTurnInput): Promise { tools, hooks, log, + telemetryMode, currentStep: steps, maxRetryAttempts, recordUsage: recordStepUsage, diff --git a/packages/agent-core/src/loop/turn-step.ts b/packages/agent-core/src/loop/turn-step.ts index b06cd67df..0999904d5 100644 --- a/packages/agent-core/src/loop/turn-step.ts +++ b/packages/agent-core/src/loop/turn-step.ts @@ -38,6 +38,7 @@ export interface ExecuteLoopStepDeps { readonly tools?: readonly ExecutableTool[] | undefined; readonly hooks?: LoopHooks | undefined; readonly log?: Logger | undefined; + readonly telemetryMode?: 'agent' | 'plan' | undefined; readonly currentStep: number; readonly maxRetryAttempts?: number; readonly recordUsage: (usage: TokenUsage) => RecordStepUsageResult | void | Promise; @@ -56,6 +57,7 @@ export async function executeLoopStep(deps: ExecuteLoopStepDeps): Promise<{ tools, hooks, log, + telemetryMode, currentStep, maxRetryAttempts, recordUsage, @@ -117,6 +119,7 @@ export async function executeLoopStep(deps: ExecuteLoopStepDeps): Promise<{ turnId, currentStep, stepUuid, + telemetryMode, maxAttempts: maxRetryAttempts, log, }); diff --git a/packages/agent-core/test/agent/turn.test.ts b/packages/agent-core/test/agent/turn.test.ts index e39094fb4..23d504a5a 100644 --- a/packages/agent-core/test/agent/turn.test.ts +++ b/packages/agent-core/test/agent/turn.test.ts @@ -62,13 +62,158 @@ describe('Agent turn flow', () => { await ctx.rpc.prompt({ input: [{ type: 'text', text: 'Hello without login' }] }); await ctx.untilTurnEnd(); + expect(records).toContainEqual({ + event: 'input_submitted', + properties: { turn_id: 0, mode: 'agent' }, + }); expect(records).toContainEqual({ event: 'turn_started', - properties: { mode: 'agent' }, + properties: { turn_id: 0, mode: 'agent' }, }); expect(records).toContainEqual({ event: 'turn_interrupted', - properties: { mode: 'agent', at_step: 0 }, + properties: { turn_id: 0, mode: 'agent', at_step: 0 }, + }); + expect(records).toContainEqual({ + event: 'turn_ended', + properties: { + turn_id: 0, + mode: 'agent', + outcome: 'error', + duration_ms: expect.any(Number), + }, + }); + }); + + it('tracks successful turn and llm request telemetry', async () => { + const records: TelemetryRecord[] = []; + const ctx = testAgent({ telemetry: recordingTelemetry(records) }); + ctx.configure(); + records.length = 0; + + ctx.mockNextResponse({ type: 'text', text: 'Hello from telemetry' }); + await ctx.rpc.prompt({ input: [{ type: 'text', text: 'Hello' }] }); + await ctx.untilTurnEnd(); + + expect(records).toContainEqual({ + event: 'input_submitted', + properties: { turn_id: 0, mode: 'agent' }, + }); + expect(records).toContainEqual({ + event: 'turn_started', + properties: { turn_id: 0, mode: 'agent' }, + }); + expect(records).toContainEqual({ + event: 'llm_request', + properties: expect.objectContaining({ + turn_id: 0, + step_no: 1, + mode: 'agent', + attempt_no: 1, + max_attempts: 3, + client_request_id: expect.any(String), + model: 'mock-model', + provider: 'kimi', + outcome: 'success', + retryable: false, + duration_ms: expect.any(Number), + first_token_latency_ms: expect.any(Number), + stream_duration_ms: expect.any(Number), + finish_reason: 'completed', + input_tokens: expect.any(Number), + output_tokens: expect.any(Number), + cache_read_tokens: 0, + cache_creation_tokens: 0, + }), + }); + expect(records).toContainEqual({ + event: 'turn_ended', + properties: { + turn_id: 0, + mode: 'agent', + outcome: 'success', + duration_ms: expect.any(Number), + }, + }); + }); + + it('tracks cancelled turn and llm request telemetry', async () => { + const records: TelemetryRecord[] = []; + const ctx = testAgent({ generate: abortableGenerate, telemetry: recordingTelemetry(records) }); + ctx.configure(); + + const stepStarted = ctx.once('turn.step.started'); + await ctx.rpc.prompt({ input: [{ type: 'text', text: 'Cancel me' }] }); + await stepStarted; + records.length = 0; + + await ctx.rpc.cancel({ turnId: 0 }); + await ctx.untilTurnEnd(); + + expect(records).toContainEqual({ + event: 'llm_request', + properties: expect.objectContaining({ + turn_id: 0, + step_no: 1, + mode: 'agent', + attempt_no: 1, + max_attempts: 3, + client_request_id: expect.any(String), + model: 'mock-model', + provider: 'kimi', + outcome: 'cancelled', + retryable: false, + duration_ms: expect.any(Number), + }), + }); + const llmRequest = records.find((record) => record.event === 'llm_request'); + expect(llmRequest?.properties).not.toHaveProperty('error_type'); + expect(records).toContainEqual({ + event: 'turn_ended', + properties: { + turn_id: 0, + mode: 'agent', + outcome: 'cancelled', + duration_ms: expect.any(Number), + }, + }); + }); + + it('propagates plan mode across turn telemetry', async () => { + const records: TelemetryRecord[] = []; + const ctx = testAgent({ telemetry: recordingTelemetry(records) }); + ctx.configure(); + await ctx.agent.planMode.enter('telemetry-plan'); + records.length = 0; + + ctx.mockNextResponse({ type: 'text', text: 'Here is the plan.' }); + await ctx.rpc.prompt({ input: [{ type: 'text', text: 'Plan something' }] }); + await ctx.untilTurnEnd(); + + expect(records).toContainEqual({ + event: 'input_submitted', + properties: { turn_id: 0, mode: 'plan' }, + }); + expect(records).toContainEqual({ + event: 'turn_started', + properties: { turn_id: 0, mode: 'plan' }, + }); + expect(records).toContainEqual({ + event: 'llm_request', + properties: expect.objectContaining({ + turn_id: 0, + step_no: 1, + mode: 'plan', + outcome: 'success', + }), + }); + expect(records).toContainEqual({ + event: 'turn_ended', + properties: expect.objectContaining({ + turn_id: 0, + mode: 'plan', + outcome: 'success', + }), }); }); @@ -96,6 +241,7 @@ describe('Agent turn flow', () => { properties: { turn_id: 0, step_no: 1, + mode: 'agent', tool_name: 'Bash', dup_type: 'same_step', args_hash: expect.any(String), @@ -134,6 +280,7 @@ describe('Agent turn flow', () => { properties: { turn_id: 0, step_no: 2, + mode: 'agent', tool_name: 'Bash', dup_type: 'cross_step', args_hash: expect.any(String), @@ -142,6 +289,11 @@ describe('Agent turn flow', () => { expect(records).toContainEqual({ event: 'tool_call', properties: expect.objectContaining({ + turn_id: 0, + step_no: 2, + tool_call_id: 'call_dup_2', + mode: 'agent', + permission_mode: 'yolo', tool_name: 'Bash', outcome: 'success', dup_type: 'cross_step', @@ -219,6 +371,11 @@ describe('Agent turn flow', () => { expect(records).toContainEqual({ event: 'tool_call', properties: expect.objectContaining({ + turn_id: 0, + step_no: 1, + tool_call_id: 'call_missing', + mode: 'agent', + permission_mode: 'manual', tool_name: 'MissingTool', outcome: 'error', dup_type: 'normal', @@ -1396,10 +1553,35 @@ describe('Agent turn flow', () => { if (statusCode === undefined) { expect(record?.properties).not.toHaveProperty('status_code'); } + + const llmRequest = records.find((candidate) => candidate.event === 'llm_request'); + expect(llmRequest).toEqual({ + event: 'llm_request', + properties: expect.objectContaining({ + turn_id: 0, + step_no: 1, + mode: 'agent', + attempt_no: 1, + max_attempts: 1, + client_request_id: expect.any(String), + model: 'mock-model', + provider: 'kimi', + outcome: 'error', + retryable: expect.any(Boolean), + duration_ms: expect.any(Number), + error_type: errorType, + }), + }); + if (statusCode !== undefined) { + expect(llmRequest?.properties).toMatchObject({ status_code: statusCode }); + } else { + expect(llmRequest?.properties).not.toHaveProperty('status_code'); + } }); it('keeps transient retry handling with request-scoped OAuth auth', async () => { const { logger, entries } = captureLogs(); + const records: TelemetryRecord[] = []; const authKeys: string[] = []; const oauthOptions = oauthAgentOptions(async () => 'fresh-token'); const generate: GenerateFn = async ( @@ -1419,10 +1601,16 @@ describe('Agent turn flow', () => { options?.onStreamEnd?.(); return textResult('Recovered after retry'); }; - const ctx = testAgent({ ...oauthOptions, generate, log: logger }); + const ctx = testAgent({ + ...oauthOptions, + generate, + log: logger, + telemetry: recordingTelemetry(records), + }); ctx.configure(); await ctx.rpc.setModel({ model: 'kimi-code' }); ctx.newEvents(); + records.length = 0; await ctx.rpc.prompt({ input: [{ type: 'text', text: 'hello' }] }); const events = await ctx.untilTurnEnd(); @@ -1449,6 +1637,46 @@ describe('Agent turn flow', () => { expect(payloads[0]).toMatchObject({ turnStep: '0.1' }); expect(payloads[0]).not.toHaveProperty('attempt'); expect(payloads[1]).toMatchObject({ turnStep: '0.1', attempt: '2/3' }); + + const llmRequests = records.filter((record) => record.event === 'llm_request'); + expect(llmRequests).toHaveLength(2); + expect(llmRequests[0]).toEqual({ + event: 'llm_request', + properties: expect.objectContaining({ + turn_id: 0, + step_no: 1, + mode: 'agent', + attempt_no: 1, + max_attempts: 3, + client_request_id: expect.any(String), + model: 'kimi-for-coding', + provider: 'google_genai', + outcome: 'error', + retryable: true, + error_type: 'network', + duration_ms: expect.any(Number), + }), + }); + expect(llmRequests[1]).toEqual({ + event: 'llm_request', + properties: expect.objectContaining({ + turn_id: 0, + step_no: 1, + mode: 'agent', + attempt_no: 2, + max_attempts: 3, + client_request_id: expect.any(String), + model: 'kimi-for-coding', + provider: 'google_genai', + outcome: 'success', + retryable: false, + duration_ms: expect.any(Number), + finish_reason: 'completed', + }), + }); + expect(llmRequests[0]?.properties?.['client_request_id']).not.toBe( + llmRequests[1]?.properties?.['client_request_id'], + ); }); it('force-refreshes OAuth credentials on video upload 401 and falls back to login_required when replay 401', async () => { diff --git a/packages/agent-core/test/agent/turn/tool-dedup.test.ts b/packages/agent-core/test/agent/turn/tool-dedup.test.ts index cff03b9bb..d50429d16 100644 --- a/packages/agent-core/test/agent/turn/tool-dedup.test.ts +++ b/packages/agent-core/test/agent/turn/tool-dedup.test.ts @@ -466,7 +466,7 @@ describe('ToolCallDeduplicator', () => { describe('repeat telemetry', () => { it('emits tool_call_repeat with the streak count starting at the second occurrence', async () => { const { client, events } = makeRecordingTelemetry(); - const dedup = new ToolCallDeduplicator({ telemetry: client }); + const dedup = new ToolCallDeduplicator({ telemetry: client, telemetryMode: 'agent' }); for (let i = 0; i < 3; i += 1) { dedup.beginStep(); await runOriginal(dedup, `c${String(i)}`, 'Read', { p: 1 }, okResult('R')); @@ -475,6 +475,7 @@ describe('ToolCallDeduplicator', () => { const repeats = events.filter((e) => e.event === 'tool_call_repeat'); expect(repeats.map((e) => e.properties?.['repeat_count'])).toEqual([2, 3]); expect(repeats.every((e) => e.properties?.['tool_name'] === 'Read')).toBe(true); + expect(repeats.every((e) => e.properties?.['mode'] === 'agent')).toBe(true); }); it('does not emit telemetry on the first call', async () => { From 23512322e350dbebba205a254e42b6e91a7e24ff Mon Sep 17 00:00:00 2001 From: 7Sageer <7sageer@djwcb.cn> Date: Thu, 18 Jun 2026 15:57:40 +0800 Subject: [PATCH 2/2] fix(agent-core): correct telemetry request mode and turn outcomes --- packages/agent-core/src/agent/turn/index.ts | 35 +++++++-- .../agent-core/src/agent/turn/telemetry.ts | 36 ++++++--- packages/agent-core/src/loop/retry.ts | 8 +- packages/agent-core/src/loop/run-turn.ts | 3 +- packages/agent-core/src/loop/turn-step.ts | 3 +- packages/agent-core/test/agent/turn.test.ts | 73 ++++++++++++++++++- .../test/harness/goal-session.test.ts | 26 ++++++- 7 files changed, 159 insertions(+), 25 deletions(-) diff --git a/packages/agent-core/src/agent/turn/index.ts b/packages/agent-core/src/agent/turn/index.ts index 6149065ad..7c03a462a 100644 --- a/packages/agent-core/src/agent/turn/index.ts +++ b/packages/agent-core/src/agent/turn/index.ts @@ -69,6 +69,9 @@ interface TrackedToolCall { readonly startedAt: number; } +type TurnTelemetryOutcome = 'success' | 'error' | 'cancelled' | 'blocked'; +type TurnBlockedReason = 'user_prompt_hook' | 'goal_budget'; + const LLM_NOT_SET_MESSAGE = 'LLM not set, send "/login" to login'; /** Origin tag for the synthetic "continue" prompt that drives each goal turn. */ @@ -430,7 +433,7 @@ export class TurnFlow { }; this.agent.usage.endTurn(); this.agent.emitEvent(ended); - this.trackTurnEnded(turnId, ended, Date.now() - startedAt); + this.trackTurnEnded(turnId, ended, Date.now() - startedAt, 'goal_budget'); this.endTelemetryTurn(turnId); return ended; } @@ -541,7 +544,12 @@ export class TurnFlow { if (ended.reason !== 'completed') { this.trackTurnInterrupted(turnId, this.currentStepByTurn.get(turnId) ?? this.currentStep); } - this.trackTurnEnded(turnId, ended, Date.now() - startedAt); + this.trackTurnEnded( + turnId, + ended, + Date.now() - startedAt, + blockedByUserPromptHook ? 'user_prompt_hook' : undefined, + ); this.endTelemetryTurn(turnId); return { event: ended, stopReason: completedStopReason, blockedByUserPromptHook }; } @@ -627,7 +635,7 @@ export class TurnFlow { dispatchEvent: this.buildDispatchEvent(turnId), tools: this.agent.tools.loopTools, log: this.agent.log, - telemetryMode, + telemetryMode: () => this.telemetryMode(), maxSteps: loopControl?.maxStepsPerTurn, maxRetryAttempts: loopControl?.maxRetriesPerStep, recordStepUsage: async (usage) => { @@ -920,13 +928,20 @@ export class TurnFlow { return mode; } - private trackTurnEnded(turnId: number, ended: TurnEndedEvent, durationMs: number): void { - this.agent.telemetry.track('turn_ended', { + private trackTurnEnded( + turnId: number, + ended: TurnEndedEvent, + durationMs: number, + blockedReason?: TurnBlockedReason, + ): void { + const properties: Record = { turn_id: turnId, mode: this.telemetryModeByTurn.get(turnId) ?? this.telemetryMode(), - outcome: telemetryTurnOutcome(ended), + outcome: telemetryTurnOutcome(ended, blockedReason), duration_ms: durationMs, - }); + blocked_reason: blockedReason, + }; + this.agent.telemetry.track('turn_ended', properties); } private endTelemetryTurn(turnId: number): void { @@ -1114,7 +1129,11 @@ function currentTurnInputTokens(usage: TokenUsage | undefined): number | undefin return inputTotal(usage); } -function telemetryTurnOutcome(event: TurnEndedEvent): 'success' | 'error' | 'cancelled' { +function telemetryTurnOutcome( + event: TurnEndedEvent, + blockedReason?: TurnBlockedReason, +): TurnTelemetryOutcome { + if (blockedReason !== undefined) return 'blocked'; switch (event.reason) { case 'completed': return 'success'; diff --git a/packages/agent-core/src/agent/turn/telemetry.ts b/packages/agent-core/src/agent/turn/telemetry.ts index 8a004969a..b02094faa 100644 --- a/packages/agent-core/src/agent/turn/telemetry.ts +++ b/packages/agent-core/src/agent/turn/telemetry.ts @@ -6,9 +6,14 @@ import { isContextOverflowStatusError, } from '@moonshot-ai/kosong'; -import { ErrorCodes, type KimiErrorPayload } from '#/errors'; +import { ErrorCodes, isKimiError, type KimiErrorPayload } from '#/errors'; export type TelemetryMode = 'agent' | 'plan'; +export type TelemetryModeResolver = TelemetryMode | (() => TelemetryMode); + +export function resolveTelemetryMode(mode: TelemetryModeResolver | undefined): TelemetryMode | undefined { + return typeof mode === 'function' ? mode() : mode; +} export interface ApiErrorClassification { readonly errorType: string; @@ -22,17 +27,20 @@ export interface ApiErrorClassification { * `APIStatusError`, `APIConnectionError`), `summary` may be omitted — the * status code / `instanceof` checks carry all the information. This is the * `llm_request` path inside `KosongLLM`. - * - When `error` is a `KimiError` or an already-serialized payload (the - * turn-level `api_error` path, where the error has been summarized), pass - * `summary` so the error code (`PROVIDER_RATE_LIMIT`, `PROVIDER_AUTH_ERROR`, - * `CONTEXT_OVERFLOW`) can be recognized; otherwise classification falls back - * to `'other'`. + * - When `error` is a `KimiError`, its `code` and `details.statusCode` are + * recognized directly, so the `llm_request` path can classify wrapped OAuth + * errors without re-summarizing them. + * - When `error` is an already-serialized payload (the turn-level `api_error` + * path, where the error has been summarized), pass `summary` so the error + * code (`PROVIDER_RATE_LIMIT`, `PROVIDER_AUTH_ERROR`, `CONTEXT_OVERFLOW`) + * can be recognized; otherwise classification falls back to `'other'`. */ export function classifyApiError( error: unknown, summary?: KimiErrorPayload | undefined, ): ApiErrorClassification { - const statusCode = apiStatusCode(error) ?? summaryStatusCode(summary); + const errorCode = isKimiError(error) ? error.code : summary?.code; + const statusCode = apiStatusCode(error) ?? kimiErrorStatusCode(error) ?? summaryStatusCode(summary); if (statusCode !== undefined) { if (statusCode === 429) return { errorType: 'rate_limit', statusCode }; if (statusCode === 401 || statusCode === 403) return { errorType: 'auth', statusCode }; @@ -44,9 +52,11 @@ export function classifyApiError( return { errorType: 'api', statusCode }; } - if (summary?.code === ErrorCodes.PROVIDER_RATE_LIMIT) return { errorType: 'rate_limit' }; - if (summary?.code === ErrorCodes.PROVIDER_AUTH_ERROR) return { errorType: 'auth' }; - if (summary?.code === ErrorCodes.CONTEXT_OVERFLOW) return { errorType: 'context_overflow' }; + if (errorCode === ErrorCodes.PROVIDER_RATE_LIMIT) return { errorType: 'rate_limit' }; + if (errorCode === ErrorCodes.PROVIDER_AUTH_ERROR || errorCode === ErrorCodes.AUTH_LOGIN_REQUIRED) { + return { errorType: 'auth' }; + } + if (errorCode === ErrorCodes.CONTEXT_OVERFLOW) return { errorType: 'context_overflow' }; if (isApiConnectionError(error, summary)) return { errorType: 'network' }; if (isApiTimeoutError(error, summary)) return { errorType: 'timeout' }; if (isApiEmptyResponseError(error, summary)) return { errorType: 'empty_response' }; @@ -70,6 +80,12 @@ function summaryStatusCode(summary: KimiErrorPayload | undefined): number | unde return typeof statusCode === 'number' ? statusCode : undefined; } +function kimiErrorStatusCode(error: unknown): number | undefined { + if (!isKimiError(error)) return undefined; + const statusCode = error.details?.['statusCode']; + return typeof statusCode === 'number' ? statusCode : undefined; +} + function isApiConnectionError(error: unknown, summary: KimiErrorPayload | undefined): boolean { return error instanceof APIConnectionError || summary?.name === 'APIConnectionError'; } diff --git a/packages/agent-core/src/loop/retry.ts b/packages/agent-core/src/loop/retry.ts index e6efe959a..f55d9b522 100644 --- a/packages/agent-core/src/loop/retry.ts +++ b/packages/agent-core/src/loop/retry.ts @@ -4,6 +4,10 @@ import * as retry from 'retry'; import type { Logger } from '#/logging/types'; import { abortable } from '../utils/abort'; +import { + resolveTelemetryMode, + type TelemetryModeResolver, +} from '../agent/turn/telemetry'; import type { LoopEventDispatcher } from './events'; import { isAbortError } from './errors'; import type { LLM, LLMChatParams, LLMChatResponse } from './llm'; @@ -21,7 +25,7 @@ export interface ChatWithRetryInput { readonly turnId: string; readonly currentStep: number; readonly stepUuid: string; - readonly telemetryMode?: 'agent' | 'plan' | undefined; + readonly telemetryMode?: TelemetryModeResolver; readonly maxAttempts?: number; readonly log?: Logger | undefined; } @@ -98,7 +102,7 @@ function paramsForAttempt( telemetryFields: { turnId: input.turnId, step: input.currentStep, - mode: input.telemetryMode, + mode: resolveTelemetryMode(input.telemetryMode), attemptNo: attempt, maxAttempts, }, diff --git a/packages/agent-core/src/loop/run-turn.ts b/packages/agent-core/src/loop/run-turn.ts index f124fe46a..2f2f362a2 100644 --- a/packages/agent-core/src/loop/run-turn.ts +++ b/packages/agent-core/src/loop/run-turn.ts @@ -10,6 +10,7 @@ import { addUsage, emptyUsage, type TokenUsage } from '@moonshot-ai/kosong'; import type { Logger } from '#/logging/types'; +import type { TelemetryModeResolver } from '../agent/turn/telemetry'; import { createMaxStepsExceededError, errorMessage, @@ -38,7 +39,7 @@ export interface RunTurnInput { readonly tools?: readonly ExecutableTool[] | undefined; readonly hooks?: LoopHooks | undefined; readonly log?: Logger | undefined; - readonly telemetryMode?: 'agent' | 'plan' | undefined; + readonly telemetryMode?: TelemetryModeResolver; readonly maxSteps?: number | undefined; readonly maxRetryAttempts?: number; readonly recordStepUsage?: diff --git a/packages/agent-core/src/loop/turn-step.ts b/packages/agent-core/src/loop/turn-step.ts index 0999904d5..f3b4f830d 100644 --- a/packages/agent-core/src/loop/turn-step.ts +++ b/packages/agent-core/src/loop/turn-step.ts @@ -13,6 +13,7 @@ import type { TokenUsage } from '@moonshot-ai/kosong'; import type { Logger } from '#/logging/types'; import type { LoopEventDispatcher } from './events'; +import type { TelemetryModeResolver } from '../agent/turn/telemetry'; import type { LLM, LLMChatParams, LLMChatResponse } from './llm'; import { chatWithRetry } from './retry'; import { runToolCallBatch, type ToolCallStepContext } from './tool-call'; @@ -38,7 +39,7 @@ export interface ExecuteLoopStepDeps { readonly tools?: readonly ExecutableTool[] | undefined; readonly hooks?: LoopHooks | undefined; readonly log?: Logger | undefined; - readonly telemetryMode?: 'agent' | 'plan' | undefined; + readonly telemetryMode?: TelemetryModeResolver; readonly currentStep: number; readonly maxRetryAttempts?: number; readonly recordUsage: (usage: TokenUsage) => RecordStepUsageResult | void | Promise; diff --git a/packages/agent-core/test/agent/turn.test.ts b/packages/agent-core/test/agent/turn.test.ts index 23d504a5a..5ae6e80dd 100644 --- a/packages/agent-core/test/agent/turn.test.ts +++ b/packages/agent-core/test/agent/turn.test.ts @@ -217,6 +217,40 @@ describe('Agent turn flow', () => { }); }); + it('tracks request-time plan mode for llm requests after EnterPlanMode', async () => { + const records: TelemetryRecord[] = []; + const ctx = testAgent({ telemetry: recordingTelemetry(records) }); + ctx.configure({ tools: ['EnterPlanMode'] }); + await ctx.rpc.setPermission({ mode: 'yolo' }); + records.length = 0; + + const enterPlanModeCall: ToolCall = { + type: 'function', + id: 'call_enter_plan', + name: 'EnterPlanMode', + arguments: '{}', + }; + ctx.mockNextResponse({ type: 'text', text: 'I will enter plan mode.' }, enterPlanModeCall); + ctx.mockNextResponse({ type: 'text', text: 'Plan mode is active now.' }); + await ctx.rpc.prompt({ input: [{ type: 'text', text: 'Plan first' }] }); + await ctx.untilTurnEnd(); + + const llmRequests = records.filter((record) => record.event === 'llm_request'); + expect(llmRequests).toHaveLength(2); + expect(llmRequests[0]?.properties).toMatchObject({ + turn_id: 0, + step_no: 1, + mode: 'agent', + outcome: 'success', + }); + expect(llmRequests[1]?.properties).toMatchObject({ + turn_id: 0, + step_no: 2, + mode: 'plan', + outcome: 'success', + }); + }); + it('tracks duplicate tool-call detection telemetry', async () => { const records: TelemetryRecord[] = []; const ctx = testAgent({ @@ -737,8 +771,10 @@ describe('Agent turn flow', () => { command: "echo 'no profanity' >&2; exit 2", }, ]); - const ctx = testAgent({ hookEngine }); + const records: TelemetryRecord[] = []; + const ctx = testAgent({ hookEngine, telemetry: recordingTelemetry(records) }); ctx.configure(); + records.length = 0; await ctx.rpc.prompt({ input: [{ type: 'text', text: 'bad words here' }] }); const events = await ctx.untilTurnEnd(); @@ -755,6 +791,16 @@ describe('Agent turn flow', () => { }), }), ); + expect(records).toContainEqual({ + event: 'turn_ended', + properties: { + turn_id: 0, + mode: 'agent', + outcome: 'blocked', + blocked_reason: 'user_prompt_hook', + duration_ms: expect.any(Number), + }, + }); expect(ctx.agent.context.data().history).toEqual([ { role: 'user', @@ -1379,6 +1425,7 @@ describe('Agent turn flow', () => { it('falls back to login_required when force-refresh and replay both 401', async () => { const tokenCalls: Array = []; const authKeys: string[] = []; + const records: TelemetryRecord[] = []; const oauthOptions = oauthAgentOptions( async (options) => { tokenCalls.push(options?.force); @@ -1397,10 +1444,11 @@ describe('Agent turn flow', () => { authKeys.push(options?.auth?.apiKey ?? ''); throw new APIStatusError(401, 'Unauthorized', 'req-401'); }; - const ctx = testAgent({ ...oauthOptions, generate }); + const ctx = testAgent({ ...oauthOptions, generate, telemetry: recordingTelemetry(records) }); ctx.configure(); await ctx.rpc.setModel({ model: 'kimi-code' }); ctx.newEvents(); + records.length = 0; await ctx.rpc.prompt({ input: [{ type: 'text', text: 'hello' }] }); const events = await ctx.untilTurnEnd(); @@ -1423,6 +1471,27 @@ describe('Agent turn flow', () => { }), }), ); + + expect(records).toContainEqual({ + event: 'llm_request', + properties: expect.objectContaining({ + turn_id: 0, + step_no: 1, + mode: 'agent', + attempt_no: 1, + max_attempts: 3, + outcome: 'error', + error_type: 'auth', + status_code: 401, + }), + }); + expect(records).toContainEqual({ + event: 'api_error', + properties: expect.objectContaining({ + error_type: 'auth', + status_code: 401, + }), + }); }); it('keeps non-OAuth provider 401 as provider auth error', async () => { diff --git a/packages/agent-core/test/harness/goal-session.test.ts b/packages/agent-core/test/harness/goal-session.test.ts index 84e0edffe..e8f3742be 100644 --- a/packages/agent-core/test/harness/goal-session.test.ts +++ b/packages/agent-core/test/harness/goal-session.test.ts @@ -14,7 +14,9 @@ import type { ResolvedAgentProfile } from '../../src/profile'; import type { SDKSessionRPC } from '../../src/rpc'; import { Session } from '../../src/session'; import { SessionAPIImpl } from '../../src/session/rpc'; +import type { TelemetryClient } from '../../src/telemetry'; import { createScriptedGenerate } from '../agent/harness/scripted-generate'; +import { recordingTelemetry, type TelemetryRecord } from '../fixtures/telemetry'; import { testKaos } from '../fixtures/test-kaos'; const MOCK_PROVIDER = { type: 'kimi', apiKey: 'test-key', model: 'mock-model' } as const satisfies ProviderConfig; @@ -81,6 +83,7 @@ async function setupSession( generate?: NonNullable, hooks?: readonly HookDef[], config?: KimiConfig, + telemetry?: TelemetryClient, ) { const scripted = createScriptedGenerate(); const session = track( @@ -93,6 +96,7 @@ async function setupSession( providerManager: testProviderManager(), hooks, config, + telemetry, }), ); const { agent } = await session.createAgent( @@ -433,13 +437,23 @@ describe('goal session end-to-end', () => { it('blocks immediately when a resumed goal is already over budget', async () => { const sessionDir = await makeTempDir(); const events: Array> = []; - const { session, agent, scripted } = await setupSession(sessionDir, events, ['GetGoal']); + const records: TelemetryRecord[] = []; + const { session, agent, scripted } = await setupSession( + sessionDir, + events, + ['GetGoal'], + undefined, + undefined, + undefined, + recordingTelemetry(records), + ); const api = new SessionAPIImpl(session); await api.createGoal({ agentId: 'main', objective: 'work' }); await agent.goal.setBudgetLimits({ budgetLimits: { turnBudget: 1 } }, 'model'); await agent.goal.incrementTurn(); await agent.goal.markBlocked({ reason: 'A configured budget was reached' }); await api.resumeGoal({ agentId: 'main' }); + records.length = 0; scripted.mockNextResponse({ type: 'text', text: 'should not run' }); agent.turn.prompt([{ type: 'text', text: 'continue' }]); @@ -449,6 +463,16 @@ describe('goal session end-to-end', () => { expect(scripted.calls).toHaveLength(0); expect(goal?.status).toBe('blocked'); expect(goal?.turnsUsed).toBe(1); + expect(records).toContainEqual({ + event: 'turn_ended', + properties: { + turn_id: 0, + mode: 'agent', + outcome: 'blocked', + blocked_reason: 'goal_budget', + duration_ms: expect.any(Number), + }, + }); }); it('stops before another model step when a token budget is reached mid-turn', async () => {