From ffa1b612d2cd0e77a38ace14593dea3fc609fae3 Mon Sep 17 00:00:00 2001 From: Petr Waclawek Date: Wed, 25 Jul 2018 16:20:53 +0200 Subject: [PATCH 1/4] ISubsriber.WithPrefix and IConnectionMultiplexer.WithPrefix implementations --- .../SubscriberWrapperTests.cs | 76 ++++++++ .../ConnectionMultiplexerExtensions.cs | 23 +++ .../ConnectionMultiplexerWrapper.cs | 184 ++++++++++++++++++ .../KeyspaceIsolation/SubscriberExtensions.cs | 47 +++++ .../KeyspaceIsolation/SubscriberWrapper.cs | 122 ++++++++++++ 5 files changed, 452 insertions(+) create mode 100644 StackExchange.Redis.Tests/SubscriberWrapperTests.cs create mode 100644 StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerExtensions.cs create mode 100644 StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerWrapper.cs create mode 100644 StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/SubscriberExtensions.cs create mode 100644 StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/SubscriberWrapper.cs diff --git a/StackExchange.Redis.Tests/SubscriberWrapperTests.cs b/StackExchange.Redis.Tests/SubscriberWrapperTests.cs new file mode 100644 index 000000000..40ab1e9de --- /dev/null +++ b/StackExchange.Redis.Tests/SubscriberWrapperTests.cs @@ -0,0 +1,76 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using StackExchange.Redis.KeyspaceIsolation; +using Xunit; +using Xunit.Abstractions; + +namespace StackExchange.Redis.Tests +{ + public class SubscriberWrapperTests : TestBase + { + public SubscriberWrapperTests(ITestOutputHelper output) : base(output) + {} + + [Fact] + public async Task UsePrefixForChannel() + { + using (var client = Create(allowAdmin: true)) + { + const string prefix1 = "(p1)-"; + const string prefix2 = "(p2)-"; + + var s1 = client.GetSubscriber().WithChannelPrefix(prefix1); + var s12 = client.GetSubscriber().WithChannelPrefix(prefix1); + var s2 = client.GetSubscriber().WithChannelPrefix(prefix2); + var s = client.GetSubscriber(); + + var l1 = new List(); + var l12 = new List(); + var l2 = new List(); + var l = new List(); + var lAll = new List(); + var lT1 = new List(); + var c1 = new List(); + + const string channelName = "test-channel"; + s1.Subscribe(channelName, (channel, value) => + { + c1.Add(channel); + l1.Add(value); + }); + s12.Subscribe(channelName, (channel, value) => l12.Add(value)); + s2.Subscribe(channelName, (channel, value) => l2.Add(value)); + s.Subscribe(channelName, (channel, value) => l.Add(value)); + s.Subscribe("*" + channelName, (channel, value) => lAll.Add(value)); + s.Subscribe(prefix1 + channelName, (channel, value) => lT1.Add(value)); + + s1.Publish(channelName, "value1"); + s.Publish(channelName, "value"); + + // Give some time to pub-sub + await Task.Delay(500); + + Assert.Single(l1); + Assert.Equal("value1",l1[0]); + + Assert.Single(l12); + Assert.Equal("value1",l12[0]); + + Assert.Empty(l2); + + Assert.Single(l); + Assert.Equal("value",l[0]); + + Assert.Equal(2, lAll.Count); + Assert.Contains("value", lAll); + Assert.Contains("value1", lAll); + + Assert.Single(lT1); + Assert.Equal("value1",lT1[0]); + + Assert.Single(c1); + Assert.Equal(channelName,c1[0]); + } + } + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerExtensions.cs b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerExtensions.cs new file mode 100644 index 000000000..f8fb6c5f1 --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerExtensions.cs @@ -0,0 +1,23 @@ +namespace StackExchange.Redis.KeyspaceIsolation +{ + /// + /// Provides the extension method to . + /// + public static class ConnectionMultiplexerExtensions + { + /// + /// Wraps the connection multiplexer to provide specific database and channels keyspace using the + /// + /// The multiplexer to wrap + /// The prefix for keys and channel names + /// + /// The caller is responsible for disposing the wrapped connection multiplexer + /// + /// + /// + public static IConnectionMultiplexer WithPrefix(this IConnectionMultiplexer inner, string prefix) + { + return new ConnectionMultiplexerWrapper(inner, prefix); + } + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerWrapper.cs b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerWrapper.cs new file mode 100644 index 000000000..1e39537fd --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerWrapper.cs @@ -0,0 +1,184 @@ +using System; +using System.IO; +using System.Net; +using System.Threading.Tasks; + +namespace StackExchange.Redis.KeyspaceIsolation +{ + /// + /// Uses prefix to provide specific database and channels keyspace + /// + internal class ConnectionMultiplexerWrapper : IConnectionMultiplexer + { + private readonly IConnectionMultiplexer _inner; + private readonly string _prefix; + + /// + /// Initializes a new instance of the class. + /// + /// The multiplexer to wrap + /// The prefix for keys and channel names + /// + /// The caller is responsible for releasing the wrapped connection multiplexer + /// + public ConnectionMultiplexerWrapper(IConnectionMultiplexer inner, string prefix) + { + if (inner == null) + { + throw new ArgumentNullException(nameof(inner)); + } + + if (prefix == null) + { + throw new ArgumentNullException(nameof(prefix)); + } + + if (prefix.Length == 0) + { + throw new ArgumentException("Prefix must not be empty"); + } + + _inner = inner; + _prefix = prefix; + } + + public void RegisterProfiler(IProfiler profiler) => _inner.RegisterProfiler(profiler); + + public void BeginProfiling(object forContext) => _inner.BeginProfiling(forContext); + + public ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true) => _inner.FinishProfiling(forContext, allowCleanupSweep); + + public ServerCounters GetCounters() => _inner.GetCounters(); + + public EndPoint[] GetEndPoints(bool configuredOnly = false) => _inner.GetEndPoints(configuredOnly); + + public void Wait(Task task) => _inner.Wait(task); + + public T Wait(Task task) => _inner.Wait(task); + + public void WaitAll(params Task[] tasks) => _inner.WaitAll(tasks); + + public int HashSlot(RedisKey key) => _inner.HashSlot(key); + + public ISubscriber GetSubscriber(object asyncState = null) => _inner.GetSubscriber(asyncState).WithChannelPrefix(_prefix); + + public IDatabase GetDatabase(int db = -1, object asyncState = null) => _inner.GetDatabase(db, asyncState).WithKeyPrefix(_prefix); + + public IServer GetServer(string host, int port, object asyncState = null) => _inner.GetServer(host, port, asyncState); + + public IServer GetServer(string hostAndPort, object asyncState = null) => _inner.GetServer(hostAndPort, asyncState); + + public IServer GetServer(IPAddress host, int port) => _inner.GetServer(host, port); + + public IServer GetServer(EndPoint endpoint, object asyncState = null) => _inner.GetServer(endpoint, asyncState); + + public Task ConfigureAsync(TextWriter log = null) => _inner.ConfigureAsync(log); + + public bool Configure(TextWriter log = null) => _inner.Configure(log); + + public string GetStatus() => _inner.GetStatus(); + + public void GetStatus(TextWriter log) => _inner.GetStatus(log); + + public void Close(bool allowCommandsToComplete = true) => _inner.Close(allowCommandsToComplete); + + public Task CloseAsync(bool allowCommandsToComplete = true) => _inner.CloseAsync(allowCommandsToComplete); + + public string GetStormLog() => _inner.GetStormLog(); + + public void ResetStormLog() => _inner.ResetStormLog(); + + public long PublishReconfigure(CommandFlags flags = CommandFlags.None) => _inner.PublishReconfigure(flags); + + public Task PublishReconfigureAsync(CommandFlags flags = CommandFlags.None) => _inner.PublishReconfigureAsync(flags); + + public bool AllowConnect + { + get => _inner.AllowConnect; + set => _inner.AllowConnect = value; + } + + public bool IgnoreConnect + { + get => _inner.IgnoreConnect; + set => _inner.IgnoreConnect = value; + } + + public string ClientName => _inner.ClientName; + + public string Configuration => _inner.Configuration; + + public int TimeoutMilliseconds => _inner.TimeoutMilliseconds; + + public long OperationCount => _inner.OperationCount; + + public bool PreserveAsyncOrder + { + get => _inner.PreserveAsyncOrder; + set => _inner.PreserveAsyncOrder = value; + } + + public bool IsConnected => _inner.IsConnected; + + public bool IsConnecting => _inner.IsConnecting; + + public bool IncludeDetailInExceptions + { + get => _inner.IncludeDetailInExceptions; + set => _inner.IncludeDetailInExceptions = value; + } + + public int StormLogThreshold + { + get => _inner.StormLogThreshold; + set => _inner.StormLogThreshold = value; + } + + public event EventHandler ErrorMessage + { + add => _inner.ErrorMessage += value; + remove => _inner.ErrorMessage -= value; + } + + public event EventHandler ConnectionFailed + { + add => _inner.ConnectionFailed += value; + remove => _inner.ConnectionFailed -= value; + } + + public event EventHandler InternalError + { + add => _inner.InternalError += value; + remove => _inner.InternalError -= value; + } + + public event EventHandler ConnectionRestored + { + add => _inner.ConnectionRestored += value; + remove => _inner.ConnectionRestored -= value; + } + + public event EventHandler ConfigurationChanged + { + add => _inner.ConfigurationChanged += value; + remove => _inner.ConfigurationChanged -= value; + } + + public event EventHandler ConfigurationChangedBroadcast + { + add => _inner.ConfigurationChangedBroadcast += value; + remove => _inner.ConfigurationChangedBroadcast -= value; + } + + public event EventHandler HashSlotMoved + { + add => _inner.HashSlotMoved += value; + remove => _inner.HashSlotMoved -= value; + } + + public void Dispose() + { + // do not dispose wrapped multiplexer - creator is responsible to dispose it + } + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/SubscriberExtensions.cs b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/SubscriberExtensions.cs new file mode 100644 index 000000000..8c037efc6 --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/SubscriberExtensions.cs @@ -0,0 +1,47 @@ +using System; + +namespace StackExchange.Redis.KeyspaceIsolation +{ + /// + /// Provides the extension method to . + /// + public static class SubscriberExtensions + { + /// + /// Creates a new instance that provides an isolated channel namespace + /// of the specified underyling subscriber instance. + /// + /// + /// The underlying subscriber instance that the returned instance shall use. + /// + /// + /// The prefix that defines a channel namespace isolation for the returned subscriber instance. + /// + /// + /// A new instance that invokes the specified underlying + /// but prepends the specified + /// to all channel names and thus forms a logical channel namespace isolation. + /// + public static ISubscriber WithChannelPrefix(this ISubscriber subscriber, RedisChannel channelPrefix) + { + if (subscriber == null) + { + throw new ArgumentNullException(nameof(subscriber)); + } + + if (channelPrefix.IsNullOrEmpty) + { + throw new ArgumentNullException(nameof(channelPrefix)); + } + + if (subscriber is SubscriberWrapper wrapper) + { + // combine the channel prefix in advance to minimize indirection + channelPrefix = wrapper.ToInner(channelPrefix); + subscriber = wrapper.Inner; + } + + return new SubscriberWrapper(subscriber, channelPrefix); + } + } +} diff --git a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/SubscriberWrapper.cs b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/SubscriberWrapper.cs new file mode 100644 index 000000000..08741c62f --- /dev/null +++ b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/SubscriberWrapper.cs @@ -0,0 +1,122 @@ +using System; +using System.Net; +using System.Threading.Tasks; + +namespace StackExchange.Redis.KeyspaceIsolation +{ + internal class SubscriberWrapper : ISubscriber + { + public ISubscriber Inner { get; } + + internal RedisChannel Prefix { get; } + + public SubscriberWrapper(ISubscriber inner, byte[] prefix) + { + Inner = inner; + Prefix = new RedisChannel(prefix, RedisChannel.PatternMode.Literal); + } + + public ConnectionMultiplexer Multiplexer => Inner.Multiplexer; + + public bool TryWait(Task task) => Inner.TryWait(task); + + public void Wait(Task task) => Inner.Wait(task); + + public T Wait(Task task) => Inner.Wait(task); + + public void WaitAll(params Task[] tasks) => Inner.WaitAll(tasks); + + public string ClientGetName(CommandFlags flags = CommandFlags.None) => Inner.ClientGetName(flags); + + public Task ClientGetNameAsync(CommandFlags flags = CommandFlags.None) => Inner.ClientGetNameAsync(flags); + + public void Quit(CommandFlags flags = CommandFlags.None) => Inner.Quit(flags); + + public TimeSpan Ping(CommandFlags flags = CommandFlags.None) => Inner.Ping(flags); + + public Task PingAsync(CommandFlags flags = CommandFlags.None) => Inner.PingAsync(flags); + + public EndPoint IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpoint(ToInner(channel), flags); + + public Task IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpointAsync(ToInner(channel), flags); + + public bool IsConnected(RedisChannel channel = default(RedisChannel)) => Inner.IsConnected(ToInner(channel)); + + public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) => Inner.Publish(ToInner(channel), message, flags); + + public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) => Inner.PublishAsync(ToInner(channel), message, flags); + + public void Subscribe(RedisChannel channel, Action handler, CommandFlags flags = CommandFlags.None) => + Inner.Subscribe(ToInner(channel), ToInner(handler), flags); + + public Task SubscribeAsync(RedisChannel channel, Action handler, CommandFlags flags = CommandFlags.None) => + Inner.SubscribeAsync(ToInner(channel), ToInner(handler), flags); + + public EndPoint SubscribedEndpoint(RedisChannel channel) => Inner.SubscribedEndpoint(ToInner(channel)); + + public void Unsubscribe(RedisChannel channel, Action handler = null, CommandFlags flags = CommandFlags.None) => + Inner.Unsubscribe(ToInner(channel), ToInner(handler), flags); + + public void UnsubscribeAll(CommandFlags flags = CommandFlags.None) + { + if (Prefix.IsNullOrEmpty) + Inner.UnsubscribeAll(flags); + else + Inner.Unsubscribe(new RedisChannel(Prefix + "*", RedisChannel.PatternMode.Pattern), null, flags); + } + + public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None) + { + if (Prefix.IsNullOrEmpty) + return Inner.UnsubscribeAllAsync(flags); + else + return Inner.UnsubscribeAsync(new RedisChannel(Prefix + "*", RedisChannel.PatternMode.Pattern), null, flags); + } + + public Task UnsubscribeAsync(RedisChannel channel, Action handler = null, CommandFlags flags = CommandFlags.None) => + Inner.UnsubscribeAsync(ToInner(channel), ToInner(handler), flags); + + public RedisChannel ToInner(RedisChannel outer) + { + if (Prefix.IsNullOrEmpty) return outer; + + if (outer.IsNullOrEmpty) return Prefix; + + byte[] outerArr = outer; + byte[] prefixArr = Prefix; + + var innerArr = new byte[prefixArr.Length + outerArr.Length]; + Buffer.BlockCopy(prefixArr, 0, innerArr, 0, prefixArr.Length); + Buffer.BlockCopy(outerArr, 0, innerArr, prefixArr.Length, outerArr.Length); + + var patternMode = outer.IsPatternBased ? RedisChannel.PatternMode.Pattern : RedisChannel.PatternMode.Literal; + + return new RedisChannel(innerArr, patternMode); + } + + protected Action ToInner(Action handler) => (channel, value) => handler(ToOuter(channel), value); + + public RedisChannel ToOuter(RedisChannel inner) + { + if (Prefix.IsNullOrEmpty || inner.IsNullOrEmpty) return inner; + + byte[] innerArr = inner; + byte[] prefixArr = Prefix; + + if (innerArr.Length <= prefixArr.Length) return inner; + + for (var i = 0; i < prefixArr.Length; i++) + { + if (prefixArr[i] != innerArr[i]) return inner; + } + + var outerLength = innerArr.Length - prefixArr.Length; + var outerArr = new byte[outerLength]; + Buffer.BlockCopy(innerArr, prefixArr.Length, outerArr, 0, outerLength); + + var patternMode = inner.IsPatternBased ? RedisChannel.PatternMode.Pattern : RedisChannel.PatternMode.Literal; + + return new RedisChannel(outerArr, patternMode); + } + } +} From cccde5a3298f9310627f859270bee6ca880e9f63 Mon Sep 17 00:00:00 2001 From: Nick Craver Date: Sat, 29 Oct 2022 09:04:25 -0400 Subject: [PATCH 2/4] Simplify to KeyPrefixedSubscriber only --- .../ConnectionMultiplexerExtensions.cs | 23 --- .../ConnectionMultiplexerWrapper.cs | 184 ------------------ ...berWrapper.cs => KeyPrefixedSubscriber.cs} | 2 +- 3 files changed, 1 insertion(+), 208 deletions(-) delete mode 100644 StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerExtensions.cs delete mode 100644 StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerWrapper.cs rename StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/{SubscriberWrapper.cs => KeyPrefixedSubscriber.cs} (98%) diff --git a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerExtensions.cs b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerExtensions.cs deleted file mode 100644 index f8fb6c5f1..000000000 --- a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerExtensions.cs +++ /dev/null @@ -1,23 +0,0 @@ -namespace StackExchange.Redis.KeyspaceIsolation -{ - /// - /// Provides the extension method to . - /// - public static class ConnectionMultiplexerExtensions - { - /// - /// Wraps the connection multiplexer to provide specific database and channels keyspace using the - /// - /// The multiplexer to wrap - /// The prefix for keys and channel names - /// - /// The caller is responsible for disposing the wrapped connection multiplexer - /// - /// - /// - public static IConnectionMultiplexer WithPrefix(this IConnectionMultiplexer inner, string prefix) - { - return new ConnectionMultiplexerWrapper(inner, prefix); - } - } -} diff --git a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerWrapper.cs b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerWrapper.cs deleted file mode 100644 index 1e39537fd..000000000 --- a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/ConnectionMultiplexerWrapper.cs +++ /dev/null @@ -1,184 +0,0 @@ -using System; -using System.IO; -using System.Net; -using System.Threading.Tasks; - -namespace StackExchange.Redis.KeyspaceIsolation -{ - /// - /// Uses prefix to provide specific database and channels keyspace - /// - internal class ConnectionMultiplexerWrapper : IConnectionMultiplexer - { - private readonly IConnectionMultiplexer _inner; - private readonly string _prefix; - - /// - /// Initializes a new instance of the class. - /// - /// The multiplexer to wrap - /// The prefix for keys and channel names - /// - /// The caller is responsible for releasing the wrapped connection multiplexer - /// - public ConnectionMultiplexerWrapper(IConnectionMultiplexer inner, string prefix) - { - if (inner == null) - { - throw new ArgumentNullException(nameof(inner)); - } - - if (prefix == null) - { - throw new ArgumentNullException(nameof(prefix)); - } - - if (prefix.Length == 0) - { - throw new ArgumentException("Prefix must not be empty"); - } - - _inner = inner; - _prefix = prefix; - } - - public void RegisterProfiler(IProfiler profiler) => _inner.RegisterProfiler(profiler); - - public void BeginProfiling(object forContext) => _inner.BeginProfiling(forContext); - - public ProfiledCommandEnumerable FinishProfiling(object forContext, bool allowCleanupSweep = true) => _inner.FinishProfiling(forContext, allowCleanupSweep); - - public ServerCounters GetCounters() => _inner.GetCounters(); - - public EndPoint[] GetEndPoints(bool configuredOnly = false) => _inner.GetEndPoints(configuredOnly); - - public void Wait(Task task) => _inner.Wait(task); - - public T Wait(Task task) => _inner.Wait(task); - - public void WaitAll(params Task[] tasks) => _inner.WaitAll(tasks); - - public int HashSlot(RedisKey key) => _inner.HashSlot(key); - - public ISubscriber GetSubscriber(object asyncState = null) => _inner.GetSubscriber(asyncState).WithChannelPrefix(_prefix); - - public IDatabase GetDatabase(int db = -1, object asyncState = null) => _inner.GetDatabase(db, asyncState).WithKeyPrefix(_prefix); - - public IServer GetServer(string host, int port, object asyncState = null) => _inner.GetServer(host, port, asyncState); - - public IServer GetServer(string hostAndPort, object asyncState = null) => _inner.GetServer(hostAndPort, asyncState); - - public IServer GetServer(IPAddress host, int port) => _inner.GetServer(host, port); - - public IServer GetServer(EndPoint endpoint, object asyncState = null) => _inner.GetServer(endpoint, asyncState); - - public Task ConfigureAsync(TextWriter log = null) => _inner.ConfigureAsync(log); - - public bool Configure(TextWriter log = null) => _inner.Configure(log); - - public string GetStatus() => _inner.GetStatus(); - - public void GetStatus(TextWriter log) => _inner.GetStatus(log); - - public void Close(bool allowCommandsToComplete = true) => _inner.Close(allowCommandsToComplete); - - public Task CloseAsync(bool allowCommandsToComplete = true) => _inner.CloseAsync(allowCommandsToComplete); - - public string GetStormLog() => _inner.GetStormLog(); - - public void ResetStormLog() => _inner.ResetStormLog(); - - public long PublishReconfigure(CommandFlags flags = CommandFlags.None) => _inner.PublishReconfigure(flags); - - public Task PublishReconfigureAsync(CommandFlags flags = CommandFlags.None) => _inner.PublishReconfigureAsync(flags); - - public bool AllowConnect - { - get => _inner.AllowConnect; - set => _inner.AllowConnect = value; - } - - public bool IgnoreConnect - { - get => _inner.IgnoreConnect; - set => _inner.IgnoreConnect = value; - } - - public string ClientName => _inner.ClientName; - - public string Configuration => _inner.Configuration; - - public int TimeoutMilliseconds => _inner.TimeoutMilliseconds; - - public long OperationCount => _inner.OperationCount; - - public bool PreserveAsyncOrder - { - get => _inner.PreserveAsyncOrder; - set => _inner.PreserveAsyncOrder = value; - } - - public bool IsConnected => _inner.IsConnected; - - public bool IsConnecting => _inner.IsConnecting; - - public bool IncludeDetailInExceptions - { - get => _inner.IncludeDetailInExceptions; - set => _inner.IncludeDetailInExceptions = value; - } - - public int StormLogThreshold - { - get => _inner.StormLogThreshold; - set => _inner.StormLogThreshold = value; - } - - public event EventHandler ErrorMessage - { - add => _inner.ErrorMessage += value; - remove => _inner.ErrorMessage -= value; - } - - public event EventHandler ConnectionFailed - { - add => _inner.ConnectionFailed += value; - remove => _inner.ConnectionFailed -= value; - } - - public event EventHandler InternalError - { - add => _inner.InternalError += value; - remove => _inner.InternalError -= value; - } - - public event EventHandler ConnectionRestored - { - add => _inner.ConnectionRestored += value; - remove => _inner.ConnectionRestored -= value; - } - - public event EventHandler ConfigurationChanged - { - add => _inner.ConfigurationChanged += value; - remove => _inner.ConfigurationChanged -= value; - } - - public event EventHandler ConfigurationChangedBroadcast - { - add => _inner.ConfigurationChangedBroadcast += value; - remove => _inner.ConfigurationChangedBroadcast -= value; - } - - public event EventHandler HashSlotMoved - { - add => _inner.HashSlotMoved += value; - remove => _inner.HashSlotMoved -= value; - } - - public void Dispose() - { - // do not dispose wrapped multiplexer - creator is responsible to dispose it - } - } -} diff --git a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/SubscriberWrapper.cs b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs similarity index 98% rename from StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/SubscriberWrapper.cs rename to StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs index 08741c62f..ab611e3ca 100644 --- a/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/SubscriberWrapper.cs +++ b/StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs @@ -4,7 +4,7 @@ namespace StackExchange.Redis.KeyspaceIsolation { - internal class SubscriberWrapper : ISubscriber + internal class KeyPrefixedSubscriber : ISubscriber { public ISubscriber Inner { get; } From c03693dd6429f2a6c6eab252ea4954612652a6cc Mon Sep 17 00:00:00 2001 From: Nick Craver Date: Sat, 29 Oct 2022 09:22:24 -0400 Subject: [PATCH 3/4] Fix up latest merge --- .../KeyPrefixedSubscriber.cs | 36 +++++++++---------- .../KeyspaceIsolation/SubscriberExtensions.cs | 6 ++-- .../PublicAPI/PublicAPI.Shipped.txt | 2 ++ .../KeyPrefixedSubscriberTests.cs | 16 ++++----- 4 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs index ab611e3ca..22389bb6f 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs @@ -10,13 +10,13 @@ internal class KeyPrefixedSubscriber : ISubscriber internal RedisChannel Prefix { get; } - public SubscriberWrapper(ISubscriber inner, byte[] prefix) + public KeyPrefixedSubscriber(ISubscriber inner, byte[] prefix) { Inner = inner; Prefix = new RedisChannel(prefix, RedisChannel.PatternMode.Literal); } - public ConnectionMultiplexer Multiplexer => Inner.Multiplexer; + public IConnectionMultiplexer Multiplexer => Inner.Multiplexer; public bool TryWait(Task task) => Inner.TryWait(task); @@ -26,19 +26,13 @@ public SubscriberWrapper(ISubscriber inner, byte[] prefix) public void WaitAll(params Task[] tasks) => Inner.WaitAll(tasks); - public string ClientGetName(CommandFlags flags = CommandFlags.None) => Inner.ClientGetName(flags); - - public Task ClientGetNameAsync(CommandFlags flags = CommandFlags.None) => Inner.ClientGetNameAsync(flags); - - public void Quit(CommandFlags flags = CommandFlags.None) => Inner.Quit(flags); - public TimeSpan Ping(CommandFlags flags = CommandFlags.None) => Inner.Ping(flags); public Task PingAsync(CommandFlags flags = CommandFlags.None) => Inner.PingAsync(flags); - public EndPoint IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpoint(ToInner(channel), flags); + public EndPoint? IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpoint(ToInner(channel), flags); - public Task IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpointAsync(ToInner(channel), flags); + public Task IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpointAsync(ToInner(channel), flags); public bool IsConnected(RedisChannel channel = default(RedisChannel)) => Inner.IsConnected(ToInner(channel)); @@ -46,15 +40,21 @@ public SubscriberWrapper(ISubscriber inner, byte[] prefix) public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) => Inner.PublishAsync(ToInner(channel), message, flags); + public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags = CommandFlags.None) => + Inner.Subscribe(ToInner(channel), flags); + public void Subscribe(RedisChannel channel, Action handler, CommandFlags flags = CommandFlags.None) => Inner.Subscribe(ToInner(channel), ToInner(handler), flags); + public Task SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) => + Inner.SubscribeAsync(ToInner(channel), flags); + public Task SubscribeAsync(RedisChannel channel, Action handler, CommandFlags flags = CommandFlags.None) => Inner.SubscribeAsync(ToInner(channel), ToInner(handler), flags); - public EndPoint SubscribedEndpoint(RedisChannel channel) => Inner.SubscribedEndpoint(ToInner(channel)); + public EndPoint? SubscribedEndpoint(RedisChannel channel) => Inner.SubscribedEndpoint(ToInner(channel)); - public void Unsubscribe(RedisChannel channel, Action handler = null, CommandFlags flags = CommandFlags.None) => + public void Unsubscribe(RedisChannel channel, Action? handler = null, CommandFlags flags = CommandFlags.None) => Inner.Unsubscribe(ToInner(channel), ToInner(handler), flags); public void UnsubscribeAll(CommandFlags flags = CommandFlags.None) @@ -73,7 +73,7 @@ public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None) return Inner.UnsubscribeAsync(new RedisChannel(Prefix + "*", RedisChannel.PatternMode.Pattern), null, flags); } - public Task UnsubscribeAsync(RedisChannel channel, Action handler = null, CommandFlags flags = CommandFlags.None) => + public Task UnsubscribeAsync(RedisChannel channel, Action? handler = null, CommandFlags flags = CommandFlags.None) => Inner.UnsubscribeAsync(ToInner(channel), ToInner(handler), flags); public RedisChannel ToInner(RedisChannel outer) @@ -82,8 +82,8 @@ public RedisChannel ToInner(RedisChannel outer) if (outer.IsNullOrEmpty) return Prefix; - byte[] outerArr = outer; - byte[] prefixArr = Prefix; + byte[] outerArr = outer!; + byte[] prefixArr = Prefix!; var innerArr = new byte[prefixArr.Length + outerArr.Length]; Buffer.BlockCopy(prefixArr, 0, innerArr, 0, prefixArr.Length); @@ -94,14 +94,14 @@ public RedisChannel ToInner(RedisChannel outer) return new RedisChannel(innerArr, patternMode); } - protected Action ToInner(Action handler) => (channel, value) => handler(ToOuter(channel), value); + protected Action ToInner(Action? handler) => (channel, value) => handler?.Invoke(ToOuter(channel), value); public RedisChannel ToOuter(RedisChannel inner) { if (Prefix.IsNullOrEmpty || inner.IsNullOrEmpty) return inner; - byte[] innerArr = inner; - byte[] prefixArr = Prefix; + byte[] innerArr = inner!; + byte[] prefixArr = Prefix!; if (innerArr.Length <= prefixArr.Length) return inner; diff --git a/src/StackExchange.Redis/KeyspaceIsolation/SubscriberExtensions.cs b/src/StackExchange.Redis/KeyspaceIsolation/SubscriberExtensions.cs index 8c037efc6..727ed4535 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/SubscriberExtensions.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/SubscriberExtensions.cs @@ -9,7 +9,7 @@ public static class SubscriberExtensions { /// /// Creates a new instance that provides an isolated channel namespace - /// of the specified underyling subscriber instance. + /// of the specified underlying subscriber instance. /// /// /// The underlying subscriber instance that the returned instance shall use. @@ -34,14 +34,14 @@ public static ISubscriber WithChannelPrefix(this ISubscriber subscriber, RedisCh throw new ArgumentNullException(nameof(channelPrefix)); } - if (subscriber is SubscriberWrapper wrapper) + if (subscriber is KeyPrefixedSubscriber wrapper) { // combine the channel prefix in advance to minimize indirection channelPrefix = wrapper.ToInner(channelPrefix); subscriber = wrapper.Inner; } - return new SubscriberWrapper(subscriber, channelPrefix); + return new KeyPrefixedSubscriber(subscriber, channelPrefix!); } } } diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index f1201d35f..1fe21b556 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -1126,6 +1126,7 @@ StackExchange.Redis.ITransaction.AddCondition(StackExchange.Redis.Condition! con StackExchange.Redis.ITransaction.Execute(StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> bool StackExchange.Redis.ITransaction.ExecuteAsync(StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! StackExchange.Redis.KeyspaceIsolation.DatabaseExtensions +StackExchange.Redis.KeyspaceIsolation.SubscriberExtensions StackExchange.Redis.LatencyHistoryEntry StackExchange.Redis.LatencyHistoryEntry.DurationMilliseconds.get -> int StackExchange.Redis.LatencyHistoryEntry.LatencyHistoryEntry() -> void @@ -1629,6 +1630,7 @@ static StackExchange.Redis.HashEntry.implicit operator System.Collections.Generi static StackExchange.Redis.HashEntry.operator !=(StackExchange.Redis.HashEntry x, StackExchange.Redis.HashEntry y) -> bool static StackExchange.Redis.HashEntry.operator ==(StackExchange.Redis.HashEntry x, StackExchange.Redis.HashEntry y) -> bool static StackExchange.Redis.KeyspaceIsolation.DatabaseExtensions.WithKeyPrefix(this StackExchange.Redis.IDatabase! database, StackExchange.Redis.RedisKey keyPrefix) -> StackExchange.Redis.IDatabase! +static StackExchange.Redis.KeyspaceIsolation.SubscriberExtensions.WithChannelPrefix(this StackExchange.Redis.ISubscriber! subscriber, StackExchange.Redis.RedisChannel channelPrefix) -> StackExchange.Redis.ISubscriber! static StackExchange.Redis.Lease.Create(int length, bool clear = true) -> StackExchange.Redis.Lease! static StackExchange.Redis.Lease.Empty.get -> StackExchange.Redis.Lease! static StackExchange.Redis.ListPopResult.Null.get -> StackExchange.Redis.ListPopResult diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedSubscriberTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedSubscriberTests.cs index c79121510..8ce7494c8 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedSubscriberTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedSubscriberTests.cs @@ -35,14 +35,14 @@ public async Task UsePrefixForChannel() const string channelName = "test-channel"; s1.Subscribe(channelName, (channel, value) => { - c1.Add(channel); - l1.Add(value); + c1.Add(channel!); + l1.Add(value!); }); - s12.Subscribe(channelName, (channel, value) => l12.Add(value)); - s2.Subscribe(channelName, (channel, value) => l2.Add(value)); - s.Subscribe(channelName, (channel, value) => l.Add(value)); - s.Subscribe("*" + channelName, (channel, value) => lAll.Add(value)); - s.Subscribe(prefix1 + channelName, (channel, value) => lT1.Add(value)); + s12.Subscribe(channelName, (_channel, value) => l12.Add(value!)); + s2.Subscribe(channelName, (_channel, value) => l2.Add(value!)); + s.Subscribe(channelName, (_channel, value) => l.Add(value!)); + s.Subscribe("*" + channelName, (_channel, value) => lAll.Add(value!)); + s.Subscribe(prefix1 + channelName, (_channel, value) => lT1.Add(value!)); s1.Publish(channelName, "value1"); s.Publish(channelName, "value"); @@ -55,7 +55,7 @@ public async Task UsePrefixForChannel() Assert.Single(l12); Assert.Equal("value1",l12[0]); - + Assert.Empty(l2); Assert.Single(l); From 0c9302b42ee8ee3c3fec586fc98b6177ec55e8fe Mon Sep 17 00:00:00 2001 From: Nick Craver Date: Sat, 29 Oct 2022 09:28:16 -0400 Subject: [PATCH 4/4] Ordering, add release notes --- docs/ReleaseNotes.md | 1 + .../KeyPrefixedSubscriber.cs | 40 +++++++++---------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 1bb9bf808..d4cd15b5d 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -11,6 +11,7 @@ Current package versions: - Adds: `last-in` and `cur-in` (bytes) to timeout exceptions to help identify timeouts that were just-behind another large payload off the wire ([#2276 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2276)) - Adds: general-purpose tunnel support, with HTTP proxy "connect" support included ([#2274 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2274)) - Removes: Package dependency (`System.Diagnostics.PerformanceCounter`) ([#2285 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2285)) +- Adds: `.WithChannelPrefix()` to `ISubscriber` for key-preifixing channels like the existing `.WithKeyPreifx()` on `IDatabase` ([#896 by pecanw](https://github.com/StackExchange/StackExchange.Redis/pull/896)) ## 2.6.70 diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs index 22389bb6f..b4e1b97b9 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs @@ -18,24 +18,16 @@ public KeyPrefixedSubscriber(ISubscriber inner, byte[] prefix) public IConnectionMultiplexer Multiplexer => Inner.Multiplexer; - public bool TryWait(Task task) => Inner.TryWait(task); - - public void Wait(Task task) => Inner.Wait(task); + public EndPoint? IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpoint(ToInner(channel), flags); - public T Wait(Task task) => Inner.Wait(task); + public Task IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpointAsync(ToInner(channel), flags); - public void WaitAll(params Task[] tasks) => Inner.WaitAll(tasks); + public bool IsConnected(RedisChannel channel = default(RedisChannel)) => Inner.IsConnected(ToInner(channel)); public TimeSpan Ping(CommandFlags flags = CommandFlags.None) => Inner.Ping(flags); public Task PingAsync(CommandFlags flags = CommandFlags.None) => Inner.PingAsync(flags); - public EndPoint? IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpoint(ToInner(channel), flags); - - public Task IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpointAsync(ToInner(channel), flags); - - public bool IsConnected(RedisChannel channel = default(RedisChannel)) => Inner.IsConnected(ToInner(channel)); - public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) => Inner.Publish(ToInner(channel), message, flags); public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) => Inner.PublishAsync(ToInner(channel), message, flags); @@ -54,28 +46,38 @@ public Task SubscribeAsync(RedisChannel channel, Action Inner.SubscribedEndpoint(ToInner(channel)); + public bool TryWait(Task task) => Inner.TryWait(task); + public void Unsubscribe(RedisChannel channel, Action? handler = null, CommandFlags flags = CommandFlags.None) => Inner.Unsubscribe(ToInner(channel), ToInner(handler), flags); public void UnsubscribeAll(CommandFlags flags = CommandFlags.None) { if (Prefix.IsNullOrEmpty) + { Inner.UnsubscribeAll(flags); + } else + { Inner.Unsubscribe(new RedisChannel(Prefix + "*", RedisChannel.PatternMode.Pattern), null, flags); + } } - public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None) - { - if (Prefix.IsNullOrEmpty) - return Inner.UnsubscribeAllAsync(flags); - else - return Inner.UnsubscribeAsync(new RedisChannel(Prefix + "*", RedisChannel.PatternMode.Pattern), null, flags); - } + public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None) => Prefix.IsNullOrEmpty + ? Inner.UnsubscribeAllAsync(flags) + : Inner.UnsubscribeAsync(new RedisChannel(Prefix + "*", RedisChannel.PatternMode.Pattern), null, flags); public Task UnsubscribeAsync(RedisChannel channel, Action? handler = null, CommandFlags flags = CommandFlags.None) => Inner.UnsubscribeAsync(ToInner(channel), ToInner(handler), flags); + public void Wait(Task task) => Inner.Wait(task); + + public T Wait(Task task) => Inner.Wait(task); + + public void WaitAll(params Task[] tasks) => Inner.WaitAll(tasks); + + protected Action ToInner(Action? handler) => (channel, value) => handler?.Invoke(ToOuter(channel), value); + public RedisChannel ToInner(RedisChannel outer) { if (Prefix.IsNullOrEmpty) return outer; @@ -94,8 +96,6 @@ public RedisChannel ToInner(RedisChannel outer) return new RedisChannel(innerArr, patternMode); } - protected Action ToInner(Action? handler) => (channel, value) => handler?.Invoke(ToOuter(channel), value); - public RedisChannel ToOuter(RedisChannel inner) { if (Prefix.IsNullOrEmpty || inner.IsNullOrEmpty) return inner;