Skip to content

Commit 12605c2

Browse files
committed
Issue #9 - Dead Letter Exchange, Dead Letter Routing and Alternative Exchange Policies
1 parent 8930b81 commit 12605c2

File tree

12 files changed

+833
-43
lines changed

12 files changed

+833
-43
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@
107107
<version>${junit.jupiter.version}</version>
108108
<scope>test</scope>
109109
</dependency>
110+
<dependency>
111+
<groupId>org.projectlombok</groupId>
112+
<artifactId>lombok</artifactId>
113+
<version>1.18.8</version>
114+
<scope>provided</scope>
115+
</dependency>
110116
</dependencies>
111117

112118
<build>

src/main/java/com/github/fridujo/rabbitmq/mock/AmqArguments.java

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package com.github.fridujo.rabbitmq.mock;
22

3+
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsExchangePointer;
4+
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveInteger;
5+
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveLong;
6+
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsPositiveShort;
7+
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsString;
38
import static java.util.Collections.emptyMap;
49

510
import java.util.Arrays;
@@ -14,7 +19,7 @@ public class AmqArguments {
1419
public static final String QUEUE_MAX_LENGTH_BYTES_KEY = "x-max-length-bytes";
1520
public static final String OVERFLOW_KEY = "x-overflow";
1621
public static final String MAX_PRIORITY_KEY = "x-max-priority";
17-
private final String ALTERNATE_EXCHANGE_KEY = "alternate-exchange";
22+
public static final String ALTERNATE_EXCHANGE_KEY = "alternate-exchange";
1823
private final Map<String, Object> arguments;
1924

2025
public static AmqArguments empty() {
@@ -26,58 +31,37 @@ public AmqArguments(Map<String, Object> arguments) {
2631
}
2732

2833
public Optional<ReceiverPointer> getAlternateExchange() {
29-
return string(ALTERNATE_EXCHANGE_KEY)
30-
.map(aeName -> new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, aeName));
34+
return getParameterAsExchangePointer.apply(ALTERNATE_EXCHANGE_KEY, arguments);
3135
}
3236

3337
public Optional<ReceiverPointer> getDeadLetterExchange() {
34-
return string(DEAD_LETTER_EXCHANGE_KEY)
35-
.map(aeName -> new ReceiverPointer(ReceiverPointer.Type.EXCHANGE, aeName));
38+
return getParameterAsExchangePointer.apply(DEAD_LETTER_EXCHANGE_KEY, arguments);
3639
}
3740

3841
public Optional<String> getDeadLetterRoutingKey() {
39-
return string(DEAD_LETTER_ROUTING_KEY_KEY);
42+
return getParameterAsString.apply(DEAD_LETTER_ROUTING_KEY_KEY, arguments);
4043
}
4144

4245
public Optional<Integer> queueLengthLimit() {
43-
return positiveInteger(QUEUE_MAX_LENGTH_KEY);
46+
return getParameterAsPositiveInteger.apply(QUEUE_MAX_LENGTH_KEY, arguments);
4447
}
4548

4649
public Optional<Integer> queueLengthBytesLimit() {
47-
return positiveInteger(QUEUE_MAX_LENGTH_BYTES_KEY);
50+
return getParameterAsPositiveInteger.apply(QUEUE_MAX_LENGTH_BYTES_KEY, arguments);
4851
}
4952

5053
public Overflow overflow() {
51-
return string(OVERFLOW_KEY)
54+
return getParameterAsString.apply(OVERFLOW_KEY, arguments)
5255
.flatMap(Overflow::parse)
5356
.orElse(Overflow.DROP_HEAD);
5457
}
5558

5659
public Optional<Long> getMessageTtlOfQueue() {
57-
return Optional.ofNullable(arguments.get(MESSAGE_TTL_KEY))
58-
.filter(aeObject -> aeObject instanceof Number)
59-
.map(Number.class::cast)
60-
.map(number -> number.longValue());
60+
return getParameterAsPositiveLong.apply(MESSAGE_TTL_KEY, arguments);
6161
}
6262

6363
public Optional<Short> queueMaxPriority() {
64-
return positiveInteger(MAX_PRIORITY_KEY)
65-
.filter(i -> i < 256)
66-
.map(Integer::shortValue);
67-
}
68-
69-
private Optional<Integer> positiveInteger(String key) {
70-
return Optional.ofNullable(arguments.get(key))
71-
.filter(aeObject -> aeObject instanceof Number)
72-
.map(Number.class::cast)
73-
.map(num -> num.intValue())
74-
.filter(i -> i > 0);
75-
}
76-
77-
private Optional<String> string(String key) {
78-
return Optional.ofNullable(arguments.get(key))
79-
.filter(aeObject -> aeObject instanceof String)
80-
.map(String.class::cast);
64+
return getParameterAsPositiveShort.apply(MAX_PRIORITY_KEY, arguments);
8165
}
8266

8367
public enum Overflow {

src/main/java/com/github/fridujo/rabbitmq/mock/MockConnectionFactory.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,22 @@
22

33
import com.github.fridujo.rabbitmq.mock.metrics.MetricsCollectorWrapper;
44
import com.rabbitmq.client.AddressResolver;
5-
import com.rabbitmq.client.ConnectionFactory;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
67

8+
import java.util.Collection;
9+
import java.util.HashMap;
10+
import java.util.HashSet;
711
import java.util.concurrent.ExecutorService;
812

13+
import static java.lang.String.format;
14+
915
public class MockConnectionFactory extends ConfigurableConnectionFactory<MockConnectionFactory> {
1016

17+
private static final Logger LOGGER = LoggerFactory.getLogger(MockConnectionFactory.class);
18+
19+
private HashMap<String, MockPolicy> policies = new HashMap<>();
20+
1121
public MockConnectionFactory() {
1222
setAutomaticRecoveryEnabled(false);
1323
}
@@ -22,4 +32,22 @@ public MockConnection newConnection() {
2232
MockConnection mockConnection = new MockConnection(mockNode, metricsCollectorWrapper);
2333
return mockConnection;
2434
}
35+
36+
public void setPolicy(MockPolicy policy) {
37+
policies.put(policy.getName(), policy);
38+
mockNode.applyPolicies(new HashSet(policies.values()));
39+
}
40+
41+
public void deletePolicy(String policyName) {
42+
if(policies.remove(policyName) == null) {
43+
LOGGER.error(format("Error deleting, policy with name %s was not found", policyName));
44+
} else {
45+
mockNode.applyPolicies(new HashSet(policies.values()));
46+
}
47+
}
48+
49+
public Collection<MockPolicy> listPolicies() {
50+
return policies.values();
51+
}
52+
2553
}

src/main/java/com/github/fridujo/rabbitmq/mock/MockNode.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package com.github.fridujo.rabbitmq.mock;
22

3+
import java.util.Collection;
34
import java.util.Map;
45
import java.util.Optional;
6+
import java.util.Set;
57
import java.util.concurrent.ConcurrentHashMap;
8+
import java.util.function.Function;
69
import java.util.function.Supplier;
710

811
import com.rabbitmq.client.AMQP;
@@ -189,4 +192,20 @@ public void close(MockConnection mockConnection) {
189192
public Configuration getConfiguration() {
190193
return configuration;
191194
}
195+
196+
public void applyPolicies(Set<MockPolicy> policies) {
197+
applyPolicyToReceivers(policies, exchanges.values());
198+
applyPolicyToReceivers(policies, queues.values());
199+
}
200+
201+
private <T extends Receiver> void applyPolicyToReceivers(Set<MockPolicy> policies, Collection<T> receivers) {
202+
Function<T, Optional<MockPolicy>> calculateHighestPriorityPolicy = r -> policies.stream()
203+
.sorted(MockPolicy.comparator)
204+
.filter(p -> p.receiverMatchesPolicyPattern.test(r))
205+
.findFirst();
206+
207+
receivers.stream()
208+
.filter(r -> !MockDefaultExchange.class.isInstance(r))
209+
.forEach(r -> r.setPolicy(calculateHighestPriorityPolicy.apply(r)));
210+
}
192211
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.github.fridujo.rabbitmq.mock;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
import lombok.NonNull;
6+
import lombok.Singular;
7+
import lombok.ToString;
8+
9+
import java.util.Comparator;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.Optional;
13+
import java.util.function.Predicate;
14+
15+
import static com.github.fridujo.rabbitmq.mock.MockPolicy.ApplyTo.ALL;
16+
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsExchangePointer;
17+
import static com.github.fridujo.rabbitmq.mock.tool.ParameterMarshaller.getParameterAsString;
18+
import static java.util.Arrays.asList;
19+
import static java.util.Collections.singletonList;
20+
21+
@Getter
22+
@ToString
23+
public class MockPolicy {
24+
public static final String ALTERNATE_EXCHANGE = "alternate-exchange";
25+
public static final String DEAD_LETTER_EXCHANGE = "dead-letter-exchange";
26+
public static final String DEAD_LETTER_ROUTING_KEY = "dead-letter-routing-key";
27+
28+
private String name;
29+
private String pattern;
30+
private Integer priority;
31+
private Map<String, Object> definitions;
32+
private ApplyTo applyTo;
33+
34+
static final Comparator<MockPolicy> comparator = Comparator.comparing(MockPolicy::getPriority).reversed();
35+
36+
final Predicate<Receiver> receiverMatchesPolicyPattern =
37+
r -> r.pointer().name.matches(this.pattern) && this.applyTo.matches(r) ;
38+
39+
@Builder(toBuilder = true)
40+
public MockPolicy(@NonNull String name, @NonNull String pattern, @NonNull @Singular Map<String, Object> definitions,
41+
Integer priority, ApplyTo applyTo) {
42+
this.name = name;
43+
this.pattern = pattern;
44+
this.definitions = definitions;
45+
this.priority = priority == null ? 0 : priority;
46+
this.applyTo = applyTo == null ? ALL : applyTo;
47+
}
48+
49+
public Optional<ReceiverPointer> getAlternateExchange() {
50+
return getParameterAsExchangePointer.apply(ALTERNATE_EXCHANGE, definitions);
51+
}
52+
53+
public Optional<ReceiverPointer> getDeadLetterExchange() {
54+
return getParameterAsExchangePointer.apply(DEAD_LETTER_EXCHANGE, definitions);
55+
}
56+
57+
public Optional<String> getDeadLetterRoutingKey() {
58+
return getParameterAsString.apply(DEAD_LETTER_ROUTING_KEY, definitions);
59+
}
60+
61+
public enum ApplyTo {
62+
ALL(asList(ReceiverPointer.Type.QUEUE, ReceiverPointer.Type.EXCHANGE)),
63+
EXCHANGE(singletonList(ReceiverPointer.Type.EXCHANGE)),
64+
QUEUE(singletonList(ReceiverPointer.Type.QUEUE));
65+
66+
private List<ReceiverPointer.Type> supportedTypes;
67+
68+
ApplyTo(List<ReceiverPointer.Type> supportTypes) {
69+
this.supportedTypes = supportTypes;
70+
}
71+
72+
public boolean matches(Receiver r) {
73+
return supportedTypes.contains(r.pointer().type);
74+
}
75+
}
76+
}
77+

src/main/java/com/github/fridujo/rabbitmq/mock/MockQueue.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static com.github.fridujo.rabbitmq.mock.tool.Exceptions.runAndEatExceptions;
44
import static com.github.fridujo.rabbitmq.mock.tool.Exceptions.runAndTransformExceptions;
5+
import static java.lang.String.format;
56

67
import java.io.IOException;
78
import java.time.Instant;
@@ -50,6 +51,7 @@ public class MockQueue implements Receiver {
5051
private final Map<Long, Message> unackedMessagesByDeliveryTag = new LinkedHashMap<>();
5152
private final AtomicBoolean running = new AtomicBoolean(true);
5253
private final Map<String, Set<Long>> unackedDeliveryTagsByConsumerTag = new LinkedHashMap<>();
54+
private Optional<MockPolicy> mockPolicy = Optional.empty();
5355

5456
public MockQueue(String name, AmqArguments arguments, ReceiverRegistry receiverRegistry, MockChannel mockChannel) {
5557
this.name = name;
@@ -377,15 +379,23 @@ public String toString() {
377379
}
378380

379381
private void deadLetterWithReason(Message message, DeadLettering.ReasonType reason) {
382+
383+
String routingKey = arguments.getDeadLetterRoutingKey()
384+
.map(Optional::of)
385+
.orElse(mockPolicy.flatMap(MockPolicy::getDeadLetterRoutingKey))
386+
.orElse(message.routingKey);
387+
380388
arguments.getDeadLetterExchange()
389+
.map(Optional::of)
390+
.orElse(mockPolicy.flatMap(MockPolicy::getDeadLetterExchange))
381391
.flatMap(receiverRegistry::getReceiver)
382392
.ifPresent(deadLetterExchange -> {
383393
LOGGER.debug(localized("dead-lettered to " + deadLetterExchange + ": " + message));
384394
DeadLettering.Event event = new DeadLettering.Event(name, reason, message, 1);
385395
BasicProperties props = event.prependOn(message.props);
386396
deadLetterExchange.publish(
387397
message.exchangeName,
388-
arguments.getDeadLetterRoutingKey().orElse(message.routingKey),
398+
routingKey,
389399
props,
390400
message.body);
391401
}
@@ -400,6 +410,11 @@ public List<Message> getUnackedMessages() {
400410
return new ArrayList<>(unackedMessagesByDeliveryTag.values());
401411
}
402412

413+
public void setPolicy(Optional<MockPolicy> mockPolicy) {
414+
mockPolicy.ifPresent(p -> LOGGER.info(localized(format("Applied policy %s", p))));
415+
this.mockPolicy = mockPolicy;
416+
}
417+
403418
static class ConsumerAndTag {
404419

405420
private final String tag;

src/main/java/com/github/fridujo/rabbitmq/mock/Receiver.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.rabbitmq.client.AMQP;
44

5+
import java.util.Optional;
6+
57
/**
68
* Leverage the receiving capability of both Queues and Exchanges.
79
*/
@@ -14,4 +16,6 @@ public interface Receiver {
1416
boolean publish(String exchangeName, String routingKey, AMQP.BasicProperties props, byte[] body);
1517

1618
ReceiverPointer pointer();
19+
20+
void setPolicy(Optional<MockPolicy> policy);
1721
}

src/main/java/com/github/fridujo/rabbitmq/mock/exchange/BindableMockExchange.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
package com.github.fridujo.rabbitmq.mock.exchange;
22

3+
import com.github.fridujo.rabbitmq.mock.AmqArguments;
4+
import com.github.fridujo.rabbitmq.mock.MockPolicy;
5+
import com.github.fridujo.rabbitmq.mock.Receiver;
6+
import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
7+
import com.github.fridujo.rabbitmq.mock.ReceiverRegistry;
8+
import com.rabbitmq.client.AMQP;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
312
import java.util.Collections;
413
import java.util.LinkedHashSet;
514
import java.util.Map;
@@ -9,25 +18,18 @@
918
import java.util.stream.Collectors;
1019
import java.util.stream.Stream;
1120

12-
import org.slf4j.Logger;
13-
import org.slf4j.LoggerFactory;
14-
15-
import com.github.fridujo.rabbitmq.mock.AmqArguments;
16-
import com.github.fridujo.rabbitmq.mock.MockQueue;
17-
import com.github.fridujo.rabbitmq.mock.Receiver;
18-
import com.github.fridujo.rabbitmq.mock.ReceiverPointer;
19-
import com.github.fridujo.rabbitmq.mock.ReceiverRegistry;
20-
import com.rabbitmq.client.AMQP;
21+
import static java.lang.String.format;
2122

2223
public abstract class BindableMockExchange implements MockExchange {
23-
private static final Logger LOGGER = LoggerFactory.getLogger(MockQueue.class);
24+
private static final Logger LOGGER = LoggerFactory.getLogger(BindableMockExchange.class);
2425

2526
protected final Set<BindConfiguration> bindConfigurations = new LinkedHashSet<>();
2627
private final String name;
2728
private final String type;
2829
private final AmqArguments arguments;
2930
private final ReceiverPointer pointer;
3031
private final ReceiverRegistry receiverRegistry;
32+
private Optional<MockPolicy> mockPolicy = Optional.empty();
3133

3234
protected BindableMockExchange(String name, String type, AmqArguments arguments, ReceiverRegistry receiverRegistry) {
3335
this.name = name;
@@ -66,7 +68,12 @@ public boolean publish(String previousExchangeName, String routingKey, AMQP.Basi
6668
}
6769

6870
private Optional<Receiver> getAlternateExchange() {
69-
return arguments.getAlternateExchange().flatMap(receiverRegistry::getReceiver);
71+
Optional<ReceiverPointer> policyAlternativeExchange = mockPolicy.flatMap(MockPolicy::getAlternateExchange);
72+
Optional<ReceiverPointer> exchangeArgumentAlternativeExchange = arguments.getAlternateExchange();
73+
74+
return exchangeArgumentAlternativeExchange.map(Optional::of)
75+
.orElse(policyAlternativeExchange)
76+
.flatMap(receiverRegistry::getReceiver);
7077
}
7178

7279
protected abstract Stream<ReceiverPointer> matchingReceivers(String routingKey, AMQP.BasicProperties props);
@@ -85,6 +92,12 @@ public void unbind(ReceiverPointer receiver, String routingKey) {
8592
bindConfigurations.remove(new BindConfiguration(routingKey, receiver, Collections.emptyMap()));
8693
}
8794

95+
@Override
96+
public void setPolicy(Optional<MockPolicy> mockPolicy) {
97+
mockPolicy.ifPresent(p -> LOGGER.info(localized(format("Applied policy %s", p))));
98+
this.mockPolicy = mockPolicy;
99+
}
100+
88101
@Override
89102
public ReceiverPointer pointer() {
90103
return pointer;

0 commit comments

Comments
 (0)