Skip to content

Commit 627b413

Browse files
ishank12Ishank Gupta
andauthored
support manualAck attribute for in-proc (#255)
Co-authored-by: Ishank Gupta <[email protected]>
1 parent f164d31 commit 627b413

15 files changed

+305
-58
lines changed

extension/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQClientBuilderTests.cs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,31 @@ public void Opens_Connection()
2222
var options = new OptionsWrapper<RabbitMQOptions>(new RabbitMQOptions());
2323
var mockServiceFactory = new Mock<IRabbitMQServiceFactory>();
2424
var config = new RabbitMQExtensionConfigProvider(options, new Mock<INameResolver>().Object, mockServiceFactory.Object, new LoggerFactory(), EmptyConfig, DrainModeManager);
25-
mockServiceFactory.Setup(m => m.CreateService(It.IsAny<string>(), false)).Returns(new Mock<IRabbitMQService>().Object);
25+
mockServiceFactory.Setup(m => m.CreateService(It.IsAny<string>(), false, It.IsAny<ILogger>())).Returns(new Mock<IRabbitMQService>().Object);
2626
RabbitMQAttribute attr = GetTestAttribute();
2727

2828
var clientBuilder = new RabbitMQClientBuilder(config, options);
29-
IModel model = clientBuilder.Convert(attr);
29+
IRabbitMQService service = clientBuilder.Convert(attr);
3030

31-
mockServiceFactory.Verify(m => m.CreateService(It.IsAny<string>(), false), Times.Exactly(1));
31+
mockServiceFactory.Verify(m => m.CreateService(It.IsAny<string>(), false, It.IsAny<ILogger>()), Times.Exactly(1));
3232
}
3333

3434
[Fact]
3535
public void TestWhetherConnectionIsPooled()
3636
{
3737
var options = new OptionsWrapper<RabbitMQOptions>(new RabbitMQOptions());
3838
var mockServiceFactory = new Mock<IRabbitMQServiceFactory>();
39-
mockServiceFactory.SetupSequence(m => m.CreateService(It.IsAny<string>(), false))
39+
mockServiceFactory.SetupSequence(m => m.CreateService(It.IsAny<string>(), false, It.IsAny<ILogger>()))
4040
.Returns(GetRabbitMQService());
4141
var config = new RabbitMQExtensionConfigProvider(options, new Mock<INameResolver>().Object, mockServiceFactory.Object, new LoggerFactory(), EmptyConfig, DrainModeManager);
4242
RabbitMQAttribute attr = GetTestAttribute();
4343

4444
var clientBuilder = new RabbitMQClientBuilder(config, options);
4545

46-
IModel model = clientBuilder.Convert(attr);
47-
IModel model2 = clientBuilder.Convert(attr);
46+
IRabbitMQService service = clientBuilder.Convert(attr);
47+
IRabbitMQService service2 = clientBuilder.Convert(attr);
4848

49-
Assert.Equal(model, model2);
49+
Assert.Equal(service, service2);
5050
}
5151

5252
private static RabbitMQAttribute GetTestAttribute()
@@ -60,7 +60,6 @@ private static RabbitMQAttribute GetTestAttribute()
6060
private static IRabbitMQService GetRabbitMQService()
6161
{
6262
var mockService = new Mock<IRabbitMQService>();
63-
mockService.Setup(a => a.Model).Returns(new Mock<IModel>().Object);
6463
return mockService.Object;
6564
}
6665
}

extension/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQExtensionConfigProviderTests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using Microsoft.Azure.WebJobs.Host;
55
using Microsoft.Extensions.Configuration;
6+
using Microsoft.Extensions.Logging;
67
using Microsoft.Extensions.Logging.Abstractions;
78
using Microsoft.Extensions.Options;
89
using Moq;
@@ -18,11 +19,11 @@ public void TestConnectionPooling()
1819
var rabbitmqServiceFactory = new Mock<IRabbitMQServiceFactory>();
1920

2021
rabbitmqServiceFactory
21-
.SetupSequence(a => a.CreateService(It.IsAny<string>(), It.IsAny<string>(), false))
22+
.SetupSequence(a => a.CreateService(It.IsAny<string>(), It.IsAny<string>(), false, It.IsAny<ILogger>()))
2223
.Returns(new Mock<IRabbitMQService>().Object);
2324

2425
rabbitmqServiceFactory
25-
.SetupSequence(a => a.CreateService(It.IsAny<string>(), false))
26+
.SetupSequence(a => a.CreateService(It.IsAny<string>(), false, It.IsAny<ILogger>()))
2627
.Returns(new Mock<IRabbitMQService>().Object);
2728

2829
var extensionConfigProvider = new RabbitMQExtensionConfigProvider(

extension/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQTriggerBindingTests.cs

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Microsoft.Azure.WebJobs.Host;
10+
using Microsoft.Azure.WebJobs.Host.Executors;
11+
using Microsoft.Extensions.Logging;
12+
using Moq;
613
using RabbitMQ.Client;
714
using RabbitMQ.Client.Events;
815
using Xunit;
@@ -23,6 +30,7 @@ public void Verify_BindingDataContract_Types()
2330
["RoutingKey"] = typeof(string),
2431
["BasicProperties"] = typeof(IBasicProperties),
2532
["Body"] = typeof(ReadOnlyMemory<byte>),
33+
["MessageActions"] = typeof(RabbitMQMessageActions),
2634
};
2735

2836
IReadOnlyDictionary<string, Type> actualContract = RabbitMQTriggerBinding.CreateBindingDataContract();
@@ -44,6 +52,7 @@ public void Verify_BindingDataContract_Values()
4452

4553
ReadOnlyMemory<byte> body = buffer;
4654
var eventArgs = new BasicDeliverEventArgs("ConsumerName", deliveryTag, false, "n/a", "QueueName", null, body);
55+
var messageActions = new RabbitMQMessageActions(Mock.Of<IRabbitMQService>(), eventArgs);
4756

4857
var data = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
4958
{
@@ -54,13 +63,84 @@ public void Verify_BindingDataContract_Values()
5463
["Body"] = body,
5564
["Exchange"] = eventArgs.Exchange,
5665
["BasicProperties"] = eventArgs.BasicProperties,
66+
["MessageActions"] = messageActions,
5767
};
5868

59-
IReadOnlyDictionary<string, object> actualContract = RabbitMQTriggerBinding.CreateBindingData(eventArgs);
69+
IReadOnlyDictionary<string, object> actualContract = RabbitMQTriggerBinding.CreateBindingData(eventArgs, messageActions);
6070

6171
foreach (KeyValuePair<string, object> item in actualContract)
6272
{
6373
Assert.Equal(data[item.Key], item.Value);
6474
}
6575
}
76+
77+
[Theory]
78+
[InlineData(true)]
79+
[InlineData(false)]
80+
public async Task RabbitMQTrigger_ManualAck_BasicAckBehavior(bool disableAck)
81+
{
82+
// Arrange
83+
var mockservice = new Mock<IRabbitMQService>();
84+
var mockModel = new Mock<IModel>();
85+
mockservice.Setup(a => a.CreateConsumer()).Returns(new AsyncEventingBasicConsumer(mockModel.Object));
86+
87+
var mockExecutor = new Mock<ITriggeredFunctionExecutor>();
88+
var mockLogger = new Mock<ILogger>();
89+
var mockDrainModeManager = new Mock<IDrainModeManager>();
90+
var mockBasicProperties = new Mock<IBasicProperties>();
91+
92+
// Simulate successful function execution
93+
mockExecutor
94+
.Setup(executor => executor.TryExecuteAsync(It.IsAny<TriggeredFunctionData>(), It.IsAny<CancellationToken>()))
95+
.ReturnsAsync(new FunctionResult(true));
96+
97+
var listener = new RabbitMQListener(
98+
mockservice.Object,
99+
mockExecutor.Object,
100+
mockLogger.Object,
101+
functionId: "test-function",
102+
queueName: "test-queue",
103+
disableAck: disableAck,
104+
prefetchCount: 10,
105+
drainModeManager: mockDrainModeManager.Object);
106+
107+
var eventArgs = new BasicDeliverEventArgs
108+
{
109+
DeliveryTag = 1,
110+
Body = new ReadOnlyMemory<byte>([0x01, 0x02, 0x03]),
111+
BasicProperties = mockBasicProperties.Object,
112+
};
113+
114+
// Act
115+
await listener.StartAsync(CancellationToken.None);
116+
117+
// Find the Consumer instance passed to RabbitMQService.Consume method
118+
IInvocation consumeInvocation = mockservice.Invocations
119+
.FirstOrDefault(invocation => invocation.Method.Name == "Consume");
120+
121+
Assert.NotNull(consumeInvocation);
122+
123+
var consumer = consumeInvocation.Arguments[2] as AsyncEventingBasicConsumer;
124+
Assert.NotNull(consumer);
125+
126+
// Simulate message delivery
127+
await consumer.HandleBasicDeliver(
128+
consumerTag: "ctag",
129+
deliveryTag: eventArgs.DeliveryTag,
130+
redelivered: false,
131+
exchange: string.Empty,
132+
routingKey: string.Empty,
133+
properties: eventArgs.BasicProperties,
134+
body: eventArgs.Body.ToArray());
135+
136+
// Assert
137+
if (disableAck)
138+
{
139+
mockservice.Verify(channel => channel.Acknowledge(It.IsAny<ulong>(), It.IsAny<bool>(), It.IsAny<string>()), Times.Never, "BasicAck should not be called when DisableAck is true.");
140+
}
141+
else
142+
{
143+
mockservice.Verify(channel => channel.Acknowledge(It.IsAny<ulong>(), It.IsAny<bool>(), It.IsAny<string>()), Times.Once, "BasicAck should be called when DisableAck is false.");
144+
}
145+
}
66146
}

extension/WebJobs.Extensions.RabbitMQ.Tests/Trigger/RabbitMQListenerTests.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,12 @@ public void ScaleMonitorGetScaleStatus_CountNotIncreasingOrDecreasing_ReturnsNon
205205
private static RabbitMQListener GetScaleMonitor(string functionId, string queueName)
206206
{
207207
return new RabbitMQListener(
208-
Mock.Of<IModel>(),
208+
Mock.Of<IRabbitMQService>(),
209209
Mock.Of<ITriggeredFunctionExecutor>(),
210210
Mock.Of<ILogger>(),
211211
functionId,
212212
queueName,
213+
false,
213214
7357,
214215
DrainModeManager);
215216
}
@@ -219,11 +220,12 @@ private static (IScaleMonitor<RabbitMQTriggerMetrics> Monitor, List<string> LogM
219220
(Mock<ILogger> mockLogger, List<string> logMessages) = CreateMockLogger();
220221

221222
IScaleMonitor<RabbitMQTriggerMetrics> monitor = new RabbitMQListener(
222-
Mock.Of<IModel>(),
223+
Mock.Of<IRabbitMQService>(),
223224
Mock.Of<ITriggeredFunctionExecutor>(),
224225
mockLogger.Object,
225226
"testFunctionId",
226227
"testQueueName",
228+
false,
227229
7357,
228230
DrainModeManager);
229231

extension/WebJobs.Extensions.RabbitMQ/Bindings/RabbitMQClientBuilder.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,17 @@
77

88
namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ;
99

10-
internal class RabbitMQClientBuilder(RabbitMQExtensionConfigProvider configProvider, IOptions<RabbitMQOptions> options) : IConverter<RabbitMQAttribute, IModel>
10+
internal class RabbitMQClientBuilder(RabbitMQExtensionConfigProvider configProvider, IOptions<RabbitMQOptions> options) : IConverter<RabbitMQAttribute, IRabbitMQService>
1111
{
1212
private readonly RabbitMQExtensionConfigProvider configProvider = configProvider;
1313
private readonly IOptions<RabbitMQOptions> options = options;
1414

15-
public IModel Convert(RabbitMQAttribute attribute)
15+
public IRabbitMQService Convert(RabbitMQAttribute attribute)
1616
{
1717
return this.CreateModelFromAttribute(attribute);
1818
}
1919

20-
private IModel CreateModelFromAttribute(RabbitMQAttribute attribute)
20+
private IRabbitMQService CreateModelFromAttribute(RabbitMQAttribute attribute)
2121
{
2222
if (attribute == null)
2323
{
@@ -27,8 +27,6 @@ private IModel CreateModelFromAttribute(RabbitMQAttribute attribute)
2727
string resolvedConnectionString = Utility.FirstOrDefault(attribute.ConnectionStringSetting, this.options.Value.ConnectionString);
2828
bool resolvedDisableCertificateValidation = Utility.FirstOrDefault(attribute.DisableCertificateValidation, this.options.Value.DisableCertificateValidation);
2929

30-
IRabbitMQService service = this.configProvider.GetService(resolvedConnectionString, resolvedDisableCertificateValidation);
31-
32-
return service.Model;
30+
return this.configProvider.GetService(resolvedConnectionString, resolvedDisableCertificateValidation);
3331
}
3432
}
Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using Microsoft.Extensions.Logging;
5+
46
namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ;
57

68
internal class DefaultRabbitMQServiceFactory : IRabbitMQServiceFactory
79
{
8-
public IRabbitMQService CreateService(string connectionString, string queueName, bool disableCertificateValidation)
10+
public IRabbitMQService CreateService(string connectionString, string queueName, bool disableCertificateValidation, ILogger logger)
911
{
10-
return new RabbitMQService(connectionString, queueName, disableCertificateValidation);
12+
return new RabbitMQService(connectionString, queueName, disableCertificateValidation, logger);
1113
}
1214

13-
public IRabbitMQService CreateService(string connectionString, bool disableCertificateValidation)
15+
public IRabbitMQService CreateService(string connectionString, bool disableCertificateValidation, ILogger logger)
1416
{
15-
return new RabbitMQService(connectionString, disableCertificateValidation);
17+
return new RabbitMQService(connectionString, disableCertificateValidation, logger);
1618
}
1719
}
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using Microsoft.Extensions.Logging;
5+
46
namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ;
57

68
public interface IRabbitMQServiceFactory
79
{
8-
IRabbitMQService CreateService(string connectionString, string queueName, bool disableCertificateValidation);
10+
IRabbitMQService CreateService(string connectionString, string queueName, bool disableCertificateValidation, ILogger logger);
911

10-
IRabbitMQService CreateService(string connectionString, bool disableCertificateValidation);
12+
IRabbitMQService CreateService(string connectionString, bool disableCertificateValidation, ILogger logger);
1113
}

extension/WebJobs.Extensions.RabbitMQ/Config/RabbitMQExtensionConfigProvider.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System;
@@ -84,7 +84,7 @@ internal IRabbitMQService GetService(string connectionString, string queueName,
8484
string[] keyArray =
8585
[connectionString, queueName, disableCertificateValidation.ToString()];
8686
string key = string.Join(",", keyArray);
87-
return this.connectionParametersToService.GetOrAdd(key, _ => this.rabbitMQServiceFactory.CreateService(connectionString, queueName, disableCertificateValidation));
87+
return this.connectionParametersToService.GetOrAdd(key, _ => this.rabbitMQServiceFactory.CreateService(connectionString, queueName, disableCertificateValidation, this.logger));
8888
}
8989

9090
// Overloaded method used only for getting the RabbitMQ client.
@@ -93,6 +93,6 @@ internal IRabbitMQService GetService(string connectionString, bool disableCertif
9393
string[] keyArray =
9494
[connectionString, disableCertificateValidation.ToString()];
9595
string key = string.Join(",", keyArray);
96-
return this.connectionParametersToService.GetOrAdd(key, _ => this.rabbitMQServiceFactory.CreateService(connectionString, disableCertificateValidation));
96+
return this.connectionParametersToService.GetOrAdd(key, _ => this.rabbitMQServiceFactory.CreateService(connectionString, disableCertificateValidation, this.logger));
9797
}
9898
}
Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,37 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

4+
using System;
45
using RabbitMQ.Client;
6+
using RabbitMQ.Client.Events;
57

68
namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ;
79

810
public interface IRabbitMQService
911
{
10-
IModel Model { get; }
11-
1212
IBasicPublishBatch BasicPublishBatch { get; }
1313

1414
object PublishBatchLock { get; }
1515

1616
void ResetPublishBatch();
17+
18+
void ConfigureQos(uint prefetchSize, ushort prefetchCount, bool global);
19+
20+
QueueDeclareOk GetQueueInfo(string queueName);
21+
22+
AsyncEventingBasicConsumer CreateConsumer();
23+
24+
string Consume(string queue, bool autoAck, AsyncEventingBasicConsumer consumer);
25+
26+
void OnMessageConsumed(string consumerTag, ulong deliveryTag);
27+
28+
void Acknowledge(ulong deliveryTag, bool multiple, string logDetails);
29+
30+
void Reject(ulong deliveryTag, bool requeue, string logDetails);
31+
32+
void Publish(string exchange, string routingKey, IBasicProperties basicProperties, ReadOnlyMemory<byte> body);
33+
34+
void Cancel(string consumerTag);
35+
36+
void Close();
1737
}

0 commit comments

Comments
 (0)