diff --git a/app_rx/lib/src/helpers.dart b/app_rx/lib/src/helpers.dart index d18122d..56d8dfc 100644 --- a/app_rx/lib/src/helpers.dart +++ b/app_rx/lib/src/helpers.dart @@ -1,23 +1,115 @@ -import 'dart:async'; - import 'package:rxdart/rxdart.dart'; +import 'package:tekartik_common_utils/common_utils_import.dart'; + +final _debug = false; /// Stream helpers extension TekartikRxStreamExt on Stream { /// Convert any stream to a behavior subject + @Deprecated('Use toBroadcastValueStream') BehaviorSubject toBehaviorSubject() { - late StreamSubscription subscription; + StreamSubscription? subscription; late BehaviorSubject subject; + if (_debug) { + print('toBehaviorSubject $hashCode'); + } subject = BehaviorSubject( onListen: () { + if (_debug) { + print('onListen $hashCode'); + } subscription = listen((event) { subject.add(event); }); }, onCancel: () { - subscription.cancel(); + if (_debug) { + print('onCancel $hashCode'); + } + subscription?.cancel(); }, sync: true); return subject; } + + /// It won't listen to the stream until the first listener is added and stop + /// when it is closed. + BroadcastValueStream toBroadcastValueStream() => + _BroadcastValueStream(this); +} + +/// Stream subscription is closed when then stream is closed. +abstract class BroadcastValueStream extends ValueStream { + Future close(); +} + +class _BroadcastValueStream extends Stream + implements BroadcastValueStream { + late final BehaviorSubject _subject; + StreamSubscription? subscription; + + _BroadcastValueStream(Stream stream) { + _subject = BehaviorSubject( + onListen: () { + if (_debug) { + print('onListen $hashCode'); + } + subscription ??= stream.listen((event) { + _subject.add(event); + }); + }, + onCancel: () { + if (_debug) { + print('onCancel $hashCode'); + } + }, + sync: true); + } + + @override + StreamSubscription listen( + void Function(T event)? onData, { + Function? onError, + void Function()? onDone, + bool? cancelOnError, + }) => + _subject.listen( + onData, + onError: onError, + onDone: onDone, + cancelOnError: cancelOnError, + ); + + @override + Future close() async { + if (_debug) { + print('close $hashCode'); + } + subscription?.cancel().unawait(); + await _subject.close(); + } + + @override + bool get isBroadcast => _subject.isBroadcast; + + @override + Object get error => _subject.error; + + @override + Object? get errorOrNull => _subject.errorOrNull; + + @override + bool get hasError => _subject.hasError; + + @override + bool get hasValue => _subject.hasValue; + + @override + StackTrace? get stackTrace => _subject.stackTrace; + + @override + T get value => _subject.value; + + @override + T? get valueOrNull => _subject.valueOrNull; } diff --git a/app_rx/test/helpers_test.dart b/app_rx/test/helpers_test.dart index bdcd3ec..773d9db 100644 --- a/app_rx/test/helpers_test.dart +++ b/app_rx/test/helpers_test.dart @@ -1,3 +1,5 @@ +// ignore_for_file: deprecated_member_use_from_same_package + import 'package:tekartik_app_rx/helpers.dart'; import 'package:tekartik_common_utils/common_utils_import.dart'; import 'package:test/test.dart'; @@ -31,5 +33,59 @@ void main() { expect(subject.value, 3); await subject.close(); }); + test('toBroadcastValueStream', () async { + var subject = Stream.fromIterable([1, 2, 3]).toBroadcastValueStream(); + + var completer = Completer(); + subject.first.then((value) { + expect(value, 1); + expect(subject.value, 1); + }).unawait(); + + StreamSubscription? subscription1; + subscription1 = subject.listen((data) { + if (data == 1) { + subscription1?.cancel(); + subscription1 = null; + } + }); + subject.listen((data) { + if (data == 3) { + completer.complete(); + } + }); + await completer.future; + expect(subscription1, isNull); + expect(await subject.first, 3); + expect(subject.value, 3); + await subject.close(); + }); + test('cancel toBehaviorSubject', () async { + var streamController = StreamController(sync: true); + var subject = streamController.stream.toBehaviorSubject(); + + streamController.add(1); + expect(await subject.first, 1); + await sleep(1); + + try { + // expect(await subject.first, 1); + } catch (e) { + print(e); + } + + await subject.close(); + }); + test('cancel toBroadcastValueStream', () async { + var streamController = StreamController(sync: true); + var subject = streamController.stream.toBroadcastValueStream(); + + streamController.add(1); + await subject.first; + await sleep(1); + await subject.first; + + await subject.close(); + }); }); }