diff --git a/src/Bindings/RabbitMQAsyncCollector.cs b/src/Bindings/RabbitMQAsyncCollector.cs index 7ec7282..785ff42 100644 --- a/src/Bindings/RabbitMQAsyncCollector.cs +++ b/src/Bindings/RabbitMQAsyncCollector.cs @@ -9,7 +9,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ { - internal class RabbitMQAsyncCollector : IAsyncCollector + internal class RabbitMQAsyncCollector : IAsyncCollector { private readonly RabbitMQContext _context; private readonly IBasicPublishBatch _batch; @@ -28,9 +28,9 @@ public RabbitMQAsyncCollector(RabbitMQContext context, ILogger logger) _batch = _context.Service.BasicPublishBatch; } - public Task AddAsync(byte[] message, CancellationToken cancellationToken = default) + public Task AddAsync(RabbitMQMessage message, CancellationToken cancellationToken = default) { - _batch.Add(exchange: string.Empty, routingKey: _context.ResolvedAttribute.QueueName, mandatory: false, properties: null, body: message); + _batch.Add(exchange: string.Empty, routingKey: message.RoutingKey ?? _context.ResolvedAttribute.QueueName, mandatory: false, properties: null, body: message.Body); _logger.LogDebug($"Adding message to batch for publishing..."); return Task.CompletedTask; diff --git a/src/Config/RabbitMQExtensionConfigProvider.cs b/src/Config/RabbitMQExtensionConfigProvider.cs index c497470..6bcbfa7 100644 --- a/src/Config/RabbitMQExtensionConfigProvider.cs +++ b/src/Config/RabbitMQExtensionConfigProvider.cs @@ -43,7 +43,7 @@ public void Initialize(ExtensionConfigContext context) var rule = context.AddBindingRule(); rule.AddValidator(ValidateBinding); - rule.BindToCollector((attr) => + rule.BindToCollector((attr) => { return new RabbitMQAsyncCollector(CreateContext(attr), _logger); }); diff --git a/src/RabbitMQMessage.cs b/src/RabbitMQMessage.cs new file mode 100644 index 0000000..b8eb3b7 --- /dev/null +++ b/src/RabbitMQMessage.cs @@ -0,0 +1,14 @@ +namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ +{ + public class RabbitMQMessage + { + public RabbitMQMessage(byte[] body) + { + Body = body; + } + + public byte[] Body { get; set; } + + public string RoutingKey { get; set; } + } +} diff --git a/test/WebJobs.Extensions.RabbitMQ.Samples/RabbitMQSamples.cs b/test/WebJobs.Extensions.RabbitMQ.Samples/RabbitMQSamples.cs index e44e3f1..5914ff4 100644 --- a/test/WebJobs.Extensions.RabbitMQ.Samples/RabbitMQSamples.cs +++ b/test/WebJobs.Extensions.RabbitMQ.Samples/RabbitMQSamples.cs @@ -51,12 +51,12 @@ public static void TimerTrigger_PocoOutput( // So you can add items to the queue while the sample is running, and the trigger will be called until the queue is empty. public static async Task ProcessMessage_RabbitMQAsyncCollector( [QueueTrigger(@"samples-rabbitmq-messages")] string message, - [RabbitMQ(QueueName = "queue")] IAsyncCollector messages, + [RabbitMQ(QueueName = "queue")] IAsyncCollector messages, ILogger logger) { logger.LogInformation($"Received queue trigger"); byte[] messageInBytes = Encoding.UTF8.GetBytes(message); - await messages.AddAsync(messageInBytes); + await messages.AddAsync(new RabbitMQMessage(messageInBytes) { RoutingKey = "custom-routing-key" }); } // To run: diff --git a/test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQAsyncCollectorTests.cs b/test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQAsyncCollectorTests.cs index 91b0dc3..b32af41 100644 --- a/test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQAsyncCollectorTests.cs +++ b/test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQAsyncCollectorTests.cs @@ -40,7 +40,7 @@ public async Task AddAsync_AddsMessagesToQueue() var collector = new RabbitMQAsyncCollector(context, logger); byte[] body = Encoding.UTF8.GetBytes("hi"); - await collector.AddAsync(body); + await collector.AddAsync(new RabbitMQMessage(body)); mockBatch.Verify(m => m.Add(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), body), Times.Exactly(1)); }