diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/pom.xml b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/pom.xml
new file mode 100644
index 0000000000..3d2504b82e
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/pom.xml
@@ -0,0 +1,96 @@
+
+
+ 4.0.0
+
+
+ co.elastic.apm
+ apm-spring-webflux
+ 1.56.1-SNAPSHOT
+
+
+ apm-spring-webflux-plugin-spring7
+ ${project.groupId}:${project.artifactId}
+
+
+
+ ${project.basedir}/../../..
+
+ true
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ ${version.spring-boot-4}
+ pom
+ import
+
+
+
+
+
+
+ ${project.groupId}
+ apm-spring-webflux-spring5
+ ${project.version}
+
+
+
+
+ ${project.groupId}
+ apm-spring-webflux-testapp-spring7
+ ${project.version}
+ test
+
+
+ org.springframework
+ spring-web
+ provided
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+ co.elastic.apm
+ apm-spring-webflux-spring5
+ ${project.version}
+ test
+ test-jar
+
+
+
+
+ co.elastic.apm
+ apm-reactor-plugin
+ ${project.version}
+ test
+
+
+
+ co.elastic.apm
+ apm-reactor-plugin
+ ${project.version}
+ test
+ test-jar
+
+
+
+ org.apache.ivy
+ ivy
+ test
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+
+
+
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7HeaderGetterTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7HeaderGetterTest.java
new file mode 100644
index 0000000000..4327efa964
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7HeaderGetterTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux;
+
+public class Spring7HeaderGetterTest extends Spring7Test {
+
+ public Spring7HeaderGetterTest() {
+ super(Impl.class);
+ }
+
+ public static class Impl extends HeaderGetterTest {
+
+ }
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerAnnotatedInstrumentationTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerAnnotatedInstrumentationTest.java
new file mode 100644
index 0000000000..5bf897f97a
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerAnnotatedInstrumentationTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux;
+
+
+public class Spring7ServerAnnotatedInstrumentationTest extends Spring7Test {
+
+ public Spring7ServerAnnotatedInstrumentationTest() {
+ super(Impl.class);
+ }
+
+ public static class Impl extends ServerAnnotatedInstrumentationTest {
+ }
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerFunctionalInstrumentationTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerFunctionalInstrumentationTest.java
new file mode 100644
index 0000000000..725857c371
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerFunctionalInstrumentationTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux;
+
+public class Spring7ServerFunctionalInstrumentationTest extends Spring7Test {
+
+ public Spring7ServerFunctionalInstrumentationTest() {
+ super(Impl.class);
+ }
+
+ public static class Impl extends ServerFunctionalInstrumentationTest {
+ }
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServletContainerTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServletContainerTest.java
new file mode 100644
index 0000000000..9c9790cba8
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServletContainerTest.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux;
+
+public class Spring7ServletContainerTest extends ServletContainerTest {
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7Test.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7Test.java
new file mode 100644
index 0000000000..cccfba9cf5
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7Test.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux;
+
+import org.junit.jupiter.api.Test;
+import org.junit.platform.launcher.Launcher;
+import org.junit.platform.launcher.LauncherDiscoveryRequest;
+import org.junit.platform.launcher.core.LauncherDiscoveryRequestBuilder;
+import org.junit.platform.launcher.core.LauncherFactory;
+import org.junit.platform.launcher.listeners.SummaryGeneratingListener;
+import org.junit.platform.launcher.listeners.TestExecutionSummary;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.platform.engine.discovery.DiscoverySelectors.selectClass;
+
+abstract class Spring7Test {
+ private Class> actualTestClass;
+
+ public Spring7Test(Class> testClazz) {
+ this.actualTestClass = testClazz;
+ }
+
+ @Test
+ public void runTests() {
+ LauncherDiscoveryRequest request = LauncherDiscoveryRequestBuilder.request()
+ .selectors(selectClass(actualTestClass))
+ .build();
+ Launcher launcher = LauncherFactory.create();
+ SummaryGeneratingListener listener = new SummaryGeneratingListener();
+ launcher.registerTestExecutionListeners(listener);
+ launcher.execute(request);
+
+ for (TestExecutionSummary.Failure failure : listener.getSummary().getFailures()) {
+ System.out.println(failure);
+ failure.getException().printStackTrace();
+ }
+ assertThat(listener.getSummary().getTestsFailedCount())
+ .describedAs("at least one test failure reported, see stack trace for investigation")
+ .isZero();
+ }
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7WebSocketServerInstrumentationTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7WebSocketServerInstrumentationTest.java
new file mode 100644
index 0000000000..c31527231f
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7WebSocketServerInstrumentationTest.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux;
+
+
+public class Spring7WebSocketServerInstrumentationTest extends WebSocketServerInstrumentationTest {
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java
index 3cdcdc3fb1..6274f38d0b 100644
--- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java
@@ -58,7 +58,10 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static co.elastic.apm.agent.tracer.AbstractSpan.PRIORITY_HIGH_LEVEL_FRAMEWORK;
import static co.elastic.apm.agent.tracer.AbstractSpan.PRIORITY_LOW_LEVEL_FRAMEWORK;
@@ -313,7 +316,13 @@ private static void fillResponse(Transaction> transaction, ServerWebExchange e
}
private static void copyHeaders(HttpHeaders source, PotentiallyMultiValuedMap destination) {
- for (Map.Entry> header : source.entrySet()) {
+ // This ensures we can make a MultiValueMap without having access
+ // to the api from spring framework 7 that will work across versions 5, 6 and 7
+ Java7IdentityFunction identityFunction = new Java7IdentityFunction();
+ MultiValueCollectorFunction multiValueCollectorFunction = new MultiValueCollectorFunction(source);
+ Map> multiValueHeaderMap = source.toSingleValueMap().keySet()
+ .stream().collect(Collectors.toMap(identityFunction, multiValueCollectorFunction));
+ for (Map.Entry> header : multiValueHeaderMap.entrySet()) {
for (String value : header.getValue()) {
destination.add(header.getKey(), value);
}
@@ -328,4 +337,27 @@ private static void copyCookies(MultiValueMap source, Potent
}
}
+ // Due to -source 7, lambdas cannot be used for the collector
+ private static class MultiValueCollectorFunction implements Function> {
+ private final HttpHeaders source;
+
+ public MultiValueCollectorFunction(HttpHeaders source) {
+ this.source = source;
+ }
+
+ @Override
+ public List apply(String key) {
+ return Objects.requireNonNull(source.getValuesAsList(key));
+ }
+ }
+
+ // Due to -source 7, static method invocations cannot be used
+ private static class Java7IdentityFunction implements Function {
+ public Java7IdentityFunction() {}
+
+ @Override
+ public String apply(String key) {
+ return key;
+ }
+ }
}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/test/java/co/elastic/apm/agent/springwebflux/AbstractServerInstrumentationTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/test/java/co/elastic/apm/agent/springwebflux/AbstractServerInstrumentationTest.java
index 10a46d32e4..15fa1ec200 100644
--- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/test/java/co/elastic/apm/agent/springwebflux/AbstractServerInstrumentationTest.java
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/test/java/co/elastic/apm/agent/springwebflux/AbstractServerInstrumentationTest.java
@@ -39,11 +39,13 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.springframework.core.SpringVersion;
+import org.springframework.http.HttpStatus;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Hooks;
import reactor.test.StepVerifier;
+import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
@@ -140,8 +142,8 @@ private void hello(boolean expectHeaders) {
.describedAs("non-standard request headers should be captured")
.isEqualTo("12345");
- assertThat(headers.getFirst("Accept"))
- .isEqualTo("text/plain, application/json");
+ assertThat(headers.getAll("Accept"))
+ .containsAll(List.of("text/plain" , "application/json"));
assertThat(request.getCookies()
.getFirst("cookie"))
@@ -209,6 +211,15 @@ private static int getStatusCode(WebClientResponseException exception) {
} catch (Exception | Error e) {
// silently ignored
}
+ try {
+ // Due to the many breaking changes in the spring framework API in version 7, we have to access the status code through reflection.
+ // This will check if it can retrieve the int code via the HttpStatus.value() method, as getRawStatusCode() has been removed in 7
+ Field statusCode = exception.getClass().getSuperclass().getDeclaredField("statusCode");
+ statusCode.setAccessible(true);
+ return ((HttpStatus)statusCode.get(exception)).value();
+ } catch (Exception | Error e) {
+ // silently ignored
+ }
try {
return exception.getRawStatusCode();
} catch (Exception | Error e) {
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/pom.xml b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/pom.xml
new file mode 100644
index 0000000000..a0eec5da6b
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/pom.xml
@@ -0,0 +1,122 @@
+
+
+ 4.0.0
+
+
+ co.elastic.apm
+ apm-spring-webflux
+ 1.56.1-SNAPSHOT
+
+
+ apm-spring-webflux-testapp-spring7
+ ${project.groupId}:${project.artifactId}
+
+
+
+ ${project.basedir}/../../..
+
+
+ 17
+ 17
+
+
+ true
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ ${version.spring-boot-4}
+ pom
+ import
+
+
+
+
+
+
+ ${project.groupId}
+ apm-agent-core
+ ${project.version}
+
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+ org.apache.logging.log4j
+ log4j-to-slf4j
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+
+
+
+ org.springframework.boot
+ spring-boot-starter-security
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.slf4j
+ slf4j-simple
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+
+
+
+ ${project.artifactId}
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ 4.1.0
+
+ co.elastic.apm.agent.springwebflux.testapp.WebFluxApplication
+
+ standalone
+
+
+
+
+ repackage
+
+
+
+
+
+
+
+
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingAnnotated.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingAnnotated.java
new file mode 100644
index 0000000000..338cd24982
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingAnnotated.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux.testapp;
+
+import co.elastic.apm.agent.impl.ElasticApmTracer;
+import co.elastic.apm.agent.impl.transaction.IdImpl;
+import co.elastic.apm.agent.tracer.AbstractSpan;
+import co.elastic.apm.agent.tracer.GlobalTracer;
+import co.elastic.apm.agent.impl.transaction.TransactionImpl;
+import co.elastic.apm.agent.sdk.logging.Logger;
+import co.elastic.apm.agent.sdk.logging.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PatchMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Provides Webflux annotated endpoint
+ */
+@RestController
+@RequestMapping(value = "/annotated", produces = MediaType.TEXT_PLAIN_VALUE)
+public class GreetingAnnotated {
+
+ private static final Logger log = LoggerFactory.getLogger(GreetingAnnotated.class);
+
+ final GreetingHandler greetingHandler;
+
+ @Autowired
+ public GreetingAnnotated(GreetingHandler greetingHandler) {
+ this.greetingHandler = greetingHandler;
+ }
+
+ @RequestMapping("/hello")
+ public Mono getHello(@RequestParam(value = "name", required = false) @Nullable String name) {
+ return greetingHandler.helloMessage(name);
+ }
+
+ @PreAuthorize("hasAuthority('ROLE_USER')")
+ @RequestMapping("/preauthorized")
+ public Mono getPreauthorized() {
+ return greetingHandler.helloMessage("elastic");
+ }
+
+ @PreAuthorize("hasAuthority('ROLE_USER')")
+ @RequestMapping("/username")
+ public Mono getSecurityContextUsername() {
+ return greetingHandler.getUsernameFromContext();
+ }
+
+ // protected by SecurityWebFilterChain#pathMatchers
+ @RequestMapping("/path-username")
+ public Mono getSecurityContextUsernameByPathSecured() {
+ return greetingHandler.getUsernameFromContext();
+ }
+
+ @RequestMapping("/error-handler")
+ public Mono handlerError() {
+ // using delayed exception here allows to ensure that the exception handler is properly
+ // executed, as its execution is part of the "dispatch" phase and is outside of "handler"
+ //
+ // this does not apply for functional definitions as the exception handler is directly part of the "handler"
+ // which is wrapped into the "dispatcher" (which we instrument).
+ return greetingHandler.delayedException();
+ }
+
+ @RequestMapping("/error-mono")
+ public Mono monoError() {
+ return greetingHandler.monoError();
+ }
+
+ @RequestMapping("/empty-mono")
+ public Mono monoEmpty() {
+ return greetingHandler.monoEmpty();
+ }
+
+ @ExceptionHandler
+ public ResponseEntity handleException(RuntimeException e) {
+ return ResponseEntity.status(500)
+ .body(greetingHandler.exceptionMessage(e));
+ }
+
+ @GetMapping("/hello-mapping")
+ public Mono getMapping() {
+ return greetingHandler.helloMessage("GET");
+ }
+
+ @PostMapping("/hello-mapping")
+ public Mono postMapping() {
+ return greetingHandler.helloMessage("POST");
+ }
+
+ @PutMapping("/hello-mapping")
+ public Mono putMapping() {
+ return greetingHandler.helloMessage("PUT");
+ }
+
+ @DeleteMapping("/hello-mapping")
+ public Mono deleteMapping() {
+ return greetingHandler.helloMessage("DELETE");
+ }
+
+ @PatchMapping("/hello-mapping")
+ public Mono patchMapping() {
+ return greetingHandler.helloMessage("PATCH");
+ }
+
+ @RequestMapping(path = "/hello-mapping", method = {RequestMethod.HEAD, RequestMethod.OPTIONS, RequestMethod.TRACE})
+ public Mono otherMapping(ServerHttpRequest request) {
+ return greetingHandler.helloMessage(request.getMethod().name());
+ }
+
+ @GetMapping("/with-parameters/{id}")
+ public Mono withParameters(@PathVariable("id") String id) {
+ return greetingHandler.helloMessage(id);
+ }
+
+ @GetMapping(path = "/child-flux")
+ public Flux getChildSpans(@RequestParam(value = "count", required = false, defaultValue = "3") int count,
+ @RequestParam(value = "duration", required = false, defaultValue = "5") long durationMillis,
+ @RequestParam(value = "delay", required = false, defaultValue = "5") long delayMillis) {
+
+ return greetingHandler.childSpans(count, delayMillis, durationMillis);
+ }
+
+ @GetMapping(path = "/child-flux/sse")
+ public Flux> getChildSpansSSE(@RequestParam(value = "count", required = false, defaultValue = "3") int count,
+ @RequestParam(value = "duration", required = false, defaultValue = "5") long durationMillis,
+ @RequestParam(value = "delay", required = false, defaultValue = "5") long delayMillis) {
+
+ return greetingHandler.childSpans(count, durationMillis, delayMillis)
+ .map(greetingHandler::toSSE);
+ }
+
+ @GetMapping("/custom-transaction-name")
+ public Mono customTransactionName() {
+ log.debug("enter customTransactionName");
+ try {
+
+ IdImpl transactionId = null;
+ if (!GlobalTracer.isNoop()) {
+ // Transaction should be active, even if we are outside of Mono/Flux execution
+ // In practice, it's called after onSubscribe and before onNext, thus the active context is not provided
+ // by reactor plugin, but only by the webflux plugin that keeps the transaction active.
+ ElasticApmTracer tracer = GlobalTracer.get().require(ElasticApmTracer.class);
+ TransactionImpl transaction = Objects.requireNonNull(tracer.currentTransaction(), "active transaction is required");
+ // This mimics setting the name through the public API. We cannot use the public API if we want to test span recycling
+ transaction.withName("user-provided-name", AbstractSpan.PRIORITY_USER_SUPPLIED);
+ transactionId = transaction.getTraceContext().getId();
+ }
+
+
+ return greetingHandler.helloMessage("transaction=" + transactionId);
+ } finally {
+ log.debug("exit customTransactionName");
+ }
+ }
+
+ @GetMapping("/duration")
+ public Mono duration(@RequestParam("duration") int durationMillis) {
+ return greetingHandler.duration(durationMillis);
+ }
+
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingFunctional.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingFunctional.java
new file mode 100644
index 0000000000..48a0156853
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingFunctional.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux.testapp;
+
+import co.elastic.apm.agent.impl.ElasticApmTracer;
+import co.elastic.apm.agent.tracer.AbstractSpan;
+import co.elastic.apm.agent.tracer.GlobalTracer;
+import co.elastic.apm.agent.impl.transaction.IdImpl;
+import co.elastic.apm.agent.impl.transaction.TransactionImpl;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.web.reactive.function.server.RouterFunction;
+import org.springframework.web.reactive.function.server.RouterFunctions;
+import org.springframework.web.reactive.function.server.ServerRequest;
+import org.springframework.web.reactive.function.server.ServerResponse;
+import reactor.core.publisher.Mono;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
+import static org.springframework.web.reactive.function.server.RequestPredicates.method;
+import static org.springframework.web.reactive.function.server.RequestPredicates.path;
+
+/**
+ * Provides functional Webflux endpoint
+ */
+@Configuration
+public class GreetingFunctional {
+
+ @Bean
+ public RouterFunction route(GreetingHandler greetingHandler) {
+
+ return RouterFunctions.route()
+ // 'hello' and 'hello2' are identical, but entry point in builder is not
+ .route(path("/functional/hello"),
+ request -> helloGreeting(greetingHandler, request.queryParam("name")))
+ .route(path("/functional/preauthorized"),
+ request -> helloGreeting(greetingHandler, Optional.of("elastic")))
+ .route(path("/functional/username"),
+ request -> getUsernameFromContext(greetingHandler))
+ .route(path("/functional/path-username"),
+ request -> getUsernameFromContext(greetingHandler))
+ //
+ .GET("/functional/hello2", accept(MediaType.TEXT_PLAIN),
+ request -> helloGreeting(greetingHandler, request.queryParam("name")))
+ // nested routes
+ .nest(path("/functional/nested"), builder -> builder
+ .route(method(HttpMethod.GET), request -> nested(greetingHandler, request.method().name()))
+ .route(method(HttpMethod.POST), request -> nested(greetingHandler, request.method().name()))
+ )
+ // path with parameters
+ .route(path("/functional/with-parameters/{id}"),
+ request -> helloGreeting(greetingHandler, Optional.of(request.pathVariable("id"))))
+ // route that supports multiple methods mapping
+ .route(path("/functional/hello-mapping"),
+ request -> helloGreeting(greetingHandler, Optional.of(request.method().name())))
+ // errors and mono corner cases
+ .GET("/functional/error-handler", accept(MediaType.TEXT_PLAIN), request -> greetingHandler.throwException())
+ .GET("/functional/error-mono", accept(MediaType.TEXT_PLAIN), request -> greetingHandler.monoError())
+ .GET("/functional/empty-mono", accept(MediaType.TEXT_PLAIN), request -> greetingHandler.monoEmpty())
+ // with known transaction duration
+ .GET("/functional/duration", accept(MediaType.TEXT_PLAIN), request -> response(greetingHandler.duration(getDuration(request))))
+ // custom transaction name set through API
+ .GET("/functional/custom-transaction-name", accept(MediaType.TEXT_PLAIN), request -> {
+ IdImpl transactionId = null;
+ if (!GlobalTracer.isNoop()) {
+ ElasticApmTracer tracer = GlobalTracer.get().require(ElasticApmTracer.class);
+ TransactionImpl transaction = Objects.requireNonNull(tracer.currentTransaction(), "active transaction is required");
+ // This mimics setting the name through the public API. We cannot use the public API if we want to test span recycling
+ transaction.withName("user-provided-name", AbstractSpan.PRIORITY_USER_SUPPLIED);
+ transactionId = transaction.getTraceContext().getId();
+ }
+ return response(greetingHandler.helloMessage("transaction=" + transactionId));
+ })
+ .GET("/functional/child-flux", accept(MediaType.TEXT_PLAIN), request -> ServerResponse.ok()
+ .body(greetingHandler.childSpans(getCount(request), getDelay(request), getDuration(request)), String.class
+ ))
+ .route(path("/functional/child-flux/sse"),
+ request -> ServerResponse.ok()
+ .body(greetingHandler.childSpans(getCount(request), getDelay(request), getDuration(request))
+ .map(greetingHandler::toSSE), ServerSentEvent.class
+ ))
+ // error handler
+ .onError(
+ e -> true, (e, request) -> ServerResponse
+ .status(request.queryParam("status")
+ .map(Integer::parseInt)
+ .orElse(500))
+ .bodyValue(greetingHandler.exceptionMessage(e))
+ )
+ .build();
+ }
+
+ private Long getDuration(ServerRequest request) {
+ return request.queryParam("duration").map(Long::parseLong).orElse(0L);
+ }
+
+ private int getCount(ServerRequest request) {
+ return request.queryParam("count").map(Integer::parseInt).orElse(1);
+ }
+
+ private Long getDelay(ServerRequest request) {
+ return request.queryParam("delay").map(Long::parseLong).orElse(0L);
+ }
+
+ private Mono nested(GreetingHandler greetingHandler, String methodName) {
+ return helloGreeting(greetingHandler, Optional.of("nested " + methodName));
+ }
+
+ private Mono helloGreeting(GreetingHandler greetingHandler, Optional name) {
+ return response(greetingHandler.helloMessage(name.orElse(null)));
+ }
+
+ private Mono getUsernameFromContext(GreetingHandler greetingHandler) {
+ return response(greetingHandler.getUsernameFromContext());
+ }
+
+ private Mono response(Mono value) {
+ return value.flatMap(s -> ServerResponse.ok()
+ .contentType(MediaType.TEXT_PLAIN)
+ .bodyValue(s));
+ }
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingHandler.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingHandler.java
new file mode 100644
index 0000000000..4222b836fd
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingHandler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux.testapp;
+
+import co.elastic.apm.agent.impl.ElasticApmTracer;
+import co.elastic.apm.agent.tracer.GlobalTracer;
+import co.elastic.apm.agent.tracer.Span;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.security.core.context.ReactiveSecurityContextHolder;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
+import javax.annotation.Nullable;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+
+@Component
+public class GreetingHandler {
+
+ public static final Scheduler CHILDREN_SCHEDULER = Schedulers.newBoundedElastic(16, 128, "children");
+
+ public Mono helloMessage(@Nullable String name) {
+ return Mono.just(String.format("Hello, %s!", Optional.ofNullable(name).orElse("Spring")));
+ }
+
+ public Mono throwException() {
+ throw new RuntimeException("intentional exception");
+ }
+
+ public Mono delayedException() {
+ return helloMessage(null)
+ .delayElement(Duration.ofMillis(50))
+ .flatMap(s -> {
+ throw new RuntimeException("intentional exception");
+ });
+ }
+
+ public Mono monoError() {
+ return Mono.error(new RuntimeException("intentional error"));
+ }
+
+ public Mono monoEmpty() {
+ return Mono.empty();
+ }
+
+ public String exceptionMessage(Throwable t) {
+ return "error handler: " + t.getMessage();
+ }
+
+ public Flux childSpans(int count, long delayMillis, long durationMillis) {
+ return Flux.range(1, count)
+ .subscribeOn(CHILDREN_SCHEDULER)
+ // initial delay
+ .delayElements(Duration.ofMillis(delayMillis))
+ .map(i -> String.format("child %d", i))
+ .doOnNext(name -> {
+ if (!GlobalTracer.isNoop()) {
+ Span> span = Objects.requireNonNull(GlobalTracer.get().require(ElasticApmTracer.class).currentTransaction()).createSpan();
+ span.withName(String.format("%s id=%s", name, span.getTraceContext().getId()));
+ try {
+ fakeWork(durationMillis);
+ } finally {
+ span.end();
+ }
+ }
+ });
+ }
+
+ public ServerSentEvent toSSE(String s) {
+ // we might be able to inject a comment into SSE event for context propagation
+ return ServerSentEvent.builder().data(s).build();
+ }
+
+ // Emulates a transaction that takes a known amount of time
+ // the whole transaction duration should include the delay
+ public Mono duration(long durationMillis) {
+ return helloMessage(String.format("duration=%d", durationMillis))
+ .doOnNext(m -> fakeWork(durationMillis));
+ }
+
+ public Mono getUsernameFromContext() {
+ return ReactiveSecurityContextHolder.getContext()
+ .flatMap(ctx -> Mono.just(ctx.getAuthentication().getName()));
+ }
+
+ private static void fakeWork(long durationMs) {
+ try {
+ Thread.sleep(durationMs);
+ } catch (InterruptedException e) {
+ // silently ignored
+ }
+ }
+
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingWebClient.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingWebClient.java
new file mode 100644
index 0000000000..ce856efa42
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingWebClient.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux.testapp;
+
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.socket.WebSocketMessage;
+import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
+import org.springframework.web.reactive.socket.client.WebSocketClient;
+import org.springframework.web.util.UriBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.Logger;
+import reactor.util.Loggers;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+public class GreetingWebClient {
+
+ private static final Logger logger = Loggers.getLogger(GreetingWebClient.class);
+
+ private final WebClient client;
+ private final String pathPrefix;
+ private final boolean useFunctionalEndpoint;
+ private final int port;
+ private final HttpHeaders headers;
+ private final MultiValueMap cookies;
+ private final Scheduler clientScheduler;
+ private final WebSocketClient wsClient;
+ private final String wsBaseUri;
+ private final boolean logEnabled;
+
+
+ // this client also applies a few basic checks to ensure that application behaves
+ // as expected within unit tests and in packaged application without duplicating
+ // all the testing logic.
+
+ public GreetingWebClient(String host, int port, boolean useFunctionalEndpoint, boolean logEnabled) {
+ this.pathPrefix = useFunctionalEndpoint ? "/functional" : "/annotated";
+ String baseUri = String.format("http://%s:%d%s", host, port, pathPrefix);
+ this.wsBaseUri = String.format("ws://%s:%d", host, port);
+ this.port = port;
+ this.client = WebClient.builder()
+ .baseUrl(baseUri)
+ .clientConnector(new ReactorClientHttpConnector()) // allows to use either netty/reactor or jetty client
+ .build();
+ this.useFunctionalEndpoint = useFunctionalEndpoint;
+ this.headers = new HttpHeaders();
+ this.cookies = new LinkedMultiValueMap<>();
+ this.clientScheduler = Schedulers.newBoundedElastic(16, 128, "webflux-client");
+ this.wsClient = new ReactorNettyWebSocketClient();
+ this.logEnabled = logEnabled;
+ }
+
+ public Mono getHelloMono() {
+ return requestMono("GET", "/hello", 200);
+ }
+
+ public Mono getPreAuthorized(int expectedStatus) { return requestMono("GET", "/preauthorized", expectedStatus); }
+
+ public Mono getSecurityContextUsername(int expectedStatus) { return requestMono("GET", "/username", expectedStatus); }
+
+ public Mono getSecurityContextUsernameByPathSecured(int expectedStatus) { return requestMono("GET", "/path-username", expectedStatus); }
+
+ public Mono getMappingError404() {
+ return requestMono("GET", "/error-404", 404);
+ }
+
+ public Mono getHandlerError() {
+ return requestMono("GET", "/error-handler", 500);
+ }
+
+ public Mono getMonoError() {
+ return requestMono("GET", "/error-mono", 500);
+ }
+
+ public Mono getMonoEmpty() {
+ return requestMono("GET", "/empty-mono", 200);
+ }
+
+ public Mono methodMapping(String method) {
+ return requestMono(method, "/hello-mapping", 200);
+ }
+
+ public Mono withPathParameter(String param) {
+ return requestMono("GET", uriBuilder -> uriBuilder.path("/with-parameters/{id}").build(param), 200);
+ }
+
+ // nested routes, only relevant for functional routing
+ public Mono nested(String method) {
+ return requestMono(method, "/nested", 200);
+ }
+
+ public Mono duration(long durationMillis) {
+ return requestMono("GET", uriBuilder -> uriBuilder.path("/duration").queryParam("duration", durationMillis).build(), 200);
+ }
+
+ // returned as flux, but will only produce one element
+ public Flux childSpans(int count, long durationMillis, long delay) {
+ return requestFlux(uriBuilder -> uriBuilder.path("/child-flux").queryParam("duration", durationMillis).queryParam("count", count).queryParam("delay", delay).build());
+ }
+
+ public List webSocketPingPong(int count) {
+
+ // taken from https://github.com/spring-projects/spring-framework/blob/main/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java
+ Flux input = Flux.range(1, count).map(i -> "ping-" + i);
+
+ AtomicReference> actualRef = new AtomicReference<>();
+ this.wsClient.execute(URI.create(wsBaseUri + "/ping"), session ->
+ session.send(input.map(session::textMessage))
+ .thenMany(session.receive()
+ .take(count)
+ .map(WebSocketMessage::getPayloadAsText))
+ .collectList()
+ .doOnNext(actualRef::set)
+ .then())
+ .block(Duration.ofMillis(1000));
+
+ Objects.requireNonNull(actualRef.get());
+ return actualRef.get();
+ }
+
+ // returned as a stream of elements
+ public Flux> childSpansSSE(int count, long durationMillis, long delay) {
+ return requestFluxSSE(uriBuilder -> uriBuilder.path("/child-flux/sse").queryParam("duration", durationMillis).queryParam("count",count).queryParam("delay", delay).build());
+ }
+
+ // only relevant for annotated controller
+ public Mono customTransactionName() {
+ return requestMono("GET", "/custom-transaction-name", 200);
+ }
+
+ @Deprecated
+ public Mono requestMono(String method, String path, int expectedStatus) {
+ return requestMono(method, uriBuilder -> uriBuilder.path(path).build(), expectedStatus);
+ }
+
+ public Mono requestMono(String method, Function uriFunction, int expectedStatus) {
+ Mono request = request(method, uriFunction, expectedStatus)
+ .bodyToMono(String.class)
+ .doOnError((throwable) -> logger.error("Exception occurred during requesting with exception class [{}]", throwable.getClass().getCanonicalName()))
+ .publishOn(clientScheduler);
+ return logEnabled ? request.log(logger) : request;
+ }
+
+ public void sampleRequests() {
+
+ Duration timeout = Duration.ofMillis(1000);
+
+ getHelloMono().block(timeout);
+ getMappingError404().onErrorResume(e -> Mono.empty()).block(timeout);
+ getHandlerError().onErrorResume(e -> Mono.empty()).block(timeout);
+ getMonoError().onErrorResume(e -> Mono.empty()).block(timeout);
+
+ Stream.of("GET", "POST", "PUT", "DELETE").forEach(method -> methodMapping(method).block(timeout));
+
+ withPathParameter("12345").block(timeout);
+
+ childSpans(5, 3, 1)
+ .blockLast(timeout);
+
+ childSpansSSE(5, 3, 1)
+ .blockLast(timeout);
+
+ webSocketPingPong(5);
+ }
+
+ private Flux requestFlux(Function uriFunction) {
+ Flux request = request("GET", uriFunction, 200)
+ .bodyToFlux(String.class)
+ .publishOn(clientScheduler);
+ return logEnabled ? request.log(logger) : request;
+ }
+
+ private Flux> requestFluxSSE(Function uriFunction) {
+
+ // required to get proper return generic type
+ ParameterizedTypeReference> type = new ParameterizedTypeReference<>() {
+ };
+
+ Flux> request = request("GET", uriFunction, 200)
+ .bodyToFlux(type)
+ .publishOn(clientScheduler);
+
+ return logEnabled ? request.log(logger) : request;
+ }
+
+ private WebClient.ResponseSpec request(String method, Function uriFunction, int expectedStatus) {
+ return client.method(HttpMethod.valueOf(method))
+ .uri(uriFunction)
+ .accept(MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON)
+ .headers(httpHeaders -> httpHeaders.addAll(headers))
+ .cookies(httpCookies -> httpCookies.addAll(cookies))
+ .retrieve()
+ .onRawStatus(status -> status != expectedStatus, r -> Mono.error(new IllegalStateException("unexpected response status")));
+ }
+
+ @Override
+ public String toString() {
+ return String.format("GreetingWebClient [%s]", pathPrefix);
+ }
+
+ public String getPathPrefix() {
+ return pathPrefix;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public boolean useFunctionalEndpoint() {
+ return useFunctionalEndpoint;
+ }
+
+ public void setHeader(String name, String value) {
+ headers.add(name, value);
+ }
+
+ public void setCookie(String name, String value) {
+ cookies.add(name, value);
+ }
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxApplication.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxApplication.java
new file mode 100644
index 0000000000..662b20835c
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxApplication.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux.testapp;
+
+import co.elastic.apm.agent.sdk.logging.Logger;
+import co.elastic.apm.agent.sdk.logging.LoggerFactory;
+import org.springframework.boot.Banner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.ConfigurableApplicationContext;
+
+import javax.net.ServerSocketFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@SpringBootApplication
+public class WebFluxApplication {
+
+ private static final Logger logger = LoggerFactory.getLogger(WebFluxApplication.class);
+
+ private static final int DEFAULT_PORT = 8080;
+
+ public static void main(String[] args) {
+ List arguments = Arrays.asList(args);
+ int port = Integer.parseInt(parseOption(arguments, "--port", Integer.toString(DEFAULT_PORT)));
+ String server = parseOption(arguments, "--server", "netty");
+ int count = Integer.parseInt(parseOption(arguments, "--count", "0"));
+
+ boolean isClient = arguments.contains("--client");
+ boolean logEnabled = arguments.contains("--log");
+
+ boolean isBench = arguments.contains("--benchmark");
+ boolean waitForKey = arguments.contains("--wait");
+
+ if (isBench) {
+ // start the whole server & client
+ App app = run(port, server, logEnabled);
+
+ count = Math.max(count, 100); // any smaller benchmark is not meaningful
+ try {
+ if (waitForKey) {
+ waitForKey();
+ }
+
+ // warmup has same duration as benchmark to help having consistent results
+ logger.info("warmup requests ({})", count);
+ doSampleRequests(app::getClient, count);
+ logger.info("warmup complete");
+
+ if (waitForKey) {
+ waitForKey();
+ }
+
+ logger.info("start benchmark ({})", count);
+ doSampleRequests(app::getClient, count);
+ logger.info("benchmark complete");
+
+ if (waitForKey) {
+ waitForKey();
+ }
+
+
+ } finally {
+ app.close();
+ }
+ } else if (isClient) {
+ count = Math.max(count, 1);
+ doSampleRequests(useFunc -> new GreetingWebClient("localhost", port, useFunc, logEnabled), count);
+ } else {
+
+ try (App app = run(port, server, logEnabled)) {
+ if (count > 0) {
+ doSampleRequests(useFunc -> new GreetingWebClient("localhost", port, useFunc, logEnabled), count);
+ }
+
+ // leave server running
+ waitForKey();
+ }
+ }
+
+ }
+
+ private static void doSampleRequests(Function clientProvider, int count) {
+ List clients = Stream.of(true, false)
+ .map(clientProvider)
+ .collect(Collectors.toList());
+
+ long start = System.currentTimeMillis();
+ int statusFrequency = count <= 10 ? 1 : count / 10;
+
+ long timeLastUpdate = start;
+ int countLastUpdate = 0;
+
+ for (int i = 1; i <= count; i++) {
+ clients.forEach(GreetingWebClient::sampleRequests);
+
+ if (i % statusFrequency == 0 || i == count) {
+ long now = System.currentTimeMillis();
+ long timeSpent = now - timeLastUpdate;
+ int countSinceLastUpdate = i - countLastUpdate;
+ System.out.printf("progress = %1$6.02f %% (%2$d), count = %3$d in %4$d ms, average = %5$.02f ms%n",
+ i * 100d / count,
+ i,
+ countSinceLastUpdate,
+ timeSpent,
+ timeSpent * 1d / countSinceLastUpdate
+ );
+ timeLastUpdate = now;
+ countLastUpdate = i;
+
+ if (i == count) {
+ long totalTime = System.currentTimeMillis() - start;
+ System.out.printf("total count = %d in %d ms, average = %.02f%n", count, totalTime, 1.0D * totalTime / count);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Stores application state, using inner-class to avoid interfering with Spring boot application
+ */
+ public static class App implements Closeable {
+ private final int port;
+ private final ConfigurableApplicationContext context;
+ private final boolean logEnabled;
+
+ private App(int port, ConfigurableApplicationContext context, boolean logEnabled) {
+ this.port = port;
+ this.context = context;
+ this.logEnabled = logEnabled;
+ }
+
+ public GreetingWebClient getClient(boolean useFunctional) {
+ return new GreetingWebClient("localhost", port, useFunctional, logEnabled);
+ }
+
+ @Override
+ public void close() {
+ context.close();
+ }
+ }
+
+ /**
+ * Starts application on provided port
+ *
+ * @param port port to use
+ * @param server server implementation to use
+ * @param logEnabled true to enable client and server logging (very verbose, better for debugging)
+ * @return application context
+ */
+ public static App run(int port, String server, boolean logEnabled) {
+ if (port < 0) {
+ port = getAvailableRandomPort();
+ }
+
+ SpringApplication app = new SpringApplication(WebFluxApplication.class);
+ Map appProperties = new HashMap<>();
+ app.setBannerMode(Banner.Mode.OFF);
+ appProperties.put("server.port", port);
+ appProperties.put("server", server);
+ appProperties.put("logging.level.org.springframework", logEnabled ? "ERROR" : "OFF");
+ app.setDefaultProperties(appProperties);
+
+ return new App(port, app.run(), logEnabled);
+ }
+
+ private static String parseOption(List arguments, String option, String defaultValue) {
+ int index = arguments.indexOf(option);
+ if (index < 0 || arguments.size() <= index) {
+ return defaultValue;
+ }
+ return arguments.get(index + 1);
+ }
+
+ private static int getAvailableRandomPort() {
+ int port;
+ try (ServerSocket socket = ServerSocketFactory.getDefault().createServerSocket(0, 1, InetAddress.getByName("localhost"))) {
+ port = socket.getLocalPort();
+ } catch (IOException e) {
+ port = DEFAULT_PORT;
+ }
+ return port;
+ }
+
+ private static void waitForKey() {
+ System.out.println("hit any key to continue");
+ try {
+ System.in.read();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxConfig.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxConfig.java
new file mode 100644
index 0000000000..db039b37d4
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxConfig.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux.testapp;
+
+
+
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.reactor.netty.NettyReactiveWebServerFactory;
+import org.springframework.boot.reactor.netty.NettyRouteProvider;
+import org.springframework.boot.reactor.netty.NettyServerCustomizer;
+import org.springframework.boot.tomcat.TomcatConnectorCustomizer;
+import org.springframework.boot.tomcat.TomcatContextCustomizer;
+import org.springframework.boot.tomcat.TomcatProtocolHandlerCustomizer;
+import org.springframework.boot.tomcat.reactive.TomcatReactiveWebServerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.client.ReactorResourceFactory;
+import org.springframework.security.config.Customizer;
+import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
+import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
+import org.springframework.security.config.web.server.ServerHttpSecurity;
+import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
+import org.springframework.security.core.userdetails.User;
+import org.springframework.security.web.server.SecurityWebFilterChain;
+import org.springframework.web.reactive.HandlerMapping;
+import org.springframework.web.reactive.config.EnableWebFlux;
+import org.springframework.web.reactive.config.WebFluxConfigurer;
+import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
+import org.springframework.web.reactive.socket.WebSocketHandler;
+import org.springframework.web.reactive.socket.WebSocketMessage;
+import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
+import org.springframework.web.reactive.socket.server.WebSocketService;
+import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
+import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
+import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
+import org.springframework.web.reactive.socket.server.upgrade.StandardWebSocketUpgradeStrategy;
+import reactor.core.publisher.Flux;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Configuration
+@EnableWebFlux
+@EnableWebFluxSecurity
+@EnableReactiveMethodSecurity
+public class WebFluxConfig implements WebFluxConfigurer {
+
+ // those server methods are just plain copies of ReactiveWebServerFactoryConfiguration inner classes
+ // with different annotations to allow selecting server implementation through properties and not only
+ // classpath availability
+
+ // Tomcat
+
+ @Bean
+ @ConditionalOnProperty(name = "server", havingValue = "tomcat")
+ TomcatReactiveWebServerFactory tomcatReactiveWebServerFactory(
+ ObjectProvider connectorCustomizers,
+ ObjectProvider contextCustomizers,
+ ObjectProvider> protocolHandlerCustomizers) {
+ TomcatReactiveWebServerFactory factory = new TomcatReactiveWebServerFactory();
+ factory.getConnectorCustomizers()
+ .addAll(connectorCustomizers.orderedStream().collect(Collectors.toList()));
+ factory.getContextCustomizers()
+ .addAll(contextCustomizers.orderedStream().collect(Collectors.toList()));
+ factory.getProtocolHandlerCustomizers()
+ .addAll(protocolHandlerCustomizers.orderedStream().collect(Collectors.toList()));
+ return factory;
+ }
+
+ // Tomcat Websocket support
+
+ @Bean
+ @Qualifier("requestUpdateStrategy")
+ @ConditionalOnProperty(name = "server", havingValue = "tomcat")
+ RequestUpgradeStrategy tomcatRequestUpgradeStrategy() {
+ return new StandardWebSocketUpgradeStrategy();
+ }
+
+ // Netty
+
+ @Bean
+ @ConditionalOnMissingBean
+ ReactorResourceFactory reactorServerResourceFactory() {
+ return new ReactorResourceFactory();
+ }
+
+ @Bean
+ @ConditionalOnProperty(name = "server", havingValue = "netty", matchIfMissing = true)
+ NettyReactiveWebServerFactory nettyReactiveWebServerFactory(@Qualifier("reactorServerResourceFactory") ReactorResourceFactory resourceFactory,
+ ObjectProvider routes, ObjectProvider serverCustomizers) {
+ NettyReactiveWebServerFactory serverFactory = new NettyReactiveWebServerFactory();
+ serverFactory.setResourceFactory(resourceFactory);
+ routes.orderedStream().forEach(serverFactory::addRouteProviders);
+ serverFactory.getServerCustomizers().addAll(serverCustomizers.orderedStream().collect(Collectors.toList()));
+ return serverFactory;
+ }
+
+ // Netty Websocket support
+
+ @Bean
+ @Qualifier("requestUpdateStrategy")
+ @ConditionalOnProperty(name = "server", havingValue = "netty")
+ RequestUpgradeStrategy nettyRequestUpgradeStrategy() {
+ return new ReactorNettyRequestUpgradeStrategy();
+ }
+
+
+ // Generic Websocket support
+
+ @Bean
+ public HandlerMapping webSocketHandlerMapping() {
+ SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
+ handlerMapping.setOrder(1);
+ handlerMapping.setUrlMap(Map.of("/ping", pingPongHandler()));
+ return handlerMapping;
+ }
+
+ private static WebSocketHandler pingPongHandler() {
+ return session -> {
+ Flux output = session.receive()
+ .map(msg -> session.textMessage(msg.getPayloadAsText().replaceFirst("ping", "pong")));
+ return session.send(output);
+ };
+ }
+
+ @Bean
+ public WebSocketHandlerAdapter handlerAdapter(WebSocketService webSocketService) {
+ return new WebSocketHandlerAdapter(webSocketService);
+ }
+
+ @Bean
+ public WebSocketService webSocketService(@Qualifier("requestUpdateStrategy") RequestUpgradeStrategy upgradeStrategy) {
+ return new HandshakeWebSocketService(upgradeStrategy);
+ }
+
+ @Bean
+ public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
+ return http
+ .csrf(ServerHttpSecurity.CsrfSpec::disable)
+ .authorizeExchange(e ->
+ e.pathMatchers("/annotated/path-username").hasAnyAuthority("ROLE_USER")
+ .pathMatchers("/functional/path-username", "/functional/username", "/functional/preauthorized").hasAnyAuthority("ROLE_USER")
+ .pathMatchers("/**").permitAll())
+ .httpBasic(Customizer.withDefaults())
+ .build();
+ }
+
+ @Bean
+ public MapReactiveUserDetailsService userDetailsService() {
+ return new MapReactiveUserDetailsService(
+ User.withDefaultPasswordEncoder()
+ .username("elastic")
+ .password("changeme")
+ .roles("USER")
+ .build());
+ }
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/package-info.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/package-info.java
new file mode 100644
index 0000000000..e0c856b687
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@NonnullApi
+package co.elastic.apm.agent.springwebflux.testapp;
+
+import co.elastic.apm.agent.sdk.NonnullApi;
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/AnnotatedEndpointTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/AnnotatedEndpointTest.java
new file mode 100644
index 0000000000..a060bf409a
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/AnnotatedEndpointTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux.testapp;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.web.server.LocalServerPort;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+@ExtendWith(SpringExtension.class)
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+public class AnnotatedEndpointTest extends ApplicationTest {
+
+ @LocalServerPort
+ private int serverPort;
+
+ @Override
+ protected GreetingWebClient createClient() {
+
+ return new GreetingWebClient("localhost", serverPort, false, true);
+ }
+
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/ApplicationTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/ApplicationTest.java
new file mode 100644
index 0000000000..3da2354516
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/ApplicationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux.testapp;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.test.StepVerifier;
+
+import java.util.function.Predicate;
+
+public abstract class ApplicationTest {
+
+ protected GreetingWebClient client;
+
+ protected abstract GreetingWebClient createClient();
+
+ @BeforeEach
+ void beforeEach() {
+ // test with functional endpoints only, testing both functional and annotated controller should be properly
+ // covered by instrumentation tests.
+ client = createClient();
+ }
+
+ @Test
+ void helloMono() {
+ StepVerifier.create(client.getHelloMono())
+ .expectNext("Hello, Spring!")
+ .verifyComplete();
+ }
+
+ @Test
+ void mappingError() {
+ StepVerifier.create(client.getMappingError404())
+ .expectErrorMatches(expectClientError(404))
+ .verify();
+ }
+
+ @Test
+ void handlerException() {
+ StepVerifier.create(client.getHandlerError())
+ .expectErrorMatches(expectClientError(500))
+ .verify();
+ }
+
+ @Test
+ void handlerMonoError() {
+ StepVerifier.create(client.getMonoError())
+ .expectErrorMatches(expectClientError(500))
+ .verify();
+ }
+
+ @Test
+ void handlerMonoEmpty() {
+ StepVerifier.create(client.getMonoEmpty())
+ .verifyComplete();
+ }
+
+ @ParameterizedTest
+ @CsvSource({"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS", "TRACE"})
+ void methodMapping(String method) {
+ var verifier = StepVerifier.create(client.methodMapping(method));
+ if ("HEAD".equals(method)) {
+ verifier.verifyComplete();
+ } else {
+ verifier.expectNext(String.format("Hello, %s!", method))
+ .verifyComplete();
+ }
+ }
+
+ @Test
+ void withPathParameter() {
+ StepVerifier.create(client.withPathParameter("42"))
+ .expectNext("Hello, 42!")
+ .verifyComplete();
+ }
+
+ @Test
+ void withChildrenSpans() {
+ StepVerifier.create(client.childSpans(3, 50, 10))
+ .expectNext("child 1child 2child 3")
+ .verifyComplete();
+ }
+
+ @Test
+ void withChildrenSpansSSE() {
+ StepVerifier.create(client.childSpansSSE(3, 50, 10))
+ .expectNextMatches(checkSSE(1))
+ .expectNextMatches(checkSSE(2))
+ .expectNextMatches(checkSSE(3))
+ .verifyComplete();
+ }
+
+ @Test
+ void customTransactionName() {
+ StepVerifier.create(client.customTransactionName())
+ .expectNext("Hello, transaction=null!")
+ .verifyComplete();
+ }
+
+ @Test
+ void duration() {
+ StepVerifier.create(client.duration(42))
+ .expectNext("Hello, duration=42!")
+ .verifyComplete();
+ }
+
+ private static Predicate> checkSSE(final int index) {
+ return sse -> {
+ String data = sse.data();
+ if (data == null) {
+ return false;
+ }
+ return data.equals(String.format("child %d", index));
+ };
+ }
+
+ private Predicate expectClientError(int expectedStatus) {
+ return error -> (error instanceof WebClientResponseException)
+ && ((WebClientResponseException) error).getStatusCode().value() == expectedStatus;
+ }
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/FunctionalEndpointTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/FunctionalEndpointTest.java
new file mode 100644
index 0000000000..e77c2c8eee
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/FunctionalEndpointTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package co.elastic.apm.agent.springwebflux.testapp;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.web.server.LocalServerPort;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import reactor.test.StepVerifier;
+
+@ExtendWith(SpringExtension.class)
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+public class FunctionalEndpointTest extends ApplicationTest {
+
+ @LocalServerPort
+ private int serverPort;
+
+ @Override
+ protected GreetingWebClient createClient() {
+ return new GreetingWebClient("localhost", serverPort, true, true);
+ }
+
+ // nested routes are only available on functional endpoint
+
+ @ParameterizedTest
+ @CsvSource({"GET", "POST"})
+ void nestedRoutes(String method) {
+ StepVerifier.create(client.nested(method))
+ .expectNext(String.format("Hello, nested %s!", method))
+ .verifyComplete();
+
+ }
+}
diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/resources/application.properties b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/resources/application.properties
new file mode 100644
index 0000000000..be4a7623f7
--- /dev/null
+++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/resources/application.properties
@@ -0,0 +1,6 @@
+# set to DEBUG for easier test application debugging
+logging.level.root=DEBUG
+#
+
+# allows to set current server implementation: 'tomcat' or 'netty'
+server=netty
diff --git a/apm-agent-plugins/apm-spring-webflux/pom.xml b/apm-agent-plugins/apm-spring-webflux/pom.xml
index 368843c2e8..8e65d012de 100644
--- a/apm-agent-plugins/apm-spring-webflux/pom.xml
+++ b/apm-agent-plugins/apm-spring-webflux/pom.xml
@@ -19,15 +19,18 @@
2.7.16
3.5.7
+ 4.1.0
apm-spring-webflux-plugin
apm-spring-webclient-plugin
apm-spring-webflux-testapp
+ apm-spring-webflux-testapp-spring7
apm-spring-webflux-spring5
apm-spring-webflux-common
apm-spring-webflux-common-spring5
+ apm-spring-webflux-plugin-spring7