diff --git a/examples/flutter/github_search/lib/bloc/search_bloc.dart b/examples/flutter/github_search/lib/bloc/search_bloc.dart index 911281ab..b9eb022b 100644 --- a/examples/flutter/github_search/lib/bloc/search_bloc.dart +++ b/examples/flutter/github_search/lib/bloc/search_bloc.dart @@ -1,13 +1,13 @@ import 'dart:async'; -import 'package:rxdart_ext/state_stream.dart'; +import 'package:rxdart/rxdart.dart'; import '../api/github_api.dart'; import 'search_state.dart'; class SearchBloc { final Sink onTextChanged; - final StateStream state; + final ValueStream state; final StreamSubscription _subscription; @@ -25,7 +25,7 @@ class SearchBloc { // to the View. .switchMap((String term) => _search(term, api)) // The initial state to deliver to the screen. - .publishState(const SearchNoTerm()); + .publishValueSeeded(const SearchNoTerm()); final subscription = state.connect(); diff --git a/examples/flutter/github_search/lib/search_screen.dart b/examples/flutter/github_search/lib/search_screen.dart index 0a19be9b..8cc3e13c 100644 --- a/examples/flutter/github_search/lib/search_screen.dart +++ b/examples/flutter/github_search/lib/search_screen.dart @@ -22,9 +22,7 @@ class SearchScreen extends StatefulWidget { const SearchScreen({super.key, required this.api}); @override - SearchScreenState createState() { - return SearchScreenState(); - } + SearchScreenState createState() => SearchScreenState(); } class SearchScreenState extends State { @@ -47,6 +45,7 @@ class SearchScreenState extends State { Widget build(BuildContext context) { return ValueStreamBuilder( stream: bloc.state, + buildWhen: (previous, current) => previous != current, builder: (context, state, child) { return Scaffold( appBar: AppBar( diff --git a/examples/flutter/github_search/pubspec.lock b/examples/flutter/github_search/pubspec.lock index db1c28b8..b22266cb 100644 --- a/examples/flutter/github_search/pubspec.lock +++ b/examples/flutter/github_search/pubspec.lock @@ -174,14 +174,6 @@ packages: url: "https://pub.dev" source: hosted version: "3.0.6" - dart_either: - dependency: transitive - description: - name: dart_either - sha256: "928895b8266ac5906eb4e2993fead563a73b17fc86eec6b40172100d56ca2507" - url: "https://pub.dev" - source: hosted - version: "1.0.0" dart_style: dependency: transitive description: @@ -463,14 +455,6 @@ packages: relative: true source: path version: "0.28.0" - rxdart_ext: - dependency: "direct main" - description: - name: rxdart_ext - sha256: "95df7e8b13140e2c3fdb3b943569a51f18090e82aaaf6ca6e8e6437e434a6fb0" - url: "https://pub.dev" - source: hosted - version: "0.3.0" rxdart_flutter: dependency: "direct main" description: diff --git a/examples/flutter/github_search/pubspec.yaml b/examples/flutter/github_search/pubspec.yaml index 1f870d63..5fdd3759 100644 --- a/examples/flutter/github_search/pubspec.yaml +++ b/examples/flutter/github_search/pubspec.yaml @@ -15,7 +15,6 @@ dependencies: path: ../../../packages/rxdart_flutter http: ^0.13.3 flutter_spinkit: ^5.1.0 - rxdart_ext: ^0.3.0 equatable: ^2.0.5 dev_dependencies: diff --git a/packages/rxdart/lib/src/streams/connectable_stream.dart b/packages/rxdart/lib/src/streams/connectable_stream.dart index ec939a8f..b830f53d 100644 --- a/packages/rxdart/lib/src/streams/connectable_stream.dart +++ b/packages/rxdart/lib/src/streams/connectable_stream.dart @@ -175,6 +175,9 @@ class ValueConnectableStream @override StreamNotification? get lastEventOrNull => _subject.lastEventOrNull; + + @override + bool get isReplayValueStream => _subject.isReplayValueStream; } /// A [ConnectableStream] that converts a single-subscription Stream into diff --git a/packages/rxdart/lib/src/streams/replay_stream.dart b/packages/rxdart/lib/src/streams/replay_stream.dart index c43dd174..20d0ff6c 100644 --- a/packages/rxdart/lib/src/streams/replay_stream.dart +++ b/packages/rxdart/lib/src/streams/replay_stream.dart @@ -1,7 +1,9 @@ import 'package:rxdart/src/utils/collection_extensions.dart'; import 'package:rxdart/src/utils/error_and_stacktrace.dart'; -/// An [Stream] that provides synchronous access to the emitted values +/// A [Stream] that provides synchronous access to the emitted values, +/// and replays its emitted events (data and error events) +/// to new listeners when they subscribe. abstract class ReplayStream implements Stream { /// Synchronously get the values stored in Subject. May be empty. List get values; diff --git a/packages/rxdart/lib/src/streams/value_stream.dart b/packages/rxdart/lib/src/streams/value_stream.dart index 557c9e23..0fa8582b 100644 --- a/packages/rxdart/lib/src/streams/value_stream.dart +++ b/packages/rxdart/lib/src/streams/value_stream.dart @@ -43,6 +43,12 @@ abstract class ValueStream implements Stream { /// Returns the last emitted event (either data/value or error event). /// `null` if no value or error events have been emitted yet. StreamNotification? get lastEventOrNull; + + /// Returns `true` if this [ValueStream] replays its last emitted event + /// (either a value/data event or an error event) to new listeners when they subscribe. + /// + /// See also [lastEventOrNull]. + bool get isReplayValueStream; } /// Extension methods on [ValueStream] related to [lastEventOrNull]. diff --git a/packages/rxdart/lib/src/subjects/behavior_subject.dart b/packages/rxdart/lib/src/subjects/behavior_subject.dart index a70d43c6..eba4c063 100644 --- a/packages/rxdart/lib/src/subjects/behavior_subject.dart +++ b/packages/rxdart/lib/src/subjects/behavior_subject.dart @@ -187,6 +187,9 @@ class BehaviorSubject extends Subject implements ValueStream { // no event return null; } + + @override + bool get isReplayValueStream => true; } class _Wrapper { @@ -272,4 +275,7 @@ class _BehaviorSubjectStream extends Stream implements ValueStream { @override StreamNotification? get lastEventOrNull => _subject.lastEventOrNull; + + @override + bool get isReplayValueStream => _subject.isReplayValueStream; } diff --git a/packages/rxdart/test/subject/behavior_subject_test.dart b/packages/rxdart/test/subject/behavior_subject_test.dart index 26ab292a..152a8da5 100644 --- a/packages/rxdart/test/subject/behavior_subject_test.dart +++ b/packages/rxdart/test/subject/behavior_subject_test.dart @@ -1471,5 +1471,14 @@ void main() { ); }); }); + + test('isReplayValueStream', () { + expect(BehaviorSubject().isReplayValueStream, isTrue); + expect(BehaviorSubject.seeded(42).isReplayValueStream, isTrue); + + expect(BehaviorSubject().stream.isReplayValueStream, isTrue); + expect( + BehaviorSubject.seeded(42).stream.isReplayValueStream, isTrue); + }); }); } diff --git a/packages/rxdart_flutter/lib/src/value_stream_builder.dart b/packages/rxdart_flutter/lib/src/value_stream_builder.dart index b466ca29..96883673 100644 --- a/packages/rxdart_flutter/lib/src/value_stream_builder.dart +++ b/packages/rxdart_flutter/lib/src/value_stream_builder.dart @@ -1,6 +1,6 @@ import 'package:flutter/foundation.dart'; -import 'package:rxdart/rxdart.dart'; import 'package:flutter/widgets.dart'; +import 'package:rxdart/rxdart.dart'; import 'errors.dart'; import 'value_stream_listener.dart'; @@ -94,7 +94,7 @@ class ValueStreamBuilder extends StatefulWidget { required this.builder, this.buildWhen, this.child, - this.isReplayValueStream = true, + this.isReplayValueStream, }) : super(key: key); /// The [ValueStream] that the [ValueStreamBuilder] will interact with. @@ -122,9 +122,12 @@ class ValueStreamBuilder extends StatefulWidget { /// Whether or not the [stream] emits the last value /// like [BehaviorSubject] does. + /// See [ValueStream.isReplayValueStream] for more information. + /// + /// If this argument is `null`, the [ValueStream.isReplayValueStream] is used instead. /// - /// Defaults to `true`. - final bool isReplayValueStream; + /// Defaults to `null`. + final bool? isReplayValueStream; @override State> createState() => _ValueStreamBuilderState(); @@ -134,8 +137,8 @@ class ValueStreamBuilder extends StatefulWidget { super.debugFillProperties(properties); properties ..add(DiagnosticsProperty>('stream', stream)) - ..add( - DiagnosticsProperty('isReplayValueStream', isReplayValueStream)) + ..add(DiagnosticsProperty('isReplayValueStream', + isReplayValueStream ?? stream.isReplayValueStream)) ..add(ObjectFlagProperty?>.has( 'buildWhen', buildWhen, diff --git a/packages/rxdart_flutter/lib/src/value_stream_consumer.dart b/packages/rxdart_flutter/lib/src/value_stream_consumer.dart index a2f9fae4..b1e03389 100644 --- a/packages/rxdart_flutter/lib/src/value_stream_consumer.dart +++ b/packages/rxdart_flutter/lib/src/value_stream_consumer.dart @@ -85,7 +85,7 @@ class ValueStreamConsumer extends StatefulWidget { required this.builder, this.buildWhen, this.child, - this.isReplayValueStream = true, + this.isReplayValueStream, }) : super(key: key); /// The [ValueStream] that the [ValueStreamConsumer] will interact with. @@ -117,9 +117,12 @@ class ValueStreamConsumer extends StatefulWidget { /// Whether or not the [stream] emits the last value /// like [BehaviorSubject] does. + /// See [ValueStream.isReplayValueStream] for more information. /// - /// Defaults to `true`. - final bool isReplayValueStream; + /// If this argument is `null`, the [ValueStream.isReplayValueStream] is used instead. + /// + /// Defaults to `null`. + final bool? isReplayValueStream; @override State> createState() => _ValueStreamConsumerState(); @@ -129,8 +132,8 @@ class ValueStreamConsumer extends StatefulWidget { super.debugFillProperties(properties); properties ..add(DiagnosticsProperty>('stream', stream)) - ..add( - DiagnosticsProperty('isReplayValueStream', isReplayValueStream)) + ..add(DiagnosticsProperty('isReplayValueStream', + isReplayValueStream ?? stream.isReplayValueStream)) ..add(ObjectFlagProperty>.has( 'builder', builder, diff --git a/packages/rxdart_flutter/lib/src/value_stream_listener.dart b/packages/rxdart_flutter/lib/src/value_stream_listener.dart index 307484dd..6c882c94 100644 --- a/packages/rxdart_flutter/lib/src/value_stream_listener.dart +++ b/packages/rxdart_flutter/lib/src/value_stream_listener.dart @@ -51,7 +51,7 @@ class ValueStreamListener extends StatefulWidget { required this.stream, required this.listener, required this.child, - this.isReplayValueStream = true, + this.isReplayValueStream, }) : super(key: key); /// The [ValueStream] that the [ValueStreamConsumer] will interact with. @@ -67,9 +67,12 @@ class ValueStreamListener extends StatefulWidget { /// Whether or not the [stream] emits the last value /// like [BehaviorSubject] does. + /// See [ValueStream.isReplayValueStream] for more information. /// - /// Defaults to `true`. - final bool isReplayValueStream; + /// If this argument is `null`, the [ValueStream.isReplayValueStream] is used instead. + /// + /// Defaults to `null`. + final bool? isReplayValueStream; @override State> createState() => _ValueStreamListenerState(); @@ -79,8 +82,8 @@ class ValueStreamListener extends StatefulWidget { super.debugFillProperties(properties); properties ..add(DiagnosticsProperty>('stream', stream)) - ..add( - DiagnosticsProperty('isReplayValueStream', isReplayValueStream)) + ..add(DiagnosticsProperty('isReplayValueStream', + isReplayValueStream ?? stream.isReplayValueStream)) ..add(ObjectFlagProperty>.has( 'listener', listener)) ..add(ObjectFlagProperty.has('child', child)); @@ -127,21 +130,28 @@ class _ValueStreamListenerState extends State> { _currentValue = stream.value; } - final int skipCount; - - if (widget.isReplayValueStream) { - skipCount = _initialized ? 0 : 1; + if (widget.isReplayValueStream ?? stream.isReplayValueStream) { + final skipCount = _initialized ? 0 : 1; + _subscribeIfNeeded(skipCount > 0 ? stream.skip(skipCount) : stream); } else { - skipCount = 0; if (_initialized) { _ambiguate(WidgetsBinding.instance)!.addPostFrameCallback((_) { + if (widget.stream != stream) { + return; + } _notifyListener(stream.value); + _subscribeIfNeeded(stream); }); + } else { + _subscribeIfNeeded(stream); } } + } - final streamToListen = skipCount > 0 ? stream.skip(skipCount) : stream; - + void _subscribeIfNeeded(Stream streamToListen) { + if (_subscription != null) { + return; + } _subscription = streamToListen.listen( (value) { if (!mounted) return; @@ -164,6 +174,7 @@ class _ValueStreamListenerState extends State> { void _unsubscribe() { _subscription?.cancel(); + _subscription = null; } @override diff --git a/packages/rxdart_flutter/pubspec.yaml b/packages/rxdart_flutter/pubspec.yaml index aa111ab1..1205bede 100644 --- a/packages/rxdart_flutter/pubspec.yaml +++ b/packages/rxdart_flutter/pubspec.yaml @@ -17,6 +17,11 @@ dev_dependencies: sdk: flutter flutter_lints: ^1.0.4 +# TODO: Remove this +dependency_overrides: + rxdart: + path: ../rxdart + topics: - rxdart - reactive-programming diff --git a/packages/rxdart_flutter/test/src/rxdart_ext/value_subject.dart b/packages/rxdart_flutter/test/src/rxdart_ext/value_subject.dart index f788da7a..f3f2496b 100644 --- a/packages/rxdart_flutter/test/src/rxdart_ext/value_subject.dart +++ b/packages/rxdart_flutter/test/src/rxdart_ext/value_subject.dart @@ -123,6 +123,9 @@ class ValueSubject extends Subject implements ValueStream { // no event return null; } + + @override + bool get isReplayValueStream => false; } class _ValueSubjectStream extends Stream implements ValueStream { @@ -184,6 +187,9 @@ class _ValueSubjectStream extends Stream implements ValueStream { @override T? get valueOrNull => _subject.valueOrNull; + + @override + bool get isReplayValueStream => _subject.isReplayValueStream; } /// Class that holds latest value and lasted error emitted from Stream.