Skip to content

Commit b07eae9

Browse files
authored
Fix message body corruption on republish (#218)
1 parent bbd4a43 commit b07eae9

File tree

1 file changed

+21
-15
lines changed

1 file changed

+21
-15
lines changed

extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -85,38 +85,44 @@ public Task StartAsync(CancellationToken cancellationToken)
8585
this.rabbitMQModel.BasicQos(0, this.prefetchCount, false); // Non zero prefetchSize doesn't work (tested upto 5.2.0) and will throw NOT_IMPLEMENTED exception
8686
this.consumer = new EventingBasicConsumer(this.rabbitMQModel.Model);
8787

88-
this.consumer.Received += async (model, ea) =>
88+
this.consumer.Received += async (model, args) =>
8989
{
90-
using Activity activity = StartActivity(ea);
91-
92-
var input = new TriggeredFunctionData() { TriggerValue = ea };
90+
// The RabbitMQ client rents an array from the ArrayPool to hold a copy of the message body, and passes it
91+
// to the listener. Once all event handlers are executed, the array is returned back to the pool so that the
92+
// memory can be reused for future messages for that connection. However, since our event handler is async,
93+
// the very first await statement i.e. the call to TryExecuteAsync below causes the event handler invocation
94+
// to complete and lets the RabbitMQ client release the memory. This led to message body corruption when the
95+
// message is republished (see: https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211).
96+
//
97+
// We chose to copy the message body instead of having a new 'args' object as there is only one event
98+
// handler registered for the consumer so there should be no side-effects.
99+
args.Body = args.Body.ToArray();
100+
101+
using Activity activity = StartActivity(args);
102+
103+
var input = new TriggeredFunctionData() { TriggerValue = args };
93104
FunctionResult result = await this.executor.TryExecuteAsync(input, cancellationToken).ConfigureAwait(false);
94105

95106
if (!result.Succeeded)
96107
{
97-
ea.BasicProperties.Headers ??= new Dictionary<string, object>();
98-
ea.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue);
108+
args.BasicProperties.Headers ??= new Dictionary<string, object>();
109+
args.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue);
99110
int requeueCount = Convert.ToInt32(headerValue, CultureInfo.InvariantCulture) + 1;
100111

101112
if (requeueCount >= 5)
102113
{
103114
// Add message to dead letter exchange.
104115
this.logger.LogDebug("Requeue count exceeded: rejecting message");
105-
this.rabbitMQModel.BasicReject(ea.DeliveryTag, false);
116+
this.rabbitMQModel.BasicReject(args.DeliveryTag, false);
106117
return;
107118
}
108119

109120
this.logger.LogDebug("Republishing message");
110-
ea.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount;
111-
112-
// RabbitMQ client library seems to be reusing the memory pointed by 'ea.Body' for subsequent
113-
// message-received events. This led to https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211.
114-
// Hence, pass a copy of 'ea.Body' to method 'BasicPublish' instead of the object itself to prevent
115-
// sharing of the memory and possibility of memory corruption.
116-
this.rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, ea.BasicProperties, ea.Body.ToArray());
121+
args.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount;
122+
this.rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, args.BasicProperties, args.Body);
117123
}
118124

119-
this.rabbitMQModel.BasicAck(ea.DeliveryTag, false);
125+
this.rabbitMQModel.BasicAck(args.DeliveryTag, false);
120126
};
121127

122128
this.consumerTag = this.rabbitMQModel.BasicConsume(queue: this.queueName, autoAck: false, consumer: this.consumer);

0 commit comments

Comments
 (0)