Skip to content

Commit 7e8a78e

Browse files
committed
Annotations
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 35cc6f6 commit 7e8a78e

File tree

4 files changed

+67
-0
lines changed

4 files changed

+67
-0
lines changed

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System;
6+
using System.Collections.Generic;
67
using System.Threading.Tasks;
78

89
namespace RabbitMQ.AMQP.Client
@@ -27,6 +28,9 @@ public interface IContext
2728
{
2829
Task AcceptAsync();
2930
Task DiscardAsync();
31+
Task DiscardAsync(Dictionary<string, object> annotations);
32+
3033
Task RequeueAsync();
34+
Task RequeueAsync(Dictionary<string, object> annotations);
3135
}
3236
}

RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using System.Collections.Generic;
56
using System.Threading.Tasks;
67
using Amqp;
8+
using Amqp.Types;
79

810
namespace RabbitMQ.AMQP.Client.Impl
911
{
@@ -54,6 +56,29 @@ public Task DiscardAsync()
5456
return rejectTask;
5557
}
5658

59+
public Task DiscardAsync(Dictionary<string, object> annotations)
60+
{
61+
62+
if (_link.IsClosed)
63+
{
64+
throw new ConsumerException("Link is closed");
65+
}
66+
67+
Task rejectTask = Task.Run(() =>
68+
{
69+
Fields messageAnnotations = new();
70+
foreach (var kvp in annotations)
71+
{
72+
messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value);
73+
}
74+
_link.Modify(_message, true, true, messageAnnotations);
75+
_unsettledMessageCounter.Decrement();
76+
_message.Dispose();
77+
});
78+
79+
return rejectTask;
80+
81+
}
5782
public Task RequeueAsync()
5883
{
5984
if (_link.IsClosed)
@@ -70,5 +95,29 @@ public Task RequeueAsync()
7095

7196
return requeueTask;
7297
}
98+
99+
public Task RequeueAsync(Dictionary<string, object> annotations)
100+
{
101+
102+
if (_link.IsClosed)
103+
{
104+
throw new ConsumerException("Link is closed");
105+
}
106+
107+
Task requeueTask = Task.Run(() =>
108+
{
109+
Fields messageAnnotations = new();
110+
foreach (var kvp in annotations)
111+
{
112+
messageAnnotations.Add(new Symbol(kvp.Key), kvp.Value);
113+
}
114+
_link.Modify(_message, false, false, messageAnnotations);
115+
_unsettledMessageCounter.Decrement();
116+
_message.Dispose();
117+
});
118+
119+
return requeueTask;
120+
121+
}
73122
}
74123
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,9 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumer
125125
RabbitMQ.AMQP.Client.IContext
126126
RabbitMQ.AMQP.Client.IContext.AcceptAsync() -> System.Threading.Tasks.Task!
127127
RabbitMQ.AMQP.Client.IContext.DiscardAsync() -> System.Threading.Tasks.Task!
128+
RabbitMQ.AMQP.Client.IContext.DiscardAsync(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> System.Threading.Tasks.Task!
128129
RabbitMQ.AMQP.Client.IContext.RequeueAsync() -> System.Threading.Tasks.Task!
130+
RabbitMQ.AMQP.Client.IContext.RequeueAsync(System.Collections.Generic.Dictionary<string!, object!>! annotations) -> System.Threading.Tasks.Task!
129131
RabbitMQ.AMQP.Client.IEntityInfo
130132
RabbitMQ.AMQP.Client.IEntityInfoSpecification<T>
131133
RabbitMQ.AMQP.Client.IEntityInfoSpecification<T>.DeclareAsync() -> System.Threading.Tasks.Task<T>!

RabbitMQ.AMQP.Client/Utils.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System;
6+
using System.Collections.Generic;
7+
using System.Linq;
68
using System.Security.Cryptography;
79
using System.Text;
810
using System.Web;
@@ -177,8 +179,18 @@ internal static bool CompareMap(Map map1, Map map2)
177179
return false;
178180
}
179181
}
182+
180183
return true;
181184
}
185+
186+
internal static void CheckMessageAnnotations(Dictionary<string, object> annotations)
187+
{
188+
foreach (var kvp in annotations.Where(kvp => !kvp.Key.StartsWith("x-")))
189+
{
190+
throw new ArgumentException(
191+
$"Message annotation keys must start with 'x-': {kvp.Key}");
192+
}
193+
}
182194
}
183195

184196
// TODO why can't we use normal HTTP encoding?

0 commit comments

Comments
 (0)