Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Bindings/RabbitMQAsyncCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ
{
internal class RabbitMQAsyncCollector : IAsyncCollector<byte[]>
internal class RabbitMQAsyncCollector : IAsyncCollector<RabbitMQMessage>
{
private readonly RabbitMQContext _context;
private readonly IBasicPublishBatch _batch;
Expand All @@ -28,9 +28,9 @@ public RabbitMQAsyncCollector(RabbitMQContext context, ILogger logger)
_batch = _context.Service.BasicPublishBatch;
}

public Task AddAsync(byte[] message, CancellationToken cancellationToken = default)
public Task AddAsync(RabbitMQMessage message, CancellationToken cancellationToken = default)
{
_batch.Add(exchange: string.Empty, routingKey: _context.ResolvedAttribute.QueueName, mandatory: false, properties: null, body: message);
_batch.Add(exchange: string.Empty, routingKey: message.RoutingKey ?? _context.ResolvedAttribute.QueueName, mandatory: false, properties: null, body: message.Body);
_logger.LogDebug($"Adding message to batch for publishing...");

return Task.CompletedTask;
Expand Down
2 changes: 1 addition & 1 deletion src/Config/RabbitMQExtensionConfigProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void Initialize(ExtensionConfigContext context)

var rule = context.AddBindingRule<RabbitMQAttribute>();
rule.AddValidator(ValidateBinding);
rule.BindToCollector<byte[]>((attr) =>
rule.BindToCollector((attr) =>
{
return new RabbitMQAsyncCollector(CreateContext(attr), _logger);
});
Expand Down
14 changes: 14 additions & 0 deletions src/RabbitMQMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ
{
public class RabbitMQMessage
{
public RabbitMQMessage(byte[] body)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeffhollan / @fabiocav - Do you recommend introducing a wrapper type 'RabbitMQMessage'

{
Body = body;
}

public byte[] Body { get; set; }

public string RoutingKey { get; set; }
}
}
4 changes: 2 additions & 2 deletions test/WebJobs.Extensions.RabbitMQ.Samples/RabbitMQSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ public static void TimerTrigger_PocoOutput(
// So you can add items to the queue while the sample is running, and the trigger will be called until the queue is empty.
public static async Task ProcessMessage_RabbitMQAsyncCollector(
[QueueTrigger(@"samples-rabbitmq-messages")] string message,
[RabbitMQ(QueueName = "queue")] IAsyncCollector<byte[]> messages,
[RabbitMQ(QueueName = "queue")] IAsyncCollector<RabbitMQMessage> messages,
ILogger logger)
{
logger.LogInformation($"Received queue trigger");
byte[] messageInBytes = Encoding.UTF8.GetBytes(message);
await messages.AddAsync(messageInBytes);
await messages.AddAsync(new RabbitMQMessage(messageInBytes) { RoutingKey = "custom-routing-key" });
}

// To run:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public async Task AddAsync_AddsMessagesToQueue()
var collector = new RabbitMQAsyncCollector(context, logger);

byte[] body = Encoding.UTF8.GetBytes("hi");
await collector.AddAsync(body);
await collector.AddAsync(new RabbitMQMessage(body));

mockBatch.Verify(m => m.Add(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<IBasicProperties>(), body), Times.Exactly(1));
}
Expand Down