From 58a76703898d59989b248120738ab79475e8988b Mon Sep 17 00:00:00 2001 From: jihad540 Date: Wed, 17 Jun 2026 10:19:33 +0200 Subject: [PATCH 1/4] fix(webflux): fix NoSuchMethodError when using spring framework 7 with webflux --- .../co/elastic/apm/agent/springwebflux/WebfluxHelper.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java index 3cdcdc3fb1..2d86281026 100644 --- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java @@ -58,7 +58,9 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.BiFunction; +import java.util.stream.Collectors; import static co.elastic.apm.agent.tracer.AbstractSpan.PRIORITY_HIGH_LEVEL_FRAMEWORK; import static co.elastic.apm.agent.tracer.AbstractSpan.PRIORITY_LOW_LEVEL_FRAMEWORK; @@ -313,7 +315,11 @@ private static void fillResponse(Transaction transaction, ServerWebExchange e } private static void copyHeaders(HttpHeaders source, PotentiallyMultiValuedMap destination) { - for (Map.Entry> header : source.entrySet()) { + // This is the only way I could find to make a multiValueMap without having access + // to the api from spring framework 7 that will work across versions 5, 6 and 7 + Map> multiValueHeaderMap = source.toSingleValueMap().keySet() + .parallelStream().collect(Collectors.toMap(e -> e, key -> Objects.requireNonNull(source.get(key)))); + for (Map.Entry> header : multiValueHeaderMap.entrySet()) { for (String value : header.getValue()) { destination.add(header.getKey(), value); } From 2219b0e3e82a869e07196bc87872da40b5ee2b44 Mon Sep 17 00:00:00 2001 From: jihad540 Date: Wed, 17 Jun 2026 10:51:59 +0200 Subject: [PATCH 2/4] fix(webflux): fix compilation --- .../co/elastic/apm/agent/springwebflux/WebfluxHelper.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java index 2d86281026..e01ccb31f0 100644 --- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java @@ -60,6 +60,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; import static co.elastic.apm.agent.tracer.AbstractSpan.PRIORITY_HIGH_LEVEL_FRAMEWORK; @@ -318,7 +319,12 @@ private static void copyHeaders(HttpHeaders source, PotentiallyMultiValuedMap de // This is the only way I could find to make a multiValueMap without having access // to the api from spring framework 7 that will work across versions 5, 6 and 7 Map> multiValueHeaderMap = source.toSingleValueMap().keySet() - .parallelStream().collect(Collectors.toMap(e -> e, key -> Objects.requireNonNull(source.get(key)))); + .parallelStream().collect(Collectors.toMap(Function.identity(), new Function<>() { + @Override + public List apply(String key) { + return Objects.requireNonNull(source.get(key)); + } + })); for (Map.Entry> header : multiValueHeaderMap.entrySet()) { for (String value : header.getValue()) { destination.add(header.getKey(), value); From b83d4f17801d6040e18083dec859393536e75bfd Mon Sep 17 00:00:00 2001 From: jihad540 Date: Fri, 19 Jun 2026 14:21:47 +0200 Subject: [PATCH 3/4] fix(webflux): fix compilation errors + provide spring 7 testing module --- .../apm-spring-webflux-plugin-spring7/pom.xml | 96 +++++++ .../Spring7HeaderGetterTest.java | 30 +++ ...ng7ServerAnnotatedInstrumentationTest.java | 30 +++ ...g7ServerFunctionalInstrumentationTest.java | 29 ++ .../Spring7ServletContainerTest.java | 22 ++ .../apm/agent/springwebflux/Spring7Test.java | 57 ++++ ...ng7WebSocketServerInstrumentationTest.java | 23 ++ .../agent/springwebflux/WebfluxHelper.java | 34 ++- .../AbstractServerInstrumentationTest.java | 15 +- .../pom.xml | 122 +++++++++ .../testapp/GreetingAnnotated.java | 197 ++++++++++++++ .../testapp/GreetingFunctional.java | 143 ++++++++++ .../testapp/GreetingHandler.java | 114 ++++++++ .../testapp/GreetingWebClient.java | 253 ++++++++++++++++++ .../testapp/WebFluxApplication.java | 220 +++++++++++++++ .../springwebflux/testapp/WebFluxConfig.java | 176 ++++++++++++ .../springwebflux/testapp/package-info.java | 22 ++ .../testapp/AnnotatedEndpointTest.java | 39 +++ .../testapp/ApplicationTest.java | 141 ++++++++++ .../testapp/FunctionalEndpointTest.java | 51 ++++ .../src/test/resources/application.properties | 6 + apm-agent-plugins/apm-spring-webflux/pom.xml | 3 + 22 files changed, 1814 insertions(+), 9 deletions(-) create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/pom.xml create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7HeaderGetterTest.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerAnnotatedInstrumentationTest.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerFunctionalInstrumentationTest.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServletContainerTest.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7Test.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7WebSocketServerInstrumentationTest.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/pom.xml create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingAnnotated.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingFunctional.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingHandler.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingWebClient.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxApplication.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxConfig.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/package-info.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/AnnotatedEndpointTest.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/ApplicationTest.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/FunctionalEndpointTest.java create mode 100644 apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/resources/application.properties diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/pom.xml b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/pom.xml new file mode 100644 index 0000000000..3d2504b82e --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/pom.xml @@ -0,0 +1,96 @@ + + + 4.0.0 + + + co.elastic.apm + apm-spring-webflux + 1.56.1-SNAPSHOT + + + apm-spring-webflux-plugin-spring7 + ${project.groupId}:${project.artifactId} + + + + ${project.basedir}/../../.. + + true + + + + + + org.springframework.boot + spring-boot-dependencies + ${version.spring-boot-4} + pom + import + + + + + + + ${project.groupId} + apm-spring-webflux-spring5 + ${project.version} + + + + + ${project.groupId} + apm-spring-webflux-testapp-spring7 + ${project.version} + test + + + org.springframework + spring-web + provided + + + io.projectreactor + reactor-test + test + + + co.elastic.apm + apm-spring-webflux-spring5 + ${project.version} + test + test-jar + + + + + co.elastic.apm + apm-reactor-plugin + ${project.version} + test + + + + co.elastic.apm + apm-reactor-plugin + ${project.version} + test + test-jar + + + + org.apache.ivy + ivy + test + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + + diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7HeaderGetterTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7HeaderGetterTest.java new file mode 100644 index 0000000000..4327efa964 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7HeaderGetterTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux; + +public class Spring7HeaderGetterTest extends Spring7Test { + + public Spring7HeaderGetterTest() { + super(Impl.class); + } + + public static class Impl extends HeaderGetterTest { + + } +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerAnnotatedInstrumentationTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerAnnotatedInstrumentationTest.java new file mode 100644 index 0000000000..5bf897f97a --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerAnnotatedInstrumentationTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux; + + +public class Spring7ServerAnnotatedInstrumentationTest extends Spring7Test { + + public Spring7ServerAnnotatedInstrumentationTest() { + super(Impl.class); + } + + public static class Impl extends ServerAnnotatedInstrumentationTest { + } +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerFunctionalInstrumentationTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerFunctionalInstrumentationTest.java new file mode 100644 index 0000000000..725857c371 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServerFunctionalInstrumentationTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux; + +public class Spring7ServerFunctionalInstrumentationTest extends Spring7Test { + + public Spring7ServerFunctionalInstrumentationTest() { + super(Impl.class); + } + + public static class Impl extends ServerFunctionalInstrumentationTest { + } +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServletContainerTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServletContainerTest.java new file mode 100644 index 0000000000..9c9790cba8 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7ServletContainerTest.java @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux; + +public class Spring7ServletContainerTest extends ServletContainerTest { +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7Test.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7Test.java new file mode 100644 index 0000000000..cccfba9cf5 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7Test.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux; + +import org.junit.jupiter.api.Test; +import org.junit.platform.launcher.Launcher; +import org.junit.platform.launcher.LauncherDiscoveryRequest; +import org.junit.platform.launcher.core.LauncherDiscoveryRequestBuilder; +import org.junit.platform.launcher.core.LauncherFactory; +import org.junit.platform.launcher.listeners.SummaryGeneratingListener; +import org.junit.platform.launcher.listeners.TestExecutionSummary; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.platform.engine.discovery.DiscoverySelectors.selectClass; + +abstract class Spring7Test { + private Class actualTestClass; + + public Spring7Test(Class testClazz) { + this.actualTestClass = testClazz; + } + + @Test + public void runTests() { + LauncherDiscoveryRequest request = LauncherDiscoveryRequestBuilder.request() + .selectors(selectClass(actualTestClass)) + .build(); + Launcher launcher = LauncherFactory.create(); + SummaryGeneratingListener listener = new SummaryGeneratingListener(); + launcher.registerTestExecutionListeners(listener); + launcher.execute(request); + + for (TestExecutionSummary.Failure failure : listener.getSummary().getFailures()) { + System.out.println(failure); + failure.getException().printStackTrace(); + } + assertThat(listener.getSummary().getTestsFailedCount()) + .describedAs("at least one test failure reported, see stack trace for investigation") + .isZero(); + } +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7WebSocketServerInstrumentationTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7WebSocketServerInstrumentationTest.java new file mode 100644 index 0000000000..c31527231f --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-plugin-spring7/src/test/java/co/elastic/apm/agent/springwebflux/Spring7WebSocketServerInstrumentationTest.java @@ -0,0 +1,23 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux; + + +public class Spring7WebSocketServerInstrumentationTest extends WebSocketServerInstrumentationTest { +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java index e01ccb31f0..6274f38d0b 100644 --- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/WebfluxHelper.java @@ -316,15 +316,12 @@ private static void fillResponse(Transaction transaction, ServerWebExchange e } private static void copyHeaders(HttpHeaders source, PotentiallyMultiValuedMap destination) { - // This is the only way I could find to make a multiValueMap without having access + // This ensures we can make a MultiValueMap without having access // to the api from spring framework 7 that will work across versions 5, 6 and 7 + Java7IdentityFunction identityFunction = new Java7IdentityFunction(); + MultiValueCollectorFunction multiValueCollectorFunction = new MultiValueCollectorFunction(source); Map> multiValueHeaderMap = source.toSingleValueMap().keySet() - .parallelStream().collect(Collectors.toMap(Function.identity(), new Function<>() { - @Override - public List apply(String key) { - return Objects.requireNonNull(source.get(key)); - } - })); + .stream().collect(Collectors.toMap(identityFunction, multiValueCollectorFunction)); for (Map.Entry> header : multiValueHeaderMap.entrySet()) { for (String value : header.getValue()) { destination.add(header.getKey(), value); @@ -340,4 +337,27 @@ private static void copyCookies(MultiValueMap source, Potent } } + // Due to -source 7, lambdas cannot be used for the collector + private static class MultiValueCollectorFunction implements Function> { + private final HttpHeaders source; + + public MultiValueCollectorFunction(HttpHeaders source) { + this.source = source; + } + + @Override + public List apply(String key) { + return Objects.requireNonNull(source.getValuesAsList(key)); + } + } + + // Due to -source 7, static method invocations cannot be used + private static class Java7IdentityFunction implements Function { + public Java7IdentityFunction() {} + + @Override + public String apply(String key) { + return key; + } + } } diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/test/java/co/elastic/apm/agent/springwebflux/AbstractServerInstrumentationTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/test/java/co/elastic/apm/agent/springwebflux/AbstractServerInstrumentationTest.java index 10a46d32e4..15fa1ec200 100644 --- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/test/java/co/elastic/apm/agent/springwebflux/AbstractServerInstrumentationTest.java +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/test/java/co/elastic/apm/agent/springwebflux/AbstractServerInstrumentationTest.java @@ -39,11 +39,13 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.springframework.core.SpringVersion; +import org.springframework.http.HttpStatus; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Hooks; import reactor.test.StepVerifier; +import java.lang.reflect.Field; import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -140,8 +142,8 @@ private void hello(boolean expectHeaders) { .describedAs("non-standard request headers should be captured") .isEqualTo("12345"); - assertThat(headers.getFirst("Accept")) - .isEqualTo("text/plain, application/json"); + assertThat(headers.getAll("Accept")) + .containsAll(List.of("text/plain" , "application/json")); assertThat(request.getCookies() .getFirst("cookie")) @@ -209,6 +211,15 @@ private static int getStatusCode(WebClientResponseException exception) { } catch (Exception | Error e) { // silently ignored } + try { + // Due to the many breaking changes in the spring framework API in version 7, we have to access the status code through reflection. + // This will check if it can retrieve the int code via the HttpStatus.value() method, as getRawStatusCode() has been removed in 7 + Field statusCode = exception.getClass().getSuperclass().getDeclaredField("statusCode"); + statusCode.setAccessible(true); + return ((HttpStatus)statusCode.get(exception)).value(); + } catch (Exception | Error e) { + // silently ignored + } try { return exception.getRawStatusCode(); } catch (Exception | Error e) { diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/pom.xml b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/pom.xml new file mode 100644 index 0000000000..9adb95c8bd --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/pom.xml @@ -0,0 +1,122 @@ + + + 4.0.0 + + + co.elastic.apm + apm-spring-webflux + 1.56.1-SNAPSHOT + + + apm-spring-webflux-testapp-spring7 + ${project.groupId}:${project.artifactId} + + + + ${project.basedir}/../../.. + + + 17 + 17 + + + true + + + + + + + org.springframework.boot + spring-boot-dependencies + ${version.spring-boot-4} + pom + import + + + + + + + ${project.groupId} + apm-agent-core + ${project.version} + + + + org.springframework.boot + spring-boot-starter-webflux + + + + ch.qos.logback + logback-classic + + + + org.apache.logging.log4j + log4j-to-slf4j + + + + + + + org.springframework.boot + spring-boot-starter-tomcat + + + + org.springframework.boot + spring-boot-starter-security + + + + + com.fasterxml.jackson.core + jackson-databind + + + org.slf4j + slf4j-simple + + + + org.springframework.boot + spring-boot-starter-test + test + + + + io.projectreactor + reactor-test + test + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + 4.1.0 + + co.elastic.apm.agent.springwebflux.testapp.WebFluxApplication + + standalone + + + + + repackage + + + + + + + + diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingAnnotated.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingAnnotated.java new file mode 100644 index 0000000000..338cd24982 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingAnnotated.java @@ -0,0 +1,197 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux.testapp; + +import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.impl.transaction.IdImpl; +import co.elastic.apm.agent.tracer.AbstractSpan; +import co.elastic.apm.agent.tracer.GlobalTracer; +import co.elastic.apm.agent.impl.transaction.TransactionImpl; +import co.elastic.apm.agent.sdk.logging.Logger; +import co.elastic.apm.agent.sdk.logging.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PatchMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Provides Webflux annotated endpoint + */ +@RestController +@RequestMapping(value = "/annotated", produces = MediaType.TEXT_PLAIN_VALUE) +public class GreetingAnnotated { + + private static final Logger log = LoggerFactory.getLogger(GreetingAnnotated.class); + + final GreetingHandler greetingHandler; + + @Autowired + public GreetingAnnotated(GreetingHandler greetingHandler) { + this.greetingHandler = greetingHandler; + } + + @RequestMapping("/hello") + public Mono getHello(@RequestParam(value = "name", required = false) @Nullable String name) { + return greetingHandler.helloMessage(name); + } + + @PreAuthorize("hasAuthority('ROLE_USER')") + @RequestMapping("/preauthorized") + public Mono getPreauthorized() { + return greetingHandler.helloMessage("elastic"); + } + + @PreAuthorize("hasAuthority('ROLE_USER')") + @RequestMapping("/username") + public Mono getSecurityContextUsername() { + return greetingHandler.getUsernameFromContext(); + } + + // protected by SecurityWebFilterChain#pathMatchers + @RequestMapping("/path-username") + public Mono getSecurityContextUsernameByPathSecured() { + return greetingHandler.getUsernameFromContext(); + } + + @RequestMapping("/error-handler") + public Mono handlerError() { + // using delayed exception here allows to ensure that the exception handler is properly + // executed, as its execution is part of the "dispatch" phase and is outside of "handler" + // + // this does not apply for functional definitions as the exception handler is directly part of the "handler" + // which is wrapped into the "dispatcher" (which we instrument). + return greetingHandler.delayedException(); + } + + @RequestMapping("/error-mono") + public Mono monoError() { + return greetingHandler.monoError(); + } + + @RequestMapping("/empty-mono") + public Mono monoEmpty() { + return greetingHandler.monoEmpty(); + } + + @ExceptionHandler + public ResponseEntity handleException(RuntimeException e) { + return ResponseEntity.status(500) + .body(greetingHandler.exceptionMessage(e)); + } + + @GetMapping("/hello-mapping") + public Mono getMapping() { + return greetingHandler.helloMessage("GET"); + } + + @PostMapping("/hello-mapping") + public Mono postMapping() { + return greetingHandler.helloMessage("POST"); + } + + @PutMapping("/hello-mapping") + public Mono putMapping() { + return greetingHandler.helloMessage("PUT"); + } + + @DeleteMapping("/hello-mapping") + public Mono deleteMapping() { + return greetingHandler.helloMessage("DELETE"); + } + + @PatchMapping("/hello-mapping") + public Mono patchMapping() { + return greetingHandler.helloMessage("PATCH"); + } + + @RequestMapping(path = "/hello-mapping", method = {RequestMethod.HEAD, RequestMethod.OPTIONS, RequestMethod.TRACE}) + public Mono otherMapping(ServerHttpRequest request) { + return greetingHandler.helloMessage(request.getMethod().name()); + } + + @GetMapping("/with-parameters/{id}") + public Mono withParameters(@PathVariable("id") String id) { + return greetingHandler.helloMessage(id); + } + + @GetMapping(path = "/child-flux") + public Flux getChildSpans(@RequestParam(value = "count", required = false, defaultValue = "3") int count, + @RequestParam(value = "duration", required = false, defaultValue = "5") long durationMillis, + @RequestParam(value = "delay", required = false, defaultValue = "5") long delayMillis) { + + return greetingHandler.childSpans(count, delayMillis, durationMillis); + } + + @GetMapping(path = "/child-flux/sse") + public Flux> getChildSpansSSE(@RequestParam(value = "count", required = false, defaultValue = "3") int count, + @RequestParam(value = "duration", required = false, defaultValue = "5") long durationMillis, + @RequestParam(value = "delay", required = false, defaultValue = "5") long delayMillis) { + + return greetingHandler.childSpans(count, durationMillis, delayMillis) + .map(greetingHandler::toSSE); + } + + @GetMapping("/custom-transaction-name") + public Mono customTransactionName() { + log.debug("enter customTransactionName"); + try { + + IdImpl transactionId = null; + if (!GlobalTracer.isNoop()) { + // Transaction should be active, even if we are outside of Mono/Flux execution + // In practice, it's called after onSubscribe and before onNext, thus the active context is not provided + // by reactor plugin, but only by the webflux plugin that keeps the transaction active. + ElasticApmTracer tracer = GlobalTracer.get().require(ElasticApmTracer.class); + TransactionImpl transaction = Objects.requireNonNull(tracer.currentTransaction(), "active transaction is required"); + // This mimics setting the name through the public API. We cannot use the public API if we want to test span recycling + transaction.withName("user-provided-name", AbstractSpan.PRIORITY_USER_SUPPLIED); + transactionId = transaction.getTraceContext().getId(); + } + + + return greetingHandler.helloMessage("transaction=" + transactionId); + } finally { + log.debug("exit customTransactionName"); + } + } + + @GetMapping("/duration") + public Mono duration(@RequestParam("duration") int durationMillis) { + return greetingHandler.duration(durationMillis); + } + +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingFunctional.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingFunctional.java new file mode 100644 index 0000000000..48a0156853 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingFunctional.java @@ -0,0 +1,143 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux.testapp; + +import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.tracer.AbstractSpan; +import co.elastic.apm.agent.tracer.GlobalTracer; +import co.elastic.apm.agent.impl.transaction.IdImpl; +import co.elastic.apm.agent.impl.transaction.TransactionImpl; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.RouterFunctions; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Mono; + +import java.util.Objects; +import java.util.Optional; + +import static org.springframework.web.reactive.function.server.RequestPredicates.accept; +import static org.springframework.web.reactive.function.server.RequestPredicates.method; +import static org.springframework.web.reactive.function.server.RequestPredicates.path; + +/** + * Provides functional Webflux endpoint + */ +@Configuration +public class GreetingFunctional { + + @Bean + public RouterFunction route(GreetingHandler greetingHandler) { + + return RouterFunctions.route() + // 'hello' and 'hello2' are identical, but entry point in builder is not + .route(path("/functional/hello"), + request -> helloGreeting(greetingHandler, request.queryParam("name"))) + .route(path("/functional/preauthorized"), + request -> helloGreeting(greetingHandler, Optional.of("elastic"))) + .route(path("/functional/username"), + request -> getUsernameFromContext(greetingHandler)) + .route(path("/functional/path-username"), + request -> getUsernameFromContext(greetingHandler)) + // + .GET("/functional/hello2", accept(MediaType.TEXT_PLAIN), + request -> helloGreeting(greetingHandler, request.queryParam("name"))) + // nested routes + .nest(path("/functional/nested"), builder -> builder + .route(method(HttpMethod.GET), request -> nested(greetingHandler, request.method().name())) + .route(method(HttpMethod.POST), request -> nested(greetingHandler, request.method().name())) + ) + // path with parameters + .route(path("/functional/with-parameters/{id}"), + request -> helloGreeting(greetingHandler, Optional.of(request.pathVariable("id")))) + // route that supports multiple methods mapping + .route(path("/functional/hello-mapping"), + request -> helloGreeting(greetingHandler, Optional.of(request.method().name()))) + // errors and mono corner cases + .GET("/functional/error-handler", accept(MediaType.TEXT_PLAIN), request -> greetingHandler.throwException()) + .GET("/functional/error-mono", accept(MediaType.TEXT_PLAIN), request -> greetingHandler.monoError()) + .GET("/functional/empty-mono", accept(MediaType.TEXT_PLAIN), request -> greetingHandler.monoEmpty()) + // with known transaction duration + .GET("/functional/duration", accept(MediaType.TEXT_PLAIN), request -> response(greetingHandler.duration(getDuration(request)))) + // custom transaction name set through API + .GET("/functional/custom-transaction-name", accept(MediaType.TEXT_PLAIN), request -> { + IdImpl transactionId = null; + if (!GlobalTracer.isNoop()) { + ElasticApmTracer tracer = GlobalTracer.get().require(ElasticApmTracer.class); + TransactionImpl transaction = Objects.requireNonNull(tracer.currentTransaction(), "active transaction is required"); + // This mimics setting the name through the public API. We cannot use the public API if we want to test span recycling + transaction.withName("user-provided-name", AbstractSpan.PRIORITY_USER_SUPPLIED); + transactionId = transaction.getTraceContext().getId(); + } + return response(greetingHandler.helloMessage("transaction=" + transactionId)); + }) + .GET("/functional/child-flux", accept(MediaType.TEXT_PLAIN), request -> ServerResponse.ok() + .body(greetingHandler.childSpans(getCount(request), getDelay(request), getDuration(request)), String.class + )) + .route(path("/functional/child-flux/sse"), + request -> ServerResponse.ok() + .body(greetingHandler.childSpans(getCount(request), getDelay(request), getDuration(request)) + .map(greetingHandler::toSSE), ServerSentEvent.class + )) + // error handler + .onError( + e -> true, (e, request) -> ServerResponse + .status(request.queryParam("status") + .map(Integer::parseInt) + .orElse(500)) + .bodyValue(greetingHandler.exceptionMessage(e)) + ) + .build(); + } + + private Long getDuration(ServerRequest request) { + return request.queryParam("duration").map(Long::parseLong).orElse(0L); + } + + private int getCount(ServerRequest request) { + return request.queryParam("count").map(Integer::parseInt).orElse(1); + } + + private Long getDelay(ServerRequest request) { + return request.queryParam("delay").map(Long::parseLong).orElse(0L); + } + + private Mono nested(GreetingHandler greetingHandler, String methodName) { + return helloGreeting(greetingHandler, Optional.of("nested " + methodName)); + } + + private Mono helloGreeting(GreetingHandler greetingHandler, Optional name) { + return response(greetingHandler.helloMessage(name.orElse(null))); + } + + private Mono getUsernameFromContext(GreetingHandler greetingHandler) { + return response(greetingHandler.getUsernameFromContext()); + } + + private Mono response(Mono value) { + return value.flatMap(s -> ServerResponse.ok() + .contentType(MediaType.TEXT_PLAIN) + .bodyValue(s)); + } +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingHandler.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingHandler.java new file mode 100644 index 0000000000..4222b836fd --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingHandler.java @@ -0,0 +1,114 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux.testapp; + +import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.tracer.GlobalTracer; +import co.elastic.apm.agent.tracer.Span; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.security.core.context.ReactiveSecurityContextHolder; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +import javax.annotation.Nullable; +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; + +@Component +public class GreetingHandler { + + public static final Scheduler CHILDREN_SCHEDULER = Schedulers.newBoundedElastic(16, 128, "children"); + + public Mono helloMessage(@Nullable String name) { + return Mono.just(String.format("Hello, %s!", Optional.ofNullable(name).orElse("Spring"))); + } + + public Mono throwException() { + throw new RuntimeException("intentional exception"); + } + + public Mono delayedException() { + return helloMessage(null) + .delayElement(Duration.ofMillis(50)) + .flatMap(s -> { + throw new RuntimeException("intentional exception"); + }); + } + + public Mono monoError() { + return Mono.error(new RuntimeException("intentional error")); + } + + public Mono monoEmpty() { + return Mono.empty(); + } + + public String exceptionMessage(Throwable t) { + return "error handler: " + t.getMessage(); + } + + public Flux childSpans(int count, long delayMillis, long durationMillis) { + return Flux.range(1, count) + .subscribeOn(CHILDREN_SCHEDULER) + // initial delay + .delayElements(Duration.ofMillis(delayMillis)) + .map(i -> String.format("child %d", i)) + .doOnNext(name -> { + if (!GlobalTracer.isNoop()) { + Span span = Objects.requireNonNull(GlobalTracer.get().require(ElasticApmTracer.class).currentTransaction()).createSpan(); + span.withName(String.format("%s id=%s", name, span.getTraceContext().getId())); + try { + fakeWork(durationMillis); + } finally { + span.end(); + } + } + }); + } + + public ServerSentEvent toSSE(String s) { + // we might be able to inject a comment into SSE event for context propagation + return ServerSentEvent.builder().data(s).build(); + } + + // Emulates a transaction that takes a known amount of time + // the whole transaction duration should include the delay + public Mono duration(long durationMillis) { + return helloMessage(String.format("duration=%d", durationMillis)) + .doOnNext(m -> fakeWork(durationMillis)); + } + + public Mono getUsernameFromContext() { + return ReactiveSecurityContextHolder.getContext() + .flatMap(ctx -> Mono.just(ctx.getAuthentication().getName())); + } + + private static void fakeWork(long durationMs) { + try { + Thread.sleep(durationMs); + } catch (InterruptedException e) { + // silently ignored + } + } + +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingWebClient.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingWebClient.java new file mode 100644 index 0000000000..ce856efa42 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/GreetingWebClient.java @@ -0,0 +1,253 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux.testapp; + +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; +import org.springframework.web.reactive.socket.client.WebSocketClient; +import org.springframework.web.util.UriBuilder; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.util.Logger; +import reactor.util.Loggers; + +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Stream; + +public class GreetingWebClient { + + private static final Logger logger = Loggers.getLogger(GreetingWebClient.class); + + private final WebClient client; + private final String pathPrefix; + private final boolean useFunctionalEndpoint; + private final int port; + private final HttpHeaders headers; + private final MultiValueMap cookies; + private final Scheduler clientScheduler; + private final WebSocketClient wsClient; + private final String wsBaseUri; + private final boolean logEnabled; + + + // this client also applies a few basic checks to ensure that application behaves + // as expected within unit tests and in packaged application without duplicating + // all the testing logic. + + public GreetingWebClient(String host, int port, boolean useFunctionalEndpoint, boolean logEnabled) { + this.pathPrefix = useFunctionalEndpoint ? "/functional" : "/annotated"; + String baseUri = String.format("http://%s:%d%s", host, port, pathPrefix); + this.wsBaseUri = String.format("ws://%s:%d", host, port); + this.port = port; + this.client = WebClient.builder() + .baseUrl(baseUri) + .clientConnector(new ReactorClientHttpConnector()) // allows to use either netty/reactor or jetty client + .build(); + this.useFunctionalEndpoint = useFunctionalEndpoint; + this.headers = new HttpHeaders(); + this.cookies = new LinkedMultiValueMap<>(); + this.clientScheduler = Schedulers.newBoundedElastic(16, 128, "webflux-client"); + this.wsClient = new ReactorNettyWebSocketClient(); + this.logEnabled = logEnabled; + } + + public Mono getHelloMono() { + return requestMono("GET", "/hello", 200); + } + + public Mono getPreAuthorized(int expectedStatus) { return requestMono("GET", "/preauthorized", expectedStatus); } + + public Mono getSecurityContextUsername(int expectedStatus) { return requestMono("GET", "/username", expectedStatus); } + + public Mono getSecurityContextUsernameByPathSecured(int expectedStatus) { return requestMono("GET", "/path-username", expectedStatus); } + + public Mono getMappingError404() { + return requestMono("GET", "/error-404", 404); + } + + public Mono getHandlerError() { + return requestMono("GET", "/error-handler", 500); + } + + public Mono getMonoError() { + return requestMono("GET", "/error-mono", 500); + } + + public Mono getMonoEmpty() { + return requestMono("GET", "/empty-mono", 200); + } + + public Mono methodMapping(String method) { + return requestMono(method, "/hello-mapping", 200); + } + + public Mono withPathParameter(String param) { + return requestMono("GET", uriBuilder -> uriBuilder.path("/with-parameters/{id}").build(param), 200); + } + + // nested routes, only relevant for functional routing + public Mono nested(String method) { + return requestMono(method, "/nested", 200); + } + + public Mono duration(long durationMillis) { + return requestMono("GET", uriBuilder -> uriBuilder.path("/duration").queryParam("duration", durationMillis).build(), 200); + } + + // returned as flux, but will only produce one element + public Flux childSpans(int count, long durationMillis, long delay) { + return requestFlux(uriBuilder -> uriBuilder.path("/child-flux").queryParam("duration", durationMillis).queryParam("count", count).queryParam("delay", delay).build()); + } + + public List webSocketPingPong(int count) { + + // taken from https://github.com/spring-projects/spring-framework/blob/main/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java + Flux input = Flux.range(1, count).map(i -> "ping-" + i); + + AtomicReference> actualRef = new AtomicReference<>(); + this.wsClient.execute(URI.create(wsBaseUri + "/ping"), session -> + session.send(input.map(session::textMessage)) + .thenMany(session.receive() + .take(count) + .map(WebSocketMessage::getPayloadAsText)) + .collectList() + .doOnNext(actualRef::set) + .then()) + .block(Duration.ofMillis(1000)); + + Objects.requireNonNull(actualRef.get()); + return actualRef.get(); + } + + // returned as a stream of elements + public Flux> childSpansSSE(int count, long durationMillis, long delay) { + return requestFluxSSE(uriBuilder -> uriBuilder.path("/child-flux/sse").queryParam("duration", durationMillis).queryParam("count",count).queryParam("delay", delay).build()); + } + + // only relevant for annotated controller + public Mono customTransactionName() { + return requestMono("GET", "/custom-transaction-name", 200); + } + + @Deprecated + public Mono requestMono(String method, String path, int expectedStatus) { + return requestMono(method, uriBuilder -> uriBuilder.path(path).build(), expectedStatus); + } + + public Mono requestMono(String method, Function uriFunction, int expectedStatus) { + Mono request = request(method, uriFunction, expectedStatus) + .bodyToMono(String.class) + .doOnError((throwable) -> logger.error("Exception occurred during requesting with exception class [{}]", throwable.getClass().getCanonicalName())) + .publishOn(clientScheduler); + return logEnabled ? request.log(logger) : request; + } + + public void sampleRequests() { + + Duration timeout = Duration.ofMillis(1000); + + getHelloMono().block(timeout); + getMappingError404().onErrorResume(e -> Mono.empty()).block(timeout); + getHandlerError().onErrorResume(e -> Mono.empty()).block(timeout); + getMonoError().onErrorResume(e -> Mono.empty()).block(timeout); + + Stream.of("GET", "POST", "PUT", "DELETE").forEach(method -> methodMapping(method).block(timeout)); + + withPathParameter("12345").block(timeout); + + childSpans(5, 3, 1) + .blockLast(timeout); + + childSpansSSE(5, 3, 1) + .blockLast(timeout); + + webSocketPingPong(5); + } + + private Flux requestFlux(Function uriFunction) { + Flux request = request("GET", uriFunction, 200) + .bodyToFlux(String.class) + .publishOn(clientScheduler); + return logEnabled ? request.log(logger) : request; + } + + private Flux> requestFluxSSE(Function uriFunction) { + + // required to get proper return generic type + ParameterizedTypeReference> type = new ParameterizedTypeReference<>() { + }; + + Flux> request = request("GET", uriFunction, 200) + .bodyToFlux(type) + .publishOn(clientScheduler); + + return logEnabled ? request.log(logger) : request; + } + + private WebClient.ResponseSpec request(String method, Function uriFunction, int expectedStatus) { + return client.method(HttpMethod.valueOf(method)) + .uri(uriFunction) + .accept(MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON) + .headers(httpHeaders -> httpHeaders.addAll(headers)) + .cookies(httpCookies -> httpCookies.addAll(cookies)) + .retrieve() + .onRawStatus(status -> status != expectedStatus, r -> Mono.error(new IllegalStateException("unexpected response status"))); + } + + @Override + public String toString() { + return String.format("GreetingWebClient [%s]", pathPrefix); + } + + public String getPathPrefix() { + return pathPrefix; + } + + public int getPort() { + return port; + } + + public boolean useFunctionalEndpoint() { + return useFunctionalEndpoint; + } + + public void setHeader(String name, String value) { + headers.add(name, value); + } + + public void setCookie(String name, String value) { + cookies.add(name, value); + } +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxApplication.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxApplication.java new file mode 100644 index 0000000000..662b20835c --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxApplication.java @@ -0,0 +1,220 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux.testapp; + +import co.elastic.apm.agent.sdk.logging.Logger; +import co.elastic.apm.agent.sdk.logging.LoggerFactory; +import org.springframework.boot.Banner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; + +import javax.net.ServerSocketFactory; +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@SpringBootApplication +public class WebFluxApplication { + + private static final Logger logger = LoggerFactory.getLogger(WebFluxApplication.class); + + private static final int DEFAULT_PORT = 8080; + + public static void main(String[] args) { + List arguments = Arrays.asList(args); + int port = Integer.parseInt(parseOption(arguments, "--port", Integer.toString(DEFAULT_PORT))); + String server = parseOption(arguments, "--server", "netty"); + int count = Integer.parseInt(parseOption(arguments, "--count", "0")); + + boolean isClient = arguments.contains("--client"); + boolean logEnabled = arguments.contains("--log"); + + boolean isBench = arguments.contains("--benchmark"); + boolean waitForKey = arguments.contains("--wait"); + + if (isBench) { + // start the whole server & client + App app = run(port, server, logEnabled); + + count = Math.max(count, 100); // any smaller benchmark is not meaningful + try { + if (waitForKey) { + waitForKey(); + } + + // warmup has same duration as benchmark to help having consistent results + logger.info("warmup requests ({})", count); + doSampleRequests(app::getClient, count); + logger.info("warmup complete"); + + if (waitForKey) { + waitForKey(); + } + + logger.info("start benchmark ({})", count); + doSampleRequests(app::getClient, count); + logger.info("benchmark complete"); + + if (waitForKey) { + waitForKey(); + } + + + } finally { + app.close(); + } + } else if (isClient) { + count = Math.max(count, 1); + doSampleRequests(useFunc -> new GreetingWebClient("localhost", port, useFunc, logEnabled), count); + } else { + + try (App app = run(port, server, logEnabled)) { + if (count > 0) { + doSampleRequests(useFunc -> new GreetingWebClient("localhost", port, useFunc, logEnabled), count); + } + + // leave server running + waitForKey(); + } + } + + } + + private static void doSampleRequests(Function clientProvider, int count) { + List clients = Stream.of(true, false) + .map(clientProvider) + .collect(Collectors.toList()); + + long start = System.currentTimeMillis(); + int statusFrequency = count <= 10 ? 1 : count / 10; + + long timeLastUpdate = start; + int countLastUpdate = 0; + + for (int i = 1; i <= count; i++) { + clients.forEach(GreetingWebClient::sampleRequests); + + if (i % statusFrequency == 0 || i == count) { + long now = System.currentTimeMillis(); + long timeSpent = now - timeLastUpdate; + int countSinceLastUpdate = i - countLastUpdate; + System.out.printf("progress = %1$6.02f %% (%2$d), count = %3$d in %4$d ms, average = %5$.02f ms%n", + i * 100d / count, + i, + countSinceLastUpdate, + timeSpent, + timeSpent * 1d / countSinceLastUpdate + ); + timeLastUpdate = now; + countLastUpdate = i; + + if (i == count) { + long totalTime = System.currentTimeMillis() - start; + System.out.printf("total count = %d in %d ms, average = %.02f%n", count, totalTime, 1.0D * totalTime / count); + } + } + } + + } + + /** + * Stores application state, using inner-class to avoid interfering with Spring boot application + */ + public static class App implements Closeable { + private final int port; + private final ConfigurableApplicationContext context; + private final boolean logEnabled; + + private App(int port, ConfigurableApplicationContext context, boolean logEnabled) { + this.port = port; + this.context = context; + this.logEnabled = logEnabled; + } + + public GreetingWebClient getClient(boolean useFunctional) { + return new GreetingWebClient("localhost", port, useFunctional, logEnabled); + } + + @Override + public void close() { + context.close(); + } + } + + /** + * Starts application on provided port + * + * @param port port to use + * @param server server implementation to use + * @param logEnabled true to enable client and server logging (very verbose, better for debugging) + * @return application context + */ + public static App run(int port, String server, boolean logEnabled) { + if (port < 0) { + port = getAvailableRandomPort(); + } + + SpringApplication app = new SpringApplication(WebFluxApplication.class); + Map appProperties = new HashMap<>(); + app.setBannerMode(Banner.Mode.OFF); + appProperties.put("server.port", port); + appProperties.put("server", server); + appProperties.put("logging.level.org.springframework", logEnabled ? "ERROR" : "OFF"); + app.setDefaultProperties(appProperties); + + return new App(port, app.run(), logEnabled); + } + + private static String parseOption(List arguments, String option, String defaultValue) { + int index = arguments.indexOf(option); + if (index < 0 || arguments.size() <= index) { + return defaultValue; + } + return arguments.get(index + 1); + } + + private static int getAvailableRandomPort() { + int port; + try (ServerSocket socket = ServerSocketFactory.getDefault().createServerSocket(0, 1, InetAddress.getByName("localhost"))) { + port = socket.getLocalPort(); + } catch (IOException e) { + port = DEFAULT_PORT; + } + return port; + } + + private static void waitForKey() { + System.out.println("hit any key to continue"); + try { + System.in.read(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxConfig.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxConfig.java new file mode 100644 index 0000000000..db039b37d4 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/WebFluxConfig.java @@ -0,0 +1,176 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux.testapp; + + + +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.reactor.netty.NettyReactiveWebServerFactory; +import org.springframework.boot.reactor.netty.NettyRouteProvider; +import org.springframework.boot.reactor.netty.NettyServerCustomizer; +import org.springframework.boot.tomcat.TomcatConnectorCustomizer; +import org.springframework.boot.tomcat.TomcatContextCustomizer; +import org.springframework.boot.tomcat.TomcatProtocolHandlerCustomizer; +import org.springframework.boot.tomcat.reactive.TomcatReactiveWebServerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.ReactorResourceFactory; +import org.springframework.security.config.Customizer; +import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity; +import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity; +import org.springframework.security.config.web.server.ServerHttpSecurity; +import org.springframework.security.core.userdetails.MapReactiveUserDetailsService; +import org.springframework.security.core.userdetails.User; +import org.springframework.security.web.server.SecurityWebFilterChain; +import org.springframework.web.reactive.HandlerMapping; +import org.springframework.web.reactive.config.EnableWebFlux; +import org.springframework.web.reactive.config.WebFluxConfigurer; +import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; +import org.springframework.web.reactive.socket.server.WebSocketService; +import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService; +import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; +import org.springframework.web.reactive.socket.server.upgrade.StandardWebSocketUpgradeStrategy; +import reactor.core.publisher.Flux; + +import java.util.Map; +import java.util.stream.Collectors; + +@Configuration +@EnableWebFlux +@EnableWebFluxSecurity +@EnableReactiveMethodSecurity +public class WebFluxConfig implements WebFluxConfigurer { + + // those server methods are just plain copies of ReactiveWebServerFactoryConfiguration inner classes + // with different annotations to allow selecting server implementation through properties and not only + // classpath availability + + // Tomcat + + @Bean + @ConditionalOnProperty(name = "server", havingValue = "tomcat") + TomcatReactiveWebServerFactory tomcatReactiveWebServerFactory( + ObjectProvider connectorCustomizers, + ObjectProvider contextCustomizers, + ObjectProvider> protocolHandlerCustomizers) { + TomcatReactiveWebServerFactory factory = new TomcatReactiveWebServerFactory(); + factory.getConnectorCustomizers() + .addAll(connectorCustomizers.orderedStream().collect(Collectors.toList())); + factory.getContextCustomizers() + .addAll(contextCustomizers.orderedStream().collect(Collectors.toList())); + factory.getProtocolHandlerCustomizers() + .addAll(protocolHandlerCustomizers.orderedStream().collect(Collectors.toList())); + return factory; + } + + // Tomcat Websocket support + + @Bean + @Qualifier("requestUpdateStrategy") + @ConditionalOnProperty(name = "server", havingValue = "tomcat") + RequestUpgradeStrategy tomcatRequestUpgradeStrategy() { + return new StandardWebSocketUpgradeStrategy(); + } + + // Netty + + @Bean + @ConditionalOnMissingBean + ReactorResourceFactory reactorServerResourceFactory() { + return new ReactorResourceFactory(); + } + + @Bean + @ConditionalOnProperty(name = "server", havingValue = "netty", matchIfMissing = true) + NettyReactiveWebServerFactory nettyReactiveWebServerFactory(@Qualifier("reactorServerResourceFactory") ReactorResourceFactory resourceFactory, + ObjectProvider routes, ObjectProvider serverCustomizers) { + NettyReactiveWebServerFactory serverFactory = new NettyReactiveWebServerFactory(); + serverFactory.setResourceFactory(resourceFactory); + routes.orderedStream().forEach(serverFactory::addRouteProviders); + serverFactory.getServerCustomizers().addAll(serverCustomizers.orderedStream().collect(Collectors.toList())); + return serverFactory; + } + + // Netty Websocket support + + @Bean + @Qualifier("requestUpdateStrategy") + @ConditionalOnProperty(name = "server", havingValue = "netty") + RequestUpgradeStrategy nettyRequestUpgradeStrategy() { + return new ReactorNettyRequestUpgradeStrategy(); + } + + + // Generic Websocket support + + @Bean + public HandlerMapping webSocketHandlerMapping() { + SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping(); + handlerMapping.setOrder(1); + handlerMapping.setUrlMap(Map.of("/ping", pingPongHandler())); + return handlerMapping; + } + + private static WebSocketHandler pingPongHandler() { + return session -> { + Flux output = session.receive() + .map(msg -> session.textMessage(msg.getPayloadAsText().replaceFirst("ping", "pong"))); + return session.send(output); + }; + } + + @Bean + public WebSocketHandlerAdapter handlerAdapter(WebSocketService webSocketService) { + return new WebSocketHandlerAdapter(webSocketService); + } + + @Bean + public WebSocketService webSocketService(@Qualifier("requestUpdateStrategy") RequestUpgradeStrategy upgradeStrategy) { + return new HandshakeWebSocketService(upgradeStrategy); + } + + @Bean + public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) { + return http + .csrf(ServerHttpSecurity.CsrfSpec::disable) + .authorizeExchange(e -> + e.pathMatchers("/annotated/path-username").hasAnyAuthority("ROLE_USER") + .pathMatchers("/functional/path-username", "/functional/username", "/functional/preauthorized").hasAnyAuthority("ROLE_USER") + .pathMatchers("/**").permitAll()) + .httpBasic(Customizer.withDefaults()) + .build(); + } + + @Bean + public MapReactiveUserDetailsService userDetailsService() { + return new MapReactiveUserDetailsService( + User.withDefaultPasswordEncoder() + .username("elastic") + .password("changeme") + .roles("USER") + .build()); + } +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/package-info.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/package-info.java new file mode 100644 index 0000000000..e0c856b687 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/main/java/co/elastic/apm/agent/springwebflux/testapp/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +@NonnullApi +package co.elastic.apm.agent.springwebflux.testapp; + +import co.elastic.apm.agent.sdk.NonnullApi; diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/AnnotatedEndpointTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/AnnotatedEndpointTest.java new file mode 100644 index 0000000000..a060bf409a --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/AnnotatedEndpointTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux.testapp; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public class AnnotatedEndpointTest extends ApplicationTest { + + @LocalServerPort + private int serverPort; + + @Override + protected GreetingWebClient createClient() { + + return new GreetingWebClient("localhost", serverPort, false, true); + } + +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/ApplicationTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/ApplicationTest.java new file mode 100644 index 0000000000..3da2354516 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/ApplicationTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux.testapp; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.test.StepVerifier; + +import java.util.function.Predicate; + +public abstract class ApplicationTest { + + protected GreetingWebClient client; + + protected abstract GreetingWebClient createClient(); + + @BeforeEach + void beforeEach() { + // test with functional endpoints only, testing both functional and annotated controller should be properly + // covered by instrumentation tests. + client = createClient(); + } + + @Test + void helloMono() { + StepVerifier.create(client.getHelloMono()) + .expectNext("Hello, Spring!") + .verifyComplete(); + } + + @Test + void mappingError() { + StepVerifier.create(client.getMappingError404()) + .expectErrorMatches(expectClientError(404)) + .verify(); + } + + @Test + void handlerException() { + StepVerifier.create(client.getHandlerError()) + .expectErrorMatches(expectClientError(500)) + .verify(); + } + + @Test + void handlerMonoError() { + StepVerifier.create(client.getMonoError()) + .expectErrorMatches(expectClientError(500)) + .verify(); + } + + @Test + void handlerMonoEmpty() { + StepVerifier.create(client.getMonoEmpty()) + .verifyComplete(); + } + + @ParameterizedTest + @CsvSource({"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS", "TRACE"}) + void methodMapping(String method) { + var verifier = StepVerifier.create(client.methodMapping(method)); + if ("HEAD".equals(method)) { + verifier.verifyComplete(); + } else { + verifier.expectNext(String.format("Hello, %s!", method)) + .verifyComplete(); + } + } + + @Test + void withPathParameter() { + StepVerifier.create(client.withPathParameter("42")) + .expectNext("Hello, 42!") + .verifyComplete(); + } + + @Test + void withChildrenSpans() { + StepVerifier.create(client.childSpans(3, 50, 10)) + .expectNext("child 1child 2child 3") + .verifyComplete(); + } + + @Test + void withChildrenSpansSSE() { + StepVerifier.create(client.childSpansSSE(3, 50, 10)) + .expectNextMatches(checkSSE(1)) + .expectNextMatches(checkSSE(2)) + .expectNextMatches(checkSSE(3)) + .verifyComplete(); + } + + @Test + void customTransactionName() { + StepVerifier.create(client.customTransactionName()) + .expectNext("Hello, transaction=null!") + .verifyComplete(); + } + + @Test + void duration() { + StepVerifier.create(client.duration(42)) + .expectNext("Hello, duration=42!") + .verifyComplete(); + } + + private static Predicate> checkSSE(final int index) { + return sse -> { + String data = sse.data(); + if (data == null) { + return false; + } + return data.equals(String.format("child %d", index)); + }; + } + + private Predicate expectClientError(int expectedStatus) { + return error -> (error instanceof WebClientResponseException) + && ((WebClientResponseException) error).getStatusCode().value() == expectedStatus; + } +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/FunctionalEndpointTest.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/FunctionalEndpointTest.java new file mode 100644 index 0000000000..e77c2c8eee --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/java/co/elastic/apm/agent/springwebflux/testapp/FunctionalEndpointTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package co.elastic.apm.agent.springwebflux.testapp; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import reactor.test.StepVerifier; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public class FunctionalEndpointTest extends ApplicationTest { + + @LocalServerPort + private int serverPort; + + @Override + protected GreetingWebClient createClient() { + return new GreetingWebClient("localhost", serverPort, true, true); + } + + // nested routes are only available on functional endpoint + + @ParameterizedTest + @CsvSource({"GET", "POST"}) + void nestedRoutes(String method) { + StepVerifier.create(client.nested(method)) + .expectNext(String.format("Hello, nested %s!", method)) + .verifyComplete(); + + } +} diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/resources/application.properties b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/resources/application.properties new file mode 100644 index 0000000000..be4a7623f7 --- /dev/null +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/src/test/resources/application.properties @@ -0,0 +1,6 @@ +# set to DEBUG for easier test application debugging +logging.level.root=DEBUG +# + +# allows to set current server implementation: 'tomcat' or 'netty' +server=netty diff --git a/apm-agent-plugins/apm-spring-webflux/pom.xml b/apm-agent-plugins/apm-spring-webflux/pom.xml index 368843c2e8..8e65d012de 100644 --- a/apm-agent-plugins/apm-spring-webflux/pom.xml +++ b/apm-agent-plugins/apm-spring-webflux/pom.xml @@ -19,15 +19,18 @@ 2.7.16 3.5.7 + 4.1.0 apm-spring-webflux-plugin apm-spring-webclient-plugin apm-spring-webflux-testapp + apm-spring-webflux-testapp-spring7 apm-spring-webflux-spring5 apm-spring-webflux-common apm-spring-webflux-common-spring5 + apm-spring-webflux-plugin-spring7 From ff14da8902070aa6bc3394be7b8cc48b089f9882 Mon Sep 17 00:00:00 2001 From: jihad540 Date: Fri, 19 Jun 2026 14:38:23 +0200 Subject: [PATCH 4/4] fix(webflux): fix comment --- .../apm-spring-webflux-testapp-spring7/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/pom.xml b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/pom.xml index 9adb95c8bd..a0eec5da6b 100644 --- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/pom.xml +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-testapp-spring7/pom.xml @@ -15,7 +15,7 @@ ${project.basedir}/../../.. - + 17 17