diff --git a/.ci/ubuntu/cluster/gha-setup.sh b/.ci/ubuntu/cluster/gha-setup.sh
index a03ffb4..d47e69b 100755
--- a/.ci/ubuntu/cluster/gha-setup.sh
+++ b/.ci/ubuntu/cluster/gha-setup.sh
@@ -19,7 +19,7 @@ function run_docker_compose
docker compose --file "$script_dir/docker-compose.yml" $@
}
-readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}"
+readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}"
if [[ ! -v GITHUB_ACTIONS ]]
then
diff --git a/.ci/ubuntu/cluster/rmq/21-enable-management-collector.conf b/.ci/ubuntu/cluster/rmq/21-enable-management-collector.conf
index 913a49a..26710ae 100644
--- a/.ci/ubuntu/cluster/rmq/21-enable-management-collector.conf
+++ b/.ci/ubuntu/cluster/rmq/21-enable-management-collector.conf
@@ -1,3 +1,3 @@
-# This file might be required to test with images built from main.
+# This file is required to test with images built from main.
# Images in pivotalrabbitmq/rabbitmq disable the metrics collector by default.
management_agent.disable_metrics_collector = false
\ No newline at end of file
diff --git a/.ci/ubuntu/one-node/gha-setup.sh b/.ci/ubuntu/one-node/gha-setup.sh
index a4c7efa..92bd04a 100755
--- a/.ci/ubuntu/one-node/gha-setup.sh
+++ b/.ci/ubuntu/one-node/gha-setup.sh
@@ -9,7 +9,7 @@ readonly script_dir
echo "[INFO] script_dir: '$script_dir'"
-readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}"
+readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}"
diff --git a/RabbitMQ.AMQP.Client/Consts.cs b/RabbitMQ.AMQP.Client/Consts.cs
index 25b4a8e..9a89a28 100644
--- a/RabbitMQ.AMQP.Client/Consts.cs
+++ b/RabbitMQ.AMQP.Client/Consts.cs
@@ -2,6 +2,8 @@
// and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+using Amqp.Types;
+
namespace RabbitMQ.AMQP.Client
{
public static class Consts
@@ -21,5 +23,24 @@ public static class Consts
/// The default virtual host, /
///
public const string DefaultVirtualHost = "/";
+
+ // amqp:sql-filter
+ private const string AmqpSqlFilter = "amqp:sql-filter";
+ internal static readonly Symbol s_streamSqlFilterSymbol = new(AmqpSqlFilter);
+ internal const string SqlFilter = "sql-filter";
+ internal static readonly Symbol s_sqlFilterSymbol = new(SqlFilter);
+
+ internal const string AmqpPropertiesFilter = "amqp:properties-filter";
+ internal const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter";
+
+ // sql-filter
+ private const string RmqStreamFilter = "rabbitmq:stream-filter";
+ private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec";
+ private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered";
+
+ internal static readonly Symbol s_streamFilterSymbol = new(RmqStreamFilter);
+ internal static readonly Symbol s_streamOffsetSpecSymbol = new(RmqStreamOffsetSpec);
+ internal static readonly Symbol s_streamMatchUnfilteredSymbol = new(RmqStreamMatchUnfiltered);
+
}
}
diff --git a/RabbitMQ.AMQP.Client/FeatureFlags.cs b/RabbitMQ.AMQP.Client/FeatureFlags.cs
index 7adbcb2..7ba4312 100644
--- a/RabbitMQ.AMQP.Client/FeatureFlags.cs
+++ b/RabbitMQ.AMQP.Client/FeatureFlags.cs
@@ -4,10 +4,25 @@
namespace RabbitMQ.AMQP.Client
{
- internal class FeatureFlags
+ public class FeatureFlags
{
- public bool IsSqlFeatureEnabled { get; set; } = false;
- public bool IsBrokerCompatible { get; set; } = false;
+
+ ///
+ /// Check if filter feature is enabled.
+ /// Filter feature is available in RabbitMQ 4.1 and later.
+ ///
+ public bool IsFilterFeatureEnabled { get; internal set; } = false;
+
+ ///
+ /// Check if Sql feature is enabled.
+ /// Sql feature is available in RabbitMQ 4.2 and later.
+ ///
+ public bool IsSqlFeatureEnabled { get; internal set; } = false;
+ ///
+ /// Check if the client is compatible with the broker version.
+ /// The client requires RabbitMQ 4.0 or later to be compatible.
+ ///
+ public bool IsBrokerCompatible { get; internal set; } = false;
public void Validate()
{
diff --git a/RabbitMQ.AMQP.Client/IConsumerBuilder.cs b/RabbitMQ.AMQP.Client/IConsumerBuilder.cs
index 31f00cf..78bb1ac 100644
--- a/RabbitMQ.AMQP.Client/IConsumerBuilder.cs
+++ b/RabbitMQ.AMQP.Client/IConsumerBuilder.cs
@@ -194,6 +194,14 @@ public interface IStreamFilterOptions
///
IStreamFilterOptions PropertySymbol(string key, string value);
+ ///
+ /// SQL filter expression.
+ ///
+ ///
+ /// Requires RabbitMQ 4.2 or more.
+ /// Documentation: SQL Filtering
+ IStreamFilterOptions Sql(string sql);
+
///
/// Return the stream options.
///
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
index 5d1d856..0222020 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
@@ -39,8 +39,7 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
private readonly IMetricsReporter? _metricsReporter;
private readonly Dictionary _connectionProperties = new();
- private bool _areFilterExpressionsSupported = false;
- private FeatureFlags _featureFlags = new FeatureFlags();
+ internal readonly FeatureFlags _featureFlags = new FeatureFlags();
///
/// _publishersDict contains all the publishers created by the connection.
@@ -261,8 +260,6 @@ protected override void Dispose(bool disposing)
internal Connection? NativeConnection => _nativeConnection;
- internal bool AreFilterExpressionsSupported => _areFilterExpressionsSupported;
-
// TODO this couples AmqpConnection with AmqpPublisher, yuck
internal void AddPublisher(Guid id, IPublisher consumer)
{
@@ -674,7 +671,7 @@ private void HandleProperties(Fields properties)
// this is a feature that was introduced in RabbitMQ 4.2.0
_featureFlags.IsSqlFeatureEnabled = Utils.Is4_2_OrMore(brokerVersion);
- _areFilterExpressionsSupported = Utils.SupportsFilterExpressions(brokerVersion);
+ _featureFlags.IsFilterFeatureEnabled = Utils.SupportsFilterExpressions(brokerVersion);
}
}
}
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
index 10330ac..39ebcaa 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
@@ -58,7 +58,7 @@ public override async Task OpenAsync()
// ListenerContext will override only the filters the selected filters.
if (_configuration.ListenerContext is not null)
{
- var listenerStreamOptions = new ListenerStreamOptions(_configuration.Filters, _amqpConnection.AreFilterExpressionsSupported);
+ var listenerStreamOptions = new ListenerStreamOptions(_configuration.Filters);
var listenerContext = new IConsumerBuilder.ListenerContext(listenerStreamOptions);
_configuration.ListenerContext(listenerContext);
}
@@ -88,6 +88,12 @@ void OnAttached(ILink argLink, Attach argAttach)
// TODO configurable timeout
var waitSpan = TimeSpan.FromSeconds(5);
+
+ // TODO
+ // Even 10ms is enough to allow the links to establish,
+ // which tells me it allows the .NET runtime to process
+ await Task.Delay(10).ConfigureAwait(false);
+
_receiverLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
.ConfigureAwait(false);
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
index af0c637..1be732e 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
@@ -18,9 +18,12 @@ internal sealed class ConsumerConfiguration
{
public string Address { get; set; } = "";
public int InitialCredits { get; set; } = 100; // TODO use constant, check with Java lib
+
public Map Filters { get; set; } = new();
+
// TODO is a MessageHandler *really* optional???
public MessageHandler? Handler { get; set; }
+
// TODO re-name to ListenerContextAction? Callback?
public Action? ListenerContext = null;
}
@@ -73,8 +76,7 @@ public IConsumerBuilder SubscriptionListener(Action BuildAndStartAsync(CancellationToken cancellationToken = default)
@@ -84,6 +86,13 @@ public async Task BuildAndStartAsync(CancellationToken cancellationTo
throw new ConsumerException("Message handler is not set");
}
+ if (_configuration.Filters[Consts.s_sqlFilterSymbol] is not null &&
+ _amqpConnection._featureFlags.IsSqlFeatureEnabled == false)
+ {
+ throw new ConsumerException("SQL filter is not supported by the connection." +
+ "RabbitMQ 4.2.0 or later is required.");
+ }
+
AmqpConsumer consumer = new(_amqpConnection, _configuration, _metricsReporter);
// TODO pass cancellationToken
@@ -104,27 +113,17 @@ public abstract class StreamOptions : IConsumerBuilder.IStreamOptions
private static readonly Regex s_offsetValidator = new Regex("^[0-9]+[YMDhms]$",
RegexOptions.Compiled | RegexOptions.CultureInvariant);
- private const string RmqStreamFilter = "rabbitmq:stream-filter";
- private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec";
- private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered";
-
- private static readonly Symbol s_streamFilterSymbol = new(RmqStreamFilter);
- private static readonly Symbol s_streamOffsetSpecSymbol = new(RmqStreamOffsetSpec);
- private static readonly Symbol s_streamMatchUnfilteredSymbol = new(RmqStreamMatchUnfiltered);
-
private readonly Map _filters;
- private readonly bool _areFilterExpressionsSupported;
- protected StreamOptions(Map filters, bool areFilterExpressionsSupported)
+ protected StreamOptions(Map filters)
{
_filters = filters;
- _areFilterExpressionsSupported = areFilterExpressionsSupported;
}
public IConsumerBuilder.IStreamOptions Offset(long offset)
{
- _filters[s_streamOffsetSpecSymbol] =
- new DescribedValue(s_streamOffsetSpecSymbol, offset);
+ _filters[Consts.s_streamOffsetSpecSymbol] =
+ new DescribedValue(Consts.s_streamOffsetSpecSymbol, offset);
return this;
}
@@ -158,15 +157,15 @@ public IConsumerBuilder.IStreamOptions Offset(string interval)
public IConsumerBuilder.IStreamOptions FilterValues(params string[] values)
{
- _filters[s_streamFilterSymbol] =
- new DescribedValue(s_streamFilterSymbol, values.ToList());
+ _filters[Consts.s_streamFilterSymbol] =
+ new DescribedValue(Consts.s_streamFilterSymbol, values.ToList());
return this;
}
public IConsumerBuilder.IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered)
{
- _filters[s_streamMatchUnfilteredSymbol]
- = new DescribedValue(s_streamMatchUnfilteredSymbol, matchUnfiltered);
+ _filters[Consts.s_streamMatchUnfilteredSymbol]
+ = new DescribedValue(Consts.s_streamMatchUnfilteredSymbol, matchUnfiltered);
return this;
}
@@ -174,8 +173,8 @@ public IConsumerBuilder.IStreamOptions FilterMatchUnfiltered(bool matchUnfiltere
private void SetOffsetSpecificationFilter(object value)
{
- _filters[s_streamOffsetSpecSymbol]
- = new DescribedValue(s_streamOffsetSpecSymbol, value);
+ _filters[Consts.s_streamOffsetSpecSymbol]
+ = new DescribedValue(Consts.s_streamOffsetSpecSymbol, value);
}
public IConsumerBuilder.IStreamFilterOptions Filter()
@@ -198,8 +197,8 @@ public IConsumerBuilder.IStreamFilterOptions Filter()
///
public class ListenerStreamOptions : StreamOptions
{
- public ListenerStreamOptions(Map filters, bool areFilterExpressionsSupported)
- : base(filters, areFilterExpressionsSupported)
+ public ListenerStreamOptions(Map filters)
+ : base(filters)
{
}
@@ -221,8 +220,8 @@ public class ConsumerBuilderStreamOptions : StreamOptions
private readonly IConsumerBuilder _consumerBuilder;
public ConsumerBuilderStreamOptions(IConsumerBuilder consumerBuilder,
- Map filters, bool areFilterExpressionsSupported)
- : base(filters, areFilterExpressionsSupported)
+ Map filters)
+ : base(filters)
{
_consumerBuilder = consumerBuilder;
}
@@ -239,8 +238,8 @@ public override IConsumerBuilder Builder()
///
public class StreamFilterOptions : IConsumerBuilder.IStreamFilterOptions
{
- private IConsumerBuilder.IStreamOptions _streamOptions;
- private Map _filters;
+ private readonly IConsumerBuilder.IStreamOptions _streamOptions;
+ private readonly Map _filters;
public StreamFilterOptions(IConsumerBuilder.IStreamOptions streamOptions, Map filters)
{
@@ -248,6 +247,18 @@ public StreamFilterOptions(IConsumerBuilder.IStreamOptions streamOptions, Map fi
_filters = filters;
}
+ public IConsumerBuilder.IStreamFilterOptions Sql(string sql)
+ {
+ if (string.IsNullOrWhiteSpace(sql))
+ {
+ throw new ArgumentNullException(nameof(sql));
+ }
+
+ _filters[Consts.s_sqlFilterSymbol] =
+ new DescribedValue(Consts.s_streamSqlFilterSymbol, sql);
+ return this;
+ }
+
public IConsumerBuilder.IStreamOptions Stream()
{
return _streamOptions;
@@ -300,9 +311,8 @@ public IConsumerBuilder.IStreamFilterOptions PropertySymbol(string key, string v
private StreamFilterOptions PropertyFilter(string propertyKey, object propertyValue)
{
- const string AmqpPropertiesFilter = "amqp:properties-filter";
- DescribedValue propertiesFilterValue = Filter(AmqpPropertiesFilter);
+ DescribedValue propertiesFilterValue = Filter(Consts.AmqpPropertiesFilter);
Map propertiesFilter = (Map)propertiesFilterValue.Value;
// Note: you MUST use a symbol as the key
propertiesFilter.Add(new Symbol(propertyKey), propertyValue);
@@ -311,9 +321,8 @@ private StreamFilterOptions PropertyFilter(string propertyKey, object propertyVa
private StreamFilterOptions ApplicationPropertyFilter(string propertyKey, object propertyValue)
{
- const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter";
- DescribedValue applicationPropertiesFilterValue = Filter(AmqpApplicationPropertiesFilter);
+ DescribedValue applicationPropertiesFilterValue = Filter(Consts.AmqpApplicationPropertiesFilter);
Map applicationPropertiesFilter = (Map)applicationPropertiesFilterValue.Value;
// Note: do NOT put a symbol as the key
applicationPropertiesFilter.Add(propertyKey, propertyValue);
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
index 405e8de..e8c47f0 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
@@ -67,6 +67,12 @@ void OnAttached(ILink argLink, Attach argAttach)
// TODO configurable timeout
var waitSpan = TimeSpan.FromSeconds(5);
+ // TODO
+ // Even 10ms is enough to allow the links to establish,
+ // which tells me it allows the .NET runtime to process
+ await Task.Delay(10)
+ .ConfigureAwait(false);
+
_senderLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
.ConfigureAwait(false);
diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
index 4850878..113932d 100644
--- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
+++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
@@ -113,6 +113,12 @@ RabbitMQ.AMQP.Client.ExchangeType.DIRECT = 0 -> RabbitMQ.AMQP.Client.ExchangeTyp
RabbitMQ.AMQP.Client.ExchangeType.FANOUT = 1 -> RabbitMQ.AMQP.Client.ExchangeType
RabbitMQ.AMQP.Client.ExchangeType.HEADERS = 3 -> RabbitMQ.AMQP.Client.ExchangeType
RabbitMQ.AMQP.Client.ExchangeType.TOPIC = 2 -> RabbitMQ.AMQP.Client.ExchangeType
+RabbitMQ.AMQP.Client.FeatureFlags
+RabbitMQ.AMQP.Client.FeatureFlags.FeatureFlags() -> void
+RabbitMQ.AMQP.Client.FeatureFlags.IsBrokerCompatible.get -> bool
+RabbitMQ.AMQP.Client.FeatureFlags.IsFilterFeatureEnabled.get -> bool
+RabbitMQ.AMQP.Client.FeatureFlags.IsSqlFeatureEnabled.get -> bool
+RabbitMQ.AMQP.Client.FeatureFlags.Validate() -> void
RabbitMQ.AMQP.Client.IAddressBuilder
RabbitMQ.AMQP.Client.IAddressBuilder.Exchange(RabbitMQ.AMQP.Client.IExchangeSpecification! exchangeSpec) -> T
RabbitMQ.AMQP.Client.IAddressBuilder.Exchange(string! exchangeName) -> T
@@ -182,6 +188,7 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Property(string! key,
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.PropertySymbol(string! key, string! value) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.ReplyTo(string! replyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.ReplyToGroupId(string! groupId) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
+RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Subject(string! subject) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.To(string! to) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
@@ -533,7 +540,7 @@ RabbitMQ.AMQP.Client.Impl.BindingSpecification._routingKey -> string!
RabbitMQ.AMQP.Client.Impl.BindingSpecification._sourceName -> string!
RabbitMQ.AMQP.Client.Impl.BindingSpecification._toQueue -> bool
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions
-RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters, bool areFilterExpressionsSupported) -> void
+RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters) -> void
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Address() -> string!
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.DefaultAddressBuilder() -> void
@@ -557,7 +564,7 @@ RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Type() -> RabbitMQ.AMQP.Client.QueueT
RabbitMQ.AMQP.Client.Impl.FieldNotSetException
RabbitMQ.AMQP.Client.Impl.FieldNotSetException.FieldNotSetException() -> void
RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions
-RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters, bool areFilterExpressionsSupported) -> void
+RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters) -> void
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.Build() -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.MessageAddressBuilder(RabbitMQ.AMQP.Client.IMessage! message) -> void
@@ -605,6 +612,7 @@ RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Property(string! key, object! valu
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.PropertySymbol(string! key, string! value) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.ReplyTo(string! replyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.ReplyToGroupId(string! groupId) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
+RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.StreamFilterOptions(RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! streamOptions, Amqp.Types.Map! filters) -> void
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Subject(string! subject) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
@@ -618,7 +626,7 @@ RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(long offset) -> RabbitMQ.AMQP.Cli
RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(RabbitMQ.AMQP.Client.StreamOffsetSpecification specification) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(string! interval) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(System.DateTime timestamp) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
-RabbitMQ.AMQP.Client.Impl.StreamOptions.StreamOptions(Amqp.Types.Map! filters, bool areFilterExpressionsSupported) -> void
+RabbitMQ.AMQP.Client.Impl.StreamOptions.StreamOptions(Amqp.Types.Map! filters) -> void
RabbitMQ.AMQP.Client.InternalBugException
RabbitMQ.AMQP.Client.InternalBugException.InternalBugException() -> void
RabbitMQ.AMQP.Client.InternalBugException.InternalBugException(string! message) -> void
diff --git a/Tests/Consumer/ConsumerSqlFilterTests.cs b/Tests/Consumer/ConsumerSqlFilterTests.cs
new file mode 100644
index 0000000..db8c4ba
--- /dev/null
+++ b/Tests/Consumer/ConsumerSqlFilterTests.cs
@@ -0,0 +1,121 @@
+// This source code is dual-licensed under the Apache License, version 2.0,
+// and the Mozilla Public License, version 2.0.
+// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using RabbitMQ.AMQP.Client;
+using RabbitMQ.AMQP.Client.Impl;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Tests.Consumer
+{
+ public class ConsumerSqlFilterTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
+ {
+ // This class is a placeholder for SQL filter tests.
+ // The actual implementation of SQL filter tests will depend on the specific requirements and context.
+ // For example, it could involve testing SQL queries against a mock database or validating SQL syntax.
+
+ // Example test method (to be implemented):
+ [SkippableFact]
+ [Trait("Category", "SqlFilter")]
+ public async Task TestSqlFilterFunctionalityAsync()
+ {
+ Assert.NotNull(_connection);
+ Assert.NotNull(_management);
+
+ // cast to AMQPConnection to use Skip.If
+ var amqpConnection = (_connection as AmqpConnection);
+ Skip.IfNot(amqpConnection is { _featureFlags.IsSqlFeatureEnabled: true },
+ "SQL filter is not supported by the connection.");
+
+ IQueueSpecification q = _management.Queue(_queueName).Stream().Queue();
+ await q.DeclareAsync();
+ TaskCompletionSource tcs =
+ new(TaskCreationOptions.RunContinuationsAsynchronously);
+ IConsumer consumer = await _connection.ConsumerBuilder()
+ .Queue(_queueName)
+ .Stream().Filter().Sql("properties.subject LIKE '%John%'").Stream()
+ .Offset(StreamOffsetSpecification.First)
+ .Builder().MessageHandler((IContext ctx, IMessage msg) =>
+ {
+ tcs.SetResult(msg);
+ // Here you would implement the logic to handle messages that match the SQL filter.
+ // For example, you could validate that the message content matches expected SQL criteria.
+ return Task.CompletedTask;
+ })
+ .BuildAndStartAsync();
+
+ IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync();
+
+ // var msgNotInTheFilter = new AmqpMessage("Test message for SQL filter")
+ // .Property("user_id", "Gas"); // This property should not match the SQL filter
+ var msgNotInTheFilter = new AmqpMessage("Test message for SQL filter, should not match")
+ .Subject("Gas"); // This property should not match the SQL filter
+ await publisher.PublishAsync(msgNotInTheFilter);
+ var msgInTheFilter = new AmqpMessage("Test message for SQL filter")
+ .Subject("John"); // This property should match the SQL filter
+ await publisher.PublishAsync(msgInTheFilter);
+ await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10));
+
+ Assert.Equal("Test message for SQL filter", tcs.Task.Result.BodyAsString());
+ Assert.Equal("John", tcs.Task.Result.Subject());
+ Assert.Equal("Test message for SQL filter", tcs.Task.Result.BodyAsString());
+ await consumer.CloseAsync();
+ await publisher.CloseAsync();
+ await q.DeleteAsync();
+ await _connection.CloseAsync();
+ }
+
+ [SkippableTheory]
+ [Trait("Category", "SqlFilter")]
+ [InlineData("myP", "John")]
+ [InlineData("myP", "Doe")]
+ [InlineData("user_id", "Alice")]
+ [InlineData("user_id", "Bob")]
+ public async Task TestSqlFilterFunctionalityAsyncValues(string property, string value)
+ {
+ Assert.NotNull(_connection);
+ Assert.NotNull(_management);
+
+ // cast to AMQPConnection to use Skip.If
+ var amqpConnection = (_connection as AmqpConnection);
+ Skip.IfNot(amqpConnection is { _featureFlags.IsSqlFeatureEnabled: true },
+ "SQL filter is not supported by the connection.");
+ IQueueSpecification q = _management.Queue(_queueName).Stream().Queue();
+ await q.DeclareAsync();
+ TaskCompletionSource tcs =
+ new(TaskCreationOptions.RunContinuationsAsynchronously);
+ IConsumer consumer = await _connection.ConsumerBuilder()
+ .Queue(_queueName)
+ .Stream().Filter().Sql($"{property} LIKE '%{value}'").Stream()
+ .Offset(StreamOffsetSpecification.First)
+ .Builder().MessageHandler((IContext ctx, IMessage msg) =>
+ {
+ tcs.SetResult(msg);
+ // Here you would implement the logic to handle messages that match the SQL filter.
+ // For example, you could validate that the message content matches expected SQL criteria.
+ return Task.CompletedTask;
+ })
+ .BuildAndStartAsync();
+
+ IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync();
+ await publisher.PublishAsync(new AmqpMessage($"NO")
+ .Property(property, "NO"));
+
+ await publisher.PublishAsync(new AmqpMessage($"with property_{property} value {value}")
+ .Property(property, value));
+
+ await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10));
+ Assert.Equal($"with property_{property} value {value}", tcs.Task.Result.BodyAsString());
+ Assert.Equal(value, tcs.Task.Result.Property(property));
+ await consumer.CloseAsync();
+ await publisher.CloseAsync();
+ await q.DeleteAsync();
+ await _connection.CloseAsync();
+
+ }
+ }
+}
diff --git a/Tests/Consumer/StreamConsumerTests.cs b/Tests/Consumer/StreamConsumerTests.cs
index b5fd8b4..c02eeea 100644
--- a/Tests/Consumer/StreamConsumerTests.cs
+++ b/Tests/Consumer/StreamConsumerTests.cs
@@ -964,6 +964,6 @@ public async Task FilterExpressionStringModifier()
private void SkipIfNoFilterExpressions()
{
- Skip.IfNot(_areFilterExpressionsSupported, "At least RabbitMQ 4.1.0 required");
+ Skip.IfNot(_featureFlags is { IsFilterFeatureEnabled: true }, "At least RabbitMQ 4.1.0 required");
}
}
diff --git a/Tests/IntegrationTest.cs b/Tests/IntegrationTest.cs
index bdb3129..0fe0518 100644
--- a/Tests/IntegrationTest.cs
+++ b/Tests/IntegrationTest.cs
@@ -38,7 +38,7 @@ public abstract partial class IntegrationTest : IAsyncLifetime
private readonly bool _setupConnectionAndManagement;
protected readonly ConnectionSettingsBuilder _connectionSettingBuilder;
- protected bool _areFilterExpressionsSupported = false;
+ protected FeatureFlags? _featureFlags = null;
public IntegrationTest(ITestOutputHelper testOutputHelper,
bool setupConnectionAndManagement = true)
@@ -73,7 +73,7 @@ public virtual async Task InitializeAsync()
_connection = await AmqpConnection.CreateAsync(_connectionSettings);
if (_connection is AmqpConnection amqpConnection)
{
- _areFilterExpressionsSupported = amqpConnection.AreFilterExpressionsSupported;
+ _featureFlags = amqpConnection._featureFlags;
}
_management = _connection.Management();
}
diff --git a/Tests/Management/ManagementTests.cs b/Tests/Management/ManagementTests.cs
index b590183..59b0be4 100644
--- a/Tests/Management/ManagementTests.cs
+++ b/Tests/Management/ManagementTests.cs
@@ -77,7 +77,8 @@ public async Task DeclareQueueWithQueueInfoValidation(
Assert.Equal(ClusterSize, queueInfo.Members().Count);
}
- Assert.NotNull(queueInfo.Leader());
+ // restore when https://github.com/rabbitmq/rabbitmq-server/pull/14438 will be merged
+ // Assert.NotNull(queueInfo.Leader());
Assert.Equal(queueInfo.Durable(), durable);
Assert.Equal(queueInfo.AutoDelete(), autoDelete);
Assert.Equal(queueInfo.Exclusive(), exclusive);
diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs
index 588d607..74df7ba 100644
--- a/Tests/Rpc/RpcServerTests.cs
+++ b/Tests/Rpc/RpcServerTests.cs
@@ -16,8 +16,8 @@ namespace Tests.Rpc
public class RpcServerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
{
private string _requestQueueName = string.Empty;
- private string _replyToName = $"queueReplyTo-{Now}-{Guid.NewGuid()}";
- private string _correlationId = $"my-correlation-id-{Guid.NewGuid()}";
+ private readonly string _replyToName = $"queueReplyTo-{Now}-{Guid.NewGuid()}";
+ private readonly string _correlationId = $"my-correlation-id-{Guid.NewGuid()}";
public override async Task InitializeAsync()
{