diff --git a/.ci/ubuntu/cluster/gha-setup.sh b/.ci/ubuntu/cluster/gha-setup.sh index a03ffb4..d47e69b 100755 --- a/.ci/ubuntu/cluster/gha-setup.sh +++ b/.ci/ubuntu/cluster/gha-setup.sh @@ -19,7 +19,7 @@ function run_docker_compose docker compose --file "$script_dir/docker-compose.yml" $@ } -readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}" +readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}" if [[ ! -v GITHUB_ACTIONS ]] then diff --git a/.ci/ubuntu/cluster/rmq/21-enable-management-collector.conf b/.ci/ubuntu/cluster/rmq/21-enable-management-collector.conf index 913a49a..26710ae 100644 --- a/.ci/ubuntu/cluster/rmq/21-enable-management-collector.conf +++ b/.ci/ubuntu/cluster/rmq/21-enable-management-collector.conf @@ -1,3 +1,3 @@ -# This file might be required to test with images built from main. +# This file is required to test with images built from main. # Images in pivotalrabbitmq/rabbitmq disable the metrics collector by default. management_agent.disable_metrics_collector = false \ No newline at end of file diff --git a/.ci/ubuntu/one-node/gha-setup.sh b/.ci/ubuntu/one-node/gha-setup.sh index a4c7efa..92bd04a 100755 --- a/.ci/ubuntu/one-node/gha-setup.sh +++ b/.ci/ubuntu/one-node/gha-setup.sh @@ -9,7 +9,7 @@ readonly script_dir echo "[INFO] script_dir: '$script_dir'" -readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}" +readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}" diff --git a/RabbitMQ.AMQP.Client/Consts.cs b/RabbitMQ.AMQP.Client/Consts.cs index 25b4a8e..9a89a28 100644 --- a/RabbitMQ.AMQP.Client/Consts.cs +++ b/RabbitMQ.AMQP.Client/Consts.cs @@ -2,6 +2,8 @@ // and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +using Amqp.Types; + namespace RabbitMQ.AMQP.Client { public static class Consts @@ -21,5 +23,24 @@ public static class Consts /// The default virtual host, / /// public const string DefaultVirtualHost = "/"; + + // amqp:sql-filter + private const string AmqpSqlFilter = "amqp:sql-filter"; + internal static readonly Symbol s_streamSqlFilterSymbol = new(AmqpSqlFilter); + internal const string SqlFilter = "sql-filter"; + internal static readonly Symbol s_sqlFilterSymbol = new(SqlFilter); + + internal const string AmqpPropertiesFilter = "amqp:properties-filter"; + internal const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter"; + + // sql-filter + private const string RmqStreamFilter = "rabbitmq:stream-filter"; + private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec"; + private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered"; + + internal static readonly Symbol s_streamFilterSymbol = new(RmqStreamFilter); + internal static readonly Symbol s_streamOffsetSpecSymbol = new(RmqStreamOffsetSpec); + internal static readonly Symbol s_streamMatchUnfilteredSymbol = new(RmqStreamMatchUnfiltered); + } } diff --git a/RabbitMQ.AMQP.Client/FeatureFlags.cs b/RabbitMQ.AMQP.Client/FeatureFlags.cs index 7adbcb2..7ba4312 100644 --- a/RabbitMQ.AMQP.Client/FeatureFlags.cs +++ b/RabbitMQ.AMQP.Client/FeatureFlags.cs @@ -4,10 +4,25 @@ namespace RabbitMQ.AMQP.Client { - internal class FeatureFlags + public class FeatureFlags { - public bool IsSqlFeatureEnabled { get; set; } = false; - public bool IsBrokerCompatible { get; set; } = false; + + /// + /// Check if filter feature is enabled. + /// Filter feature is available in RabbitMQ 4.1 and later. + /// + public bool IsFilterFeatureEnabled { get; internal set; } = false; + + /// + /// Check if Sql feature is enabled. + /// Sql feature is available in RabbitMQ 4.2 and later. + /// + public bool IsSqlFeatureEnabled { get; internal set; } = false; + /// + /// Check if the client is compatible with the broker version. + /// The client requires RabbitMQ 4.0 or later to be compatible. + /// + public bool IsBrokerCompatible { get; internal set; } = false; public void Validate() { diff --git a/RabbitMQ.AMQP.Client/IConsumerBuilder.cs b/RabbitMQ.AMQP.Client/IConsumerBuilder.cs index 31f00cf..78bb1ac 100644 --- a/RabbitMQ.AMQP.Client/IConsumerBuilder.cs +++ b/RabbitMQ.AMQP.Client/IConsumerBuilder.cs @@ -194,6 +194,14 @@ public interface IStreamFilterOptions /// IStreamFilterOptions PropertySymbol(string key, string value); + /// + /// SQL filter expression. + /// + /// + /// Requires RabbitMQ 4.2 or more. + /// Documentation: SQL Filtering + IStreamFilterOptions Sql(string sql); + /// /// Return the stream options. /// diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs index 5d1d856..0222020 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs @@ -39,8 +39,7 @@ public class AmqpConnection : AbstractLifeCycle, IConnection private readonly IMetricsReporter? _metricsReporter; private readonly Dictionary _connectionProperties = new(); - private bool _areFilterExpressionsSupported = false; - private FeatureFlags _featureFlags = new FeatureFlags(); + internal readonly FeatureFlags _featureFlags = new FeatureFlags(); /// /// _publishersDict contains all the publishers created by the connection. @@ -261,8 +260,6 @@ protected override void Dispose(bool disposing) internal Connection? NativeConnection => _nativeConnection; - internal bool AreFilterExpressionsSupported => _areFilterExpressionsSupported; - // TODO this couples AmqpConnection with AmqpPublisher, yuck internal void AddPublisher(Guid id, IPublisher consumer) { @@ -674,7 +671,7 @@ private void HandleProperties(Fields properties) // this is a feature that was introduced in RabbitMQ 4.2.0 _featureFlags.IsSqlFeatureEnabled = Utils.Is4_2_OrMore(brokerVersion); - _areFilterExpressionsSupported = Utils.SupportsFilterExpressions(brokerVersion); + _featureFlags.IsFilterFeatureEnabled = Utils.SupportsFilterExpressions(brokerVersion); } } } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs index 10330ac..39ebcaa 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs @@ -58,7 +58,7 @@ public override async Task OpenAsync() // ListenerContext will override only the filters the selected filters. if (_configuration.ListenerContext is not null) { - var listenerStreamOptions = new ListenerStreamOptions(_configuration.Filters, _amqpConnection.AreFilterExpressionsSupported); + var listenerStreamOptions = new ListenerStreamOptions(_configuration.Filters); var listenerContext = new IConsumerBuilder.ListenerContext(listenerStreamOptions); _configuration.ListenerContext(listenerContext); } @@ -88,6 +88,12 @@ void OnAttached(ILink argLink, Attach argAttach) // TODO configurable timeout var waitSpan = TimeSpan.FromSeconds(5); + + // TODO + // Even 10ms is enough to allow the links to establish, + // which tells me it allows the .NET runtime to process + await Task.Delay(10).ConfigureAwait(false); + _receiverLink = await attachCompletedTcs.Task.WaitAsync(waitSpan) .ConfigureAwait(false); diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs index af0c637..1be732e 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs @@ -18,9 +18,12 @@ internal sealed class ConsumerConfiguration { public string Address { get; set; } = ""; public int InitialCredits { get; set; } = 100; // TODO use constant, check with Java lib + public Map Filters { get; set; } = new(); + // TODO is a MessageHandler *really* optional??? public MessageHandler? Handler { get; set; } + // TODO re-name to ListenerContextAction? Callback? public Action? ListenerContext = null; } @@ -73,8 +76,7 @@ public IConsumerBuilder SubscriptionListener(Action BuildAndStartAsync(CancellationToken cancellationToken = default) @@ -84,6 +86,13 @@ public async Task BuildAndStartAsync(CancellationToken cancellationTo throw new ConsumerException("Message handler is not set"); } + if (_configuration.Filters[Consts.s_sqlFilterSymbol] is not null && + _amqpConnection._featureFlags.IsSqlFeatureEnabled == false) + { + throw new ConsumerException("SQL filter is not supported by the connection." + + "RabbitMQ 4.2.0 or later is required."); + } + AmqpConsumer consumer = new(_amqpConnection, _configuration, _metricsReporter); // TODO pass cancellationToken @@ -104,27 +113,17 @@ public abstract class StreamOptions : IConsumerBuilder.IStreamOptions private static readonly Regex s_offsetValidator = new Regex("^[0-9]+[YMDhms]$", RegexOptions.Compiled | RegexOptions.CultureInvariant); - private const string RmqStreamFilter = "rabbitmq:stream-filter"; - private const string RmqStreamOffsetSpec = "rabbitmq:stream-offset-spec"; - private const string RmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered"; - - private static readonly Symbol s_streamFilterSymbol = new(RmqStreamFilter); - private static readonly Symbol s_streamOffsetSpecSymbol = new(RmqStreamOffsetSpec); - private static readonly Symbol s_streamMatchUnfilteredSymbol = new(RmqStreamMatchUnfiltered); - private readonly Map _filters; - private readonly bool _areFilterExpressionsSupported; - protected StreamOptions(Map filters, bool areFilterExpressionsSupported) + protected StreamOptions(Map filters) { _filters = filters; - _areFilterExpressionsSupported = areFilterExpressionsSupported; } public IConsumerBuilder.IStreamOptions Offset(long offset) { - _filters[s_streamOffsetSpecSymbol] = - new DescribedValue(s_streamOffsetSpecSymbol, offset); + _filters[Consts.s_streamOffsetSpecSymbol] = + new DescribedValue(Consts.s_streamOffsetSpecSymbol, offset); return this; } @@ -158,15 +157,15 @@ public IConsumerBuilder.IStreamOptions Offset(string interval) public IConsumerBuilder.IStreamOptions FilterValues(params string[] values) { - _filters[s_streamFilterSymbol] = - new DescribedValue(s_streamFilterSymbol, values.ToList()); + _filters[Consts.s_streamFilterSymbol] = + new DescribedValue(Consts.s_streamFilterSymbol, values.ToList()); return this; } public IConsumerBuilder.IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered) { - _filters[s_streamMatchUnfilteredSymbol] - = new DescribedValue(s_streamMatchUnfilteredSymbol, matchUnfiltered); + _filters[Consts.s_streamMatchUnfilteredSymbol] + = new DescribedValue(Consts.s_streamMatchUnfilteredSymbol, matchUnfiltered); return this; } @@ -174,8 +173,8 @@ public IConsumerBuilder.IStreamOptions FilterMatchUnfiltered(bool matchUnfiltere private void SetOffsetSpecificationFilter(object value) { - _filters[s_streamOffsetSpecSymbol] - = new DescribedValue(s_streamOffsetSpecSymbol, value); + _filters[Consts.s_streamOffsetSpecSymbol] + = new DescribedValue(Consts.s_streamOffsetSpecSymbol, value); } public IConsumerBuilder.IStreamFilterOptions Filter() @@ -198,8 +197,8 @@ public IConsumerBuilder.IStreamFilterOptions Filter() /// public class ListenerStreamOptions : StreamOptions { - public ListenerStreamOptions(Map filters, bool areFilterExpressionsSupported) - : base(filters, areFilterExpressionsSupported) + public ListenerStreamOptions(Map filters) + : base(filters) { } @@ -221,8 +220,8 @@ public class ConsumerBuilderStreamOptions : StreamOptions private readonly IConsumerBuilder _consumerBuilder; public ConsumerBuilderStreamOptions(IConsumerBuilder consumerBuilder, - Map filters, bool areFilterExpressionsSupported) - : base(filters, areFilterExpressionsSupported) + Map filters) + : base(filters) { _consumerBuilder = consumerBuilder; } @@ -239,8 +238,8 @@ public override IConsumerBuilder Builder() /// public class StreamFilterOptions : IConsumerBuilder.IStreamFilterOptions { - private IConsumerBuilder.IStreamOptions _streamOptions; - private Map _filters; + private readonly IConsumerBuilder.IStreamOptions _streamOptions; + private readonly Map _filters; public StreamFilterOptions(IConsumerBuilder.IStreamOptions streamOptions, Map filters) { @@ -248,6 +247,18 @@ public StreamFilterOptions(IConsumerBuilder.IStreamOptions streamOptions, Map fi _filters = filters; } + public IConsumerBuilder.IStreamFilterOptions Sql(string sql) + { + if (string.IsNullOrWhiteSpace(sql)) + { + throw new ArgumentNullException(nameof(sql)); + } + + _filters[Consts.s_sqlFilterSymbol] = + new DescribedValue(Consts.s_streamSqlFilterSymbol, sql); + return this; + } + public IConsumerBuilder.IStreamOptions Stream() { return _streamOptions; @@ -300,9 +311,8 @@ public IConsumerBuilder.IStreamFilterOptions PropertySymbol(string key, string v private StreamFilterOptions PropertyFilter(string propertyKey, object propertyValue) { - const string AmqpPropertiesFilter = "amqp:properties-filter"; - DescribedValue propertiesFilterValue = Filter(AmqpPropertiesFilter); + DescribedValue propertiesFilterValue = Filter(Consts.AmqpPropertiesFilter); Map propertiesFilter = (Map)propertiesFilterValue.Value; // Note: you MUST use a symbol as the key propertiesFilter.Add(new Symbol(propertyKey), propertyValue); @@ -311,9 +321,8 @@ private StreamFilterOptions PropertyFilter(string propertyKey, object propertyVa private StreamFilterOptions ApplicationPropertyFilter(string propertyKey, object propertyValue) { - const string AmqpApplicationPropertiesFilter = "amqp:application-properties-filter"; - DescribedValue applicationPropertiesFilterValue = Filter(AmqpApplicationPropertiesFilter); + DescribedValue applicationPropertiesFilterValue = Filter(Consts.AmqpApplicationPropertiesFilter); Map applicationPropertiesFilter = (Map)applicationPropertiesFilterValue.Value; // Note: do NOT put a symbol as the key applicationPropertiesFilter.Add(propertyKey, propertyValue); diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs index 405e8de..e8c47f0 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs @@ -67,6 +67,12 @@ void OnAttached(ILink argLink, Attach argAttach) // TODO configurable timeout var waitSpan = TimeSpan.FromSeconds(5); + // TODO + // Even 10ms is enough to allow the links to establish, + // which tells me it allows the .NET runtime to process + await Task.Delay(10) + .ConfigureAwait(false); + _senderLink = await attachCompletedTcs.Task.WaitAsync(waitSpan) .ConfigureAwait(false); diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index 4850878..113932d 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -113,6 +113,12 @@ RabbitMQ.AMQP.Client.ExchangeType.DIRECT = 0 -> RabbitMQ.AMQP.Client.ExchangeTyp RabbitMQ.AMQP.Client.ExchangeType.FANOUT = 1 -> RabbitMQ.AMQP.Client.ExchangeType RabbitMQ.AMQP.Client.ExchangeType.HEADERS = 3 -> RabbitMQ.AMQP.Client.ExchangeType RabbitMQ.AMQP.Client.ExchangeType.TOPIC = 2 -> RabbitMQ.AMQP.Client.ExchangeType +RabbitMQ.AMQP.Client.FeatureFlags +RabbitMQ.AMQP.Client.FeatureFlags.FeatureFlags() -> void +RabbitMQ.AMQP.Client.FeatureFlags.IsBrokerCompatible.get -> bool +RabbitMQ.AMQP.Client.FeatureFlags.IsFilterFeatureEnabled.get -> bool +RabbitMQ.AMQP.Client.FeatureFlags.IsSqlFeatureEnabled.get -> bool +RabbitMQ.AMQP.Client.FeatureFlags.Validate() -> void RabbitMQ.AMQP.Client.IAddressBuilder RabbitMQ.AMQP.Client.IAddressBuilder.Exchange(RabbitMQ.AMQP.Client.IExchangeSpecification! exchangeSpec) -> T RabbitMQ.AMQP.Client.IAddressBuilder.Exchange(string! exchangeName) -> T @@ -182,6 +188,7 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Property(string! key, RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.PropertySymbol(string! key, string! value) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions! RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.ReplyTo(string! replyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions! RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.ReplyToGroupId(string! groupId) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions! +RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions! RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.Subject(string! subject) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions! RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.To(string! to) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions! @@ -533,7 +540,7 @@ RabbitMQ.AMQP.Client.Impl.BindingSpecification._routingKey -> string! RabbitMQ.AMQP.Client.Impl.BindingSpecification._sourceName -> string! RabbitMQ.AMQP.Client.Impl.BindingSpecification._toQueue -> bool RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions -RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters, bool areFilterExpressionsSupported) -> void +RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters) -> void RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Address() -> string! RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.DefaultAddressBuilder() -> void @@ -557,7 +564,7 @@ RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Type() -> RabbitMQ.AMQP.Client.QueueT RabbitMQ.AMQP.Client.Impl.FieldNotSetException RabbitMQ.AMQP.Client.Impl.FieldNotSetException.FieldNotSetException() -> void RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions -RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters, bool areFilterExpressionsSupported) -> void +RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters) -> void RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.Build() -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.MessageAddressBuilder(RabbitMQ.AMQP.Client.IMessage! message) -> void @@ -605,6 +612,7 @@ RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Property(string! key, object! valu RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.PropertySymbol(string! key, string! value) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions! RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.ReplyTo(string! replyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions! RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.ReplyToGroupId(string! groupId) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions! +RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Sql(string! sql) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions! RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.StreamFilterOptions(RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! streamOptions, Amqp.Types.Map! filters) -> void RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.Subject(string! subject) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions! @@ -618,7 +626,7 @@ RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(long offset) -> RabbitMQ.AMQP.Cli RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(RabbitMQ.AMQP.Client.StreamOffsetSpecification specification) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(string! interval) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! RabbitMQ.AMQP.Client.Impl.StreamOptions.Offset(System.DateTime timestamp) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! -RabbitMQ.AMQP.Client.Impl.StreamOptions.StreamOptions(Amqp.Types.Map! filters, bool areFilterExpressionsSupported) -> void +RabbitMQ.AMQP.Client.Impl.StreamOptions.StreamOptions(Amqp.Types.Map! filters) -> void RabbitMQ.AMQP.Client.InternalBugException RabbitMQ.AMQP.Client.InternalBugException.InternalBugException() -> void RabbitMQ.AMQP.Client.InternalBugException.InternalBugException(string! message) -> void diff --git a/Tests/Consumer/ConsumerSqlFilterTests.cs b/Tests/Consumer/ConsumerSqlFilterTests.cs new file mode 100644 index 0000000..db8c4ba --- /dev/null +++ b/Tests/Consumer/ConsumerSqlFilterTests.cs @@ -0,0 +1,121 @@ +// This source code is dual-licensed under the Apache License, version 2.0, +// and the Mozilla Public License, version 2.0. +// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using RabbitMQ.AMQP.Client; +using RabbitMQ.AMQP.Client.Impl; +using Xunit; +using Xunit.Abstractions; + +namespace Tests.Consumer +{ + public class ConsumerSqlFilterTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) + { + // This class is a placeholder for SQL filter tests. + // The actual implementation of SQL filter tests will depend on the specific requirements and context. + // For example, it could involve testing SQL queries against a mock database or validating SQL syntax. + + // Example test method (to be implemented): + [SkippableFact] + [Trait("Category", "SqlFilter")] + public async Task TestSqlFilterFunctionalityAsync() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + + // cast to AMQPConnection to use Skip.If + var amqpConnection = (_connection as AmqpConnection); + Skip.IfNot(amqpConnection is { _featureFlags.IsSqlFeatureEnabled: true }, + "SQL filter is not supported by the connection."); + + IQueueSpecification q = _management.Queue(_queueName).Stream().Queue(); + await q.DeclareAsync(); + TaskCompletionSource tcs = + new(TaskCreationOptions.RunContinuationsAsynchronously); + IConsumer consumer = await _connection.ConsumerBuilder() + .Queue(_queueName) + .Stream().Filter().Sql("properties.subject LIKE '%John%'").Stream() + .Offset(StreamOffsetSpecification.First) + .Builder().MessageHandler((IContext ctx, IMessage msg) => + { + tcs.SetResult(msg); + // Here you would implement the logic to handle messages that match the SQL filter. + // For example, you could validate that the message content matches expected SQL criteria. + return Task.CompletedTask; + }) + .BuildAndStartAsync(); + + IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync(); + + // var msgNotInTheFilter = new AmqpMessage("Test message for SQL filter") + // .Property("user_id", "Gas"); // This property should not match the SQL filter + var msgNotInTheFilter = new AmqpMessage("Test message for SQL filter, should not match") + .Subject("Gas"); // This property should not match the SQL filter + await publisher.PublishAsync(msgNotInTheFilter); + var msgInTheFilter = new AmqpMessage("Test message for SQL filter") + .Subject("John"); // This property should match the SQL filter + await publisher.PublishAsync(msgInTheFilter); + await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10)); + + Assert.Equal("Test message for SQL filter", tcs.Task.Result.BodyAsString()); + Assert.Equal("John", tcs.Task.Result.Subject()); + Assert.Equal("Test message for SQL filter", tcs.Task.Result.BodyAsString()); + await consumer.CloseAsync(); + await publisher.CloseAsync(); + await q.DeleteAsync(); + await _connection.CloseAsync(); + } + + [SkippableTheory] + [Trait("Category", "SqlFilter")] + [InlineData("myP", "John")] + [InlineData("myP", "Doe")] + [InlineData("user_id", "Alice")] + [InlineData("user_id", "Bob")] + public async Task TestSqlFilterFunctionalityAsyncValues(string property, string value) + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + + // cast to AMQPConnection to use Skip.If + var amqpConnection = (_connection as AmqpConnection); + Skip.IfNot(amqpConnection is { _featureFlags.IsSqlFeatureEnabled: true }, + "SQL filter is not supported by the connection."); + IQueueSpecification q = _management.Queue(_queueName).Stream().Queue(); + await q.DeclareAsync(); + TaskCompletionSource tcs = + new(TaskCreationOptions.RunContinuationsAsynchronously); + IConsumer consumer = await _connection.ConsumerBuilder() + .Queue(_queueName) + .Stream().Filter().Sql($"{property} LIKE '%{value}'").Stream() + .Offset(StreamOffsetSpecification.First) + .Builder().MessageHandler((IContext ctx, IMessage msg) => + { + tcs.SetResult(msg); + // Here you would implement the logic to handle messages that match the SQL filter. + // For example, you could validate that the message content matches expected SQL criteria. + return Task.CompletedTask; + }) + .BuildAndStartAsync(); + + IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync(); + await publisher.PublishAsync(new AmqpMessage($"NO") + .Property(property, "NO")); + + await publisher.PublishAsync(new AmqpMessage($"with property_{property} value {value}") + .Property(property, value)); + + await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10)); + Assert.Equal($"with property_{property} value {value}", tcs.Task.Result.BodyAsString()); + Assert.Equal(value, tcs.Task.Result.Property(property)); + await consumer.CloseAsync(); + await publisher.CloseAsync(); + await q.DeleteAsync(); + await _connection.CloseAsync(); + + } + } +} diff --git a/Tests/Consumer/StreamConsumerTests.cs b/Tests/Consumer/StreamConsumerTests.cs index b5fd8b4..c02eeea 100644 --- a/Tests/Consumer/StreamConsumerTests.cs +++ b/Tests/Consumer/StreamConsumerTests.cs @@ -964,6 +964,6 @@ public async Task FilterExpressionStringModifier() private void SkipIfNoFilterExpressions() { - Skip.IfNot(_areFilterExpressionsSupported, "At least RabbitMQ 4.1.0 required"); + Skip.IfNot(_featureFlags is { IsFilterFeatureEnabled: true }, "At least RabbitMQ 4.1.0 required"); } } diff --git a/Tests/IntegrationTest.cs b/Tests/IntegrationTest.cs index bdb3129..0fe0518 100644 --- a/Tests/IntegrationTest.cs +++ b/Tests/IntegrationTest.cs @@ -38,7 +38,7 @@ public abstract partial class IntegrationTest : IAsyncLifetime private readonly bool _setupConnectionAndManagement; protected readonly ConnectionSettingsBuilder _connectionSettingBuilder; - protected bool _areFilterExpressionsSupported = false; + protected FeatureFlags? _featureFlags = null; public IntegrationTest(ITestOutputHelper testOutputHelper, bool setupConnectionAndManagement = true) @@ -73,7 +73,7 @@ public virtual async Task InitializeAsync() _connection = await AmqpConnection.CreateAsync(_connectionSettings); if (_connection is AmqpConnection amqpConnection) { - _areFilterExpressionsSupported = amqpConnection.AreFilterExpressionsSupported; + _featureFlags = amqpConnection._featureFlags; } _management = _connection.Management(); } diff --git a/Tests/Management/ManagementTests.cs b/Tests/Management/ManagementTests.cs index b590183..59b0be4 100644 --- a/Tests/Management/ManagementTests.cs +++ b/Tests/Management/ManagementTests.cs @@ -77,7 +77,8 @@ public async Task DeclareQueueWithQueueInfoValidation( Assert.Equal(ClusterSize, queueInfo.Members().Count); } - Assert.NotNull(queueInfo.Leader()); + // restore when https://github.com/rabbitmq/rabbitmq-server/pull/14438 will be merged + // Assert.NotNull(queueInfo.Leader()); Assert.Equal(queueInfo.Durable(), durable); Assert.Equal(queueInfo.AutoDelete(), autoDelete); Assert.Equal(queueInfo.Exclusive(), exclusive); diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index 588d607..74df7ba 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -16,8 +16,8 @@ namespace Tests.Rpc public class RpcServerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) { private string _requestQueueName = string.Empty; - private string _replyToName = $"queueReplyTo-{Now}-{Guid.NewGuid()}"; - private string _correlationId = $"my-correlation-id-{Guid.NewGuid()}"; + private readonly string _replyToName = $"queueReplyTo-{Now}-{Guid.NewGuid()}"; + private readonly string _correlationId = $"my-correlation-id-{Guid.NewGuid()}"; public override async Task InitializeAsync() {