Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 5.7.2
- Fixed
- Suppressed spurious `OperationCanceledException` / `TaskCanceledException` APM error entries during pod graceful shutdown. When the Service Bus receive loop is cancelled with a requested cancellation token, the exception is now logged at `Warning` level instead of `Error`.

## 5.7.1
- Changed
- Updated IMessageMetadataAccessor interface to allow to be decorated
Expand Down
12 changes: 10 additions & 2 deletions src/Ev.ServiceBus/Management/Wrappers/ReceiverWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class ReceiverWrapper
private readonly ComposedReceiverOptions _composedOptions;
private readonly ITransactionManager _transactionManager;

private Func<ProcessErrorEventArgs, Task>? _onExceptionReceivedHandler;
private Func<ProcessErrorEventArgs, Task> _onExceptionReceivedHandler = _ => Task.CompletedTask;

public ReceiverWrapper(ServiceBusClient? client,
ComposedReceiverOptions options,
Expand Down Expand Up @@ -132,6 +132,14 @@ private void TrySetReceptionRegistrationOnContext(MessageContext context, IServi
/// <returns></returns>
protected async Task OnExceptionOccured(ProcessErrorEventArgs exceptionEvent)
{
if (exceptionEvent.Exception is OperationCanceledException oce && oce.CancellationToken.IsCancellationRequested)
{
_messageProcessingLogger.LogWarning(
"[Ev.ServiceBus] Receive loop cancelled for {ClientType} '{ResourceId}' during shutdown.",
_composedOptions.ClientType, _composedOptions.ResourceId);
return;
}

var processException = exceptionEvent.Exception as FailedToProcessMessageException;
using (_messageProcessingLogger.ProcessingInProgress(
clientType: processException?.ClientType ?? _composedOptions.ClientType.ToString(),
Expand All @@ -149,7 +157,7 @@ protected async Task OnExceptionOccured(ProcessErrorEventArgs exceptionEvent)
exceptionEvent.EntityPath,
processExceptionInnerException);

await _onExceptionReceivedHandler!(exceptionEvent);
await _onExceptionReceivedHandler(exceptionEvent);
}
}

Expand Down
97 changes: 97 additions & 0 deletions tests/Ev.ServiceBus.UnitTests/ReceiverWrapperTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Ev.ServiceBus.Abstractions;
using Ev.ServiceBus.Abstractions.Listeners;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Moq;
using Xunit;

namespace Ev.ServiceBus.UnitTests;

public sealed class ReceiverWrapperTests
{
private static TestableReceiverWrapper CreateWrapper(ILogger<LoggingExtensions.MessageProcessing>? messageLogger = null)
{
var mockServices = new Mock<IServiceCollection>();
var composedOptions = new ComposedReceiverOptions([new QueueOptions(mockServices.Object, "test-queue")]);
var parentOptions = new ServiceBusOptions();

var mockProvider = new Mock<IServiceProvider>();
mockProvider.Setup(p => p.GetService(typeof(ITransactionManager)))
.Returns(Mock.Of<ITransactionManager>());
mockProvider.Setup(p => p.GetService(typeof(ILogger<LoggingExtensions.ServiceBusClientManagement>)))
.Returns(Mock.Of<ILogger<LoggingExtensions.ServiceBusClientManagement>>());
mockProvider.Setup(p => p.GetService(typeof(ILogger<LoggingExtensions.MessageProcessing>)))
.Returns(messageLogger ?? Mock.Of<ILogger<LoggingExtensions.MessageProcessing>>());

return new TestableReceiverWrapper(composedOptions, parentOptions, mockProvider.Object);
}

[Fact]
public async Task OnExceptionOccured_WithCancelledToken_DoesNotLogError()
{
var mockLogger = new Mock<ILogger<LoggingExtensions.MessageProcessing>>();
var wrapper = CreateWrapper(mockLogger.Object);

using var cts = new CancellationTokenSource();
cts.Cancel();

var args = new ProcessErrorEventArgs(
new OperationCanceledException("shutdown", cts.Token),
ServiceBusErrorSource.Receive,
"test-namespace",
"test-queue",
cts.Token);

await wrapper.InvokeOnExceptionOccuredAsync(args);

mockLogger.Verify(
x => x.Log(
LogLevel.Error,
It.IsAny<EventId>(),
It.Is<It.IsAnyType>((v, t) => true),
It.IsAny<Exception?>(),
It.Is<Func<It.IsAnyType, Exception?, string>>((v, t) => true)),
Times.Never());
}

[Fact]
public async Task OnExceptionOccured_WithNonCancelledToken_LogsError()
{
var mockLogger = new Mock<ILogger<LoggingExtensions.MessageProcessing>>();
mockLogger.Setup(x => x.IsEnabled(LogLevel.Error)).Returns(true);
var wrapper = CreateWrapper(mockLogger.Object);

var args = new ProcessErrorEventArgs(
new InvalidOperationException("connection lost"),
ServiceBusErrorSource.Receive,
"test-namespace",
"test-queue",
CancellationToken.None);

await wrapper.InvokeOnExceptionOccuredAsync(args);

mockLogger.Verify(
x => x.Log(
LogLevel.Error,
It.IsAny<EventId>(),
It.Is<It.IsAnyType>((v, t) => true),
It.IsAny<Exception?>(),
It.Is<Func<It.IsAnyType, Exception?, string>>((v, t) => true)),
Times.Once());
}

private sealed class TestableReceiverWrapper : ReceiverWrapper
{
public TestableReceiverWrapper(
ComposedReceiverOptions options,
ServiceBusOptions parentOptions,
IServiceProvider provider)
: base(null, options, parentOptions, provider) { }

public Task InvokeOnExceptionOccuredAsync(ProcessErrorEventArgs args) => OnExceptionOccured(args);
}
}
Loading