Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ import io.netty.handler.codec.http.HttpServerCodec;

import io.suboptimal.connectjava.codec.protobuf.ConnectProtobufCodecs;
import io.suboptimal.connectjava.model.*;
import io.suboptimal.connectjava.protocol.*;
import io.suboptimal.connectjava.protocol.server.ConnectCorsParameters;
import io.suboptimal.connectjava.protocol.server.ConnectServerProtocol;
import io.suboptimal.connectjava.protocol.server.ConnectServerProtocolConfig;
import io.suboptimal.connectjava.protocol.server.ConnectServerProtocolParameters;

import java.util.Map;

Expand All @@ -115,18 +118,18 @@ ConnectServiceDefinition greeter = new ConnectServiceDefinition(
/* idempotent — also reachable via Unary-GET */ true)),
/* optional descriptor for introspection */ null);

ConnectProtocolConfig config = ConnectProtocolConfig
ConnectServerProtocolConfig config = ConnectServerProtocolConfig
.builder(
Map.of(greeter.serviceName(), greeter),
GreeterCallHandler::new, // ConnectCallHandlerFactory
new ConnectProtocolParameters(
new ConnectServerProtocolParameters(
/* maxRequestBytes */ 4 * 1024 * 1024,
/* maxFrameBytes */ 1 * 1024 * 1024,
ConnectCorsParameters.disabled()),
ConnectProtobufCodecs.defaults()) // proto + proto-json codecs
.build();

ConnectProtocol protocol = new ConnectProtocol(config);
ConnectServerProtocol protocol = new ConnectServerProtocol(config);

ChannelInitializer<Channel> http1Initializer = new ChannelInitializer<>() {
@Override
Expand Down Expand Up @@ -347,13 +350,13 @@ serves Connect alongside any other HTTP/1.1 or HTTP/2 protocol you implement,
with ALPN, H2C prior-knowledge, and H2C upgrade negotiation done for you:

```java
import io.suboptimal.connectjava.protocol.ConnectProtocol;
import io.suboptimal.connectjava.protocol.server.ConnectServerProtocol;
import io.suboptimal.nettymultiprotocol.AppChannelConfigurer;
import io.suboptimal.nettymultiprotocol.AppProtocol;
import io.suboptimal.nettymultiprotocol.AppProtocolRegistry;
import io.suboptimal.nettymultiprotocol.NettyMultiprotocol;

ConnectProtocol connect = new ConnectProtocol(connectConfig);
io.suboptimal.connectjava.protocol.server.ConnectServerProtocol connect = new ConnectServerProtocol(connectConfig);

AppProtocol connectAsApp = new AppProtocol() {
@Override public AppChannelConfigurer http1() { return connect.http1()::configure; }
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<maven-compiler-plugin.version>3.15.0</maven-compiler-plugin.version>
<maven-surefire-plugin.version>3.2.5</maven-surefire-plugin.version>
<protobuf-maven-plugin.version>3.1.2</protobuf-maven-plugin.version>
<jetbrains-annotations.version>24.1.0</jetbrains-annotations.version>

<central-publishing-maven-plugin.version>0.7.0</central-publishing-maven-plugin.version>
<flatten-maven-plugin.version>1.6.0</flatten-maven-plugin.version>
Expand Down Expand Up @@ -88,6 +89,12 @@
<version>${jspecify.version}</version>
</dependency>

<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>${jetbrains-annotations.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import io.suboptimal.connectjava.model.ConnectMethodDefinition;
import io.suboptimal.connectjava.model.ConnectServiceDefinition;
import io.suboptimal.connectjava.protocol.server.ConnectServerInterceptor;

/**
* Immutable view of a Connect RPC call passed to each {@link io.suboptimal.connectjava.protocol.ConnectInterceptor}.
* Immutable view of a Connect RPC call passed to each {@link ConnectServerInterceptor}.
*
* <p>{@link #responseHeadersBuilder()} accepts mutations until the first response payload (or the
* terminal response for unary calls) is written; after that point any mutation throws
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.suboptimal.connectjava.api;

import io.suboptimal.connectjava.model.ConnectMethodDefinition;
import io.suboptimal.connectjava.model.ConnectServiceDefinition;

public record ConnectClientResponseStart (
ConnectServiceDefinition serviceDefinition,
ConnectMethodDefinition methodDefinition,
ConnectResponseMeta responseMeta
) implements ConnectMessage {}
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
package io.suboptimal.connectjava.api;

import org.jspecify.annotations.Nullable;

import java.util.List;
import java.util.Map;

/**
* Terminal signal indicating the successful end of one side of an RPC payload stream.
* Terminal signal indicating the end of one side of an RPC payload stream.
*
* <p>For streaming calls the {@code trailers} field carries trailing metadata from the
* end-stream envelope, and {@code error} is non-null when the end-stream envelope carried
* an error. A consumer must treat this message as terminal in both cases: a non-null
* {@code error} means the call failed, and the trailers are still available.
* For unary calls use {@link #INSTANCE} (no trailers, no error).
*/
public record ConnectEndOfStream() implements ConnectMessage {
public static final ConnectEndOfStream INSTANCE = new ConnectEndOfStream();
public record ConnectEndOfStream(Map<String, List<String>> trailers, @Nullable ConnectError error)
implements ConnectMessage {
public static final ConnectEndOfStream INSTANCE = new ConnectEndOfStream(Map.of(), null);

public ConnectEndOfStream(Map<String, List<String>> trailers) {
this(trailers, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

public sealed interface ConnectMessage permits
ConnectCallExchange,
ConnectClientResponseStart,
ConnectPayload,
ConnectEndOfStream,
ConnectError
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.suboptimal.connectjava.api;

import io.suboptimal.connectjava.protocol.ConnectCallObserver;
import io.suboptimal.connectjava.protocol.server.ConnectServerCallObserver;

/**
* Mutable Connect response headers collected by {@link ConnectCallObserver}s.
* Mutable Connect response headers collected by {@link ConnectServerCallObserver}s.
*
* <p>Mutations are applied to the wire response after all header observers run. Operation
* order is preserved, including the difference between {@link #set(CharSequence, CharSequence)}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.suboptimal.connectjava.api;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Metadata from a Connect RPC response.
*
* @param statusCode HTTP status code
* @param headers leading metadata (HTTP response headers without {@code trailer-} prefix)
*/
public record ConnectResponseMeta(
int statusCode,
Map<String, List<String>> headers
) {
public ConnectResponseMeta {
headers = copyLower(headers);
}

private static Map<String, List<String>> copyLower(Map<String, List<String>> source) {
return source
.entrySet()
.stream()
.collect(Collectors.toUnmodifiableMap(e -> e.getKey().toLowerCase(Locale.ROOT),
Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.suboptimal.connectjava.api;

import io.suboptimal.connectjava.protocol.ConnectCallObserver;
import io.suboptimal.connectjava.protocol.server.ConnectServerCallObserver;

/**
* Mutable Connect response trailers collected by {@link ConnectCallObserver}s.
* Mutable Connect response trailers collected by {@link ConnectServerCallObserver}s.
*
* <p>For unary RPCs, trailers are serialized as {@code Trailer-*} response headers. For
* streaming RPCs, trailers are serialized as the {@code metadata} object in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,33 @@
import io.suboptimal.connectjava.compression.ConnectCompression;
import io.suboptimal.connectjava.compression.ConnectCompressionRegistry;
import io.suboptimal.connectjava.compression.ConnectIdentityCompression;
import org.jetbrains.annotations.ApiStatus;
import org.jspecify.annotations.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;

final class ConnectCompressionNegotiation {
@ApiStatus.Internal
public final class ConnectCompressionNegotiation {

private ConnectCompressionNegotiation() {
}

static @Nullable String compressionNameFor(@Nullable CharSequence encoding) {
public static @Nullable String compressionNameFor(@Nullable CharSequence encoding) {
if (encoding == null) {
return null;
}
String name = encoding.toString().trim().toLowerCase(Locale.ROOT);
return name.isEmpty() ? null : name;
}

static String formatSupportedEncodings(ConnectCompressionRegistry registry) {
public static String formatSupportedEncodings(ConnectCompressionRegistry registry) {
return String.join(",", registry.supportedNames());
}

static ConnectCompression selectResponseEncoding(
public static ConnectCompression selectResponseEncoding(
ConnectCompression requestEncoding, @Nullable String responseEncoding, ConnectCompressionRegistry registry)
{
if (!requestEncoding.isIdentity()) {
Expand Down Expand Up @@ -100,7 +102,7 @@ private static double qFor(List<Coding> codings, String name, double fallback) {
return null;
}

static ByteBuf decompressMessage(ByteBufAllocator alloc, ByteBuf body, ConnectCompression compression)
public static ByteBuf decompressMessage(ByteBufAllocator alloc, ByteBuf body, ConnectCompression compression)
throws IOException
{
if (body.readableBytes() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,52 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.jetbrains.annotations.ApiStatus;
import org.jspecify.annotations.Nullable;

/**
* Connect streaming envelope framing: {@code 1 byte flags | 4 byte big-endian length | payload bytes}.
*
* <p>Encoding is stateless; decoding is stateful (maintains a buffer accumulator).
*/
final class ConnectEnvelope {
static final byte FLAG_COMPRESSED = 0x01;
static final byte FLAG_END_STREAM = 0x02;
@ApiStatus.Internal
public final class ConnectEnvelope {
public static final byte FLAG_COMPRESSED = 0x01;
public static final byte FLAG_END_STREAM = 0x02;
static final int HEADER_SIZE = 5;

private ConnectEnvelope() {}

record DecodedFrame(byte flags, ByteBuf payload) {}
public record DecodedFrame(byte flags, ByteBuf payload) {}

static final class FrameTooLargeException extends RuntimeException {
public static final class FrameTooLargeException extends RuntimeException {
FrameTooLargeException(String message) {
super(message, null, false, false);
}
}

static ByteBuf encode(ByteBufAllocator alloc, byte flags, byte[] payload) {
public static ByteBuf encode(ByteBufAllocator alloc, byte flags, byte[] payload) {
ByteBuf buf = alloc.buffer(HEADER_SIZE + payload.length);
buf.writeByte(flags);
buf.writeInt(payload.length);
buf.writeBytes(payload);
return buf;
}

static ByteBuf encode(ByteBufAllocator alloc, byte flags, ByteBuf payload) {
public static ByteBuf encode(ByteBufAllocator alloc, byte flags, ByteBuf payload) {
ByteBuf buf = alloc.buffer(HEADER_SIZE + payload.readableBytes());
buf.writeByte(flags);
buf.writeInt(payload.readableBytes());
buf.writeBytes(payload, payload.readerIndex(), payload.readableBytes());
return buf;
}

static final class Decoder {
public static final class Decoder {
private final ByteBuf accumulator;
private final int maxFrameBytes;
private boolean closed;

Decoder(ByteBufAllocator alloc, int maxFrameBytes) {
public Decoder(ByteBufAllocator alloc, int maxFrameBytes) {
this.accumulator = alloc.buffer();
this.maxFrameBytes = maxFrameBytes;
}
Expand All @@ -54,7 +56,7 @@ static final class Decoder {
* Appends incoming bytes to the internal accumulator. No-op if the decoder is closed
* or {@code buf} has no readable bytes.
*/
void append(ByteBuf buf) {
public void append(ByteBuf buf) {
if (closed) {
return;
}
Expand All @@ -70,7 +72,8 @@ void append(ByteBuf buf) {
* the declared payload length exceeds the configured maximum. Returns {@code null} if
* the decoder is closed.
*/
@Nullable DecodedFrame pollFrame() {
@Nullable
public DecodedFrame pollFrame() {
if (closed) {
return null;
}
Expand All @@ -94,11 +97,11 @@ void append(ByteBuf buf) {
return new DecodedFrame(flags, payload);
}

int readableBytes() {
public int readableBytes() {
return closed ? 0 : accumulator.readableBytes();
}

void close() {
public void close() {
if (!closed) {
closed = true;
accumulator.release();
Expand Down
Loading