Skip to content

Commit

Permalink
feat: add toBroadcastValueStream
Browse files Browse the repository at this point in the history
  • Loading branch information
alextekartik committed May 27, 2024
1 parent de21018 commit 358cf04
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 4 deletions.
100 changes: 96 additions & 4 deletions app_rx/lib/src/helpers.dart
Original file line number Diff line number Diff line change
@@ -1,23 +1,115 @@
import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:tekartik_common_utils/common_utils_import.dart';

final _debug = false;

/// Stream helpers
extension TekartikRxStreamExt<T> on Stream<T> {
/// Convert any stream to a behavior subject
@Deprecated('Use toBroadcastValueStream')
BehaviorSubject<T> toBehaviorSubject() {
late StreamSubscription<T> subscription;
StreamSubscription<T>? subscription;
late BehaviorSubject<T> subject;
if (_debug) {
print('toBehaviorSubject $hashCode');
}
subject = BehaviorSubject<T>(
onListen: () {
if (_debug) {
print('onListen $hashCode');
}
subscription = listen((event) {
subject.add(event);
});
},
onCancel: () {
subscription.cancel();
if (_debug) {
print('onCancel $hashCode');
}
subscription?.cancel();
},
sync: true);
return subject;
}

/// It won't listen to the stream until the first listener is added and stop
/// when it is closed.
BroadcastValueStream<T> toBroadcastValueStream() =>
_BroadcastValueStream(this);
}

/// Stream subscription is closed when then stream is closed.
abstract class BroadcastValueStream<T> extends ValueStream<T> {
Future<void> close();
}

class _BroadcastValueStream<T> extends Stream<T>
implements BroadcastValueStream<T> {
late final BehaviorSubject<T> _subject;
StreamSubscription<T>? subscription;

_BroadcastValueStream(Stream<T> stream) {
_subject = BehaviorSubject<T>(
onListen: () {
if (_debug) {
print('onListen $hashCode');
}
subscription ??= stream.listen((event) {
_subject.add(event);
});
},
onCancel: () {
if (_debug) {
print('onCancel $hashCode');
}
},
sync: true);
}

@override
StreamSubscription<T> listen(
void Function(T event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
_subject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);

@override
Future<void> close() async {
if (_debug) {
print('close $hashCode');
}
subscription?.cancel().unawait();
await _subject.close();
}

@override
bool get isBroadcast => _subject.isBroadcast;

@override
Object get error => _subject.error;

@override
Object? get errorOrNull => _subject.errorOrNull;

@override
bool get hasError => _subject.hasError;

@override
bool get hasValue => _subject.hasValue;

@override
StackTrace? get stackTrace => _subject.stackTrace;

@override
T get value => _subject.value;

@override
T? get valueOrNull => _subject.valueOrNull;
}
56 changes: 56 additions & 0 deletions app_rx/test/helpers_test.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// ignore_for_file: deprecated_member_use_from_same_package

import 'package:tekartik_app_rx/helpers.dart';
import 'package:tekartik_common_utils/common_utils_import.dart';
import 'package:test/test.dart';
Expand Down Expand Up @@ -31,5 +33,59 @@ void main() {
expect(subject.value, 3);
await subject.close();
});
test('toBroadcastValueStream', () async {
var subject = Stream.fromIterable([1, 2, 3]).toBroadcastValueStream();

var completer = Completer<void>();
subject.first.then((value) {
expect(value, 1);
expect(subject.value, 1);
}).unawait();

StreamSubscription? subscription1;
subscription1 = subject.listen((data) {
if (data == 1) {
subscription1?.cancel();
subscription1 = null;
}
});
subject.listen((data) {
if (data == 3) {
completer.complete();
}
});
await completer.future;
expect(subscription1, isNull);
expect(await subject.first, 3);
expect(subject.value, 3);
await subject.close();
});
test('cancel toBehaviorSubject', () async {
var streamController = StreamController<int>(sync: true);
var subject = streamController.stream.toBehaviorSubject();

streamController.add(1);
expect(await subject.first, 1);
await sleep(1);

try {
// expect(await subject.first, 1);
} catch (e) {
print(e);
}

await subject.close();
});
test('cancel toBroadcastValueStream', () async {
var streamController = StreamController<int>(sync: true);
var subject = streamController.stream.toBroadcastValueStream();

streamController.add(1);
await subject.first;
await sleep(1);
await subject.first;

await subject.close();
});
});
}

0 comments on commit 358cf04

Please sign in to comment.