Skip to content

Commit 726e283

Browse files
committed
add tests
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 7e8a78e commit 726e283

File tree

4 files changed

+94
-7
lines changed

4 files changed

+94
-7
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public IMessage Annotation(string key, object value)
130130
public object Annotation(string key)
131131
{
132132
ThrowIfAnnotationsNotSet();
133-
return NativeMessage.MessageAnnotations[key];
133+
return NativeMessage.MessageAnnotations[new Symbol(key)];
134134
}
135135
}
136136
}

RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,27 +58,29 @@ public Task DiscardAsync()
5858

5959
public Task DiscardAsync(Dictionary<string, object> annotations)
6060
{
61-
6261
if (_link.IsClosed)
6362
{
6463
throw new ConsumerException("Link is closed");
6564
}
6665

66+
Utils.ValidateMessageAnnotations(annotations);
67+
6768
Task rejectTask = Task.Run(() =>
6869
{
6970
Fields messageAnnotations = new();
7071
foreach (var kvp in annotations)
7172
{
7273
messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value);
7374
}
75+
7476
_link.Modify(_message, true, true, messageAnnotations);
7577
_unsettledMessageCounter.Decrement();
7678
_message.Dispose();
7779
});
7880

7981
return rejectTask;
80-
8182
}
83+
8284
public Task RequeueAsync()
8385
{
8486
if (_link.IsClosed)
@@ -98,26 +100,25 @@ public Task RequeueAsync()
98100

99101
public Task RequeueAsync(Dictionary<string, object> annotations)
100102
{
101-
102103
if (_link.IsClosed)
103104
{
104105
throw new ConsumerException("Link is closed");
105106
}
106-
107+
Utils.ValidateMessageAnnotations(annotations);
107108
Task requeueTask = Task.Run(() =>
108109
{
109110
Fields messageAnnotations = new();
110111
foreach (var kvp in annotations)
111112
{
112113
messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value);
113114
}
115+
114116
_link.Modify(_message, false, false, messageAnnotations);
115117
_unsettledMessageCounter.Decrement();
116118
_message.Dispose();
117119
});
118120

119121
return requeueTask;
120-
121122
}
122123
}
123124
}

RabbitMQ.AMQP.Client/Utils.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ internal static bool CompareMap(Map map1, Map map2)
183183
return true;
184184
}
185185

186-
internal static void CheckMessageAnnotations(Dictionary<string, object> annotations)
186+
internal static void ValidateMessageAnnotations(Dictionary<string, object> annotations)
187187
{
188188
foreach (var kvp in annotations.Where(kvp => !kvp.Key.StartsWith("x-")))
189189
{
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using RabbitMQ.AMQP.Client;
5+
using RabbitMQ.AMQP.Client.Impl;
6+
using Xunit;
7+
using Xunit.Abstractions;
8+
9+
namespace Tests.Consumer;
10+
11+
public class ConsumerOutcomeTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
12+
{
13+
[Fact]
14+
public void ValidateAnnotations()
15+
{
16+
const string wrongAnnotationKey = "missing-the-start-x-annotation-key";
17+
const string annotationValue = "annotation-value";
18+
// This should throw an exception because the annotation key does not start with "x-"
19+
Assert.Throws<ArgumentException>(() =>
20+
Utils.ValidateMessageAnnotations(new Dictionary<string, object>
21+
{
22+
{ wrongAnnotationKey, annotationValue }
23+
}));
24+
25+
const string correctAnnotationKey = "x-otp-annotation-key";
26+
// This should not throw an exception because the annotation key starts with "x-"
27+
Utils.ValidateMessageAnnotations(new Dictionary<string, object>
28+
{
29+
{ correctAnnotationKey, annotationValue }
30+
});
31+
}
32+
33+
[Fact]
34+
public async Task DiscardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndContainAnnotationsWhenConfigured()
35+
{
36+
string dlqQueueName = $"dlq_{_queueName}";
37+
await DeclareDeadLetterTopology(_queueName, dlqQueueName);
38+
39+
40+
Assert.NotNull(_connection);
41+
Assert.NotNull(_management);
42+
43+
const string annotationKey = "x-opt-annotation-key";
44+
const string annotationValue = "annotation-value";
45+
TaskCompletionSource<bool> tcs =
46+
new(TaskCreationOptions.RunContinuationsAsynchronously);
47+
IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync();
48+
IConsumer consumer = await _connection.ConsumerBuilder().MessageHandler(
49+
async (context, _) =>
50+
{
51+
await context.DiscardAsync(new Dictionary<string, object> { { annotationKey, annotationValue } });
52+
tcs.SetResult(true);
53+
}
54+
).Queue(_queueName).BuildAndStartAsync();
55+
56+
IMessage message = new AmqpMessage($"message");
57+
PublishResult pr = await publisher.PublishAsync(message);
58+
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);
59+
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
60+
await consumer.CloseAsync();
61+
TaskCompletionSource<IMessage> tcsDl =
62+
new(TaskCreationOptions.RunContinuationsAsynchronously);
63+
IConsumer dlConsumer = await _connection.ConsumerBuilder().MessageHandler(async (context, message1) =>
64+
{
65+
await context.AcceptAsync();
66+
tcsDl.SetResult(message1);
67+
}).Queue(dlqQueueName).BuildAndStartAsync();
68+
69+
IMessage mResult = await tcsDl.Task.WaitAsync(TimeSpan.FromSeconds(5));
70+
71+
Assert.NotNull(mResult);
72+
Assert.Equal(mResult.Annotation(annotationKey), annotationValue);
73+
await dlConsumer.CloseAsync();
74+
}
75+
76+
77+
private async Task DeclareDeadLetterTopology(string queueName, string dlxQueueName)
78+
{
79+
string dlx = $"{queueName}.dlx";
80+
Assert.NotNull(_management);
81+
await _management.Queue().Name(queueName).Type(QueueType.QUORUM).DeadLetterExchange(dlx).DeclareAsync();
82+
await _management.Exchange(dlx).Type(ExchangeType.FANOUT).AutoDelete(true).DeclareAsync();
83+
await _management.Queue(dlxQueueName).Exclusive(true).DeclareAsync();
84+
await _management.Binding().SourceExchange(dlx).DestinationQueue(dlxQueueName).BindAsync();
85+
}
86+
}

0 commit comments

Comments
 (0)