Reactive Extensions for the Bevy Game Engine!
rx_bevy abstracts away common event orchestration patterns under observables
and operators, so you can focus on building your logic, instead of boilerplate.
rx_bevy is a fairly low-level library, in the sense that it isn't a solution
to a specific problem, but a toolbox to implement solutions. Feel free to
build on top of rx_bevy and publish it as a library like extra
operators and observables!
Please be mindful of the crate name you choose to not block me from adding new features! Please refer to the external crate naming guide.
- To learn more about this crate, visit https://alexaegis.github.io/rx_bevy/
- To learn more about Rx in general, visit the ReactiveX Website!
If you want to jump straight to using rx_bevy check out the numbered examples
that go though how observables can be used within Bevy:
Other examples on observables, operators and subjects can be found at
crates/rx_core/examples/. I recommend cloning the repository to check them
out!
Change the virtual time speed with keyboard input!
use bevy::prelude::*;
use rx_bevy::prelude::*;
fn main() -> AppExit {
App::new()
.add_plugins((
DefaultPlugins,
RxPlugin,
RxSchedulerPlugin::<Update, Virtual>::default(),
))
.init_resource::<ExampleSubscriptions>()
.add_systems(Startup, setup)
.run()
}
#[derive(Resource, Default, Deref, DerefMut)]
struct ExampleSubscriptions {
subscriptions: SharedSubscription,
}
fn setup(rx_schedule: RxSchedule<Update, Virtual>, mut example_subscriptions: ResMut<ExampleSubscriptions>) {
let subscription = KeyboardObservable::new(KeyboardObservableOptions::default(), rx_schedule.handle())
.filter(|key_code, _| matches!(key_code, KeyCode::Digit1 | KeyCode::Digit2 | KeyCode::Digit3))
.subscribe(ResourceDestination::new(
|mut virtual_time: Mut<'_, Time<Virtual>>, signal| {
let speed = match signal {
ObserverNotification::Next(key_code) => match key_code {
KeyCode::Digit1 => 0.5,
KeyCode::Digit2 => 1.0,
KeyCode::Digit3 => 1.5,
_ => unreachable!(),
},
ObserverNotification::Complete | ObserverNotification::Error(_) => 1.0,
};
virtual_time.set_relative_speed(speed);
},
rx_schedule.handle(),
));
example_subscriptions.add(subscription);
}Observables define a stream of emissions that is instantiated upon subscription.
- Bevy Specific:
- EventObservable - Observe events sent to an entity!
- KeyboardObservable - Observe the global key presses!
- MessageObservable - Observe messages written!
- ProxyObservable - Subscribes to another observable entity!
- ResourceObservable - Observe changes of a resource!
- Creation:
- CreateObservable - Define your own function that will interact with the subscriber!
- DeferredObservable - Subscribes to an observable returned by a function.
- Immediate Observables:
- JustObservable - Immediately emits a single value!
- EmptyObservable - Immediately completes!
- ThrowObservable - Immediately errors!
- ClosedObservable - Immediately unsubscribes!
- Miscellaneous Observables:
- NeverObservable -
Never emits, never unsubscribes! Only once dropped!
Warning: you need to handle subscriptions made to this yourself!
- NeverObservable -
Never emits, never unsubscribes! Only once dropped!
- Combination (Multi-Signal):
- CombineChangesObservable - Subscribes to two different observables, and emit the latest of both both values when either of them emits. It denotes which one had changed, and it emits even when one on them haven't emitted yet.
- CombineLatestObservable - Subscribes to two observables, and emits the latest of both values when either of them emits. It only starts emitting once both have emitted at least once.
- ZipObservable - Subscribes to two different observables, and emit both values when both of them emits, pairing up emissions by the order they happened.
- JoinObservable - Subscribes to two different observables, and emit the latest of both values once both of them had completed!
- Combination (Single-Signal):
- MergeObservable - Combine many observables of the same output type into a single observable, subscribing to all of them at once!
- ConcatObservable - Combine many observables of the same output type into a single observable, subscribing to them one-by-one in order!
- Timing:
- TimerObservable -
Emit a
()once the timer elapses! - IntervalObservable -
Emit a sequence of
usize's every time theDurationof the interval rolls over.
- TimerObservable -
Emit a
- Iterators:
- IteratorObservable - Emits the values of an iterator immediately when subscribed to.
- IteratorOnTickObservable - Emits the values of an iterator once per every tick of the scheduler.
- Connectable
- ConnectableObservable -
Maintains an internal connector subject, that can subscribe to a source
observable only when the
connectfunction is called on it. Subscribers of will subscribe to this internal connector.
- ConnectableObservable -
Maintains an internal connector subject, that can subscribe to a source
observable only when the
RxObservers (Not to be confused with Bevy's Observers!) are the destinations of subscriptions! They are the last stations of a signal.
- Bevy Specific:
- EntityDestination - Send observed signals to an entity as events!
- ResourceDestination - Write into a resource when observing signals!
- PrintObserver -
A simple observer that prints all signals to the console using
println!. - FnObserver - A custom observer that uses user-supplied functions to handle signals. All signal handlers must be defined up-front.
- DynFnObserver - A custom observer that uses user-supplied functions to handle signals. not all signal handlers have to be defined, but will panic if it observes an error without an error handler defined.
- NoopObserver - Ignores all signals. Will panic in debug mode if it observes an error.
Subjects are both Observers and Observables at the same time. Subjects multicast the signals they observe across all subscribers.
- PublishSubject - Observed signals are forwarded to all active subscribers. It does not replay values to late subscribers, but terminal state (complete/error) is always replayed! Other subjects are built on top of this.
- BehaviorSubject - Always holds a value that is replayed to late subscribers.
- ReplaySubject -
Buffers the last
Nvalues and replays them to late subscribers. - AsyncSubject - Reduces observed values into one and emits it to active subscribers once completed. Once completed, it also replays the result to late subscribers.
- ProvenanceSubject -
A
BehaviorSubjectthat also stores an additional value that can be used for filtering. Useful to track the origin of a value as some subscribers may only be interested in certain origins while some are interested in all values regardless of origin.
Operators take an observable as input and return a new observable as output, enhancing the original observable with new behavior.
- Mapping:
- MapOperator - Transform each value with a mapping function.
- MapIntoOperator -
Map each value using
Into. - MapErrorOperator - Transform error values into another error value.
- MapNeverOperator -
Re-type
Nevernext/error channels into concrete types as they are always!unreachable(). - MaterializeOperator - Turn next/error/complete into notification values. Rendering terminal signals ineffective.
- DematerializeOperator - Convert notifications back into real signals.
- EnumerateOperator - Attach a running index to each emission.
- PairwiseOperator - Emit the previous and current values together.
- Filtering Operators (Multi-Signal):
- FilterOperator - Keep values that satisfy a predicate.
- FilterMapOperator -
Map values to an
Optionand keep only theSomevalues. - TakeOperator -
Emit only the first
nvalues, then complete. - SkipOperator -
Drop the first
nvalues. - LiftOptionOperator -
Filter out
Noneand forwardSomevalues.
- Filtering Operators (Single-Signal):
- FirstOperator - Emit the very first value, then complete.
- FindOperator - Emit the first value matching a predicate, then complete.
- FindIndexOperator - Emit the index of the first matching value, then complete.
- ElementAtOperator - Emit the value at the given index then complete.
- IsEmptyOperator - Emit a single boolean indicating if the source emitted anything before it had completed.
- Higher-Order (Flatten Observable Observables):
- ConcatAllOperator - Subscribes to all upstream observables one at a time in order.
- MergeAllOperator - Subscribes to all upstream observables and merges their emissions concurrently.
- SwitchAllOperator - Subscribe to the upstream observable, unsubscribing previous ones.
- ExhaustAllOperator - Subscribe to the upstream observables only if there is no active subscription.
- Higher-Order (Mapper)
- ConcatMapOperator - Maps upstream signals into an observable, then subscribes to them one at a time in order.
- MergeMapOperator - Maps upstream signals into an observable, then subscribes to them and merges their emissions concurrently.
- SwitchMapOperator - Maps upstream signals into an observable, then subscribes to the latest one, unsubscribing previous ones.
- ExhaustMapOperator - Maps upstream signals into an observable, then subscribes to them only if there is no active subscription.
- Combination:
- WithLatestFromOperator - Combine each source emission with the latest value from another observable.
- Buffering:
- BufferCountOperator - Collect values into fixed-size buffers before emitting them.
- Multicasting:
- ShareOperator - Multicast a source through a connector so downstream subscribers share one upstream subscription. The connector can be any subject.
- Accumulator (Multi-Signal):
- ScanOperator - Accumulate state and emit every intermediate result.
- Accumulator (Single-Signal):
- CountOperator - Count values emitted by upstream.
- ReduceOperator - Fold values and emit only the final accumulator on completion.
- Side-Effects:
- TapOperator - Mirror values into another observer while letting them pass through.
- TapNextOperator -
Run a callback for each
nextwithout touching errors or completion. - OnNextOperator - Invoke a callback for each value that can also decide whether to forward it.
- OnSubscribeOperator - Run a callback when a subscription is established.
- FinalizeOperator - Execute cleanup when the observable finishes or unsubscribes.
- Producing:
- StartWithOperator - Emit a value first when subscribing to the source.
- EndWithOperator - Emit a value on completion.
- Error Handling:
- CatchOperator - On error, switch to a recovery observable.
- RetryOperator - Resubscribe on error up to the configured retry count.
- IntoResultOperator -
Capture next/error signals as
Resultvalues. - LiftResultOperator -
Split
Resultvalues into next and error signals. - ErrorBoundaryOperator -
Enforce
Neveras the error type to guard pipelines at compile time.
- Timing Operators:
- AdsrOperator - Convert trigger signals into an ADSR envelope driven by the scheduler.
- DebounceTimeOperator - Emit the most recent value after a period of silence.
- DelayOperator - Shift emissions forward in time using the scheduler.
- FallbackWhenSilentOperator - Emit a fallback value on ticks where the source stayed silent.
- ObserveOnOperator - Re-emit upstream signals with the provided scheduler.
- SubscribeOnOperator - Schedule upstream subscription on the provided scheduler.
- ThrottleTimeOperator - Limit the frequency of downstream emissions.
- Composite Operators:
- CompositeOperator - Build reusable operator chains without needing a source observable!
- IdentityOperator -
A no-op operator, used mainly as the entry point of a
CompositeOperator.
For every primitive, there is a derive macro available to ease implementation.
They mostly implement traits defining associated types like Out and
OutError. They may also provide default, trivial implementations for when it
is applicable.
See the individual macros for more information:
RxExecutor- Derive macro for Executors.RxObservable- Derive macro for Observables.RxObserver- Derive macro for RxObservers.RxOperator- Derive macro for Operators.RxScheduler- Derive macro for Schedulers.RxSubject- Derive macro for Subjects.RxSubscriber- Derive macro for Subscribers.RxSubscription- Derive macro for Subscriptions.RxWork- Derive macro for schedulable work.
The rx_core_testing crate provides utilities to test your Observables and
Operators.
- MockExecutor & Scheduler - Control the passage of time manually.
- MockObserver & NotificationCollector - Collect all observed notifications and perform assertions over them.
- TestHarness - Perform more complex assertions to ensure proper behavior.
-
Not everything needs to be an Observable!
rx_bevyis meant to orchestrate events, if something isn't meant to be an event don't make it one without good reason! This doesn't mean you can't express most of your game logic with observable, go ahead, it's fun! But performance critical parts should prioritize performance over a choice of API. And this doesn't meanrx_bevyisn't performant either, but everything comes at a cost! -
Observables does not necessarily have to fully live inside the ECS to be used with Bevy:
- The Observable you subscribe to can be an actual observable implementation as is, or an entity holding an ObservableComponent with one.
- The "destination", the observer you establish a subscription towards can
also be either directly an
RxObserver, or an entity with that can observeRxSignals using an actual Bevy Observer. - The subscriptions made could also be used as is (just make sure you don't drop them unless you want to! That automatically unsubscribes it!), or as an entity, that will unsubscribe only when despawned!
- The scheduler used too can be anything, nothing stops you from using a completely different scheduler implementation than the provided one!
And you can mix and match these aspects however you like! Whatever is more comfortable in a given situation!
-
All subscriptions unsubscribe when dropped! Make sure you put them somewhere safe.
-
"Shared" types - like all Subjects - are actually bundles of
Arcs internally, so you can just Clone them. (This isn't like this because of convenience, the implementation relies on having multiple locks) -
If a behavior of an operator or observable isn't clear, and the provided documentation doesn help, check out the implementation!
For example, you're not sure if the
delayoperator is delaying errors too or just regular signals. (Besides reading delay's readme) Jumping to theDelaySubscriberanswers that at a glance as theerrorimpl just simply calls error on the destination! -
Pipelines don't have to be one big pile of operators, feel free to separate them into segments into different variables.
-
Using the
shareoperator you can "cache" upstream calculations if you use theReplaySubjectas its connector! -
Be careful with filtering operators. If you filter out a signal, nothing will trigger anything downstream! Which can be a problem if you need some "reset" logic there!
Let's say you have an observable pipeline that sets the color of a light based on an upstream signal
Color. Then you introduce the concept of a power outage so you add aCombineLatestObservableand combine thecolorandhas_powerstates. You may instinctively reach for thefilteroperator to prevent the color to be set if there is no power. But that would mean you can't turn it off and after a power outage the lamp stay on its last observed color! In these situations instead offilter, you actually need amapand you need to represent an entirely new state, in this caseoff. -
It's very easy tangle yourself up in a web of pipelines. Try to keep things simple!
While
Rxintroduces an entirely new dimension to programming (time), that also comes with complexity!
Only the latest version is maintained. Please do not submit issues for older versions!
| Bevy | rx_bevy |
|---|---|
| 0.18 | 0.3 |
| 0.17 | 0.2 |
| 0.16 | 0.1 |
rx_bevyhas been in closed development in one form or another since the release of Bevy0.16, and first released with the release of Bevy0.18.rx_bevyversions0.1and0.2therefore had no users and received no post-release bugfixes. They are there so you can tryrx_bevyout even if you're a little behind on updates!
See contributing.md
