Skip to content

Commit 4c61c95

Browse files
tclemCopilot
andcommitted
Go SDK: emit JSON-RPC error on pending-buffer overflow + guard drop
Carries the Rust SDK PR #1394 follow-up review fixes into the Go port: 1. Cap the per-session parked-waiter list at 128. When exceeded, reject the oldest waiter with errPendingSessionBufferOverflow ('pending session buffer overflow'). The handler returns a *jsonrpc2.Error with code -32603 via the new pendingRoutingRPCError helper, so the runtime receives a proper error response instead of hanging on the request id until its own timeout. Mirrors Rust commit 491b442 and TS commit c167bc3. 2. When the last pending-routing guard drops without RegisterSession (e.g. session.create failed mid-RPC), signal all parked waiters with errPendingSessionRoutingEnded ('pending session routing ended before session was registered'). Distinct phrasing from the overflow path so debugging can tell the two cases apart. Mirrors Rust commit e0ff254 and TS commit c167bc3. Adds pendingRoutingRPCError helper that routes sentinel errors to -32603 while unknown-session errors keep -32602. Adds two tests: - TestPendingRouting_OverflowEmitsError: 129 parked waiters, oldest gets -32603 overflow error, remaining 128 resolve normally after registration. - TestPendingRouting_GuardDropDistinctMessage: parks a request, drops the guard without registration, verifies exact routing-ended message and -32603 code. Updates TestPendingRouting_RejectsWaitersOnDispose to assert the new exact message and code instead of the old 'dropped' substring check. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent e7734f8 commit 4c61c95

2 files changed

Lines changed: 156 additions & 10 deletions

File tree

go/client.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,13 @@ func validateSessionFsConfig(config *SessionFsConfig) error {
8888
// }
8989
// defer client.Stop()
9090
//
91+
// Sentinel errors for the two pending-routing rejection paths. Using distinct
92+
// values lets callers (and debugging) tell overflow eviction from guard-drop.
93+
var (
94+
errPendingSessionBufferOverflow = errors.New("pending session buffer overflow")
95+
errPendingSessionRoutingEnded = errors.New("pending session routing ended before session was registered")
96+
)
97+
9198
// pendingResult carries the outcome of a parked inbound-request session lookup.
9299
type pendingResult struct {
93100
session *Session
@@ -1021,7 +1028,7 @@ func (c *Client) beginPendingSessionRouting() func() {
10211028

10221029
for _, chs := range waiters {
10231030
for _, ch := range chs {
1024-
ch <- pendingResult{err: fmt.Errorf("request dropped: cloud session.create completed without registering this session id")}
1031+
ch <- pendingResult{err: errPendingSessionRoutingEnded}
10251032
}
10261033
}
10271034
})
@@ -1076,10 +1083,12 @@ func (c *Client) waitForSession(sessionID string) (*Session, error) {
10761083
ch := make(chan pendingResult, 1)
10771084
waiters := c.pending.waiters[sessionID]
10781085
if len(waiters) >= pendingSessionBufferLimit {
1079-
// Reject the oldest waiter to keep the queue bounded.
1086+
// Reject the oldest waiter to keep the queue bounded. Send a JSON-RPC
1087+
// error response via the handler return so the runtime doesn't hang on
1088+
// the request id waiting for a reply that would never come.
10801089
oldest := waiters[0]
10811090
waiters = waiters[1:]
1082-
oldest <- pendingResult{err: fmt.Errorf("request dropped: pending session waiter buffer full for %s", sessionID)}
1091+
oldest <- pendingResult{err: errPendingSessionBufferOverflow}
10831092
}
10841093
c.pending.waiters[sessionID] = append(waiters, ch)
10851094
c.pending.mu.Unlock()
@@ -2126,6 +2135,18 @@ func (c *Client) handleSessionEvent(req sessionEventRequest) {
21262135
c.pending.mu.Unlock()
21272136
}
21282137

2138+
// pendingRoutingRPCError maps an error from waitForSession to the appropriate
2139+
// JSON-RPC error. Overflow and guard-drop rejections use -32603 (internal
2140+
// error) so the runtime gets a proper error response instead of hanging on the
2141+
// request id. All other waitForSession errors (e.g. unknown session) keep the
2142+
// existing -32602 (invalid params) code.
2143+
func pendingRoutingRPCError(err error) *jsonrpc2.Error {
2144+
if errors.Is(err, errPendingSessionBufferOverflow) || errors.Is(err, errPendingSessionRoutingEnded) {
2145+
return &jsonrpc2.Error{Code: -32603, Message: err.Error()}
2146+
}
2147+
return &jsonrpc2.Error{Code: -32602, Message: err.Error()}
2148+
}
2149+
21292150
// handleUserInputRequest handles a user input request from the CLI server.
21302151
func (c *Client) handleUserInputRequest(req userInputRequest) (*userInputResponse, *jsonrpc2.Error) {
21312152
if req.SessionID == "" || req.Question == "" {
@@ -2134,7 +2155,7 @@ func (c *Client) handleUserInputRequest(req userInputRequest) (*userInputRespons
21342155

21352156
session, err := c.waitForSession(req.SessionID)
21362157
if err != nil {
2137-
return nil, &jsonrpc2.Error{Code: -32602, Message: err.Error()}
2158+
return nil, pendingRoutingRPCError(err)
21382159
}
21392160

21402161
response, err := session.handleUserInputRequest(UserInputRequest{
@@ -2161,7 +2182,7 @@ func (c *Client) handleExitPlanModeRequest(req exitPlanModeRequest) (*ExitPlanMo
21612182

21622183
session, err := c.waitForSession(req.SessionID)
21632184
if err != nil {
2164-
return nil, &jsonrpc2.Error{Code: -32602, Message: err.Error()}
2185+
return nil, pendingRoutingRPCError(err)
21652186
}
21662187

21672188
response, err := session.handleExitPlanModeRequest(ExitPlanModeRequest{
@@ -2185,7 +2206,7 @@ func (c *Client) handleAutoModeSwitchRequest(req autoModeSwitchRequest) (*autoMo
21852206

21862207
session, err := c.waitForSession(req.SessionID)
21872208
if err != nil {
2188-
return nil, &jsonrpc2.Error{Code: -32602, Message: err.Error()}
2209+
return nil, pendingRoutingRPCError(err)
21892210
}
21902211

21912212
response, err := session.handleAutoModeSwitchRequest(AutoModeSwitchRequest{
@@ -2207,7 +2228,7 @@ func (c *Client) handleHooksInvoke(req hooksInvokeRequest) (map[string]any, *jso
22072228

22082229
session, err := c.waitForSession(req.SessionID)
22092230
if err != nil {
2210-
return nil, &jsonrpc2.Error{Code: -32602, Message: err.Error()}
2231+
return nil, pendingRoutingRPCError(err)
22112232
}
22122233

22132234
output, err := session.handleHooksInvoke(req.Type, req.Input)
@@ -2230,7 +2251,7 @@ func (c *Client) handleSystemMessageTransform(req systemMessageTransformRequest)
22302251

22312252
session, err := c.waitForSession(req.SessionID)
22322253
if err != nil {
2233-
return systemMessageTransformResponse{}, &jsonrpc2.Error{Code: -32602, Message: err.Error()}
2254+
return systemMessageTransformResponse{}, pendingRoutingRPCError(err)
22342255
}
22352256

22362257
resp, err := session.handleSystemMessageTransform(req.Sections)

go/cloud_session_test.go

Lines changed: 127 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,8 +317,133 @@ func TestPendingRouting_RejectsWaitersOnDispose(t *testing.T) {
317317
if rpcErr == nil {
318318
t.Fatal("expected an rpc error after dispose without registration")
319319
}
320-
if !strings.Contains(rpcErr.Message, "dropped") {
321-
t.Errorf("expected 'dropped' in error message, got: %s", rpcErr.Message)
320+
if !strings.Contains(rpcErr.Message, "routing ended before session was registered") {
321+
t.Errorf("expected routing-ended message, got: %s", rpcErr.Message)
322+
}
323+
if rpcErr.Code != -32603 {
324+
t.Errorf("expected code -32603, got: %d", rpcErr.Code)
325+
}
326+
case <-time.After(2 * time.Second):
327+
t.Fatal("timed out waiting for rejected waiter")
328+
}
329+
}
330+
331+
// TestPendingRouting_OverflowEmitsError verifies that when the parked-waiter
332+
// buffer reaches its cap, the oldest waiter receives the overflow error response
333+
// and the remaining 128 waiters resolve normally after registration.
334+
func TestPendingRouting_OverflowEmitsError(t *testing.T) {
335+
client := newCloudTestClient()
336+
dispose := client.beginPendingSessionRouting()
337+
338+
const pendingID = "overflow-request-session"
339+
const total = pendingSessionBufferLimit + 1 // 129
340+
341+
type result struct {
342+
resp *userInputResponse
343+
err *jsonrpcError
344+
}
345+
346+
// Register a user-input handler so the session resolves successfully.
347+
session, cleanup := newTestSession()
348+
defer cleanup()
349+
session.SessionID = pendingID
350+
session.registerUserInputHandler(func(req UserInputRequest, _ UserInputInvocation) (UserInputResponse, error) {
351+
return UserInputResponse{Answer: "yes"}, nil
352+
})
353+
354+
results := make([]chan result, total)
355+
for i := range total {
356+
results[i] = make(chan result, 1)
357+
go func(ch chan result) {
358+
resp, rpcErr := client.handleUserInputRequest(userInputRequest{
359+
SessionID: pendingID,
360+
Question: "Proceed?",
361+
})
362+
ch <- result{resp, rpcErr}
363+
}(results[i])
364+
}
365+
366+
// Give goroutines time to park.
367+
time.Sleep(50 * time.Millisecond)
368+
369+
// Register the session and flush — this resolves the 128 remaining waiters.
370+
client.sessionsMux.Lock()
371+
client.sessions[pendingID] = session
372+
client.sessionsMux.Unlock()
373+
client.flushPendingForSession(pendingID, session)
374+
dispose()
375+
376+
// Collect all results with a timeout.
377+
var gotOverflow int
378+
var gotSuccess int
379+
deadline := time.After(2 * time.Second)
380+
for _, ch := range results {
381+
select {
382+
case r := <-ch:
383+
if r.err != nil {
384+
if !strings.Contains(r.err.Message, "pending session buffer overflow") {
385+
t.Errorf("unexpected error message: %s", r.err.Message)
386+
}
387+
if r.err.Code != -32603 {
388+
t.Errorf("expected code -32603 for overflow, got: %d", r.err.Code)
389+
}
390+
gotOverflow++
391+
} else {
392+
gotSuccess++
393+
}
394+
case <-deadline:
395+
t.Fatalf("timed out: overflow=%d success=%d", gotOverflow, gotSuccess)
396+
}
397+
}
398+
399+
if gotOverflow != 1 {
400+
t.Errorf("expected exactly 1 overflow rejection, got %d", gotOverflow)
401+
}
402+
if gotSuccess != pendingSessionBufferLimit {
403+
t.Errorf("expected %d successful resolutions, got %d", pendingSessionBufferLimit, gotSuccess)
404+
}
405+
}
406+
407+
// TestPendingRouting_GuardDropDistinctMessage verifies that when the last
408+
// pending-routing guard drops without registration, parked waiters receive the
409+
// distinct routing-ended error (not the overflow message) so the two paths are
410+
// distinguishable in logs and debugging.
411+
func TestPendingRouting_GuardDropDistinctMessage(t *testing.T) {
412+
client := newCloudTestClient()
413+
dispose := client.beginPendingSessionRouting()
414+
415+
const pendingID = "guard-drop-session"
416+
417+
resultCh := make(chan *jsonrpcError, 1)
418+
go func() {
419+
_, rpcErr := client.handleUserInputRequest(userInputRequest{
420+
SessionID: pendingID,
421+
Question: "Proceed?",
422+
})
423+
resultCh <- rpcErr
424+
}()
425+
426+
// Give the goroutine time to park.
427+
time.Sleep(20 * time.Millisecond)
428+
429+
// Drop the guard without registering — simulates session.create failing.
430+
dispose()
431+
432+
select {
433+
case rpcErr := <-resultCh:
434+
if rpcErr == nil {
435+
t.Fatal("expected an rpc error after guard drop without registration")
436+
}
437+
const want = "pending session routing ended before session was registered"
438+
if rpcErr.Message != want {
439+
t.Errorf("expected exact message %q, got %q", want, rpcErr.Message)
440+
}
441+
if rpcErr.Code != -32603 {
442+
t.Errorf("expected code -32603, got: %d", rpcErr.Code)
443+
}
444+
// Must NOT contain the overflow message.
445+
if strings.Contains(rpcErr.Message, "buffer overflow") {
446+
t.Errorf("guard-drop path must not use overflow message, got: %s", rpcErr.Message)
322447
}
323448
case <-time.After(2 * time.Second):
324449
t.Fatal("timed out waiting for rejected waiter")

0 commit comments

Comments
 (0)