diff --git a/apps/web/src/app/api/openrouter/[...path]/route.ts b/apps/web/src/app/api/openrouter/[...path]/route.ts index 42fb585890..f56fb68592 100644 --- a/apps/web/src/app/api/openrouter/[...path]/route.ts +++ b/apps/web/src/app/api/openrouter/[...path]/route.ts @@ -97,12 +97,6 @@ import { hasMiddleOutTransform, } from '@/lib/ai-gateway/providers/openrouter/request-helpers'; import { scheduleAutoRoutingMirror } from '@/lib/ai-gateway/auto-routing-mirror'; -import { - createStreamLifecycleTracker, - observeEventStream, - shouldObserveEventStream, - STREAM_ATTEMPT_HEADER, -} from '@/lib/ai-gateway/o11y/stream-lifecycle.server'; export const maxDuration = 800; @@ -720,8 +714,6 @@ export async function POST(request: NextRequest): Promise= 400) { @@ -786,29 +776,7 @@ export async function POST(request: NextRequest): Promise streamTracker.observe('provider', outcome), owner), - owner - ); - } + const clonedReponse = response.clone(); // reading from body is side-effectful if (!shouldBlockOnClassify) { classifyResult = await awaitClassifyAbuse(classifyPromise); @@ -845,7 +813,6 @@ export async function POST(request: NextRequest): Promise streamTracker.observe('final', outcome)), - finalResponse - ); + return wrapInSafeNextResponse(response); } diff --git a/apps/web/src/lib/ai-gateway/o11y/stream-lifecycle.server.test.ts b/apps/web/src/lib/ai-gateway/o11y/stream-lifecycle.server.test.ts deleted file mode 100644 index cd79eedc80..0000000000 --- a/apps/web/src/lib/ai-gateway/o11y/stream-lifecycle.server.test.ts +++ /dev/null @@ -1,409 +0,0 @@ -import { createHash } from 'node:crypto'; -import { describe, expect, it, jest } from '@jest/globals'; -import { - createStreamLifecycleTracker, - isEventStreamContentType, - observeEventStream, - shouldObserveEventStream, - type StreamOutcome, -} from './stream-lifecycle.server'; - -const encoder = new TextEncoder(); - -function stream(chunks: string[]) { - return new ReadableStream({ - start(controller) { - for (const chunk of chunks) controller.enqueue(encoder.encode(chunk)); - controller.close(); - }, - }); -} - -async function observe(chunks: string[]) { - const outcomes: StreamOutcome[] = []; - const body = observeEventStream(stream(chunks), outcome => outcomes.push(outcome)); - const bytes = new Uint8Array(await new Response(body).arrayBuffer()); - return { text: new TextDecoder().decode(bytes), outcome: outcomes[0] }; -} - -function outcome(overrides: Partial = {}): StreamOutcome { - return { - bytes: 20, - chunks: 2, - sha256: 'abc', - events: 2, - malformed_events: 0, - last_event_type: '[DONE]', - terminal_event: true, - unterminated_final_block: false, - disposition: 'eof', - ...overrides, - }; -} - -describe('event stream lifecycle observation', () => { - it('does not lock the source until the observed stream is first pulled', async () => { - const source = stream(['data: [DONE]\n\n']); - const body = observeEventStream(source, jest.fn()); - - expect(source.locked).toBe(false); - - const reader = body.getReader(); - const first = await reader.read(); - expect(first.done).toBe(false); - expect(source.locked).toBe(true); - - await reader.read(); - expect(source.locked).toBe(false); - }); - - it('preserves exact bytes and detects split terminal events with common delimiters', async () => { - const input = [ - 'event: response.completed\r', - '\ndata: {"type":"response.completed"}\r\n\r', - '\ndata: {"choices":[{"finish_reason":"stop"}]}\n\ndata: [DO', - 'NE]\r\r', - ]; - const result = await observe(input); - - expect(result.text).toBe(input.join('')); - expect(result.outcome).toMatchObject({ - bytes: encoder.encode(input.join('')).byteLength, - chunks: input.length, - events: 3, - terminal_event: true, - last_event_type: '[DONE]', - unterminated_final_block: false, - disposition: 'eof', - }); - expect(result.outcome?.sha256).toBe( - createHash('sha256') - .update(encoder.encode(input.join(''))) - .digest('hex') - ); - }); - - it.each([ - ['response incomplete', 'data: {"type":"response.incomplete"}\n\n'], - ['response failed', 'data: {"type":"response.failed"}\n\n'], - ['message stop', 'data: {"type":"message_stop"}\n\n'], - ['chat finish reason', 'data: {"choices":[{"finish_reason":"length"}]}\n\n'], - ])('recognizes %s terminal events', async (_name, input) => { - const result = await observe([input]); - expect(result.outcome?.terminal_event).toBe(true); - }); - - it('records malformed events and an unterminated final block without retaining the payload', async () => { - const result = await observe(['data: not-json\n\ndata: {"type":"partial"}']); - - expect(result.outcome).toMatchObject({ - events: 1, - malformed_events: 1, - terminal_event: false, - unterminated_final_block: true, - disposition: 'eof', - }); - }); - - it('bounds oversized events and resumes scanning after a split CRLF delimiter', async () => { - const input = [`data: ${'x'.repeat(70_000)}\r`, '\n\r', '\ndata: [DONE]\r\n\r\n']; - const result = await observe(input); - - expect(result.text).toBe(input.join('')); - expect(result.outcome).toMatchObject({ - events: 2, - malformed_events: 1, - last_event_type: '[DONE]', - terminal_event: true, - unterminated_final_block: false, - }); - }); - - it.each([ - ['LF then CRLF', ['data: {"type":"partial"}\n\r', '\ndata: [DONE]\n\n']], - ['CRLF then LF', ['data: {"type":"partial"}\r\n', '\ndata: [DONE]\n\n']], - ['CR then CRLF', ['data: {"type":"partial"}\r', '\r\ndata: [DONE]\n\n']], - ['LF then CR', ['data: {"type":"partial"}\n', '\rdata: [DONE]\n\n']], - ])('parses mixed %s blank-line delimiters across chunks', async (_name, input) => { - const result = await observe(input); - expect(result.outcome).toMatchObject({ - events: 2, - last_event_type: '[DONE]', - terminal_event: true, - unterminated_final_block: false, - }); - }); - - it('records cancellation and propagates it to the source', async () => { - const cancel = jest.fn<(reason?: unknown) => void>(); - const outcomes: StreamOutcome[] = []; - const source = new ReadableStream({ cancel }); - const reader = observeEventStream(source, value => outcomes.push(value)).getReader(); - - await reader.cancel('stop'); - - expect(cancel).toHaveBeenCalledWith('stop'); - expect(outcomes[0]?.disposition).toBe('cancel'); - }); - - it('records source errors and preserves the error', async () => { - const failure = new Error('stream failed'); - const outcomes: StreamOutcome[] = []; - const source = new ReadableStream({ - pull(controller) { - controller.error(failure); - }, - }); - const reader = observeEventStream(source, value => outcomes.push(value)).getReader(); - - await expect(reader.read()).rejects.toBe(failure); - expect(outcomes[0]?.disposition).toBe('error'); - }); - - it('reports a throwing EOF callback without altering byte delivery or cleanup', async () => { - const report = jest.spyOn(console, 'error').mockImplementation(() => undefined); - const source = stream(['data: [DONE]\n\n']); - const body = observeEventStream(source, () => { - throw new Error('callback failed'); - }); - - await expect(new Response(body).text()).resolves.toBe('data: [DONE]\n\n'); - expect(source.locked).toBe(false); - expect(report).toHaveBeenCalledWith( - JSON.stringify({ - event: 'ai_stream_lifecycle_observer_failure', - error_type: 'Error', - }) - ); - report.mockRestore(); - }); - - it('reports a throwing cancellation callback without blocking source cancellation', async () => { - const report = jest.spyOn(console, 'error').mockImplementation(() => undefined); - const cancel = jest.fn<(reason?: unknown) => void>(); - const source = new ReadableStream({ cancel }); - const reader = observeEventStream(source, () => { - throw new Error('callback failed'); - }).getReader(); - - await expect(reader.cancel('stop')).resolves.toBeUndefined(); - expect(cancel).toHaveBeenCalledWith('stop'); - expect(source.locked).toBe(false); - expect(report).toHaveBeenCalledWith( - JSON.stringify({ - event: 'ai_stream_lifecycle_observer_failure', - error_type: 'Error', - }) - ); - report.mockRestore(); - }); - - it('does not close an already cancelled stream when a pending pull settles', async () => { - const cancel = jest.fn<(reason?: unknown) => void>(); - const outcomes: StreamOutcome[] = []; - const source = new ReadableStream({ cancel }); - const reader = observeEventStream(source, outcome => outcomes.push(outcome)).getReader(); - const pending = reader.read(); - - await Promise.resolve(); - await reader.cancel('stop'); - - await expect(pending).resolves.toEqual({ done: true, value: undefined }); - expect(cancel).toHaveBeenCalledWith('stop'); - expect(source.locked).toBe(false); - expect(outcomes).toEqual([expect.objectContaining({ disposition: 'cancel' })]); - }); -}); - -describe('event stream content type', () => { - it.each([ - 'text/event-stream', - 'Text/Event-Stream', - 'TEXT/EVENT-STREAM; charset=utf-8', - ' text/event-stream ; charset=UTF-8', - ])('accepts %s', contentType => { - expect(isEventStreamContentType(contentType)).toBe(true); - }); - - it.each([null, '', 'application/json', 'application/text/event-stream'])( - 'rejects %s', - contentType => { - expect(isEventStreamContentType(contentType)).toBe(false); - } - ); -}); - -describe('event stream observation scope', () => { - it('observes successful direct-provider event streams', () => { - expect( - shouldObserveEventStream({ - provider_id: 'custom', - status: 200, - has_body: true, - content_type: 'Text/Event-Stream; charset=utf-8', - }) - ).toBe(true); - }); - - it.each([ - ['non-custom provider', { provider_id: 'openrouter' }], - ['unsuccessful response', { status: 500 }], - ['bodyless response', { has_body: false }], - ['non-event response', { content_type: 'application/json' }], - ])('does not observe %s', (_name, override) => { - expect( - shouldObserveEventStream({ - provider_id: 'custom', - status: 200, - has_body: true, - content_type: 'text/event-stream', - ...override, - }) - ).toBe(false); - }); -}); - -describe('stream lifecycle correlation logging', () => { - const context = { - attempt_id: 'attempt', - provider_id: 'provider', - api_kind: 'responses', - }; - - it('emits machine-readable structural records by default', () => { - const warn = jest.spyOn(console, 'warn').mockImplementation(() => undefined); - const tracker = createStreamLifecycleTracker(context); - - tracker.observe('provider', outcome()); - tracker.observe('final', outcome({ terminal_event: false })); - - expect(warn).toHaveBeenCalledTimes(1); - expect(JSON.parse(String(warn.mock.calls[0]?.[0]))).toEqual( - expect.objectContaining({ - event: 'ai_stream_lifecycle', - attempt_id: 'attempt', - provider_id: 'provider', - api_kind: 'responses', - classification: 'divergence', - }) - ); - warn.mockRestore(); - }); - - it('always logs confirmed divergence with structural outcomes', () => { - const log = jest.fn(); - const tracker = createStreamLifecycleTracker(context, { random: () => 1, log }); - - tracker.observe('provider', outcome()); - tracker.observe('final', outcome({ terminal_event: false })); - - expect(log).toHaveBeenCalledWith( - 'AI stream lifecycle anomaly', - expect.objectContaining({ - attempt_id: 'attempt', - classification: 'divergence', - provider: expect.objectContaining({ disposition: 'eof' }), - final: expect.objectContaining({ terminal_event: false }), - }) - ); - }); - - it.each([ - ['missing terminal event', { terminal_event: false }], - ['unterminated final block', { unterminated_final_block: true }], - ['malformed framing', { malformed_events: 1 }], - ])('always logs source incomplete for provider EOF with %s', (_name, overrides) => { - const log = jest.fn(); - const tracker = createStreamLifecycleTracker(context, { random: () => 1, log }); - const incomplete = outcome(overrides); - - tracker.observe('provider', incomplete); - tracker.observe('final', incomplete); - - expect(log).toHaveBeenCalledWith( - 'AI stream lifecycle anomaly', - expect.objectContaining({ classification: 'source_incomplete' }) - ); - }); - - it('always logs branch errors', () => { - const log = jest.fn(); - const tracker = createStreamLifecycleTracker(context, { random: () => 1, log }); - - tracker.observe('provider', outcome({ disposition: 'error' })); - tracker.observe('final', outcome()); - - expect(log).toHaveBeenCalledWith( - 'AI stream lifecycle anomaly', - expect.objectContaining({ classification: 'error' }) - ); - }); - - it('always logs final cancellation after a complete provider stream', () => { - const log = jest.fn(); - const tracker = createStreamLifecycleTracker(context, { random: () => 1, log }); - - tracker.observe('provider', outcome()); - tracker.observe('final', outcome({ disposition: 'cancel' })); - - expect(log).toHaveBeenCalledWith( - 'AI stream lifecycle anomaly', - expect.objectContaining({ classification: 'final_cancelled' }) - ); - }); - - it('keeps provider cancellation sampled as inconclusive', () => { - const log = jest.fn(); - const tracker = createStreamLifecycleTracker(context, { random: () => 1, log }); - - tracker.observe('provider', outcome({ disposition: 'cancel' })); - tracker.observe('final', outcome({ disposition: 'cancel' })); - - expect(log).not.toHaveBeenCalled(); - }); - - it('samples provider cancellation and other inconclusive pairs at 0.01%', () => { - const sampled = jest.fn(); - const skipped = jest.fn(); - const included = createStreamLifecycleTracker(context, { random: () => 0.00009, log: sampled }); - const excluded = createStreamLifecycleTracker(context, { random: () => 0.0001, log: skipped }); - - for (const tracker of [included, excluded]) { - tracker.observe('provider', outcome({ disposition: 'cancel' })); - tracker.observe('final', outcome({ disposition: 'cancel' })); - } - - expect(sampled).toHaveBeenCalledWith( - 'AI stream lifecycle inconclusive', - expect.objectContaining({ classification: 'inconclusive' }) - ); - expect(skipped).not.toHaveBeenCalled(); - }); - - it('samples complete matching pairs as controls', () => { - const sampled = jest.fn(); - const skipped = jest.fn(); - const included = createStreamLifecycleTracker(context, { random: () => 0.0009, log: sampled }); - const excluded = createStreamLifecycleTracker(context, { random: () => 0.001, log: skipped }); - - for (const tracker of [included, excluded]) { - tracker.observe('provider', outcome()); - tracker.observe('final', outcome()); - } - - expect(sampled).toHaveBeenCalledWith('AI stream lifecycle control', expect.any(Object)); - expect(skipped).not.toHaveBeenCalled(); - }); - - it('emits at most one paired record', () => { - const log = jest.fn(); - const tracker = createStreamLifecycleTracker(context, { random: () => 0, log }); - - tracker.observe('provider', outcome()); - tracker.observe('final', outcome()); - tracker.observe('final', outcome({ sha256: 'different' })); - tracker.observe('provider', outcome({ disposition: 'error' })); - - expect(log).toHaveBeenCalledTimes(1); - }); -}); diff --git a/apps/web/src/lib/ai-gateway/o11y/stream-lifecycle.server.ts b/apps/web/src/lib/ai-gateway/o11y/stream-lifecycle.server.ts deleted file mode 100644 index 8c14b66a1a..0000000000 --- a/apps/web/src/lib/ai-gateway/o11y/stream-lifecycle.server.ts +++ /dev/null @@ -1,316 +0,0 @@ -import { createHash } from 'node:crypto'; - -const MAX_EVENT_TEXT = 64 * 1024; -const NORMAL_SAMPLE_RATE = 0.001; -// Cancellation and other inconclusive pairs are sampled at 0.01% to provide controls without noise. -const INCONCLUSIVE_SAMPLE_RATE = 0.0001; - -export const STREAM_ATTEMPT_HEADER = 'x-kilo-attempt-id'; - -type Disposition = 'eof' | 'error' | 'cancel'; -type Side = 'provider' | 'final'; - -export type StreamOutcome = { - bytes: number; - chunks: number; - sha256: string; - events: number; - malformed_events: number; - last_event_type: string | null; - terminal_event: boolean; - unterminated_final_block: boolean; - disposition: Disposition; -}; - -type Context = { - attempt_id: string; - provider_id: string; - api_kind: string; -}; - -type Classification = - | 'control' - | 'divergence' - | 'error' - | 'final_cancelled' - | 'source_incomplete' - | 'inconclusive'; - -type TrackerOptions = { - random?: () => number; - log?: (message: string, data: Record) => void; -}; - -export function createStreamLifecycleTracker(context: Context, options: TrackerOptions = {}) { - const outcomes: Partial> = {}; - let paired = false; - - return { - observe(side: Side, outcome: StreamOutcome) { - outcomes[side] = outcome; - const provider = outcomes.provider; - const final = outcomes.final; - if (!provider || !final || paired) return; - paired = true; - - const classification = classify(provider, final); - const rate = - classification === 'control' - ? NORMAL_SAMPLE_RATE - : classification === 'inconclusive' - ? INCONCLUSIVE_SAMPLE_RATE - : 1; - if (rate < 1 && (options.random ?? Math.random)() >= rate) return; - - const message = - classification === 'control' - ? 'AI stream lifecycle control' - : classification === 'inconclusive' - ? 'AI stream lifecycle inconclusive' - : 'AI stream lifecycle anomaly'; - const data = { - ...context, - classification, - vercel_deployment: process.env.VERCEL_DEPLOYMENT_ID ?? null, - vercel_region: process.env.VERCEL_REGION ?? process.env.VERCEL_FUNCTION_REGION ?? null, - provider, - final, - }; - if (options.log) { - options.log(message, data); - return; - } - - const record = JSON.stringify({ event: 'ai_stream_lifecycle', ...data }); - if (classification === 'control' || classification === 'inconclusive') { - console.log(record); - return; - } - console.warn(record); - }, - }; -} - -export function observeEventStream( - source: ReadableStream, - done: (outcome: StreamOutcome) => void, - owner?: object -): ReadableStream { - const hash = createHash('sha256'); - const scanner = createScanner(); - let retained = owner; - let reader: ReadableStreamDefaultReader | null = null; - let bytes = 0; - let chunks = 0; - let settled = false; - let cancelled = false; - - const releaseOwner = () => { - if (retained === undefined) return; - retained = undefined; - }; - const report = (error: unknown) => { - console.error( - JSON.stringify({ - event: 'ai_stream_lifecycle_observer_failure', - error_type: error instanceof Error ? error.name : typeof error, - }) - ); - }; - const settle = (disposition: Disposition) => { - if (settled) return; - settled = true; - try { - const ending = scanner.finish(); - done({ - bytes, - chunks, - sha256: hash.digest('hex'), - ...ending, - disposition, - }); - } catch (error) { - report(error); - } finally { - releaseOwner(); - } - }; - const release = () => { - const active = reader; - reader = null; - active?.releaseLock(); - }; - - return new ReadableStream( - { - async pull(controller) { - reader ??= source.getReader(); - const active = reader; - try { - const result = await active.read(); - if (cancelled) return; - if (result.done) { - settle('eof'); - controller.close(); - release(); - return; - } - bytes += result.value.byteLength; - chunks += 1; - hash.update(result.value); - scanner.push(result.value); - controller.enqueue(result.value); - } catch (error) { - if (cancelled) return; - settle('error'); - release(); - controller.error(error); - } - }, - async cancel(reason) { - cancelled = true; - settle('cancel'); - try { - if (reader) await reader.cancel(reason); - else await source.cancel(reason); - } finally { - release(); - } - }, - }, - { highWaterMark: 0 } - ); -} - -function classify(provider: StreamOutcome, final: StreamOutcome): Classification { - if (provider.disposition === 'error' || final.disposition === 'error') return 'error'; - const providerClean = isClean(provider); - if (provider.disposition === 'eof' && !providerClean) return 'source_incomplete'; - if (providerClean && final.disposition === 'cancel') return 'final_cancelled'; - - const finalClean = isClean(final); - if (providerClean && final.disposition === 'eof') { - if (!finalClean || provider.sha256 !== final.sha256 || provider.bytes !== final.bytes) { - return 'divergence'; - } - return 'control'; - } - return 'inconclusive'; -} - -function isClean(outcome: StreamOutcome) { - return ( - outcome.disposition === 'eof' && - outcome.terminal_event && - outcome.malformed_events === 0 && - !outcome.unterminated_final_block - ); -} - -export function isEventStreamContentType(contentType: string | null) { - if (!contentType) return false; - return contentType.split(';', 1)[0]?.trim().toLowerCase() === 'text/event-stream'; -} - -export function shouldObserveEventStream(input: { - provider_id: string; - status: number; - has_body: boolean; - content_type: string | null; -}) { - return ( - input.provider_id === 'custom' && - input.status >= 200 && - input.status < 300 && - input.has_body && - isEventStreamContentType(input.content_type) - ); -} - -function createScanner() { - const decoder = new TextDecoder(); - let text = ''; - let overflow = false; - let events = 0; - let malformed = 0; - let last: string | null = null; - let terminal = false; - - const consume = (block: string, truncated: boolean) => { - if (truncated || block.length > MAX_EVENT_TEXT) { - events += 1; - malformed += 1; - return; - } - const lines = block.split(/\r\n|\n|\r/); - const named = lines - .find(line => line.startsWith('event:')) - ?.slice(6) - .trim(); - const data = lines - .filter(line => line.startsWith('data:')) - .map(line => line.slice(5).trimStart()) - .join('\n'); - - if (!named && !data) return; - events += 1; - if (named) last = named; - if (data.trim() === '[DONE]') { - last = named || '[DONE]'; - terminal = true; - return; - } - if (!data) return; - - try { - const value = JSON.parse(data) as Record; - if (typeof value.type === 'string') last = value.type; - const choices = Array.isArray(value.choices) ? value.choices : []; - const finished = choices.some(choice => { - if (!choice || typeof choice !== 'object') return false; - return (choice as Record).finish_reason != null; - }); - terminal ||= - value.type === 'response.completed' || - value.type === 'response.incomplete' || - value.type === 'response.failed' || - value.type === 'message_stop' || - finished; - } catch { - malformed += 1; - } - }; - - const scan = () => { - while (true) { - const match = /(?:\r\n|\r(?!\n)|\n)(?:\r\n|\r(?!\n)|\n)/.exec(text); - if (!match) break; - consume(text.slice(0, match.index), overflow); - text = text.slice(match.index + match[0].length); - overflow = false; - } - if (text.length <= MAX_EVENT_TEXT) return; - - // Preserve only enough trailing text to detect a delimiter split across chunks. - text = text.slice(-3); - overflow = true; - }; - - return { - push(chunk: Uint8Array) { - text += decoder.decode(chunk, { stream: true }); - scan(); - }, - finish() { - text += decoder.decode(); - scan(); - return { - events, - malformed_events: malformed, - last_event_type: last, - terminal_event: terminal, - unterminated_final_block: overflow || text.length > 0, - }; - }, - }; -}