|
1 | | -# Extensions |
2 | | - |
3 | | -## Overview |
4 | | -Extensions for concerns found in System.Reactive that make consuming the library and using it to build an application easier. |
5 | | - |
6 | | -## The library contains the following extensions |
7 | | - |
8 | | -### `ReactiveMarbles.Extensions` |
9 | | - |
10 | | -#### `ReactiveExtensions` |
11 | | - |
12 | | -- WhereIsNotNull |
13 | | -- AsSignal |
14 | | -- SyncTimer |
15 | | -- BufferUntil |
16 | | -- CatchIgnore |
17 | | -- CombineLatestValuesAreAllFalse |
18 | | -- CombineLatestValuesAreAllTrue |
19 | | -- GetMax |
20 | | -- GetMin |
21 | | -- DetectStale |
22 | | -- Conflate |
23 | | -- Heartbeat |
24 | | -- WithLimitedConcurrency |
25 | | -- OnNext |
26 | | -- ObserveOnSafe |
27 | | -- Start |
28 | | -- ForEach |
29 | | -- ScheduleSafe |
30 | | -- FromArray |
31 | | -- Using |
32 | | -- While |
33 | | -- Schedule |
34 | | -- Filter |
35 | | -- Shuffle |
36 | | -- OnErrorRetry |
37 | | -- TakeUntil |
38 | | -- SyncronizeAsync |
39 | | -- SubscribeAsync |
40 | | -- SyncronizeSynchronous |
41 | | -- SubscribeSynchronous |
| 1 | +# ReactiveMarbles.Extensions |
| 2 | + |
| 3 | +A focused collection of high–value Reactive Extensions (Rx) operators that do **not** ship with `System.Reactive` but are commonly needed when building reactive .NET applications. |
| 4 | + |
| 5 | +The goal of this library is to: |
| 6 | +- Reduce boilerplate for frequent reactive patterns (timers, buffering, throttling, heartbeats, etc.) |
| 7 | +- Provide pragmatic, allocation?aware helpers for performance sensitive scenarios |
| 8 | +- Avoid additional dependencies – only `System.Reactive` is required |
| 9 | + |
| 10 | +Supported Target Frameworks: `.NET Standard 2.0`, `.NET 8`, `.NET 9`, `.NET 10`. |
| 11 | + |
| 12 | +--- |
| 13 | +## Table of Contents |
| 14 | +1. [Installation](#installation) |
| 15 | +2. [Quick Start](#quick-start) |
| 16 | +3. [API Catalog](#api-catalog) |
| 17 | +4. [Operator Categories & Examples](#operator-categories--examples) |
| 18 | + - [Null / Signal Helpers](#null--signal-helpers) |
| 19 | + - [Timing, Scheduling & Flow Control](#timing-scheduling--flow-control) |
| 20 | + - [Inactivity / Liveness](#inactivity--liveness) |
| 21 | + - [Error Handling & Resilience](#error-handling--resilience) |
| 22 | + - [Combining, Partitioning & Logical Helpers](#combining-partitioning--logical-helpers) |
| 23 | + - [Async / Task Integration](#async--task-integration) |
| 24 | + - [Backpressure / Conflation](#backpressure--conflation) |
| 25 | + - [Selective & Conditional Emission](#selective--conditional-emission) |
| 26 | + - [Buffering & Transformation](#buffering--transformation) |
| 27 | + - [Subscription / Side Effects](#subscription--side-effects) |
| 28 | + - [Utility & Miscellaneous](#utility--miscellaneous) |
| 29 | +5. [Performance Notes](#performance-notes) |
| 30 | +6. [Thread Safety](#thread-safety) |
| 31 | +7. [License](#license) |
| 32 | + |
| 33 | +--- |
| 34 | +## Installation |
| 35 | +```bash |
| 36 | +# Package coming soon (example) |
| 37 | +dotnet add package ReactiveMarbles.Extensions |
| 38 | +``` |
| 39 | +Reference the project directly while developing locally. |
| 40 | + |
| 41 | +--- |
| 42 | +## Quick Start |
| 43 | +```csharp |
| 44 | +using System; |
| 45 | +using System.Reactive.Linq; |
| 46 | +using ReactiveMarbles.Extensions; |
| 47 | + |
| 48 | +var source = Observable.Interval(TimeSpan.FromMilliseconds(120)) |
| 49 | + .Take(10) |
| 50 | + .Select(i => (long?) (i % 3 == 0 ? null : i)); |
| 51 | + |
| 52 | +// 1. Filter nulls + convert to a Unit signal. |
| 53 | +var signal = source.WhereIsNotNull().AsSignal(); |
| 54 | + |
| 55 | +// 2. Add a heartbeat if the upstream goes quiet for 500ms. |
| 56 | +var withHeartbeat = source.WhereIsNotNull() |
| 57 | + .Heartbeat(TimeSpan.FromMilliseconds(500), Scheduler.Default); |
| 58 | + |
| 59 | +// 3. Retry with exponential backoff up to 5 times. |
| 60 | +var resilient = Observable.Defer(() => |
| 61 | + Observable.Throw<long>(new InvalidOperationException("Boom"))) |
| 62 | + .RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100)); |
| 63 | + |
| 64 | +// 4. Conflate bursty updates. |
| 65 | +var conflated = source.Conflate(TimeSpan.FromMilliseconds(300), Scheduler.Default); |
| 66 | + |
| 67 | +using (conflated.Subscribe(Console.WriteLine)) |
| 68 | +{ |
| 69 | + Console.ReadLine(); |
| 70 | +} |
| 71 | +``` |
| 72 | + |
| 73 | +--- |
| 74 | +## API Catalog |
| 75 | +Below is the full list of extension methods (grouped logically). |
| 76 | +Some overloads omitted for brevity. |
| 77 | + |
| 78 | +| Category | Operators | |
| 79 | +|----------|-----------| |
| 80 | +| Null & Signal | `WhereIsNotNull`, `AsSignal` | |
| 81 | +| Timing & Scheduling | `SyncTimer`, `Schedule` (overloads), `ScheduleSafe`, `ThrottleFirst`, `DebounceImmediate` | |
| 82 | +| Inactivity / Liveness | `Heartbeat`, `DetectStale`, `BufferUntilInactive` | |
| 83 | +| Error Handling | `CatchIgnore`, `CatchAndReturn`, `OnErrorRetry` (overloads), `RetryWithBackoff` | |
| 84 | +| Combining & Aggregation | `CombineLatestValuesAreAllTrue`, `CombineLatestValuesAreAllFalse`, `GetMax`, `GetMin`, `Partition` | |
| 85 | +| Logical / Boolean | `Not`, `WhereTrue`, `WhereFalse` | |
| 86 | +| Async / Task | `SelectAsyncSequential`, `SelectLatestAsync`, `SelectAsyncConcurrent`, `SubscribeAsync` (overloads), `SynchronizeSynchronous`, `SynchronizeAsync`, `SubscribeSynchronous` (overloads) | |
| 87 | +| Backpressure | `Conflate` | |
| 88 | +| Filtering / Conditional | `Filter` (Regex), `TakeUntil` (predicate), `WaitUntil` | |
| 89 | +| Buffering | `BufferUntil`, `BufferUntilInactive` | |
| 90 | +| Transformation & Utility | `Shuffle`, `ForEach`, `FromArray`, `Using`, `While`, `Start`, `OnNext` (params helper), `DoOnSubscribe`, `DoOnDispose` | |
| 91 | + |
| 92 | +--- |
| 93 | +## Operator Categories & Examples |
| 94 | +### Null / Signal Helpers |
| 95 | +```csharp |
| 96 | +IObservable<string?> raw = GetPossiblyNullStream(); |
| 97 | +IObservable<string> cleaned = raw.WhereIsNotNull(); |
| 98 | +IObservable<Unit> signal = cleaned.AsSignal(); |
| 99 | +``` |
| 100 | + |
| 101 | +### Timing, Scheduling & Flow Control |
| 102 | +```csharp |
| 103 | +// Shared timer for a given period (one underlying timer per distinct TimeSpan) |
| 104 | +var sharedTimer = ReactiveExtensions.SyncTimer(TimeSpan.FromSeconds(1)); |
| 105 | + |
| 106 | +// Delay emission of a single value |
| 107 | +42.Schedule(TimeSpan.FromMilliseconds(250), Scheduler.Default) |
| 108 | + .Subscribe(v => Console.WriteLine($"Delayed: {v}")); |
| 109 | + |
| 110 | +// Safe scheduling when a scheduler may be null |
| 111 | +IScheduler? maybeScheduler = null; |
| 112 | +maybeScheduler.ScheduleSafe(() => Console.WriteLine("Ran inline")); |
| 113 | + |
| 114 | +// ThrottleFirst: allow first item per window, ignore rest |
| 115 | +var throttled = Observable.Interval(TimeSpan.FromMilliseconds(50)) |
| 116 | + .ThrottleFirst(TimeSpan.FromMilliseconds(200)); |
| 117 | + |
| 118 | +// DebounceImmediate: emit first immediately then debounce rest |
| 119 | +var debounced = Observable.Interval(TimeSpan.FromMilliseconds(40)) |
| 120 | + .DebounceImmediate(TimeSpan.FromMilliseconds(250)); |
| 121 | +``` |
| 122 | + |
| 123 | +### Inactivity / Liveness |
| 124 | +```csharp |
| 125 | +// Heartbeat emits IHeartbeat<T> where IsHeartbeat == true during quiet periods |
| 126 | +var heartbeats = Observable.Interval(TimeSpan.FromMilliseconds(400)) |
| 127 | + .Take(5) |
| 128 | + .Heartbeat(TimeSpan.FromMilliseconds(300), Scheduler.Default); |
| 129 | + |
| 130 | +// DetectStale emits IStale<T>: one stale marker after inactivity, or fresh update wrappers |
| 131 | +var staleAware = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(500)) |
| 132 | + .Take(3) |
| 133 | + .DetectStale(TimeSpan.FromMilliseconds(300), Scheduler.Default); |
| 134 | + |
| 135 | +// BufferUntilInactive groups events separated by inactivity |
| 136 | +var bursts = Observable.Interval(TimeSpan.FromMilliseconds(60)).Take(20); |
| 137 | +var groups = bursts.BufferUntilInactive(TimeSpan.FromMilliseconds(200)); |
| 138 | +``` |
| 139 | + |
| 140 | +### Error Handling & Resilience |
| 141 | +```csharp |
| 142 | +var flaky = Observable.Create<int>(o => |
| 143 | +{ |
| 144 | + o.OnNext(1); |
| 145 | + o.OnError(new InvalidOperationException("Fail")); |
| 146 | + return () => { }; |
| 147 | +}); |
| 148 | + |
| 149 | +// Ignore all errors and complete silently |
| 150 | +a flakySafe = flaky.CatchIgnore(); |
| 151 | + |
| 152 | +// Replace error with a fallback value |
| 153 | +var withFallback = flaky.CatchAndReturn(-1); |
| 154 | + |
| 155 | +// Retry only specific exception type with logging |
| 156 | +var retried = flaky.OnErrorRetry<int, InvalidOperationException>(ex => Console.WriteLine(ex.Message), retryCount: 3); |
| 157 | + |
| 158 | +// Retry with exponential backoff |
| 159 | +var backoff = flaky.RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100)); |
| 160 | +``` |
| 161 | + |
| 162 | +### Combining, Partitioning & Logical Helpers |
| 163 | +```csharp |
| 164 | +var a = Observable.Interval(TimeSpan.FromMilliseconds(150)).Select(i => i % 2 == 0); |
| 165 | +var b = Observable.Interval(TimeSpan.FromMilliseconds(170)).Select(i => i % 3 == 0); |
| 166 | + |
| 167 | +var allTrue = new[] { a, b }.CombineLatestValuesAreAllTrue(); |
| 168 | +var allFalse = new[] { a, b }.CombineLatestValuesAreAllFalse(); |
| 169 | + |
| 170 | +var numbers = Observable.Range(1, 10); |
| 171 | +var (even, odd) = numbers.Partition(n => n % 2 == 0); // Partition stream |
| 172 | +
|
| 173 | +var toggles = a.Not(); // Negate booleans |
| 174 | +``` |
| 175 | + |
| 176 | +### Async / Task Integration |
| 177 | +```csharp |
| 178 | +IObservable<int> inputs = Observable.Range(1, 5); |
| 179 | + |
| 180 | +// Sequential (preserves order) |
| 181 | +var seq = inputs.SelectAsyncSequential(async i => { await Task.Delay(50); return i * 2; }); |
| 182 | + |
| 183 | +// Latest only (cancels previous) |
| 184 | +var latest = inputs.SelectLatestAsync(async i => { await Task.Delay(100); return i; }); |
| 185 | + |
| 186 | +// Limited parallelism |
| 187 | +var concurrent = inputs.SelectAsyncConcurrent(async i => { await Task.Delay(100); return i; }, maxConcurrency: 2); |
| 188 | + |
| 189 | +// Asynchronous subscription (serializing tasks) |
| 190 | +inputs.SubscribeAsync(async i => await Task.Delay(10)); |
| 191 | + |
| 192 | +// Synchronous gate: ensures per-item async completion before next is emitted |
| 193 | +a inputs.SubscribeSynchronous(async i => await Task.Delay(25)); |
| 194 | +``` |
| 195 | + |
| 196 | +### Backpressure / Conflation |
| 197 | +```csharp |
| 198 | +// Conflate: enforce minimum spacing between emissions while always outputting the most recent value |
| 199 | +a var noisy = Observable.Interval(TimeSpan.FromMilliseconds(20)).Take(30); |
| 200 | +var conflated = noisy.Conflate(TimeSpan.FromMilliseconds(200), Scheduler.Default); |
| 201 | +``` |
| 202 | + |
| 203 | +### Selective & Conditional Emission |
| 204 | +```csharp |
| 205 | +// TakeUntil predicate (inclusive) |
| 206 | +var untilFive = Observable.Range(1, 100).TakeUntil(x => x == 5); |
| 207 | + |
| 208 | +// WaitUntil first match then complete |
| 209 | +var firstEven = Observable.Range(1, 10).WaitUntil(x => x % 2 == 0); |
| 210 | +``` |
| 211 | + |
| 212 | +### Buffering & Transformation |
| 213 | +```csharp |
| 214 | +// BufferUntil - collect chars between delimiters |
| 215 | +var chars = "<a><bc><d>".ToCharArray().ToObservable(); |
| 216 | +var frames = chars.BufferUntil('<', '>'); // emits "<a>", "<bc>", "<d>" |
| 217 | +
|
| 218 | +// Shuffle arrays in-place |
| 219 | +var arrays = Observable.Return(new[] { 1, 2, 3, 4, 5 }); |
| 220 | +var shuffled = arrays.Shuffle(); |
| 221 | +``` |
| 222 | + |
| 223 | +### Subscription & Side Effects |
| 224 | +```csharp |
| 225 | +var stream = Observable.Range(1, 3) |
| 226 | + .DoOnSubscribe(() => Console.WriteLine("Subscribed")) |
| 227 | + .DoOnDispose(() => Console.WriteLine("Disposed")); |
| 228 | + |
| 229 | +using (stream.Subscribe(Console.WriteLine)) |
| 230 | +{ |
| 231 | + // auto dispose at using end |
| 232 | +} |
| 233 | +``` |
| 234 | + |
| 235 | +### Utility & Miscellaneous |
| 236 | +```csharp |
| 237 | +// Emit list contents quickly with low allocations |
| 238 | +var listSource = Observable.Return<IEnumerable<int>>(new List<int> { 1, 2, 3 }); |
| 239 | +listSource.ForEach().Subscribe(Console.WriteLine); |
| 240 | + |
| 241 | +// Using helper for deterministic disposal |
| 242 | +var value = new MemoryStream().Using(ms => ms.Length); |
| 243 | + |
| 244 | +// While loop (reactive) |
| 245 | +var counter = 0; |
| 246 | +ReactiveExtensions.While(() => counter++ < 3, () => Console.WriteLine(counter)) |
| 247 | + .Subscribe(); |
| 248 | + |
| 249 | +// Batch push with OnNext params |
| 250 | +var subj = new Subject<int>(); |
| 251 | +subj.OnNext(1, 2, 3, 4); |
| 252 | +``` |
| 253 | + |
| 254 | +--- |
| 255 | +## Performance Notes |
| 256 | +- `FastForEach` path avoids iterator allocations for `List<T>`, `IList<T>`, and arrays. |
| 257 | +- `SyncTimer` ensures only one shared timer per period reducing timer overhead. |
| 258 | +- `Conflate` helps tame high–frequency producers without dropping the final value of a burst. |
| 259 | +- `Heartbeat` and `DetectStale` use lightweight scheduling primitives. |
| 260 | +- Most operators avoid capturing lambdas in hot loops where practical. |
| 261 | + |
| 262 | +## Thread Safety |
| 263 | +- All operators are pure functional transformations unless documented otherwise. |
| 264 | +- `SyncTimer` uses a `ConcurrentDictionary` and returns a hot `IConnectableObservable` that connects once per unique `TimeSpan`. |
| 265 | +- Methods returning shared observables (`SyncTimer`, `Partition` result sequences) are safe for multi-subscriber usage unless the upstream is inherently side-effecting. |
| 266 | + |
| 267 | +## License |
| 268 | +MIT – see LICENSE file. |
| 269 | + |
| 270 | +--- |
| 271 | +## Contributing |
| 272 | +Issues / PRs welcome. Please keep additions dependency–free and focused on broadly useful reactive patterns. |
| 273 | + |
| 274 | +--- |
| 275 | +## Change Log (Excerpt) |
| 276 | +(Keep this section updated as the library evolves.) |
| 277 | +- Added async task projection helpers (`SelectAsyncSequential`, `SelectLatestAsync`, `SelectAsyncConcurrent`). |
| 278 | +- Added liveness operators (`Heartbeat`, `DetectStale`, `BufferUntilInactive`). |
| 279 | +- Added resilience (`RetryWithBackoff`, expanded `OnErrorRetry` overloads). |
| 280 | +- Added flow control (`Conflate`, `ThrottleFirst`, `DebounceImmediate`). |
| 281 | + |
| 282 | +--- |
| 283 | +Happy reactive coding! ?? |
0 commit comments