diff --git a/src/Bindings/RabbitMQAsyncCollector.cs b/src/Bindings/RabbitMQAsyncCollector.cs index 7ec7282..4acfd95 100644 --- a/src/Bindings/RabbitMQAsyncCollector.cs +++ b/src/Bindings/RabbitMQAsyncCollector.cs @@ -30,7 +30,7 @@ public RabbitMQAsyncCollector(RabbitMQContext context, ILogger logger) public Task AddAsync(byte[] message, CancellationToken cancellationToken = default) { - _batch.Add(exchange: string.Empty, routingKey: _context.ResolvedAttribute.QueueName, mandatory: false, properties: null, body: message); + _batch.Add(exchange: _context.ResolvedAttribute.ExchangeName, routingKey: _context.ResolvedAttribute.QueueName, mandatory: false, properties: null, body: message); _logger.LogDebug($"Adding message to batch for publishing..."); return Task.CompletedTask; diff --git a/src/Config/DefaultRabbitMQServiceFactory.cs b/src/Config/DefaultRabbitMQServiceFactory.cs index d34e88a..8bfd695 100644 --- a/src/Config/DefaultRabbitMQServiceFactory.cs +++ b/src/Config/DefaultRabbitMQServiceFactory.cs @@ -5,9 +5,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ { internal class DefaultRabbitMQServiceFactory : IRabbitMQServiceFactory { - public IRabbitMQService CreateService(string connectionString, string hostName, string queueName, string userName, string password, int port) + public IRabbitMQService CreateService(string connectionString, string queueName, string exchangeName, string hostName, string userName, string password, int port) { - return new RabbitMQService(connectionString, hostName, queueName, userName, password, port); + return new RabbitMQService(connectionString, queueName, exchangeName, hostName, userName, password, port); } public IRabbitMQService CreateService(string connectionString, string hostName, string userName, string password, int port) diff --git a/src/Config/IRabbitMQServiceFactory.cs b/src/Config/IRabbitMQServiceFactory.cs index 11a9c0d..9e05484 100644 --- a/src/Config/IRabbitMQServiceFactory.cs +++ b/src/Config/IRabbitMQServiceFactory.cs @@ -5,7 +5,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ { public interface IRabbitMQServiceFactory { - IRabbitMQService CreateService(string connectionString, string hostName, string queueName, string userName, string password, int port); + IRabbitMQService CreateService(string connectionString, string queueName, string exchangeName, string hostName, string userName, string password, int port); IRabbitMQService CreateService(string connectionString, string hostName, string userName, string password, int port); } diff --git a/src/Config/RabbitMQExtensionConfigProvider.cs b/src/Config/RabbitMQExtensionConfigProvider.cs index e65c110..e7eff6a 100644 --- a/src/Config/RabbitMQExtensionConfigProvider.cs +++ b/src/Config/RabbitMQExtensionConfigProvider.cs @@ -73,13 +73,22 @@ public void ValidateBinding(RabbitMQAttribute attribute, Type type) { throw new InvalidOperationException("RabbitMQ username and password required if not connecting to localhost"); } + + string queueName = Utility.FirstOrDefault(attribute.QueueName, _options.Value.QueueName); + string exchangeName = Utility.FirstOrDefault(attribute.ExchangeName, _options.Value.ExchangeName); + _logger.LogInformation($"Queue: {queueName} and Exchange {exchangeName}"); + if (string.IsNullOrEmpty(queueName) && string.IsNullOrEmpty(exchangeName)) + { + throw new InvalidOperationException("One of queueName or exchangeName should be provided"); + } } internal RabbitMQContext CreateContext(RabbitMQAttribute attribute) { string connectionString = Utility.FirstOrDefault(attribute.ConnectionStringSetting, _options.Value.ConnectionString); string hostName = Utility.FirstOrDefault(attribute.HostName, _options.Value.HostName); - string queueName = Utility.FirstOrDefault(attribute.QueueName, _options.Value.QueueName); + string queueName = Utility.FirstOrDefault(attribute.QueueName, _options.Value.QueueName) ?? string.Empty; + string exchangeName = Utility.FirstOrDefault(attribute.ExchangeName, _options.Value.ExchangeName) ?? string.Empty; string userName = Utility.FirstOrDefault(attribute.UserName, _options.Value.UserName); string password = Utility.FirstOrDefault(attribute.Password, _options.Value.Password); int port = Utility.FirstOrDefault(attribute.Port, _options.Value.Port); @@ -92,12 +101,13 @@ internal RabbitMQContext CreateContext(RabbitMQAttribute attribute) ConnectionStringSetting = connectionString, HostName = hostName, QueueName = queueName, + ExchangeName = exchangeName, UserName = userName, Password = password, Port = port, }; - service = GetService(connectionString, hostName, queueName, userName, password, port); + service = GetService(connectionString, queueName, exchangeName, hostName, userName, password, port); return new RabbitMQContext { @@ -106,9 +116,9 @@ internal RabbitMQContext CreateContext(RabbitMQAttribute attribute) }; } - internal IRabbitMQService GetService(string connectionString, string hostName, string queueName, string userName, string password, int port) + internal IRabbitMQService GetService(string connectionString, string queueName, string exchangeName, string hostName, string userName, string password, int port) { - return _rabbitMQServiceFactory.CreateService(connectionString, hostName, queueName, userName, password, port); + return _rabbitMQServiceFactory.CreateService(connectionString, queueName, exchangeName, hostName, userName, password, port); } // Overloaded method used only for getting the RabbitMQ client diff --git a/src/Config/RabbitMQOptions.cs b/src/Config/RabbitMQOptions.cs index 60fc6d9..adab625 100644 --- a/src/Config/RabbitMQOptions.cs +++ b/src/Config/RabbitMQOptions.cs @@ -27,6 +27,11 @@ public RabbitMQOptions() /// public string QueueName { get; set; } + /// + /// Gets or sets the ExchangeName to enqueue messages to. + /// + public string ExchangeName { get; set; } + /// /// Gets or sets the UserName used to authenticate with RabbitMQ. /// @@ -58,6 +63,7 @@ public string Format() { { nameof(HostName), HostName }, { nameof(QueueName), QueueName }, + { nameof(ExchangeName), ExchangeName }, { nameof(Port), Port }, { nameof(PrefetchCount), PrefetchCount }, }; diff --git a/src/RabbitMQAttribute.cs b/src/RabbitMQAttribute.cs index 899e4cf..e8b7509 100644 --- a/src/RabbitMQAttribute.cs +++ b/src/RabbitMQAttribute.cs @@ -52,5 +52,11 @@ public sealed class RabbitMQAttribute : Attribute /// [ConnectionString] public string ConnectionStringSetting { get; set; } + + /// + /// Gets or sets the ExchangeName to send messages to. + /// + [AutoResolve] + public string ExchangeName { get; set; } } -} +} \ No newline at end of file diff --git a/src/Services/RabbitMQService.cs b/src/Services/RabbitMQService.cs index 63d2d89..67b3d5f 100644 --- a/src/Services/RabbitMQService.cs +++ b/src/Services/RabbitMQService.cs @@ -14,6 +14,7 @@ internal sealed class RabbitMQService : IRabbitMQService private readonly string _connectionString; private readonly string _hostName; private readonly string _queueName; + private readonly string _exchangeName; private readonly string _userName; private readonly string _password; private readonly int _port; @@ -31,13 +32,23 @@ public RabbitMQService(string connectionString, string hostName, string userName _model = connectionFactory.CreateConnection().CreateModel(); } - public RabbitMQService(string connectionString, string hostName, string queueName, string userName, string password, int port) + public RabbitMQService(string connectionString, string queueName, string exchangeName, string hostName, string userName, string password, int port) : this(connectionString, hostName, userName, password, port) { _rabbitMQModel = new RabbitMQModel(_model); _queueName = queueName ?? throw new ArgumentNullException(nameof(queueName)); + _exchangeName = exchangeName ?? throw new ArgumentNullException(nameof(exchangeName)); + + if (!string.IsNullOrEmpty(_queueName)) + { + _model.QueueDeclarePassive(_queueName); + } + + if (!string.IsNullOrEmpty(_exchangeName)) + { + _model.ExchangeDeclarePassive(_exchangeName); // Throws exception if exchange doesn't exist + } - _model.QueueDeclarePassive(_queueName); // Throws exception if queue doesn't exist _batch = _model.CreateBasicPublishBatch(); } diff --git a/src/Trigger/RabbitMQTriggerAttributeBindingProvider.cs b/src/Trigger/RabbitMQTriggerAttributeBindingProvider.cs index 148b254..a4018c0 100644 --- a/src/Trigger/RabbitMQTriggerAttributeBindingProvider.cs +++ b/src/Trigger/RabbitMQTriggerAttributeBindingProvider.cs @@ -66,7 +66,7 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex throw new InvalidOperationException("RabbitMQ username and password required if not connecting to localhost"); } - IRabbitMQService service = _provider.GetService(connectionString, hostName, queueName, userName, password, port); + IRabbitMQService service = _provider.GetService(connectionString, queueName, string.Empty, hostName, userName, password, port); return Task.FromResult(new RabbitMQTriggerBinding(service, hostName, queueName, _logger, parameter.ParameterType, _options.Value.PrefetchCount)); } diff --git a/test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQOptionsTests.cs b/test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQOptionsTests.cs index ae5219f..b731737 100644 --- a/test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQOptionsTests.cs +++ b/test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQOptionsTests.cs @@ -19,6 +19,7 @@ private string GetFormattedOption(RabbitMQOptions option) { { nameof(option.HostName), option.HostName }, { nameof(option.QueueName), option.QueueName }, + { nameof(option.ExchangeName), option.ExchangeName }, { nameof(option.Port), option.Port }, { nameof(option.PrefetchCount), option.PrefetchCount }, }; @@ -50,6 +51,7 @@ public void TestConfiguredRabbitMQOptions() int expectedPort = 8080; string expectedHostName = "someHostName"; string expectedQueueName = "someQueueName"; + string expectedExchangeName = "someExchangeName"; string expectedUserName = "someUserName"; string expectedPassword = "somePassword"; string expectedConnectionString = "someConnectionString"; @@ -58,6 +60,7 @@ public void TestConfiguredRabbitMQOptions() Port = expectedPort, HostName = expectedHostName, QueueName = expectedQueueName, + ExchangeName = expectedExchangeName, UserName = expectedUserName, Password = expectedPassword, ConnectionString = expectedConnectionString, @@ -68,6 +71,7 @@ public void TestConfiguredRabbitMQOptions() Assert.Equal(expectedPort, options.Port); Assert.Equal(expectedHostName, options.HostName); Assert.Equal(expectedQueueName, options.QueueName); + Assert.Equal(expectedExchangeName, options.ExchangeName); Assert.Equal(expectedUserName, options.UserName); Assert.Equal(expectedPassword, options.Password); Assert.Equal(expectedConnectionString, options.ConnectionString);