feat(ai-client): support TanStack Start server functions in stream() adapter#508
feat(ai-client): support TanStack Start server functions in stream() adapter#508tombeckenham wants to merge 11 commits into
Conversation
…adapter
The stream() factory now accepts Promise<AsyncIterable<StreamChunk>> and
Promise<Response> in addition to the existing AsyncIterable shape, so a
TanStack Start server function (which is just an async API endpoint) can
be wired directly into useChat without a route handler:
useChat({
connection: stream((messages) => chatFn({ data: { messages } })),
})
When the factory returns a Response (e.g. via toServerSentEventsResponse),
the adapter parses the SSE body into chunks. rpcStream() likewise accepts
a Promise-returning RPC call.
Adds unit tests for both new shapes and a docs section in
chat/connection-adapters.md.
Adds a working /server-fn-chat route that wires useChat to a TanStack
Start server function via the stream() connection adapter:
useChat({
connection: stream((messages) =>
chatFn({ data: { messages: messages as UIMessage[] } }),
),
})
The new chatFn handler in lib/server-fns.ts returns
toServerSentEventsResponse(chat({ ... })) — the stream() adapter awaits
the server function and parses the SSE response into chunks.
Sits alongside the existing index.tsx pattern (fetchServerSentEvents
to a route handler) so users can compare the two invocation styles.
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Lead with what stream() does (typed RPC into useChat), instead of calling a server function "just a fancy/async API endpoint." Same edits applied to the changeset and the stream() JSDoc. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…server-functions-pBsj5 # Conflicts: # examples/ts-react-chat/src/components/Header.tsx # examples/ts-react-chat/src/lib/server-fns.ts # packages/typescript/ai-client/src/connection-adapters.ts
|
View your CI Pipeline Execution ↗ for commit cb39b84
☁️ Nx Cloud last updated this comment at |
The previous merge commit accidentally included stale references in @tanstack/ai-fal that this PR shouldn't have touched. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@tanstack/ai
@tanstack/ai-anthropic
@tanstack/ai-client
@tanstack/ai-code-mode
@tanstack/ai-code-mode-skills
@tanstack/ai-devtools-core
@tanstack/ai-elevenlabs
@tanstack/ai-event-client
@tanstack/ai-fal
@tanstack/ai-gemini
@tanstack/ai-grok
@tanstack/ai-groq
@tanstack/ai-isolate-cloudflare
@tanstack/ai-isolate-node
@tanstack/ai-isolate-quickjs
@tanstack/ai-ollama
@tanstack/ai-openai
@tanstack/ai-openrouter
@tanstack/ai-preact
@tanstack/ai-react
@tanstack/ai-react-ui
@tanstack/ai-solid
@tanstack/ai-solid-ui
@tanstack/ai-svelte
@tanstack/ai-vue
@tanstack/ai-vue-ui
@tanstack/preact-ai-devtools
@tanstack/react-ai-devtools
@tanstack/solid-ai-devtools
commit: |
Type-check was failing on CI because the new server-function and RPC async-iterable test fixtures yielded raw string literals for chunk type, which don't satisfy the EventType enum required by StreamChunk. Switch to the enum and add the required threadId to RUN_FINISHED. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…abortSignal Address review findings on the server-function stream() adapter PR: - Synthesized RUN_FINISHED/RUN_ERROR events in normalizeConnectionAdapter no longer use `as unknown as StreamChunk`. Track threadId/runId from upstream chunks during iteration and reuse them; fall back to synthesized IDs only when no upstream chunk carried them. Use the EventType enum and typed RunFinishedEvent/RunErrorEvent so missing required fields are caught by the compiler instead of papered over. - Stop swallowing JSON.parse failures in parseSSEChunks and fetchHttpStream. A malformed mid-stream chunk is a protocol error; let it throw so it surfaces as RUN_ERROR via the connect-wrapper's catch path instead of silently dropping data behind a console.warn the user never sees. - Widen stream() and rpcStream() factory signatures with an optional abortSignal third arg and pass it through. Backwards-compatible — callers that ignore the third parameter are unaffected. Lets long-running server functions cancel in-flight work when useChat aborts. Tests updated to assert SyntaxError propagation rather than silent dropping on malformed JSON, and to expect the new third call argument on factory mocks. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…railing buffer Refine the SSE parser so the throw-on-parse-failure behavior doesn't regress on legitimate SSE traffic: - parseSSEChunks now skips SSE comment lines (`:`) and non-data fields (`event:`, `id:`, `retry:`) which proxies and CDNs commonly inject as keepalives. Previously these would have flowed into JSON.parse and thrown, killing otherwise-healthy streams behind any infrastructure that injects SSE control lines. - readStreamLines no longer yields the unterminated trailing buffer at stream end. A non-empty buffer means the connection was cut mid-line, so the content is partial by definition — yielding it would feed truncated JSON to the parser and surface a misleading RUN_ERROR for what is really a transport-layer issue. Warn and discard instead. Bare-line JSON (legacy/raw mode) is still accepted to preserve the existing public behavior covered by the `should handle SSE format without data: prefix` test. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…t UIMessage[] in chat() Drops the `as any` / `as Array<UIMessage>` casts previously needed when wiring useChat through a TanStack Start server function into chat(). The stream() factory now declares Array<UIMessage> (with a runtime assert matching the ChatClient invariant), and chat()'s messages option accepts UIMessage[] directly alongside ConstrainedModelMessage[] — the runtime already normalised both via convertMessagesToModelMessages. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pulls the @tanstack/ai changes back out — the chat() messages-option widening to accept UIMessage[] is a separate concern from the stream() server-function feature this PR is about. Restores the example's `as any` cast with a comment, drops the @tanstack/ai minor bump from the changeset, and reverts chat/index.ts to its pre-PR state. Also bumps the example adapter to gpt-5.2. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ranch The fetcher path uses the same SSE parsing and connect-wrapper plumbing as the stream() path on TanStack#508, so the polish that landed during TanStack#508's review applies directly here. Carry it over so this branch has the same robustness. - Skip SSE control lines (`:` comments, `event:` / `id:` / `retry:`) in responseToSSEChunks. Proxies and CDNs inject these as keepalives; letting them through would feed JSON.parse a non-payload line. - Drop unterminated trailing buffer in readStreamLines. A non-empty buffer at stream end means the connection was cut mid-line, so the data is partial — yielding it would surface a misleading RUN_ERROR for what is really a transport-layer issue. - Surface JSON.parse failures in responseToSSEChunks and fetchHttpStream. Stop swallowing them behind console.warn; let SyntaxError propagate so the connect-wrapper turns it into a visible RUN_ERROR. - Drop unsafe `as unknown as StreamChunk` casts in normalizeConnectionAdapter's synthesized RUN_FINISHED / RUN_ERROR events. Use EventType + RunFinishedEvent / RunErrorEvent so missing required fields are caught by the compiler. Track upstream threadId/runId from chunks and reuse them in the synthesis instead of fabricating both ids unconditionally. - Forward optional abortSignal third arg through stream() and rpcStream() factory signatures. Backwards-compatible for existing callers; lets long-running factories cancel when useChat aborts. Mirrors what fetcherToConnectionAdapter already does. Tests: - Update the two `should handle malformed JSON gracefully` tests to assert SyntaxError throws instead of silent drop. - Update stream() / rpcStream() factory mock assertions to expect the new third arg. - Add chat-fetcher test asserting a fetcher returning a malformed-SSE Response surfaces as a RUN_ERROR via onError. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Not going down this track. There's another PR that uses a fetcher see #512 |
…512) * feat(ai-client,ai-react): add `fetcher` option to ChatClient/useChat Mirrors the `fetcher` option on the multimedia hooks (useGenerateSpeech / useSummarize / useTranscription / useGenerateImage). Pass either `connection` (a ConnectionAdapter) or `fetcher` (a direct async function — typically a TanStack Start server function) — runtime XOR validation. The fetcher may return either a Response (parsed as SSE) or an AsyncIterable<StreamChunk> (yielded directly). Internally, fetcher is wrapped via `fetcherToConnectionAdapter` and reuses the same subscribe/ send queue plumbing as every other connection adapter — no new code paths in ChatClient itself. Purely additive: stream(), rpcStream(), fetchServerSentEvents(), and fetchHttpStream() are unchanged. Other framework wrappers (ai-solid, ai-vue, ai-svelte) untouched in this branch — same shape can be added to each in a follow-up if this design is preferred. Sketch alternative to #508 (the stream() connection-adapter approach) for design comparison. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * ci: apply automated fixes * fix(ai-client): port #508 robustness fixes onto fetcher-alt branch The fetcher path uses the same SSE parsing and connect-wrapper plumbing as the stream() path on #508, so the polish that landed during #508's review applies directly here. Carry it over so this branch has the same robustness. - Skip SSE control lines (`:` comments, `event:` / `id:` / `retry:`) in responseToSSEChunks. Proxies and CDNs inject these as keepalives; letting them through would feed JSON.parse a non-payload line. - Drop unterminated trailing buffer in readStreamLines. A non-empty buffer at stream end means the connection was cut mid-line, so the data is partial — yielding it would surface a misleading RUN_ERROR for what is really a transport-layer issue. - Surface JSON.parse failures in responseToSSEChunks and fetchHttpStream. Stop swallowing them behind console.warn; let SyntaxError propagate so the connect-wrapper turns it into a visible RUN_ERROR. - Drop unsafe `as unknown as StreamChunk` casts in normalizeConnectionAdapter's synthesized RUN_FINISHED / RUN_ERROR events. Use EventType + RunFinishedEvent / RunErrorEvent so missing required fields are caught by the compiler. Track upstream threadId/runId from chunks and reuse them in the synthesis instead of fabricating both ids unconditionally. - Forward optional abortSignal third arg through stream() and rpcStream() factory signatures. Backwards-compatible for existing callers; lets long-running factories cancel when useChat aborts. Mirrors what fetcherToConnectionAdapter already does. Tests: - Update the two `should handle malformed JSON gracefully` tests to assert SyntaxError throws instead of silent drop. - Update stream() / rpcStream() factory mock assertions to expect the new third arg. - Add chat-fetcher test asserting a fetcher returning a malformed-SSE Response surfaces as a RUN_ERROR via onError. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(ai-client): enforce connection/fetcher XOR via ChatTransport type Promote `ChatClientOptions` to a discriminated union so exactly one of `connection` or `fetcher` is required at the type level, surface stream truncation as a `StreamTruncatedError` instead of a silent warn, synthesize RUN_FINISHED on legacy `[DONE]` sentinels, and abort fetcher-returned async iterables that ignore their signal. Update framework wrappers (react/preact/solid/svelte/vue) and the e2e route to match. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(hooks): branch on connection/fetcher in useChat instead of spreading a partial transport Mirrors the pattern in `useGeneration` (multimedia hooks): build a `baseOptions` literal once, then call `new ChatClient(...)` in two narrow branches with the matching transport. Drops the `optionsRef.current.fetcher!` non-null assertion and the awkward discriminated-union spread, and provides a clear hook-level error when neither `connection` nor `fetcher` is provided. Applied to all five chat hooks: ai-react, ai-preact, ai-solid, ai-vue, ai-svelte. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(release): bump framework hooks to minor for fetcher option Add `@tanstack/ai-preact`, `ai-solid`, `ai-svelte`, and `ai-vue` to the fetcher changeset as minor bumps — they all expose the new `fetcher` option transitively via `ChatClientOptions`. Also simplify the hooks to pick the transport into a single object before constructing `ChatClient`, instead of duplicating the option bag in if/else branches. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(release): downgrade non-react framework hooks to patch `ai-preact`, `ai-solid`, `ai-svelte`, and `ai-vue` don't add any new public exports — they only adjust internal plumbing to handle the new connection/fetcher XOR shape from `ai-client`. `ai-react` stays minor because it genuinely re-exports new symbols (`rpcStream`, `ChatFetcher`, `ChatFetcherInput`, `ChatFetcherOptions`). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Merge branch 'main' into claude/usechat-fetcher-alternative Resolves conflicts in 9 files arising from main's changes since the PR was opened (#511 AG-UI compliance, #545 openai-base refactor, #564 stricter TS, #576 useChat partial/final reset, #577 structured-output). Notable resolutions: - chat-client.ts: Kept PR's `resolveTransport()` for connection/fetcher XOR; combined with main's two-slot `bodyOption`/`forwardedPropsOption` + AG-UI `threadId`. Removed `!` non-null assertions to satisfy main's tightened lint. - connection-adapters.ts: Kept PR's `responseToSSEChunks` / `fetcherToConnectionAdapter` / `StreamTruncatedError` alongside main's new `RunAgentInputContext`. Synthesized RUN_FINISHED/RUN_ERROR now prefer upstream-observed IDs, fall back to runContext, then synthesize. - Framework hooks (react/preact/solid/vue/svelte): Combined PR's `transport` ternary with main's `exactOptionalPropertyTypes` conditional spreads. - chat-fetcher.test.ts: Dropped `conversationId` expectation from `data` — AG-UI now sends `threadId` at the wire top level, not in the body. Verified: - @tanstack/ai-client: 228/228 tests pass, tsc clean - @tanstack/ai-react: 118/118 tests pass, tsc clean - Other framework hooks: tsc clean (svelte pre-existing $state errors unrelated to merge area) * fix(ai-client): expose threadId/runId on ChatFetcherInput After merging main (AG-UI compliance, #511), the chat client stopped injecting `conversationId` into the request body — `threadId` is now sent at the AG-UI wire root via `RunAgentInputContext`. But the fetcher path (PR #512's whole point) only receives `{ messages, data }`, so fetcher-using consumers had no way to access any correlation identifier for the turn. Thread `runContext.threadId` / `runContext.runId` through `fetcherToConnectionAdapter` and surface them as required fields on `ChatFetcherInput`. Users wrapping a `fetch()` call can now forward the ids to their server like the connection adapters already do. Also throws if `runContext` is missing — chat-client always supplies it, so a missing value indicates a bug worth surfacing immediately rather than silently fabricating a date-based id. Restores the test assertion against the new fields (the previous `conversationId` assertion was removed in the merge because the field moved to the wire root — but for fetcher consumers, neither path was left). * fix(ai-client,hooks): tighten error handling and preserve ChatTransport XOR in useChat options Addresses four items from the merge review: 1. Drop the deprecated `error: { message }` nested field from synthesized RUN_ERROR events. The top-level `message` field already satisfies AG-UI's `RunErrorEventSchema`; the nested form is `@deprecated` per the type doc. 2. Replace the `${Date.now()}` fallback in synthesized RUN_FINISHED / RUN_ERROR with an explicit throw. The chat client always supplies `runContext.threadId` / `runId`, so a missing value indicates a wiring bug worth surfacing instead of fabricating a date-based id that breaks `activeRunIds` correlation downstream. 3. Replace the `(chunk as { runId?: string }).runId` structural cast in chat-client with the `'runId' in chunk` narrowing pattern already used elsewhere in connection-adapters. Removes the cast without adding a runtime check. 4. Distribute `Omit` over `ChatClientOptions` in every framework hook's `UseChatOptions` / `CreateChatOptions` (via new `DistributedOmit` helper exported from `@tanstack/ai-client`). Plain `Omit` collapsed the `ChatTransport` discriminated union into a flat shape, forcing a load-bearing `optionsRef.current.fetcher!` non-null assertion in all five hooks. After this change: - `useChat({})` is a compile error - `useChat({ connection, fetcher })` is a compile error - Narrowing on `!opts.connection` types `opts.fetcher` as `ChatFetcher` (non-undefined) — the `!` is removed in react/preact/solid/vue/svelte hooks. Adds a type-only test in ai-react probing the XOR so a future refactor that drops `DistributedOmit` would fail compile. Tests updated: - connection-adapters.test.ts: two synthesis tests now pass `runContext` to `adapter.send()` (previously relied on the date-fallback behavior). * fix(e2e): post AG-UI wire envelope from fetcher-mode chat route The e2e fetcher was posting `{ messages, data, threadId, runId, provider, feature, testId, aimockPort }` — a pre-AG-UI shape that `chatParamsFromRequestBody` rejects (it validates against AG-UI's `RunAgentInputSchema`). The connection adapter avoids this because `fetchServerSentEvents` builds the full envelope internally; the fetcher path bypasses that. Mirror what `fetchServerSentEvents` posts: - Convert messages with `uiMessagesToWire` (parts → string content). - Include `state: {}`, `tools: []`, `context: []` (required by schema). - Forward `input.data` as `forwardedProps` (already contains the merged body fields from `useChat({ body })`). Fixes 7 failing fetcher-mode tests across all chat providers. * refactor(e2e): narrow $feature.tsx route params via type predicates Route.useParams() returns provider/feature as raw strings. Previously the file used an `as { provider: Provider; feature: Feature }` cast on the destructure, which an earlier autofix removed — leaving TS to flag each downstream usage as TS2345. Replace with two small `isProvider` / `isFeature` type predicates over the existing ALL_PROVIDERS / ALL_FEATURES arrays. The existing "return <NotSupported />" guard now narrows the strings while still performing the runtime "is this a known provider/feature?" check. * ci: apply automated fixes * fix: address CodeRabbit review comments on PR #512 Closes 5 review items in one batch: - ChatFetcher return type now accepts Response/AsyncIterable directly (not just Promise<...>), so async generator fetchers work without `as unknown as ChatFetcher` casts. Drops 6 such casts from tests. - Rename DistributedOmit type parameters O/K → TObject/TKeys to satisfy the project's @typescript-eslint/naming-convention rule (2 CI lint errors). - StreamTruncatedError in readStreamLines no longer fires when the consumer aborts mid-line — user-initiated stop() is expected, not a truncation bug. - validateSearch in $feature.tsx whitelists mode against the allowed Mode union instead of casting arbitrary URL strings. - Fix rendered code snippet in server-fn-chat.tsx — the documented shape now matches the actual `chatFn({ data: { messages }, signal })` call. - Add a fetcher-specific e2e assertion: the route sends a sentinel `x-tanstack-ai-transport: fetcher` header, and chat.spec.ts waits for a request carrying it. Without this, a silent fallback to the connection adapter would still pass the response assertion. Verified: - pnpm test:lib: 26/26 - pnpm test:eslint: 26/26 - pnpm --filter @tanstack/ai-e2e test:e2e: 196/196
🎯 Changes
Closes #509.
Wiring
useChatto a TanStack Start server function currently fails to typecheck becausestream()'s factory is typed as() => AsyncIterable<StreamChunk>, but a server function call returnsPromise<AsyncIterable<StreamChunk>>(handler returns the chat stream) orPromise<Response>(handler returnstoServerSentEventsResponse(stream)):This PR widens
stream()(andrpcStream()) to accept all three shapes — awaiting the result and parsing SSE if aResponseis returned — so a server function can be wired directly intouseChatwith end-to-end type safety from the call site to the handler:Reported by @vfshera in Discord.
Files touched
packages/typescript/ai-client/src/connection-adapters.ts— extendstream()andrpcStream(); factor shared SSE-from-Response helper.packages/typescript/ai-client/src/index.ts— export newStreamFactoryResulttype.packages/typescript/ai-client/tests/connection-adapters.test.ts— 4 new tests (Promise, Promise, Response error, rpcStream Promise).docs/chat/connection-adapters.md— new "TanStack Start Server Functions" section.examples/ts-react-chat— new/server-fn-chatroute +chatFnserver function demonstrating the pattern..changeset/stream-adapter-server-functions.md—@tanstack/ai-clientminor.✅ Checklist
pnpm run test:pr(test:lib,test:types,test:eslintall pass; manual/server-fn-chatsmoke-test was not run — sandbox couldn't launch a browser).🚀 Release Impact
@tanstack/ai-clientminor).cc @vfshera