RxDart is a reactive functional programming library for Google Dart, based on ReactiveX.
Google Dart comes with a very decent Streams API out-of-the-box; rather than attempting to provide an alternative to this API, RxDart adds functionality on top of it.
Dart 1.0 is supported until release 0.15.x, version 0.16.x is no longer backwards compatible and requires the Dart SDK 2.0
void main() {
const konamiKeyCodes = const <int>[
KeyCode.UP,
KeyCode.UP,
KeyCode.DOWN,
KeyCode.DOWN,
KeyCode.LEFT,
KeyCode.RIGHT,
KeyCode.LEFT,
KeyCode.RIGHT,
KeyCode.B,
KeyCode.A];
final result = querySelector('#result');
final keyUp = Observable<KeyboardEvent>(document.onKeyUp);
keyUp
.map((event) => event.keyCode)
.bufferCount(10, 1)
.where((lastTenKeyCodes) => const IterableEquality<int>().equals(lastTenKeyCodes, konamiKeyCodes))
.listen((_) => result.innerHtml = 'KONAMI!');
}
RxDart's Observables extends the Stream class. This has two major implications:
- All methods defined on the Stream class exist on RxDart's Observables as well.
- All Observables can be passed to any API that expects a Dart Stream as an input.
- Additional important distinctions are documented as part of the Observable class
Finally, the Observable class & operators are simple wrappers around Stream
and StreamTransformer
classes. All underlying implementations can be used free of the Observable class, and are exposed in their own libraries. They are linked to below.
Generally speaking, creating a new Observable is either done by wrapping a Dart Stream using the top-level constructor new Observable()
, or by calling a factory method on the Observable class.
But to better support Dart's strong mode, combineLatest
and zip
have been pulled apart into fixed-length constructors.
These methods are supplied as static methods, since Dart's factory methods don't support generic types.
var myObservable = new Observable(myStream);
- concat / ConcatStream
- concatEager / ConcatEagerStream
- defer / DeferStream
- error / ErrorStream
- just
- merge / MergeStream
- never / NeverStream
- periodic
- race / RaceStream
- repeat / RepeatStream
- retry / RetryStream
- retryWhen / RetryWhenStream
- sequenceEqual / SequenceEqualStream
- switchLatest / SwitchLatestStream
- timer / TimerStream
var myObservable = new Observable.merge([myFirstStream, mySecondStream]);
- combineLatest / CombineLatestStream (combineLatest2, combineLatest... combineLatest9)
- forkJoin / ForkJoinStream (forkJoin2, forkJoin... forkJoin9)
- range / RangeStream
- tween / TweenStream
- zip / ZipStream (zip2, zip3, zip4, ..., zip9)
var myObservable = Observable.combineLatest3(
myFirstStream,
mySecondStream,
myThirdStream,
(firstData, secondData, thirdData) => print("$firstData $secondData $thirdData"));
- buffer / BufferStreamTransformer
- bufferCount / BufferStreamTransformer / onCount
- bufferTest / BufferStreamTransformer / onTest
- bufferTime / BufferStreamTransformer / onTime
- concatMap (alias for
asyncExpand
) - concatWith
- debounce / DebounceStreamTransformer
- debounceTime / DebounceStreamTransformer
- delay / DelayStreamTransformer
- dematerialize / DematerializeStreamTransformer
- distinctUnique / DistinctUniqueStreamTransformer
- doOnCancel / DoStreamTransformer
- doOnData / DoStreamTransformer
- doOnDone / DoStreamTransformer
- doOnEach / DoStreamTransformer
- doOnError / DoStreamTransformer
- doOnListen / DoStreamTransformer
- doOnPause / DoStreamTransformer
- doOnResume / DoStreamTransformer
- exhaustMap / ExhaustMapStreamTransformer
- flatMap / FlatMapStreamTransformer
- flatMapIterable
- groupBy / GroupByStreamTransformer
- interval / IntervalStreamTransformer
- mapTo / MapToStreamTransformer
- materialize / MaterializeStreamTransformer
- mergeWith
- max / StreamMaxFuture
- min / StreamMinFuture
- onErrorResume / OnErrorResumeStreamTransformer
- onErrorResumeNext / OnErrorResumeStreamTransformer
- onErrorReturn / OnErrorResumeStreamTransformer
- onErrorReturnWith / OnErrorResumeStreamTransformer
- sample / SampleStreamTransformer
- sampleTime / SampleStreamTransformer
- scan / ScanStreamTransformer
- skipUntil / SkipUntilStreamTransformer
- startWith / StartWithStreamTransformer
- startWithMany / StartWithManyStreamTransformer
- switchMap / SwitchMapStreamTransformer
- takeUntil / TakeUntilStreamTransformer
- timeInterval / TimeIntervalStreamTransformer
- timestamp / TimestampStreamTransformer
- throttle / ThrottleStreamTransformer
- throttleTime / ThrottleStreamTransformer
- window / WindowStreamTransformer
- windowCount / WindowStreamTransformer / onCount
- windowTest / WindowStreamTransformer / onTest
- windowTime / WindowStreamTransformer / onTime
- withLatestFrom / WithLatestFromStreamTransformer
- zipWith
A full list of all methods and properties including those provided by the Dart Stream API (such as first
, asyncMap
, etc), can be seen by examining the DartDocs
var myObservable = new Observable(myStream)
.bufferCount(5)
.distinct();
Web and command-line examples can be found in the example
folder.
In order to run the web examples, please follow these steps:
- Clone this repo and enter the directory
- Run
pub get
- Run
pub run build_runner serve example
- Navigate to http://localhost:8080/web/ in your browser
In order to run the command line example, please follow these steps:
- Clone this repo and enter the directory
- Run
pub get
- Run
dart example/example.dart 10
In order to run the flutter example, you must have Flutter installed. For installation instructions, view the online documentation.
- Open up an Android Emulator, the iOS Simulator, or connect an appropriate mobile device for debugging.
- Open up a terminal
cd
into theexample/flutter/github_search
directory- Run
flutter doctor
to ensure you have all Flutter dependencies working. - Run
flutter packages get
- Run
flutter run
Refer to the Changelog to get all release notes.