Skip to content

Commit 98f33ff

Browse files
committed
[AMQ-9692] Support destination gc sweep of destinations with only wildcard consumers
1 parent 2eb0d66 commit 98f33ff

File tree

3 files changed

+85
-34
lines changed

3 files changed

+85
-34
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public abstract class BaseDestination implements Destination {
105105
private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
106106
private boolean gcIfInactive;
107107
private boolean gcWithNetworkConsumers;
108-
private long lastActiveTime=0l;
108+
private boolean gcWithOnlyWildcardConsumers;
109+
private long lastActiveTime = 0L;
109110
private boolean reduceMemoryFootprint = false;
110111
protected final Scheduler scheduler;
111112
private boolean disposed = false;
@@ -311,12 +312,22 @@ public final MessageStore getMessageStore() {
311312

312313
@Override
313314
public boolean isActive() {
314-
boolean isActive = destinationStatistics.getConsumers().getCount() > 0 ||
315-
destinationStatistics.getProducers().getCount() > 0;
316-
if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) {
317-
isActive = hasRegularConsumers(getConsumers());
315+
var destinationActive = (destinationStatistics.getConsumers().getCount() > 0 &&
316+
destinationStatistics.getProducers().getCount() > 0);
317+
318+
if (destinationActive &&
319+
destinationStatistics.getConsumers().getCount() > 0) {
320+
321+
if (isGcWithNetworkConsumers()) {
322+
destinationActive = hasRegularConsumers(getConsumers());
323+
}
324+
325+
if (destinationActive &&
326+
isGcWithOnlyWildcardConsumers()) {
327+
destinationActive = !getConsumers().stream().allMatch(Subscription::isWildcard);
328+
}
318329
}
319-
return isActive;
330+
return destinationActive;
320331
}
321332

322333
@Override
@@ -824,19 +835,37 @@ public boolean isGcWithNetworkConsumers() {
824835
return gcWithNetworkConsumers;
825836
}
826837

838+
/**
839+
* Indicate if it is ok to gc destinations that have only wildcard consumers
840+
* @param gcWithOnlyWildcardConsumers
841+
*/
842+
public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) {
843+
this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers;
844+
}
845+
846+
public boolean isGcWithOnlyWildcardConsumers() {
847+
return gcWithOnlyWildcardConsumers;
848+
}
849+
827850
@Override
828851
public void markForGC(long timeStamp) {
829-
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
830-
&& destinationStatistics.getMessages().getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
852+
if (isGcIfInactive()
853+
&& this.lastActiveTime == 0
854+
&& destinationStatistics.getMessages().getCount() == 0
855+
&& getInactiveTimeoutBeforeGC() > 0L
856+
&& !isActive()) {
831857
this.lastActiveTime = timeStamp;
832858
}
833859
}
834860

835861
@Override
836862
public boolean canGC() {
837-
boolean result = false;
838-
final long currentLastActiveTime = this.lastActiveTime;
839-
if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.getMessages().getCount() == 0L ) {
863+
var result = false;
864+
final var currentLastActiveTime = this.lastActiveTime;
865+
if (isGcIfInactive()
866+
&& currentLastActiveTime != 0L
867+
&& destinationStatistics.getMessages().getCount() == 0L
868+
&& !isActive()) {
840869
if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) {
841870
result = true;
842871
}
@@ -904,6 +933,14 @@ protected boolean hasRegularConsumers(List<Subscription> consumers) {
904933
return hasRegularConsumers;
905934
}
906935

936+
protected boolean hasOnlyWildCardConsumers(List<Subscription> consumers) {
937+
boolean result = false;
938+
if (!consumers.isEmpty()) {
939+
result = consumers.stream().allMatch(Subscription::isWildcard);
940+
}
941+
return result;
942+
}
943+
907944
public ConnectionContext createConnectionContext() {
908945
ConnectionContext answer = new ConnectionContext();
909946
answer.setBroker(this.broker);

activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public class PolicyEntry extends DestinationMapEntry {
9999
private boolean prioritizedMessages;
100100
private boolean allConsumersExclusiveByDefault;
101101
private boolean gcInactiveDestinations;
102+
private boolean gcWithOnlyWildcardConsumers;
102103
private boolean gcWithNetworkConsumers;
103104
private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
104105
private boolean reduceMemoryFootprint;
@@ -263,6 +264,9 @@ public void baseUpdate(BaseDestination destination, Set<String> includedProperti
263264
if (isUpdate("gcInactiveDestinations", includedProperties)) {
264265
destination.setGcIfInactive(isGcInactiveDestinations());
265266
}
267+
if (isUpdate("gcWithOnlyWildcardConsumers", includedProperties)) {
268+
destination.setGcWithOnlyWildcardConsumers(isGcWithOnlyWildcardConsumers());
269+
}
266270
if (isUpdate("gcWithNetworkConsumers", includedProperties)) {
267271
destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
268272
}
@@ -1082,6 +1086,14 @@ public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
10821086
this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
10831087
}
10841088

1089+
public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) {
1090+
this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers;
1091+
}
1092+
1093+
public boolean isGcWithOnlyWildcardConsumers() {
1094+
return gcWithOnlyWildcardConsumers;
1095+
}
1096+
10851097
public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
10861098
this.gcWithNetworkConsumers = gcWithNetworkConsumers;
10871099
}

activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,7 @@ private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws
662662

663663
//initial config
664664
setAllDestPolicyProperties(entry, true, true, 10,
665-
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
665+
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
666666
30, true, true, true, true, true, true, true, true, true);
667667
setAllQueuePolicyProperties(entry, 10000, true, true, true, true, 100,
668668
100, true, true);
@@ -675,15 +675,15 @@ private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws
675675

676676
//validate config
677677
assertAllDestPolicyProperties(getQueue("Before"), true, true, 10,
678-
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
678+
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
679679
30, true, true, true,true, true, true, true, true, true);
680680
assertAllQueuePolicyProperties(getQueue("Before"), 10000, true, true, true, true, 100,
681681
100, true, true);
682682

683683

684684
//change config
685685
setAllDestPolicyProperties(entry, false, false, 100,
686-
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
686+
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
687687
300, false, false, false,false, false, false, false, false, false);
688688
setAllQueuePolicyProperties(entry, 100000, false, false, false, false, 1000,
689689
1000, false, false);
@@ -692,14 +692,14 @@ private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws
692692
TimeUnit.SECONDS.sleep(SLEEP);
693693

694694
assertAllDestPolicyProperties(getQueue("Before"), false, false, 100,
695-
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
695+
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
696696
300, false, false, false,false, false, false, false, false, false);
697697
assertAllQueuePolicyProperties(getQueue("Before"), 100000, false, false, false, false, 1000,
698698
1000, false, false);
699699

700700
//check new dest
701701
assertAllDestPolicyProperties(getQueue("After"), false, false, 100,
702-
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
702+
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
703703
300, false, false, false, false, false, false, false, false, false);
704704
assertAllQueuePolicyProperties(getQueue("After"), 100000, false, false, false, false, 1000,
705705
1000, false, false);
@@ -713,7 +713,7 @@ private void testAllTopicPropertiesAppliedFilter(Set<String> properties) throws
713713

714714
//initial config
715715
setAllDestPolicyProperties(entry, true, true, 10,
716-
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
716+
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
717717
30, true, true, true, true, true, true, true, true, true);
718718
setAllTopicPolicyProperties(entry, 10000, true);
719719

@@ -725,28 +725,28 @@ private void testAllTopicPropertiesAppliedFilter(Set<String> properties) throws
725725

726726
//validate config
727727
assertAllDestPolicyProperties(getTopic("Before"), true, true, 10,
728-
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
728+
100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true,
729729
30, true, true, true, true, true, true, true, true, true);
730730
assertAllTopicPolicyProperties(getTopic("Before"), 10000, true);
731731

732732

733733
//change config
734734
setAllDestPolicyProperties(entry, false, false, 100,
735-
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
735+
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
736736
300, false, false, false, false, false, false, false, false, false);
737737
setAllTopicPolicyProperties(entry, 100000, false);
738738

739739
javaConfigBroker.modifyPolicyEntry(entry, false, properties);
740740
TimeUnit.SECONDS.sleep(SLEEP);
741741

742742
assertAllDestPolicyProperties(getTopic("Before"), false, false, 100,
743-
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
743+
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
744744
300, false, false, false, false, false, false, false, false, false);
745745
assertAllTopicPolicyProperties(getTopic("Before"), 100000, false);
746746

747747
//check new dest
748748
assertAllDestPolicyProperties(getTopic("After"), false, false, 100,
749-
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
749+
1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false,
750750
300, false, false, false, false, false, false, false, false, false);
751751
assertAllTopicPolicyProperties(getTopic("After"), 100000, false);
752752
}
@@ -820,6 +820,7 @@ private Set<String> getDestPropertySet() {
820820
properties.add("cursorMemoryHighWaterMark");
821821
properties.add("storeUsageHighWaterMark");
822822
properties.add("gcInactiveDestinations");
823+
properties.add("gcWithOnlyWildcardConsumers");
823824
properties.add("gcWithNetworkConsumers");
824825
properties.add("inactiveTimeoutBeforeGC");
825826
properties.add("reduceMemoryFootprint");
@@ -862,12 +863,12 @@ private void setAllTopicPolicyProperties(PolicyEntry entry, long memoryLimit, bo
862863
private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowControl,
863864
boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize,
864865
int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark,
865-
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers,
866-
long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore,
867-
int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery,
868-
boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers,
869-
boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory,
870-
boolean sendAdvisoryIfNoConsumers) {
866+
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithOnlyWildcardConsumers,
867+
boolean gcWithNetworkConsumers, long inactiveTimeoutBeforeGC, boolean reduceMemoryFootprint,
868+
boolean doOptimizeMessageStore, int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed,
869+
boolean advisoryForDelivery, boolean advisoryForDispatched, boolean advisoryForDiscardingMessages,
870+
boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, boolean advisoryWhenFull,
871+
boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) {
871872

872873
entry.setProducerFlowControl(producerFlowControl);
873874
entry.setAlwaysRetroactive(alwaysRetroactive);
@@ -879,6 +880,7 @@ private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowC
879880
entry.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
880881
entry.setStoreUsageHighWaterMark(storeUsageHighWaterMark);
881882
entry.setGcInactiveDestinations(gcInactiveDestinations);
883+
entry.setGcWithOnlyWildcardConsumers(gcWithOnlyWildcardConsumers);
882884
entry.setGcWithNetworkConsumers(gcWithNetworkConsumers);
883885
entry.setInactiveTimeoutBeforeGC(inactiveTimeoutBeforeGC);
884886
entry.setReduceMemoryFootprint(reduceMemoryFootprint);
@@ -920,13 +922,12 @@ private void assertAllTopicPolicyProperties(Topic topic, long memoryLimit, boole
920922
private void assertAllDestPolicyProperties(BaseDestination dest, boolean producerFlowControl,
921923
boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize,
922924
int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark,
923-
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers,
924-
long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore,
925-
int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery,
926-
boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers,
927-
boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory,
928-
boolean sendAdvisoryIfNoConsumers) {
929-
925+
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithOnlyWildcardConsumers,
926+
boolean gcWithNetworkConsumers, long inactiveTimeoutBeforeGC, boolean reduceMemoryFootprint,
927+
boolean doOptimizeMessageStore, int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed,
928+
boolean advisoryForDelivery, boolean advisoryForDispatched, boolean advisoryForDiscardingMessages,
929+
boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, boolean advisoryWhenFull,
930+
boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) {
930931

931932
assertEquals(producerFlowControl, dest.isProducerFlowControl());
932933
assertEquals(alwaysRetroactive, dest.isAlwaysRetroactive());
@@ -938,6 +939,7 @@ private void assertAllDestPolicyProperties(BaseDestination dest, boolean produce
938939
assertEquals(cursorMemoryHighWaterMark, dest.getCursorMemoryHighWaterMark());
939940
assertEquals(storeUsageHighWaterMark, dest.getStoreUsageHighWaterMark());
940941
assertEquals(gcInactiveDestinations, dest.isGcIfInactive());
942+
assertEquals(gcWithOnlyWildcardConsumers, dest.isGcWithOnlyWildcardConsumers());
941943
assertEquals(gcWithNetworkConsumers, dest.isGcWithNetworkConsumers());
942944
assertEquals(inactiveTimeoutBeforeGC, dest.getInactiveTimeoutBeforeGC());
943945
assertEquals(reduceMemoryFootprint, dest.isReduceMemoryFootprint());

0 commit comments

Comments
 (0)