Skip to content

Commit

Permalink
Introduce Split operation (#33)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael X. Grey <[email protected]>
  • Loading branch information
mxgrey authored Nov 27, 2024
1 parent 72fcd72 commit 85501df
Show file tree
Hide file tree
Showing 9 changed files with 1,370 additions and 15 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ async-task = { version = "4.7.1", optional = true }
# bevy_tasks::Task, so we're leaving it as a mandatory dependency for now.
bevy_tasks = { version = "0.12", features = ["multi-threaded"] }

arrayvec = "0.7"
itertools = "0.13"
smallvec = "1.13"
tokio = { version = "1.39", features = ["sync"]}
Expand Down
33 changes: 25 additions & 8 deletions src/buffer/bufferable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,28 +207,28 @@ impl<T: Bufferable, const N: usize> Bufferable for [T; N] {
}

pub trait IterBufferable {
type BufferType: Buffered;
type BufferElement: Buffered;

/// Convert an iterable collection of bufferable workflow elements into
/// buffers if they are not buffers already.
fn into_buffer_vec<const N: usize>(
self,
builder: &mut Builder,
) -> SmallVec<[Self::BufferType; N]>;
) -> SmallVec<[Self::BufferElement; N]>;

/// Join an iterable collection of bufferable workflow elements.
///
/// Performance is best if you can choose an `N` which is equal to the
/// number of buffers inside the iterable, but this will work even if `N`
/// does not match the number.
fn join_vec<const N: usize>(
fn join_vec<'w, 's, 'a, 'b, const N: usize>(
self,
builder: &mut Builder,
) -> Output<SmallVec<[<Self::BufferType as Buffered>::Item; N]>>
builder: &'b mut Builder<'w, 's, 'a>,
) -> Chain<'w, 's, 'a, 'b, SmallVec<[<Self::BufferElement as Buffered>::Item; N]>>
where
Self: Sized,
Self::BufferType: 'static + Send + Sync,
<Self::BufferType as Buffered>::Item: 'static + Send + Sync,
Self::BufferElement: 'static + Send + Sync,
<Self::BufferElement as Buffered>::Item: 'static + Send + Sync,
{
let buffers = self.into_buffer_vec::<N>(builder);
let join = builder.commands.spawn(()).id();
Expand All @@ -239,6 +239,23 @@ pub trait IterBufferable {
Join::new(buffers, target),
));

Output::new(builder.scope, target)
Output::new(builder.scope, target).chain(builder)
}
}

impl<T> IterBufferable for T
where
T: IntoIterator,
T::Item: Bufferable,
{
type BufferElement = <T::Item as Bufferable>::BufferType;

fn into_buffer_vec<const N: usize>(
self,
builder: &mut Builder,
) -> SmallVec<[Self::BufferElement; N]> {
SmallVec::<[Self::BufferElement; N]>::from_iter(
self.into_iter().map(|e| e.into_buffer(builder)),
)
}
}
27 changes: 24 additions & 3 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ use crate::{
AddOperation, AsMap, BeginCleanupWorkflow, Buffer, BufferItem, BufferKeys, BufferSettings,
Bufferable, Buffered, Chain, Collect, ForkClone, ForkCloneOutput, ForkTargetStorage, Gate,
GateRequest, Injection, InputSlot, IntoAsyncMap, IntoBlockingMap, Node, OperateBuffer,
OperateBufferAccess, OperateDynamicGate, OperateScope, OperateStaticGate, Output, Provider,
RequestOfMap, ResponseOfMap, Scope, ScopeEndpoints, ScopeSettings, ScopeSettingsStorage,
Sendish, Service, StreamPack, StreamTargetMap, StreamsOfMap, Trim, TrimBranch, UnusedTarget,
OperateBufferAccess, OperateDynamicGate, OperateScope, OperateSplit, OperateStaticGate, Output,
Provider, RequestOfMap, ResponseOfMap, Scope, ScopeEndpoints, ScopeSettings,
ScopeSettingsStorage, Sendish, Service, SplitOutputs, Splittable, StreamPack, StreamTargetMap,
StreamsOfMap, Trim, TrimBranch, UnusedTarget,
};

pub(crate) mod connect;
Expand Down Expand Up @@ -339,6 +340,26 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
self.create_collect(n, Some(n))
}

/// Create a new split operation in the workflow. The [`InputSlot`] can take
/// in values that you want to split, and [`SplitOutputs::build`] will let
/// you build connections to the split value.
pub fn create_split<T>(&mut self) -> (InputSlot<T>, SplitOutputs<T>)
where
T: 'static + Send + Sync + Splittable,
{
let source = self.commands.spawn(()).id();
self.commands.add(AddOperation::new(
Some(self.scope),
source,
OperateSplit::<T>::default(),
));

(
InputSlot::new(self.scope, source),
SplitOutputs::new(self.scope, source),
)
}

/// This method allows you to define a cleanup workflow that branches off of
/// this scope that will activate during the scope's cleanup phase. The
/// input to the cleanup workflow will be a key to access to one or more
Expand Down
126 changes: 123 additions & 3 deletions src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use crate::{
make_option_branching, make_result_branching, AddOperation, AsMap, Buffer, BufferKey,
BufferKeys, Bufferable, Buffered, Builder, Collect, CreateCancelFilter, CreateDisposalFilter,
ForkTargetStorage, Gate, GateRequest, InputSlot, IntoAsyncMap, IntoBlockingCallback,
IntoBlockingMap, Node, Noop, OperateBufferAccess, OperateDynamicGate, OperateStaticGate,
Output, ProvideOnce, Provider, Scope, ScopeSettings, Sendish, Service, Spread, StreamOf,
StreamPack, StreamTargetMap, Trim, TrimBranch, UnusedTarget,
IntoBlockingMap, Node, Noop, OperateBufferAccess, OperateDynamicGate, OperateSplit,
OperateStaticGate, Output, ProvideOnce, Provider, Scope, ScopeSettings, Sendish, Service,
Spread, StreamOf, StreamPack, StreamTargetMap, Trim, TrimBranch, UnusedTarget,
};

pub mod fork_clone_builder;
Expand All @@ -38,6 +38,9 @@ pub use fork_clone_builder::*;
pub(crate) mod premade;
use premade::*;

pub mod split;
pub use split::*;

pub mod unzip;
pub use unzip::*;

Expand Down Expand Up @@ -601,6 +604,71 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
self.map_async(|r| r)
}

/// If the chain's response implements the [`Splittable`] trait, then this
/// will insert a split operation and provide your `build` function with the
/// [`SplitBuilder`] for it. This returns the return value of your build
/// function.
pub fn split<U>(self, build: impl FnOnce(SplitBuilder<T>) -> U) -> U
where
T: Splittable,
{
let source = self.target;
self.builder.commands.add(AddOperation::new(
Some(self.builder.scope),
source,
OperateSplit::<T>::default(),
));

build(SplitBuilder::new(source, self.builder))
}

/// If the chain's response implements the [`Splittable`] trait, then this
/// will insert a split and provide a container for its available outputs.
/// To build connections to these outputs later, use [`SplitOutputs::build`].
///
/// This is equivalent to
/// ```text
/// .split(|split| split.outputs())
/// ```
pub fn split_outputs(self) -> SplitOutputs<T>
where
T: Splittable,
{
self.split(|b| b.outputs())
}

/// If the chain's response can be turned into an iterator with an appropriate
/// item type, this will allow it to be split in a list-like way.
///
/// This is equivalent to
/// ```text
/// .map_block(SplitAsList::new).split(build)
/// ```
pub fn split_as_list<U>(self, build: impl FnOnce(SplitBuilder<SplitAsList<T>>) -> U) -> U
where
T: IntoIterator,
T::Item: 'static + Send + Sync,
{
self.map_block(SplitAsList::new).split(build)
}

/// If the chain's response can be turned into an iterator with an appropriate
/// item type, this will insert a split and provide a container for its
/// available outputs. To build connections to these outputs later, use
/// [`SplitOutputs::build`].
///
/// This is equivalent to
/// ```text
/// .split_as_list(|split| split.outputs())
/// ```
pub fn split_as_list_outputs(self) -> SplitOutputs<SplitAsList<T>>
where
T: IntoIterator,
T::Item: 'static + Send + Sync,
{
self.split_as_list(|b| b.outputs())
}

/// Add a [no-op][1] to the current end of the chain.
///
/// As the name suggests, a no-op will not actually do anything, but it adds
Expand Down Expand Up @@ -633,10 +701,12 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
}
}

/// The scope that the chain is building inside of.
pub fn scope(&self) -> Entity {
self.builder.scope
}

/// The target where the chain will be sending its latest output.
pub fn target(&self) -> Entity {
self.target
}
Expand Down Expand Up @@ -942,6 +1012,38 @@ where
}
}

impl<'w, 's, 'a, 'b, K, V, T> Chain<'w, 's, 'a, 'b, T>
where
K: 'static + Send + Sync + Eq + std::hash::Hash + Clone + std::fmt::Debug,
V: 'static + Send + Sync,
T: 'static + Send + Sync + IntoIterator<Item = (K, V)>,
{
/// If the chain's response type can be turned into an iterator that returns
/// `(key, value)` pairs, then this will split it in a map-like way, whether
/// or not it is a conventional map data structure.
///
/// This is equivalent to
/// ```text
/// .map_block(SplitAsMap::new).split(build)
/// ```
pub fn split_as_map<U>(self, build: impl FnOnce(SplitBuilder<SplitAsMap<K, V, T>>) -> U) -> U {
self.map_block(SplitAsMap::new).split(build)
}

/// If the chain's response type can be turned into an iterator that returns
/// `(key, value)` pairs, then this will split it in a map-like way and
/// provide a container for its available outputs. To build connections to
/// these outputs later, use [`SplitOutputs::build`].
///
/// This is equivalent to
/// ```text
/// .split_as_map(|split| split.outputs())
/// ```
pub fn split_as_map_outputs(self) -> SplitOutputs<SplitAsMap<K, V, T>> {
self.split_as_map(|b| b.outputs())
}
}

impl<'w, 's, 'a, 'b, Request, Response, Streams>
Chain<'w, 's, 'a, 'b, (Request, Service<Request, Response, Streams>)>
where
Expand Down Expand Up @@ -1030,6 +1132,24 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
}
}

impl<'w, 's, 'a, 'b, K, V> Chain<'w, 's, 'a, 'b, (K, V)>
where
K: 'static + Send + Sync,
V: 'static + Send + Sync,
{
/// If the chain's response contains a `(key, value)` pair, get the `key`
/// component from it (the first element of the tuple).
pub fn key(self) -> Chain<'w, 's, 'a, 'b, K> {
self.map_block(|(key, _)| key)
}

/// If the chain's response contains a `(key, value)` pair, get the `value`
/// component from it (the second element of the tuple).
pub fn value(self) -> Chain<'w, 's, 'a, 'b, V> {
self.map_block(|(_, value)| value)
}
}

#[cfg(test)]
mod tests {
use crate::{prelude::*, testing::*};
Expand Down
Loading

0 comments on commit 85501df

Please sign in to comment.