Skip to content

Commit 5b3abb5

Browse files
tclemCopilot
andcommitted
Java SDK: emit JSON-RPC error on pending-buffer overflow + guard drop
Carries forward the Rust SDK PR #1394 review feedback into the Java port: 1. Cap the per-session inbound-request parked-waiter list at 128 (BUFFER_LIMIT). When exceeded, evict the oldest waiter via completeExceptionally("pending session buffer overflow"). The RpcHandlerDispatcher thread blocked in waiter.get() wakes up, catches ExecutionException, and resolveSessionForRequest calls rpc.sendErrorResponse(-32603, ...) so the runtime isn't left waiting on a request id that will never get a reply. Mirrors Rust commit 491b442 and TS commit c167bc3 on PR #1395. 2. decrementGuard now completes all stale waiters internally with a distinct message ("pending session routing ended before session was registered") instead of returning them for callers to complete with ad-hoc strings. A single canonical message lets debugging tell the overflow-eviction path from the create-failed path. Mirrors Rust commit e0ff254 and TS commit c167bc3. 3. Fix isDone() fast path in resolveSessionForRequest: the existing catch-all "fall through" swallowed ExecutionException from an overflow-evicted waiter, sending a generic -32602 "Unknown session" error instead of -32603. Now explicitly catches ExecutionException in the isDone() branch and sends the same -32603 error as the blocking path. Adds two new unit tests in CloudSessionTest: - parkedRequestWaiterBuffer_overflow_evictsOldestWithOverflowMessage: parks 129 waiters, verifies oldest completes with "pending session buffer overflow", remaining 128 resolve normally after registration. - parkedRequestWaiter_guardDropMessage_isDistinctFromOverflowMessage: parks a request via the full handler path, drops the guard without registration, verifies the JSON-RPC error response contains "routing ended before session was registered" but not "buffer overflow". Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 1a46990 commit 5b3abb5

4 files changed

Lines changed: 146 additions & 47 deletions

File tree

java/src/main/java/com/github/copilot/sdk/CopilotClient.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -583,9 +583,7 @@ public CompletableFuture<CopilotSession> createCloudSession(SessionConfig config
583583
if (returnedId == null || returnedId.isEmpty()) {
584584
// No id: release the guard and surface the error. Any runtime session
585585
// created on the other side may leak — we have nothing to destroy.
586-
var staleWaiters = pendingRoutingState.decrementGuard();
587-
completeWaitersExceptionally(staleWaiters,
588-
"Cloud session.create completed without registering this sessionId; request dropped");
586+
pendingRoutingState.decrementGuard();
589587
LOG.warning("Cloud session.create response missing sessionId; runtime session may leak");
590588
throw new RuntimeException(
591589
"Cloud session.create response did not include a sessionId; cannot register session.");
@@ -619,45 +617,30 @@ public CompletableFuture<CopilotSession> createCloudSession(SessionConfig config
619617
waiter.complete(session);
620618
}
621619
} catch (Exception e) {
622-
// Roll back: remove session from map, release guard with exceptional completion
620+
// Roll back: remove session from map, release guard.
623621
sessions.remove(returnedId);
624-
var staleWaiters = pendingRoutingState.decrementGuard();
625-
completeWaitersExceptionally(staleWaiters,
626-
"Cloud session post-registration setup failed; request dropped");
622+
pendingRoutingState.decrementGuard();
627623
LoggingHelpers.logTiming(LOG, Level.WARNING, e,
628624
"CopilotClient.createCloudSession post-registration setup failed. Elapsed={Elapsed}",
629625
totalNanos);
630626
throw e instanceof RuntimeException re ? re : new RuntimeException(e);
631627
}
632628

633-
var staleWaiters = pendingRoutingState.decrementGuard();
634-
completeWaitersExceptionally(staleWaiters,
635-
"Cloud session.create completed without registering this sessionId; request dropped");
629+
pendingRoutingState.decrementGuard();
636630

637631
LoggingHelpers.logTiming(LOG, Level.FINE,
638632
"CopilotClient.createCloudSession complete. Elapsed={Elapsed}, SessionId=" + returnedId,
639633
totalNanos);
640634
return session;
641635
}).exceptionally(ex -> {
642-
var staleWaiters = pendingRoutingState.decrementGuard();
643-
completeWaitersExceptionally(staleWaiters, "Cloud session.create failed; request dropped");
636+
pendingRoutingState.decrementGuard();
644637
LoggingHelpers.logTiming(LOG, Level.WARNING, ex,
645638
"CopilotClient.createCloudSession failed. Elapsed={Elapsed}", totalNanos);
646639
throw ex instanceof RuntimeException re ? re : new RuntimeException(ex);
647640
});
648641
});
649642
}
650643

651-
private static void completeWaitersExceptionally(java.util.List<CompletableFuture<CopilotSession>> waiters,
652-
String message) {
653-
if (!waiters.isEmpty()) {
654-
var ex = new RuntimeException(message);
655-
for (var waiter : waiters) {
656-
waiter.completeExceptionally(ex);
657-
}
658-
}
659-
}
660-
661644
/**
662645
* Resumes an existing Copilot session.
663646
* <p>

java/src/main/java/com/github/copilot/sdk/PendingRoutingState.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,32 @@ synchronized void incrementGuard() {
5656

5757
/**
5858
* Decrement the guard count. If the count reaches zero, clears all buffered
59-
* events and completes all parked waiters exceptionally.
60-
*
61-
* @return list of waiters that should be completed exceptionally (caller must
62-
* complete them outside this lock to avoid re-entrancy)
59+
* events and completes all parked request waiters exceptionally with a
60+
* canonical message that is distinct from the overflow-eviction path.
6361
*/
64-
synchronized List<CompletableFuture<CopilotSession>> decrementGuard() {
62+
synchronized void decrementGuard() {
6563
guardCount = Math.max(0, guardCount - 1);
66-
if (guardCount == 0) {
67-
pendingEvents.clear();
68-
var stale = new ArrayList<CompletableFuture<CopilotSession>>();
69-
for (var list : pendingWaiters.values()) {
70-
stale.addAll(list);
64+
if (guardCount != 0) {
65+
return;
66+
}
67+
pendingEvents.clear();
68+
var stale = new ArrayList<CompletableFuture<CopilotSession>>();
69+
for (var list : pendingWaiters.values()) {
70+
stale.addAll(list);
71+
}
72+
pendingWaiters.clear();
73+
if (!stale.isEmpty()) {
74+
// Use a distinct phrasing from the overflow-eviction path so that
75+
// debugging can tell the two failure modes apart. Matches the Rust
76+
// SDK message (PR #1394 commit e0ff254f) and the TS SDK (commit
77+
// c167bc3e).
78+
LOG.warning("Pending session routing ended before session was registered; " + "completing " + stale.size()
79+
+ " parked request waiter(s) exceptionally");
80+
var ex = new RuntimeException("pending session routing ended before session was registered");
81+
for (var waiter : stale) {
82+
waiter.completeExceptionally(ex);
7183
}
72-
pendingWaiters.clear();
73-
return stale;
7484
}
75-
return Collections.emptyList();
7685
}
7786

7887
/**
@@ -137,7 +146,19 @@ synchronized CompletableFuture<CopilotSession> tryParkRequest(String sessionId,
137146
return null; // no pending; caller sends error
138147
}
139148
var future = new CompletableFuture<CopilotSession>();
140-
pendingWaiters.computeIfAbsent(sessionId, k -> new ArrayList<>()).add(future);
149+
var list = pendingWaiters.computeIfAbsent(sessionId, k -> new ArrayList<>());
150+
if (list.size() >= BUFFER_LIMIT) {
151+
// Cap parked waiters per session. When exceeded, evict the oldest
152+
// and complete it with a distinct overflow message so the runtime
153+
// gets an error response rather than hanging on a reply that will
154+
// never arrive. Matches Rust PR #1394 (commit 491b4427) and TS
155+
// (commit c167bc3e).
156+
var oldest = list.remove(0);
157+
LOG.warning("Pending session request waiter buffer full for session " + sessionId + " (limit="
158+
+ BUFFER_LIMIT + "); evicting oldest request");
159+
oldest.completeExceptionally(new RuntimeException("pending session buffer overflow"));
160+
}
161+
list.add(future);
141162
return future;
142163
}
143164

java/src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,9 +570,22 @@ private CopilotSession resolveSessionForRequest(String sessionId, long requestId
570570
CompletableFuture<CopilotSession> waiter = pendingState.tryParkRequest(sessionId, sessions);
571571
if (waiter != null) {
572572
if (waiter.isDone()) {
573-
// Resolved synchronously (session was already registered under the lock)
573+
// Session was already registered under tryParkRequest's lock —
574+
// complete synchronously. If the waiter was completed exceptionally
575+
// (e.g. overflow eviction just beat us to isDone()), send the
576+
// same -32603 error as the blocking path so the runtime isn't
577+
// left waiting for a reply that will never arrive.
574578
try {
575579
return waiter.get();
580+
} catch (ExecutionException e) {
581+
Throwable cause = e.getCause();
582+
try {
583+
rpc.sendErrorResponse(requestId, -32603, "Session " + sessionId + " not registered: "
584+
+ (cause != null ? cause.getMessage() : e.getMessage()));
585+
} catch (IOException ioe) {
586+
LOG.log(Level.SEVERE, "Failed to send synchronous registration-failed error", ioe);
587+
}
588+
return null;
576589
} catch (Exception e) {
577590
// fall through to send error
578591
}

java/src/test/java/com/github/copilot/sdk/CloudSessionTest.java

Lines changed: 92 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,7 @@ void bufferEarlySessionEventNotifications() throws Exception {
306306
}
307307

308308
// Release the guard
309-
var staleWaiters = pendingState.decrementGuard();
310-
assertTrue(staleWaiters.isEmpty(), "No stale waiters expected");
309+
pendingState.decrementGuard();
311310

312311
// The buffered session.event should now have been replayed to the session
313312
Thread.sleep(50);
@@ -413,21 +412,104 @@ void parkedRequestFailsExceptionallyWhenGuardDroppedWithoutRegistration() throws
413412

414413
Thread.sleep(100);
415414

416-
// Drop the guard WITHOUT registering the session → waiters should be completed
417-
// exceptionally
418-
var staleWaiters = pendingState.decrementGuard();
419-
// Complete them exceptionally to simulate what CopilotClient does on failure
420-
for (var w : staleWaiters) {
421-
w.completeExceptionally(new RuntimeException("Cloud session.create failed; request dropped"));
422-
}
415+
// Drop the guard WITHOUT registering the session. decrementGuard now
416+
// completes parked waiters internally with the canonical message.
417+
pendingState.decrementGuard();
423418

424-
// The handler should eventually receive the exceptional completion and send an
419+
// The handler should receive the exceptional completion and send an
425420
// error response
426421
toolCallFuture.get(5, TimeUnit.SECONDS);
427422

428423
JsonNode response = readResponse();
429424
assertNotNull(response, "Should have received an error response");
430425
assertEquals(99, response.get("id").asInt());
431426
assertNotNull(response.get("error"), "Response should be an error (not a result)");
427+
String errorMessage = response.get("error").get("message").asText();
428+
assertTrue(errorMessage.contains("routing ended before session was registered"),
429+
"Error message should contain the canonical guard-drop phrase; got: " + errorMessage);
430+
}
431+
432+
// =========================================================================
433+
// Test 8: overflow path — oldest parked waiter gets the overflow message
434+
// =========================================================================
435+
436+
@Test
437+
void parkedRequestWaiterBuffer_overflow_evictsOldestWithOverflowMessage() throws Exception {
438+
pendingState.incrementGuard();
439+
String sid = "cloud-overflow-requests";
440+
441+
// Park BUFFER_LIMIT + 1 waiters via tryParkRequest. The 129th call must
442+
// evict the very first waiter and complete it with the overflow message.
443+
var waiters = new java.util.ArrayList<CompletableFuture<CopilotSession>>();
444+
for (int i = 0; i < PendingRoutingState.BUFFER_LIMIT + 1; i++) {
445+
waiters.add(pendingState.tryParkRequest(sid, sessions));
446+
}
447+
448+
// The first waiter (oldest) must have been evicted with the overflow message.
449+
CompletableFuture<CopilotSession> oldest = waiters.get(0);
450+
assertTrue(oldest.isCompletedExceptionally(), "Oldest waiter should be completed exceptionally on overflow");
451+
ExecutionException ex = assertThrows(ExecutionException.class, oldest::get);
452+
assertEquals("pending session buffer overflow", ex.getCause().getMessage());
453+
454+
// The remaining BUFFER_LIMIT waiters should still be pending.
455+
for (int i = 1; i <= PendingRoutingState.BUFFER_LIMIT; i++) {
456+
assertFalse(waiters.get(i).isDone(), "Waiter " + i + " should still be pending after overflow eviction");
457+
}
458+
459+
// Registering the session resolves the remaining 128 waiters normally.
460+
var session = new CopilotSession(sid, rpc);
461+
var flush = pendingState.registerAndFlush(sid, session, sessions);
462+
assertEquals(PendingRoutingState.BUFFER_LIMIT, flush.waiters().size(),
463+
"registerAndFlush should return all non-evicted waiters");
464+
for (var waiter : flush.waiters()) {
465+
waiter.complete(session);
466+
}
467+
for (int i = 1; i <= PendingRoutingState.BUFFER_LIMIT; i++) {
468+
assertFalse(waiters.get(i).isCompletedExceptionally(),
469+
"Waiter " + i + " should complete normally, not exceptionally");
470+
assertEquals(session, waiters.get(i).get(1, TimeUnit.SECONDS));
471+
}
472+
473+
pendingState.decrementGuard();
474+
}
475+
476+
// =========================================================================
477+
// Test 9: guard-drop message is distinct from overflow message
478+
// =========================================================================
479+
480+
@Test
481+
void parkedRequestWaiter_guardDropMessage_isDistinctFromOverflowMessage() throws Exception {
482+
String pendingSessionId = "cloud-session-distinct-msg";
483+
484+
pendingState.incrementGuard();
485+
486+
// Park a request in the background via the full handler path so the
487+
// response travels over the socket — this mirrors the real runtime flow.
488+
var toolCallFuture = CompletableFuture.runAsync(() -> {
489+
ObjectNode params = MAPPER.createObjectNode();
490+
params.put("sessionId", pendingSessionId);
491+
params.put("toolCallId", "tc-distinct");
492+
params.put("toolName", "noop");
493+
params.set("arguments", MAPPER.createObjectNode());
494+
invokeHandler("tool.call", "77", params);
495+
});
496+
497+
Thread.sleep(100);
498+
499+
// Drop the guard without registration. decrementGuard completes waiters
500+
// internally with the canonical guard-drop message.
501+
pendingState.decrementGuard();
502+
503+
toolCallFuture.get(5, TimeUnit.SECONDS);
504+
505+
JsonNode response = readResponse();
506+
assertEquals(77, response.get("id").asInt());
507+
assertNotNull(response.get("error"), "Should be an error response");
508+
String msg = response.get("error").get("message").asText();
509+
510+
// Must contain the guard-drop phrase — NOT the overflow phrase.
511+
assertTrue(msg.contains("routing ended before session was registered"),
512+
"Guard-drop error must use the routing-ended phrase; got: " + msg);
513+
assertFalse(msg.contains("buffer overflow"), "Guard-drop error must NOT use the overflow phrase; got: " + msg);
432514
}
433515
}

0 commit comments

Comments
 (0)