Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
import io.modelcontextprotocol.client.transport.customizer.McpHttpClientAuthorizationErrorHandler;
import io.modelcontextprotocol.client.transport.customizer.McpHttpClientTransportAuthorizationErrorHandler;
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.json.McpJsonDefaults;
Expand Down Expand Up @@ -120,7 +121,7 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {

private final boolean openConnectionOnStartup;

private final McpHttpClientAuthorizationErrorHandler authorizationErrorHandler;
private final McpHttpClientTransportAuthorizationErrorHandler authorizationErrorHandler;

private final boolean resumableStreams;

Expand All @@ -139,7 +140,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
McpHttpClientAuthorizationErrorHandler authorizationErrorHandler, List<String> supportedProtocolVersions) {
McpHttpClientTransportAuthorizationErrorHandler authorizationErrorHandler,
List<String> supportedProtocolVersions) {
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
Expand Down Expand Up @@ -295,9 +297,12 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
int statusCode = responseEvent.responseInfo().statusCode();
if (statusCode == 401 || statusCode == 403) {
logger.debug("Authorization error in reconnect with code {}", statusCode);
var request = requestBuilder.build();
var requestSnapshot = new HttpRequestSnapshot(request.uri(), request.method(),
request.headers());
return Mono.<McpSchema.JSONRPCMessage>error(
new McpHttpClientTransportAuthorizationException(
"Authorization error connecting to SSE stream",
"Authorization error connecting to SSE stream", requestSnapshot,
responseEvent.responseInfo()));
}
else if (statusCode == METHOD_NOT_ALLOWED) {
Expand Down Expand Up @@ -417,7 +422,8 @@ private Retry authorizationErrorRetrySpec() {
return Mono.deferContextual(ctx -> {
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
return Mono
.from(this.authorizationErrorHandler.handle(authException.getResponseInfo(), transportContext))
.from(this.authorizationErrorHandler.handle(authException.getRequestSnapshot(),
authException.getResponseInfo(), transportContext))
.switchIfEmpty(Mono.just(false))
.flatMap(shouldRetry -> shouldRetry ? Mono.just(retrySignal.totalRetries())
: Mono.error(retrySignal.failure()));
Expand Down Expand Up @@ -489,7 +495,6 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
return Mono
.from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody, transportContext));
}).flatMapMany(requestBuilder -> Flux.<ResponseEvent>create(responseEventSink -> {

// Create the async request with proper body subscriber selection
Mono.fromFuture(this.httpClient
.sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink))
Expand All @@ -502,12 +507,14 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
}
})).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();

})).flatMap(responseEvent -> {
}).flatMap(responseEvent -> {
int statusCode = responseEvent.responseInfo().statusCode();
if (statusCode == 401 || statusCode == 403) {
var request = requestBuilder.build();
var requestSnapshot = new HttpRequestSnapshot(request.uri(), request.method(), request.headers());
logger.debug("Authorization error in sendMessage with code {}", statusCode);
return Mono.<McpSchema.JSONRPCMessage>error(new McpHttpClientTransportAuthorizationException(
"Authorization error when sending message", responseEvent.responseInfo()));
"Authorization error when sending message", requestSnapshot, responseEvent.responseInfo()));
}

if (transportSession.markInitialized(
Expand Down Expand Up @@ -651,13 +658,12 @@ else if (statusCode == BAD_REQUEST) {
if (ref != null) {
transportSession.removeConnection(ref);
}
})
.contextWrite(deliveredSink.contextView())
.subscribe();
})).contextWrite(deliveredSink.contextView()).subscribe();

disposableRef.set(connection);
transportSession.addConnection(connection);
});

}

private static String sessionIdOrPlaceholder(McpTransportSession<?> transportSession) {
Expand Down Expand Up @@ -695,7 +701,7 @@ public static class Builder {
private List<String> supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18, ProtocolVersions.MCP_2025_11_25);

private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler.NOOP;
private McpHttpClientTransportAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientTransportAuthorizationErrorHandler.NOOP;

/**
* Creates a new builder with the specified base URI.
Expand Down Expand Up @@ -828,8 +834,34 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
* when sending a message.
* @param authorizationErrorHandler the handler
* @return this builder
* @deprecated in favor of
* {@link #authorizationErrorHandler(McpHttpClientTransportAuthorizationErrorHandler)}
*/
@Deprecated(forRemoval = true, since = "2.0.0")
public Builder authorizationErrorHandler(McpHttpClientAuthorizationErrorHandler authorizationErrorHandler) {
this.authorizationErrorHandler = new McpHttpClientTransportAuthorizationErrorHandler() {
@Override
public Publisher<Boolean> handle(HttpRequestSnapshot requestSnapshot,
HttpResponse.ResponseInfo responseInfo, McpTransportContext context) {
return authorizationErrorHandler.handle(responseInfo, context);
}

@Override
public int maxRetries() {
return authorizationErrorHandler.maxRetries();
}
};
return this;
}

/**
* Sets the handler to be used when the server responds with HTTP 401 or HTTP 403
* when sending a message.
* @param authorizationErrorHandler the handler
* @return this builder
*/
public Builder authorizationErrorHandler(
McpHttpClientTransportAuthorizationErrorHandler authorizationErrorHandler) {
this.authorizationErrorHandler = authorizationErrorHandler;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2026-2026 the original author or authors.
*/

package io.modelcontextprotocol.client.transport;

import java.net.URI;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublisher;

/**
* Captures information about an HTTP request. We use this instead of passing the plain
* {@link HttpRequest} object because we want to avoid retaining a reference to the
* request's {@link BodyPublisher}.
*
* @param requestUri the HTTP request URI
* @param method the HTTP method
* @param headers the HTTP request headers
* @author Daniel Garnier-Moiroux
*/
public record HttpRequestSnapshot(URI requestUri, String method, HttpHeaders headers) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@ public class McpHttpClientTransportAuthorizationException extends McpTransportEx

private final HttpResponse.ResponseInfo responseInfo;

public McpHttpClientTransportAuthorizationException(String message, HttpResponse.ResponseInfo responseInfo) {
private final HttpRequestSnapshot requestSnapshot;

public McpHttpClientTransportAuthorizationException(String message, HttpRequestSnapshot requestSnapshot,
HttpResponse.ResponseInfo responseInfo) {
super(message);
this.responseInfo = responseInfo;
this.requestSnapshot = requestSnapshot;
}

public HttpResponse.ResponseInfo getResponseInfo() {
return responseInfo;
}

public HttpRequestSnapshot getRequestSnapshot() {
return requestSnapshot;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.net.http.HttpResponse;

import io.modelcontextprotocol.client.transport.HttpRequestSnapshot;
import io.modelcontextprotocol.client.transport.McpHttpClientTransportAuthorizationException;
import io.modelcontextprotocol.common.McpTransportContext;
import org.reactivestreams.Publisher;
Expand All @@ -20,7 +21,9 @@
* "https://modelcontextprotocol.io/specification/2025-11-25/basic/authorization">MCP
* Specification: Authorization</a>
* @author Daniel Garnier-Moiroux
* @deprecated in favor of {@link McpHttpClientTransportAuthorizationErrorHandler}
*/
@Deprecated(forRemoval = true, since = "2.0.0")
public interface McpHttpClientAuthorizationErrorHandler {

/**
Expand All @@ -38,7 +41,10 @@ public interface McpHttpClientAuthorizationErrorHandler {
* @param context the MCP client transport context
* @return {@link Publisher} emitting true if the original request should be replayed,
* false otherwise.
* @deprecated in favor of
* {@link McpHttpClientTransportAuthorizationErrorHandler#handle(HttpRequestSnapshot, HttpResponse.ResponseInfo, McpTransportContext)}
*/
@Deprecated(forRemoval = true, since = "2.0.0")
Publisher<Boolean> handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context);

/**
Expand Down Expand Up @@ -87,7 +93,10 @@ interface Sync {
* @param responseInfo the HTTP response information
* @param context the MCP client transport context
* @return true if the original request should be replayed, false otherwise.
* @deprecated in favor of
* {@link McpHttpClientTransportAuthorizationErrorHandler.Sync#handle(HttpRequestSnapshot, HttpResponse.ResponseInfo, McpTransportContext)}
*/
@Deprecated(forRemoval = true, since = "2.0.0")
boolean handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context);

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2026-2026 the original author or authors.
*/

package io.modelcontextprotocol.client.transport.customizer;

import java.net.http.HttpResponse;

import io.modelcontextprotocol.client.transport.HttpRequestSnapshot;
import io.modelcontextprotocol.client.transport.McpHttpClientTransportAuthorizationException;
import io.modelcontextprotocol.common.McpTransportContext;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* Handle security-related errors in HTTP-client based transports. This class handles MCP
* server responses with status code 401 and 403.
*
* @see <a href=
* "https://modelcontextprotocol.io/specification/2025-11-25/basic/authorization">MCP
* Specification: Authorization</a>
* @author Daniel Garnier-Moiroux
*/
public interface McpHttpClientTransportAuthorizationErrorHandler {

/**
* Handle authorization error (HTTP 401 or 403), and signal whether the HTTP request
* should be retried or not. If the publisher returns true, the original transport
* method (connect, sendMessage) will be replayed with the original arguments.
* Otherwise, the transport will throw an
* {@link McpHttpClientTransportAuthorizationException}, indicating the error status.
* <p>
* If the returned {@link Publisher} errors, the error will be propagated to the
* calling method, to be handled by the caller.
* <p>
* The number of retries is bounded by {@link #maxRetries()}.
* @param requestSnapshot the HTTP request snapshot that failed authorization
* @param responseInfo the HTTP response information
* @param context the MCP client transport context
* @return {@link Publisher} emitting true if the original request should be replayed,
* false otherwise.
*/
Publisher<Boolean> handle(HttpRequestSnapshot requestSnapshot, HttpResponse.ResponseInfo responseInfo,
McpTransportContext context);

/**
* Maximum number of authorization error retries the transport will attempt. When the
* handler signals a retry via {@link #handle}, the transport will replay the original
* request at most this many times. If the authorization error persists after
* exhausting all retries, the transport will propagate the
* {@link McpHttpClientTransportAuthorizationException}.
* <p>
* Defaults to {@code 1}.
* @return the maximum number of retries
*/
default int maxRetries() {
return 1;
}

/**
* A no-op handler, used in the default use-case.
*/
McpHttpClientTransportAuthorizationErrorHandler NOOP = new Noop();

/**
* Create a {@link McpHttpClientTransportAuthorizationErrorHandler} from a synchronous
* handler. Will be subscribed on {@link Schedulers#boundedElastic()}. The handler may
* be blocking.
* @param handler the synchronous handler
* @return an async handler
*/
static McpHttpClientTransportAuthorizationErrorHandler fromSync(Sync handler) {
return (snapshot, info, context) -> Mono.fromCallable(() -> handler.handle(snapshot, info, context))
.subscribeOn(Schedulers.boundedElastic());
}

/**
* Synchronous authorization error handler.
*/
interface Sync {

/**
* Handle authorization error (HTTP 401 or 403), and signal whether the HTTP
* request should be retried or not. If the return value is true, the original
* transport method (connect, sendMessage) will be replayed with the original
* arguments. Otherwise, the transport will throw an
* {@link McpHttpClientTransportAuthorizationException}, indicating the error
* status.
* @param requestSnapshot the HTTP request snapshot that failed authorization
* @param responseInfo the HTTP response information
* @param context the MCP client transport context
* @return true if the original request should be replayed, false otherwise.
*/
boolean handle(HttpRequestSnapshot requestSnapshot, HttpResponse.ResponseInfo responseInfo,
McpTransportContext context);

}

class Noop implements McpHttpClientTransportAuthorizationErrorHandler {

@Override
public Publisher<Boolean> handle(HttpRequestSnapshot requestSnapshot, HttpResponse.ResponseInfo responseInfo,
McpTransportContext context) {
return Mono.just(false);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,31 @@

/**
* @author Daniel Garnier-Moiroux
* @deprecated use {@link McpHttpClientTransportAuthorizationErrorHandlerTest}
*/
@Deprecated
class McpHttpClientAuthorizationErrorHandlerTest {

private final HttpResponse.ResponseInfo responseInfo = mock(HttpResponse.ResponseInfo.class);

private final McpTransportContext context = McpTransportContext.EMPTY;

@Test
void whenTrueThenRetry() {
void returnsTrue() {
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
.fromSync((info, ctx) -> true);
StepVerifier.create(handler.handle(responseInfo, context)).expectNext(true).verifyComplete();
}

@Test
void whenFalseThenError() {
void returnsFalse() {
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
.fromSync((info, ctx) -> false);
StepVerifier.create(handler.handle(responseInfo, context)).expectNext(false).verifyComplete();
}

@Test
void whenExceptionThenPropagate() {
void propragateExceptions() {
McpHttpClientAuthorizationErrorHandler handler = McpHttpClientAuthorizationErrorHandler
.fromSync((info, ctx) -> {
throw new IllegalStateException("sync handler error");
Expand Down
Loading
Loading