Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/agent-core/src/agent/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ export class Agent {
provider,
systemPrompt: this.config.systemPrompt,
capability: this.config.modelCapabilities,
telemetry: this.telemetry,
generate: this.generate,
completionBudgetConfig,
});
Expand Down
170 changes: 93 additions & 77 deletions packages/agent-core/src/agent/turn/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@ import { createHash } from 'node:crypto';

import { createControlledPromise, type ControlledPromise } from '@antfu/utils';
import {
APIConnectionError,
APIContextOverflowError,
APIEmptyResponseError,
APIStatusError,
APITimeoutError,
grandTotal,
inputTotal,
isContextOverflowStatusError,
type ContentPart,
type TokenUsage,
} from '@moonshot-ai/kosong';
Expand Down Expand Up @@ -39,6 +34,7 @@ import { abortable, isUserCancellation, userCancellationReason } from '../../uti
import { USER_PROMPT_ORIGIN, type PromptOrigin } from '../context';
import { renderUserPromptHookBlockResult, renderUserPromptHookResult } from '../../session/hooks';
import { canonicalTelemetryArgs, isPlainRecord } from './canonical-args';
import { classifyApiError, type TelemetryMode } from './telemetry';
import { ToolCallDeduplicator } from './tool-dedup';

interface ActiveTurn {
Expand All @@ -64,6 +60,18 @@ interface PromptHookEndResult {
readonly blocked: boolean;
}

interface TrackedToolCall {
readonly name: string;
readonly turnId: number;
readonly step: number;
readonly mode: TelemetryMode;
readonly permissionMode: string;
readonly startedAt: number;
}

type TurnTelemetryOutcome = 'success' | 'error' | 'cancelled' | 'blocked';
type TurnBlockedReason = 'user_prompt_hook' | 'goal_budget';

const LLM_NOT_SET_MESSAGE = 'LLM not set, send "/login" to login';

/** Origin tag for the synthetic "continue" prompt that drives each goal turn. */
Expand Down Expand Up @@ -101,10 +109,10 @@ export class TurnFlow {
private steerBuffer: BufferedSteer[] = [];
private turnId = -1;
private activeTurn: 'resuming' | ActiveTurn | null = null;
private readonly toolCallStartedAt = new Map<string, { name: string; startedAt: number }>();
private readonly toolCallStartedAt = new Map<string, TrackedToolCall>();
private readonly toolCallDupType = new Map<string, 'normal' | 'cross_step'>();
private readonly stepToolCallKeys = new Map<number, Set<string>>();
private readonly telemetryModeByTurn = new Map<number, 'agent' | 'plan'>();
private readonly telemetryModeByTurn = new Map<number, TelemetryMode>();
private readonly currentStepByTurn = new Map<number, number>();
private readonly interruptedTelemetryTurnIds = new Set<number>();
private readonly stepFailureByTurn = new Map<number, LoopTurnInterruptedEvent>();
Expand Down Expand Up @@ -412,6 +420,7 @@ export class TurnFlow {
input: readonly ContentPart[],
origin: PromptOrigin,
): Promise<TurnEndedEvent> {
this.beginTelemetryTurn(turnId, origin);
this.agent.usage.beginTurn();
const startedAt = Date.now();
this.agent.emitEvent({ type: 'turn.started', turnId, origin });
Expand All @@ -424,6 +433,8 @@ export class TurnFlow {
};
this.agent.usage.endTurn();
this.agent.emitEvent(ended);
this.trackTurnEnded(turnId, ended, Date.now() - startedAt, 'goal_budget');
this.endTelemetryTurn(turnId);
return ended;
}

Expand All @@ -443,10 +454,7 @@ export class TurnFlow {
this.currentStep = 0;
this.stepToolCallKeys.clear();
this.toolCallDupType.clear();
const telemetryMode = this.telemetryMode();
this.telemetryModeByTurn.set(turnId, telemetryMode);
this.currentStepByTurn.set(turnId, 0);
this.agent.telemetry.track('turn_started', { mode: telemetryMode });
this.beginTelemetryTurn(turnId, origin);
this.agent.fullCompaction.resetForTurn();
this.agent.usage.beginTurn();
this.agent.emitEvent({ type: 'turn.started', turnId, origin });
Expand Down Expand Up @@ -536,10 +544,13 @@ export class TurnFlow {
if (ended.reason !== 'completed') {
this.trackTurnInterrupted(turnId, this.currentStepByTurn.get(turnId) ?? this.currentStep);
}
this.telemetryModeByTurn.delete(turnId);
this.currentStepByTurn.delete(turnId);
this.interruptedTelemetryTurnIds.delete(turnId);
this.stepFailureByTurn.delete(turnId);
this.trackTurnEnded(
turnId,
ended,
Date.now() - startedAt,
blockedByUserPromptHook ? 'user_prompt_hook' : undefined,
);
this.endTelemetryTurn(turnId);
return { event: ended, stopReason: completedStopReason, blockedByUserPromptHook };
}

Expand Down Expand Up @@ -600,7 +611,11 @@ export class TurnFlow {
private async runStepLoop(turnId: number, signal: AbortSignal): Promise<LoopTurnStopReason> {
let stopHookContinuationUsed = false;
let goalOutcomeMessageContinuationUsed = false;
const deduper = new ToolCallDeduplicator({ telemetry: this.agent.telemetry });
const telemetryMode = this.telemetryModeByTurn.get(turnId) ?? this.telemetryMode();
const deduper = new ToolCallDeduplicator({
telemetry: this.agent.telemetry,
telemetryMode,
});
await this.agent.mcp?.waitForInitialLoad(signal);
// Surface the active goal at the start of the turn (append-only; no-op when
// there is no active goal). Each goal continuation is its own turn, so this
Expand All @@ -620,6 +635,7 @@ export class TurnFlow {
dispatchEvent: this.buildDispatchEvent(turnId),
tools: this.agent.tools.loopTools,
log: this.agent.log,
telemetryMode: () => this.telemetryMode(),
maxSteps: loopControl?.maxStepsPerTurn,
maxRetryAttempts: loopControl?.maxRetriesPerStep,
recordStepUsage: async (usage) => {
Expand Down Expand Up @@ -818,6 +834,10 @@ export class TurnFlow {
);
this.toolCallStartedAt.set(event.toolCallId, {
name: event.name,
turnId,
step: event.step,
mode: this.telemetryModeByTurn.get(turnId) ?? this.telemetryMode(),
permissionMode: this.agent.permission.mode,
startedAt: Date.now(),
});
return;
Expand All @@ -830,6 +850,11 @@ export class TurnFlow {
this.toolCallDupType.delete(event.toolCallId);
const outcome = telemetryToolOutcome(event.result);
const properties: Record<string, TelemetryPropertyValue> = {
turn_id: started.turnId,
step_no: started.step,
tool_call_id: event.toolCallId,
mode: started.mode,
permission_mode: started.permissionMode,
tool_name: started.name,
outcome,
duration_ms: Date.now() - started.startedAt,
Expand Down Expand Up @@ -867,6 +892,7 @@ export class TurnFlow {
this.agent.telemetry.track('tool_call_dedup_detected', {
turn_id: turnId,
step_no: step,
mode: this.telemetryModeByTurn.get(turnId) ?? this.telemetryMode(),
tool_name: toolName,
dup_type: dupType,
args_hash: createHash('sha256').update(argsText).digest('hex').slice(0, 8),
Expand All @@ -885,12 +911,47 @@ export class TurnFlow {
if (this.interruptedTelemetryTurnIds.has(turnId)) return;
this.interruptedTelemetryTurnIds.add(turnId);
this.agent.telemetry.track('turn_interrupted', {
turn_id: turnId,
mode: this.telemetryModeByTurn.get(turnId) ?? this.telemetryMode(),
at_step: atStep,
});
}

private telemetryMode(): 'agent' | 'plan' {
private beginTelemetryTurn(turnId: number, origin: PromptOrigin): TelemetryMode {
const mode = this.telemetryMode();
this.telemetryModeByTurn.set(turnId, mode);
this.currentStepByTurn.set(turnId, 0);
if (origin.kind === 'user') {
this.agent.telemetry.track('input_submitted', { turn_id: turnId, mode });
}
this.agent.telemetry.track('turn_started', { turn_id: turnId, mode });
return mode;
}

private trackTurnEnded(
turnId: number,
ended: TurnEndedEvent,
durationMs: number,
blockedReason?: TurnBlockedReason,
): void {
const properties: Record<string, TelemetryPropertyValue> = {
turn_id: turnId,
mode: this.telemetryModeByTurn.get(turnId) ?? this.telemetryMode(),
outcome: telemetryTurnOutcome(ended, blockedReason),
duration_ms: durationMs,
blocked_reason: blockedReason,
};
this.agent.telemetry.track('turn_ended', properties);
}

private endTelemetryTurn(turnId: number): void {
this.telemetryModeByTurn.delete(turnId);
this.currentStepByTurn.delete(turnId);
this.interruptedTelemetryTurnIds.delete(turnId);
this.stepFailureByTurn.delete(turnId);
}

private telemetryMode(): TelemetryMode {
return this.agent.planMode.isActive ? 'plan' : 'agent';
}

Expand Down Expand Up @@ -1063,71 +1124,26 @@ function interruptedStep(event: LoopTurnInterruptedEvent): number {
return event.activeStep ?? event.attemptedSteps;
}

interface ApiErrorClassification {
readonly errorType: string;
readonly statusCode?: number;
}

function classifyApiError(error: unknown, summary: KimiErrorPayload): ApiErrorClassification {
const statusCode = apiStatusCode(error) ?? summaryStatusCode(summary);
if (statusCode !== undefined) {
if (statusCode === 429) return { errorType: 'rate_limit', statusCode };
if (statusCode === 401 || statusCode === 403) return { errorType: 'auth', statusCode };
if (statusCode >= 500) return { errorType: '5xx_server', statusCode };
if (isContextOverflowStatusError(statusCode, summary.message)) {
return { errorType: 'context_overflow', statusCode };
}
if (statusCode >= 400) return { errorType: '4xx_client', statusCode };
return { errorType: 'api', statusCode };
}

if (summary.code === ErrorCodes.PROVIDER_RATE_LIMIT) return { errorType: 'rate_limit' };
if (summary.code === ErrorCodes.PROVIDER_AUTH_ERROR) return { errorType: 'auth' };
if (summary.code === ErrorCodes.CONTEXT_OVERFLOW) return { errorType: 'context_overflow' };
if (isApiConnectionError(error, summary)) return { errorType: 'network' };
if (isApiTimeoutError(error, summary)) return { errorType: 'timeout' };
if (isApiEmptyResponseError(error, summary)) return { errorType: 'empty_response' };
return { errorType: 'other' };
}

function apiStatusCode(error: unknown): number | undefined {
if (error instanceof APIStatusError) {
const statusCode = (error as { readonly statusCode?: unknown }).statusCode;
return typeof statusCode === 'number' ? statusCode : undefined;
}
if (typeof error !== 'object' || error === null) return undefined;
const statusCode = (error as { readonly statusCode?: unknown }).statusCode;
if (typeof statusCode === 'number') return statusCode;
const status = (error as { readonly status?: unknown }).status;
return typeof status === 'number' ? status : undefined;
}

function summaryStatusCode(summary: KimiErrorPayload): number | undefined {
const statusCode = summary.details?.['statusCode'];
return typeof statusCode === 'number' ? statusCode : undefined;
}

function isApiConnectionError(error: unknown, summary: KimiErrorPayload): boolean {
return error instanceof APIConnectionError || summary.name === 'APIConnectionError';
}

function isApiTimeoutError(error: unknown, summary: KimiErrorPayload): boolean {
return (
error instanceof APITimeoutError ||
summary.name === 'APITimeoutError' ||
summary.name === 'TimeoutError'
);
}

function isApiEmptyResponseError(error: unknown, summary: KimiErrorPayload): boolean {
return error instanceof APIEmptyResponseError || summary.name === 'APIEmptyResponseError';
}

function currentTurnInputTokens(usage: TokenUsage | undefined): number | undefined {
if (usage === undefined) return undefined;
return inputTotal(usage);
}

function telemetryTurnOutcome(
event: TurnEndedEvent,
blockedReason?: TurnBlockedReason,
): TurnTelemetryOutcome {
if (blockedReason !== undefined) return 'blocked';
switch (event.reason) {
case 'completed':
return 'success';
case 'failed':
return 'error';
case 'cancelled':
return 'cancelled';
}
}

type ToolTelemetryResult = Extract<LoopEvent, { type: 'tool.result' }>['result'];

function telemetryToolOutcome(result: ToolTelemetryResult): 'success' | 'error' | 'cancelled' {
Expand Down
Loading
Loading