Skip to content

Commit

Permalink
Save the error til the end
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Aug 9, 2024
1 parent 6999a2f commit b0ac96d
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions sea-streamer-fuse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ where
#[pin]
muxed: S,
keys: Keys<M>,
key_keys: Vec<StreamKey>,
ended: bool,
err: Option<E>,
}

impl<S, M, E> StreamJoin<S, M, E>
Expand All @@ -54,13 +56,16 @@ where
Self {
muxed,
keys: Default::default(),
key_keys: Default::default(),
ended: false,
err: None,
}
}

/// Add a stream key that needs to be joined. You can call this multiple times.
pub fn align(&mut self, stream_key: StreamKey) {
self.keys.insert(stream_key, Default::default());
self.keys.insert(stream_key.clone(), Default::default());
self.key_keys.push(stream_key);
}

fn next(keys: &mut Keys<M>) -> Option<M> {
Expand All @@ -87,6 +92,17 @@ where
None
}
}

fn check(keys: &Keys<M>, key_keys: &[StreamKey]) -> bool {
// if none of the key streams are empty
for kk in key_keys {
if keys.get(kk).expect("Already inserted").is_empty() {
return false;
}
}
// if anyone got anything
keys.values().any(|ms| !ms.is_empty())
}
}

impl<S, M, E> Stream for StreamJoin<S, M, E>
Expand All @@ -107,15 +123,16 @@ where
Poll::Ready(Some(Ok(mes))) => {
let key = mes.stream_key();
this.keys.entry(key).or_default().push_back(mes);
if !this.keys.values().any(|ms| ms.is_empty()) {
// if none of the streams are empty
if Self::check(&this.keys, &this.key_keys) {
// if we can yield
break;
}
// keep polling
}
Poll::Ready(Some(Err(err))) => {
*this.ended = true;
return Poll::Ready(Some(Err(err)));
*this.err = Some(err);
break;
}
Poll::Ready(None) => {
*this.ended = true;
Expand All @@ -127,8 +144,11 @@ where
}
}
}
if *this.ended || !this.keys.values().any(|ms| ms.is_empty()) {
return Poll::Ready(Self::next(this.keys).map(Ok));
if *this.ended || Self::check(&this.keys, &this.key_keys) {
Poll::Ready(match Self::next(this.keys) {
Some(item) => Some(Ok(item)),
None => this.err.take().map(Err),
})
} else {
Poll::Pending
}
Expand Down

0 comments on commit b0ac96d

Please sign in to comment.