diff --git a/src/Bindings/RabbitMQAsyncCollector.cs b/src/Bindings/RabbitMQAsyncCollector.cs
index 7ec7282..4acfd95 100644
--- a/src/Bindings/RabbitMQAsyncCollector.cs
+++ b/src/Bindings/RabbitMQAsyncCollector.cs
@@ -30,7 +30,7 @@ public RabbitMQAsyncCollector(RabbitMQContext context, ILogger logger)
public Task AddAsync(byte[] message, CancellationToken cancellationToken = default)
{
- _batch.Add(exchange: string.Empty, routingKey: _context.ResolvedAttribute.QueueName, mandatory: false, properties: null, body: message);
+ _batch.Add(exchange: _context.ResolvedAttribute.ExchangeName, routingKey: _context.ResolvedAttribute.QueueName, mandatory: false, properties: null, body: message);
_logger.LogDebug($"Adding message to batch for publishing...");
return Task.CompletedTask;
diff --git a/src/Config/DefaultRabbitMQServiceFactory.cs b/src/Config/DefaultRabbitMQServiceFactory.cs
index d34e88a..8bfd695 100644
--- a/src/Config/DefaultRabbitMQServiceFactory.cs
+++ b/src/Config/DefaultRabbitMQServiceFactory.cs
@@ -5,9 +5,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ
{
internal class DefaultRabbitMQServiceFactory : IRabbitMQServiceFactory
{
- public IRabbitMQService CreateService(string connectionString, string hostName, string queueName, string userName, string password, int port)
+ public IRabbitMQService CreateService(string connectionString, string queueName, string exchangeName, string hostName, string userName, string password, int port)
{
- return new RabbitMQService(connectionString, hostName, queueName, userName, password, port);
+ return new RabbitMQService(connectionString, queueName, exchangeName, hostName, userName, password, port);
}
public IRabbitMQService CreateService(string connectionString, string hostName, string userName, string password, int port)
diff --git a/src/Config/IRabbitMQServiceFactory.cs b/src/Config/IRabbitMQServiceFactory.cs
index 11a9c0d..9e05484 100644
--- a/src/Config/IRabbitMQServiceFactory.cs
+++ b/src/Config/IRabbitMQServiceFactory.cs
@@ -5,7 +5,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ
{
public interface IRabbitMQServiceFactory
{
- IRabbitMQService CreateService(string connectionString, string hostName, string queueName, string userName, string password, int port);
+ IRabbitMQService CreateService(string connectionString, string queueName, string exchangeName, string hostName, string userName, string password, int port);
IRabbitMQService CreateService(string connectionString, string hostName, string userName, string password, int port);
}
diff --git a/src/Config/RabbitMQExtensionConfigProvider.cs b/src/Config/RabbitMQExtensionConfigProvider.cs
index e65c110..e7eff6a 100644
--- a/src/Config/RabbitMQExtensionConfigProvider.cs
+++ b/src/Config/RabbitMQExtensionConfigProvider.cs
@@ -73,13 +73,22 @@ public void ValidateBinding(RabbitMQAttribute attribute, Type type)
{
throw new InvalidOperationException("RabbitMQ username and password required if not connecting to localhost");
}
+
+ string queueName = Utility.FirstOrDefault(attribute.QueueName, _options.Value.QueueName);
+ string exchangeName = Utility.FirstOrDefault(attribute.ExchangeName, _options.Value.ExchangeName);
+ _logger.LogInformation($"Queue: {queueName} and Exchange {exchangeName}");
+ if (string.IsNullOrEmpty(queueName) && string.IsNullOrEmpty(exchangeName))
+ {
+ throw new InvalidOperationException("One of queueName or exchangeName should be provided");
+ }
}
internal RabbitMQContext CreateContext(RabbitMQAttribute attribute)
{
string connectionString = Utility.FirstOrDefault(attribute.ConnectionStringSetting, _options.Value.ConnectionString);
string hostName = Utility.FirstOrDefault(attribute.HostName, _options.Value.HostName);
- string queueName = Utility.FirstOrDefault(attribute.QueueName, _options.Value.QueueName);
+ string queueName = Utility.FirstOrDefault(attribute.QueueName, _options.Value.QueueName) ?? string.Empty;
+ string exchangeName = Utility.FirstOrDefault(attribute.ExchangeName, _options.Value.ExchangeName) ?? string.Empty;
string userName = Utility.FirstOrDefault(attribute.UserName, _options.Value.UserName);
string password = Utility.FirstOrDefault(attribute.Password, _options.Value.Password);
int port = Utility.FirstOrDefault(attribute.Port, _options.Value.Port);
@@ -92,12 +101,13 @@ internal RabbitMQContext CreateContext(RabbitMQAttribute attribute)
ConnectionStringSetting = connectionString,
HostName = hostName,
QueueName = queueName,
+ ExchangeName = exchangeName,
UserName = userName,
Password = password,
Port = port,
};
- service = GetService(connectionString, hostName, queueName, userName, password, port);
+ service = GetService(connectionString, queueName, exchangeName, hostName, userName, password, port);
return new RabbitMQContext
{
@@ -106,9 +116,9 @@ internal RabbitMQContext CreateContext(RabbitMQAttribute attribute)
};
}
- internal IRabbitMQService GetService(string connectionString, string hostName, string queueName, string userName, string password, int port)
+ internal IRabbitMQService GetService(string connectionString, string queueName, string exchangeName, string hostName, string userName, string password, int port)
{
- return _rabbitMQServiceFactory.CreateService(connectionString, hostName, queueName, userName, password, port);
+ return _rabbitMQServiceFactory.CreateService(connectionString, queueName, exchangeName, hostName, userName, password, port);
}
// Overloaded method used only for getting the RabbitMQ client
diff --git a/src/Config/RabbitMQOptions.cs b/src/Config/RabbitMQOptions.cs
index 60fc6d9..adab625 100644
--- a/src/Config/RabbitMQOptions.cs
+++ b/src/Config/RabbitMQOptions.cs
@@ -27,6 +27,11 @@ public RabbitMQOptions()
///
public string QueueName { get; set; }
+ ///
+ /// Gets or sets the ExchangeName to enqueue messages to.
+ ///
+ public string ExchangeName { get; set; }
+
///
/// Gets or sets the UserName used to authenticate with RabbitMQ.
///
@@ -58,6 +63,7 @@ public string Format()
{
{ nameof(HostName), HostName },
{ nameof(QueueName), QueueName },
+ { nameof(ExchangeName), ExchangeName },
{ nameof(Port), Port },
{ nameof(PrefetchCount), PrefetchCount },
};
diff --git a/src/RabbitMQAttribute.cs b/src/RabbitMQAttribute.cs
index 899e4cf..e8b7509 100644
--- a/src/RabbitMQAttribute.cs
+++ b/src/RabbitMQAttribute.cs
@@ -52,5 +52,11 @@ public sealed class RabbitMQAttribute : Attribute
///
[ConnectionString]
public string ConnectionStringSetting { get; set; }
+
+ ///
+ /// Gets or sets the ExchangeName to send messages to.
+ ///
+ [AutoResolve]
+ public string ExchangeName { get; set; }
}
-}
+}
\ No newline at end of file
diff --git a/src/Services/RabbitMQService.cs b/src/Services/RabbitMQService.cs
index 63d2d89..67b3d5f 100644
--- a/src/Services/RabbitMQService.cs
+++ b/src/Services/RabbitMQService.cs
@@ -14,6 +14,7 @@ internal sealed class RabbitMQService : IRabbitMQService
private readonly string _connectionString;
private readonly string _hostName;
private readonly string _queueName;
+ private readonly string _exchangeName;
private readonly string _userName;
private readonly string _password;
private readonly int _port;
@@ -31,13 +32,23 @@ public RabbitMQService(string connectionString, string hostName, string userName
_model = connectionFactory.CreateConnection().CreateModel();
}
- public RabbitMQService(string connectionString, string hostName, string queueName, string userName, string password, int port)
+ public RabbitMQService(string connectionString, string queueName, string exchangeName, string hostName, string userName, string password, int port)
: this(connectionString, hostName, userName, password, port)
{
_rabbitMQModel = new RabbitMQModel(_model);
_queueName = queueName ?? throw new ArgumentNullException(nameof(queueName));
+ _exchangeName = exchangeName ?? throw new ArgumentNullException(nameof(exchangeName));
+
+ if (!string.IsNullOrEmpty(_queueName))
+ {
+ _model.QueueDeclarePassive(_queueName);
+ }
+
+ if (!string.IsNullOrEmpty(_exchangeName))
+ {
+ _model.ExchangeDeclarePassive(_exchangeName); // Throws exception if exchange doesn't exist
+ }
- _model.QueueDeclarePassive(_queueName); // Throws exception if queue doesn't exist
_batch = _model.CreateBasicPublishBatch();
}
diff --git a/src/Trigger/RabbitMQTriggerAttributeBindingProvider.cs b/src/Trigger/RabbitMQTriggerAttributeBindingProvider.cs
index 148b254..a4018c0 100644
--- a/src/Trigger/RabbitMQTriggerAttributeBindingProvider.cs
+++ b/src/Trigger/RabbitMQTriggerAttributeBindingProvider.cs
@@ -66,7 +66,7 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex
throw new InvalidOperationException("RabbitMQ username and password required if not connecting to localhost");
}
- IRabbitMQService service = _provider.GetService(connectionString, hostName, queueName, userName, password, port);
+ IRabbitMQService service = _provider.GetService(connectionString, queueName, string.Empty, hostName, userName, password, port);
return Task.FromResult(new RabbitMQTriggerBinding(service, hostName, queueName, _logger, parameter.ParameterType, _options.Value.PrefetchCount));
}
diff --git a/test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQOptionsTests.cs b/test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQOptionsTests.cs
index ae5219f..b731737 100644
--- a/test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQOptionsTests.cs
+++ b/test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQOptionsTests.cs
@@ -19,6 +19,7 @@ private string GetFormattedOption(RabbitMQOptions option)
{
{ nameof(option.HostName), option.HostName },
{ nameof(option.QueueName), option.QueueName },
+ { nameof(option.ExchangeName), option.ExchangeName },
{ nameof(option.Port), option.Port },
{ nameof(option.PrefetchCount), option.PrefetchCount },
};
@@ -50,6 +51,7 @@ public void TestConfiguredRabbitMQOptions()
int expectedPort = 8080;
string expectedHostName = "someHostName";
string expectedQueueName = "someQueueName";
+ string expectedExchangeName = "someExchangeName";
string expectedUserName = "someUserName";
string expectedPassword = "somePassword";
string expectedConnectionString = "someConnectionString";
@@ -58,6 +60,7 @@ public void TestConfiguredRabbitMQOptions()
Port = expectedPort,
HostName = expectedHostName,
QueueName = expectedQueueName,
+ ExchangeName = expectedExchangeName,
UserName = expectedUserName,
Password = expectedPassword,
ConnectionString = expectedConnectionString,
@@ -68,6 +71,7 @@ public void TestConfiguredRabbitMQOptions()
Assert.Equal(expectedPort, options.Port);
Assert.Equal(expectedHostName, options.HostName);
Assert.Equal(expectedQueueName, options.QueueName);
+ Assert.Equal(expectedExchangeName, options.ExchangeName);
Assert.Equal(expectedUserName, options.UserName);
Assert.Equal(expectedPassword, options.Password);
Assert.Equal(expectedConnectionString, options.ConnectionString);