-
Notifications
You must be signed in to change notification settings - Fork 613
Description
Describe the bug
I discovered another issue with ObjectDisposedException related to the #1802 issue. It occurred due to the massive load on my RabbitMQ. It's the same error as before, but with a slightly different root cause. So, ObjectDisposedException when the channel is closing and disposing on MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync when publisher confirmation is enabled.
RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text='Cannot access a disposed object.
Object name: 'System.Threading.SemaphoreSlim'.', classId=0, methodId=0, exception=System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Threading.SemaphoreSlim'.
at System.Threading.SemaphoreSlim.WaitAsync(Int32 millisecondsTimeout, CancellationToken cancellationToken)
at RabbitMQ.Client.Impl.Channel.MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(ShutdownEventArgs reason)
at RabbitMQ.Client.Impl.Channel.OnChannelShutdownAsync(ShutdownEventArgs reason)
at RabbitMQ.Client.Impl.Channel.OnSessionShutdownAsync(Object sender, ShutdownEventArgs reason)
at RabbitMQ.Client.Impl.AsyncEventingWrapper`1.InternalInvoke(Delegate[] handlers, Object sender, TEvent event)
at RabbitMQ.Client.Impl.AsyncEventingWrapper`1.InternalInvoke(Delegate[] handlers, Object sender, TEvent event)
at RabbitMQ.Client.Impl.Channel.FinishCloseAsync(CancellationToken cancellationToken)
at RabbitMQ.Client.Impl.Channel.HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken)
at RabbitMQ.Client.Impl.Channel.HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
at RabbitMQ.Client.Framing.Connection.ProcessFrameAsync(InboundFrame frame, CancellationToken cancellationToken)
at RabbitMQ.Client.Framing.Connection.ReceiveLoopAsync(CancellationToken mainLoopCancellationToken)
at RabbitMQ.Client.Framing.Connection.MainLoop()
---> System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Threading.SemaphoreSlim'.
at System.Threading.SemaphoreSlim.WaitAsync(Int32 millisecondsTimeout, CancellationToken cancellationToken)
at RabbitMQ.Client.Impl.Channel.MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(ShutdownEventArgs reason)
at RabbitMQ.Client.Impl.Channel.OnChannelShutdownAsync(ShutdownEventArgs reason)
at RabbitMQ.Client.Impl.Channel.OnSessionShutdownAsync(Object sender, ShutdownEventArgs reason)
at RabbitMQ.Client.Impl.AsyncEventingWrapper`1.InternalInvoke(Delegate[] handlers, Object sender, TEvent event)
at RabbitMQ.Client.Impl.AsyncEventingWrapper`1.InternalInvoke(Delegate[] handlers, Object sender, TEvent event)
at RabbitMQ.Client.Impl.Channel.FinishCloseAsync(CancellationToken cancellationToken)
at RabbitMQ.Client.Impl.Channel.HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken)
at RabbitMQ.Client.Impl.Channel.HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
at RabbitMQ.Client.Framing.Connection.ProcessFrameAsync(InboundFrame frame, CancellationToken cancellationToken)
at RabbitMQ.Client.Framing.Connection.ReceiveLoopAsync(CancellationToken mainLoopCancellationToken)
at RabbitMQ.Client.Framing.Connection.MainLoop()
--- End of inner exception stack trace ---
at RabbitMQ.Client.Impl.Channel.PublisherConfirmationInfo.MaybeWaitForConfirmationAsync(CancellationToken cancellationToken)
at RabbitMQ.Client.Impl.Channel.MaybeEndPublisherConfirmationTracking(PublisherConfirmationInfo publisherConfirmationInfo, CancellationToken cancellationToken)
at RabbitMQ.Client.Impl.Channel.BasicPublishAsync[TProperties](String exchange, String routingKey, Boolean mandatory, TProperties basicProperties, ReadOnlyMemory`1 body, CancellationToken cancellationToken)
In the CloseAsync
method of the Channel
class, there's the ContinuationTimeout
passed to the ChannelCloseAsyncRpcContinuation`, which cancels it after a defined time without waiting for the channel to be closed.
var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
...
AssertResultIsTrue(await k);
await ConsumerDispatcher.WaitForShutdownAsync()
.ConfigureAwait(false);
When a close is invoked just before disposal (e.g. AbortAsync
in the dispose method) semaphore is disposed before ChannelCloseOkAsync (or ChannelClose) is received from the RabbitMQ. Receiving it invokes OnSessionShutdownAsync
-> OnChannelShutdownAsync
-> MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync
, and it waits on a disposed semaphore, causing an error. The error causes the connection and all its channels to be closed.
Reproduction steps
To reproduce the issue, I modified the CreateChannel program. I set ContinuationTimeout to a low value like 500-1000 milliseconds, to let the channel be open, but to cancel while closing and reproducing the issue. Then I run simultaneously around 20 tasks that open a channel and just dispose it. Then I do 2-3 repeats of the loop. Maybe it can also be reproduced by doing operations sequentially, but it has to invoke a race condition.
Expected behavior
No ObjectDisposedException from MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync. Ideally, for any case, a simple try-catch statement would be helpful if the root cause of a race bug is not simple to fix.
Additional context
Questions
I assume fixing the bug is more about correcting cancellation in the Channel
CloseAsync
method. However, this is one of the causes of the entire connection closing, which is the real problem. So I have a question about it:
- Why MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync is not enclosed in the try catch statement? It would at least handle all these Semaphore ObjectDisposedException errors happening frequently.
- Why is any exception closing a connection with all channels running on it instead of a single channel? Is a channel, being unresponsive or in an error state, a reason that makes the connection unstable or unable to be used?