Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Split operation #33

Merged
merged 8 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ async-task = { version = "4.7.1", optional = true }
# bevy_tasks::Task, so we're leaving it as a mandatory dependency for now.
bevy_tasks = { version = "0.12", features = ["multi-threaded"] }

arrayvec = "0.7"
itertools = "0.13"
smallvec = "1.13"
tokio = { version = "1.39", features = ["sync"]}
Expand Down
33 changes: 25 additions & 8 deletions src/buffer/bufferable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,28 +207,28 @@ impl<T: Bufferable, const N: usize> Bufferable for [T; N] {
}

pub trait IterBufferable {
type BufferType: Buffered;
type BufferElement: Buffered;

/// Convert an iterable collection of bufferable workflow elements into
/// buffers if they are not buffers already.
fn into_buffer_vec<const N: usize>(
self,
builder: &mut Builder,
) -> SmallVec<[Self::BufferType; N]>;
) -> SmallVec<[Self::BufferElement; N]>;

/// Join an iterable collection of bufferable workflow elements.
///
/// Performance is best if you can choose an `N` which is equal to the
/// number of buffers inside the iterable, but this will work even if `N`
/// does not match the number.
fn join_vec<const N: usize>(
fn join_vec<'w, 's, 'a, 'b, const N: usize>(
self,
builder: &mut Builder,
) -> Output<SmallVec<[<Self::BufferType as Buffered>::Item; N]>>
builder: &'b mut Builder<'w, 's, 'a>,
) -> Chain<'w, 's, 'a, 'b, SmallVec<[<Self::BufferElement as Buffered>::Item; N]>>
where
Self: Sized,
Self::BufferType: 'static + Send + Sync,
<Self::BufferType as Buffered>::Item: 'static + Send + Sync,
Self::BufferElement: 'static + Send + Sync,
<Self::BufferElement as Buffered>::Item: 'static + Send + Sync,
{
let buffers = self.into_buffer_vec::<N>(builder);
let join = builder.commands.spawn(()).id();
Expand All @@ -239,6 +239,23 @@ pub trait IterBufferable {
Join::new(buffers, target),
));

Output::new(builder.scope, target)
Output::new(builder.scope, target).chain(builder)
}
}

impl<T> IterBufferable for T
where
T: IntoIterator,
T::Item: Bufferable,
{
type BufferElement = <T::Item as Bufferable>::BufferType;

fn into_buffer_vec<const N: usize>(
self,
builder: &mut Builder,
) -> SmallVec<[Self::BufferElement; N]> {
SmallVec::<[Self::BufferElement; N]>::from_iter(
self.into_iter().map(|e| e.into_buffer(builder)),
)
}
}
27 changes: 24 additions & 3 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ use crate::{
AddOperation, AsMap, BeginCleanupWorkflow, Buffer, BufferItem, BufferKeys, BufferSettings,
Bufferable, Buffered, Chain, Collect, ForkClone, ForkCloneOutput, ForkTargetStorage, Gate,
GateRequest, Injection, InputSlot, IntoAsyncMap, IntoBlockingMap, Node, OperateBuffer,
OperateBufferAccess, OperateDynamicGate, OperateScope, OperateStaticGate, Output, Provider,
RequestOfMap, ResponseOfMap, Scope, ScopeEndpoints, ScopeSettings, ScopeSettingsStorage,
Sendish, Service, StreamPack, StreamTargetMap, StreamsOfMap, Trim, TrimBranch, UnusedTarget,
OperateBufferAccess, OperateDynamicGate, OperateScope, OperateSplit, OperateStaticGate, Output,
Provider, RequestOfMap, ResponseOfMap, Scope, ScopeEndpoints, ScopeSettings,
ScopeSettingsStorage, Sendish, Service, SplitOutputs, Splittable, StreamPack, StreamTargetMap,
StreamsOfMap, Trim, TrimBranch, UnusedTarget,
};

pub(crate) mod connect;
Expand Down Expand Up @@ -339,6 +340,26 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
self.create_collect(n, Some(n))
}

/// Create a new split operation in the workflow. The [`InputSlot`] can take
/// in values that you want to split, and [`SplitOutputs::build`] will let
/// you build connections to the split value.
pub fn create_split<T>(&mut self) -> (InputSlot<T>, SplitOutputs<T>)
where
T: 'static + Send + Sync + Splittable,
{
let source = self.commands.spawn(()).id();
self.commands.add(AddOperation::new(
Some(self.scope),
source,
OperateSplit::<T>::default(),
));

(
InputSlot::new(self.scope, source),
SplitOutputs::new(self.scope, source),
)
}

/// This method allows you to define a cleanup workflow that branches off of
/// this scope that will activate during the scope's cleanup phase. The
/// input to the cleanup workflow will be a key to access to one or more
Expand Down
126 changes: 123 additions & 3 deletions src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use crate::{
make_option_branching, make_result_branching, AddOperation, AsMap, Buffer, BufferKey,
BufferKeys, Bufferable, Buffered, Builder, Collect, CreateCancelFilter, CreateDisposalFilter,
ForkTargetStorage, Gate, GateRequest, InputSlot, IntoAsyncMap, IntoBlockingCallback,
IntoBlockingMap, Node, Noop, OperateBufferAccess, OperateDynamicGate, OperateStaticGate,
Output, ProvideOnce, Provider, Scope, ScopeSettings, Sendish, Service, Spread, StreamOf,
StreamPack, StreamTargetMap, Trim, TrimBranch, UnusedTarget,
IntoBlockingMap, Node, Noop, OperateBufferAccess, OperateDynamicGate, OperateSplit,
OperateStaticGate, Output, ProvideOnce, Provider, Scope, ScopeSettings, Sendish, Service,
Spread, StreamOf, StreamPack, StreamTargetMap, Trim, TrimBranch, UnusedTarget,
};

pub mod fork_clone_builder;
Expand All @@ -38,6 +38,9 @@ pub use fork_clone_builder::*;
pub(crate) mod premade;
use premade::*;

pub mod split;
pub use split::*;

pub mod unzip;
pub use unzip::*;

Expand Down Expand Up @@ -601,6 +604,71 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
self.map_async(|r| r)
}

/// If the chain's response implements the [`Splittable`] trait, then this
/// will insert a split operation and provide your `build` function with the
/// [`SplitBuilder`] for it. This returns the return value of your build
/// function.
pub fn split<U>(self, build: impl FnOnce(SplitBuilder<T>) -> U) -> U
where
T: Splittable,
{
let source = self.target;
self.builder.commands.add(AddOperation::new(
Some(self.builder.scope),
source,
OperateSplit::<T>::default(),
));

build(SplitBuilder::new(source, self.builder))
}

/// If the chain's response implements the [`Splittable`] trait, then this
/// will insert a split and provide a container for its available outputs.
/// To build connections to these outputs later, use [`SplitOutputs::build`].
Comment on lines +625 to +627
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help in the docs to indicate that split_outputs, split_as_list and split_as_list_outputs, split_as_map, split_as_map_outputs etc are convenient wrappers and is equivalent to some short snippets of code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 8d53e85

///
/// This is equivalent to
/// ```text
/// .split(|split| split.outputs())
/// ```
pub fn split_outputs(self) -> SplitOutputs<T>
where
T: Splittable,
{
self.split(|b| b.outputs())
}

/// If the chain's response can be turned into an iterator with an appropriate
/// item type, this will allow it to be split in a list-like way.
///
/// This is equivalent to
/// ```text
/// .map_block(SplitAsList::new).split(build)
/// ```
pub fn split_as_list<U>(self, build: impl FnOnce(SplitBuilder<SplitAsList<T>>) -> U) -> U
where
T: IntoIterator,
T::Item: 'static + Send + Sync,
{
self.map_block(SplitAsList::new).split(build)
}

/// If the chain's response can be turned into an iterator with an appropriate
/// item type, this will insert a split and provide a container for its
/// available outputs. To build connections to these outputs later, use
/// [`SplitOutputs::build`].
///
/// This is equivalent to
/// ```text
/// .split_as_list(|split| split.outputs())
/// ```
pub fn split_as_list_outputs(self) -> SplitOutputs<SplitAsList<T>>
where
T: IntoIterator,
T::Item: 'static + Send + Sync,
{
self.split_as_list(|b| b.outputs())
}

/// Add a [no-op][1] to the current end of the chain.
///
/// As the name suggests, a no-op will not actually do anything, but it adds
Expand Down Expand Up @@ -633,10 +701,12 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
}
}

/// The scope that the chain is building inside of.
pub fn scope(&self) -> Entity {
self.builder.scope
}

/// The target where the chain will be sending its latest output.
pub fn target(&self) -> Entity {
self.target
}
Expand Down Expand Up @@ -942,6 +1012,38 @@ where
}
}

impl<'w, 's, 'a, 'b, K, V, T> Chain<'w, 's, 'a, 'b, T>
where
K: 'static + Send + Sync + Eq + std::hash::Hash + Clone + std::fmt::Debug,
V: 'static + Send + Sync,
T: 'static + Send + Sync + IntoIterator<Item = (K, V)>,
{
/// If the chain's response type can be turned into an iterator that returns
/// `(key, value)` pairs, then this will split it in a map-like way, whether
/// or not it is a conventional map data structure.
///
/// This is equivalent to
/// ```text
/// .map_block(SplitAsMap::new).split(build)
/// ```
pub fn split_as_map<U>(self, build: impl FnOnce(SplitBuilder<SplitAsMap<K, V, T>>) -> U) -> U {
self.map_block(SplitAsMap::new).split(build)
}

/// If the chain's response type can be turned into an iterator that returns
/// `(key, value)` pairs, then this will split it in a map-like way and
/// provide a container for its available outputs. To build connections to
/// these outputs later, use [`SplitOutputs::build`].
///
/// This is equivalent to
/// ```text
/// .split_as_map(|split| split.outputs())
/// ```
pub fn split_as_map_outputs(self) -> SplitOutputs<SplitAsMap<K, V, T>> {
self.split_as_map(|b| b.outputs())
}
}

impl<'w, 's, 'a, 'b, Request, Response, Streams>
Chain<'w, 's, 'a, 'b, (Request, Service<Request, Response, Streams>)>
where
Expand Down Expand Up @@ -1030,6 +1132,24 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
}
}

impl<'w, 's, 'a, 'b, K, V> Chain<'w, 's, 'a, 'b, (K, V)>
where
K: 'static + Send + Sync,
V: 'static + Send + Sync,
{
/// If the chain's response contains a `(key, value)` pair, get the `key`
/// component from it (the first element of the tuple).
pub fn key(self) -> Chain<'w, 's, 'a, 'b, K> {
self.map_block(|(key, _)| key)
}

/// If the chain's response contains a `(key, value)` pair, get the `value`
/// component from it (the second element of the tuple).
pub fn value(self) -> Chain<'w, 's, 'a, 'b, V> {
self.map_block(|(_, value)| value)
}
}

#[cfg(test)]
mod tests {
use crate::{prelude::*, testing::*};
Expand Down
Loading
Loading