From 6999a2fc5ede6f339353c0fd61fd1392b193f665 Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Thu, 8 Aug 2024 12:29:59 +0100 Subject: [PATCH] Fix polling logic --- sea-streamer-fuse/src/lib.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/sea-streamer-fuse/src/lib.rs b/sea-streamer-fuse/src/lib.rs index 4747c61..838aa0c 100644 --- a/sea-streamer-fuse/src/lib.rs +++ b/sea-streamer-fuse/src/lib.rs @@ -107,6 +107,11 @@ where Poll::Ready(Some(Ok(mes))) => { let key = mes.stream_key(); this.keys.entry(key).or_default().push_back(mes); + if !this.keys.values().any(|ms| ms.is_empty()) { + // if none of the streams are empty + break; + } + // keep polling } Poll::Ready(Some(Err(err))) => { *this.ended = true; @@ -116,14 +121,17 @@ where *this.ended = true; break; } - Poll::Pending => return Poll::Pending, - } - if !this.keys.values().any(|ms| ms.is_empty()) { - // if none of the streams are empty - break; + Poll::Pending => { + // take a break + break; + } } } - Poll::Ready(Self::next(this.keys).map(Ok)) + if *this.ended || !this.keys.values().any(|ms| ms.is_empty()) { + return Poll::Ready(Self::next(this.keys).map(Ok)); + } else { + Poll::Pending + } } }