From fbf0720991700e577229998e732aed085fc7dc19 Mon Sep 17 00:00:00 2001 From: Takeru Sato Date: Fri, 27 Jun 2025 23:52:55 +0900 Subject: [PATCH] test: add test_grouped_emitters --- src/queue.rs | 60 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index c5c2c86..030a08d 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -59,7 +59,9 @@ impl QueueHandler { } } - fn grouped_emitters(&mut self, size: usize) -> Vec { + fn grouped_emitters(&mut self, size: Option) -> Vec { + let mes_len = self.messages.len(); + let size = cmp::min(mes_len, size.unwrap_or(mes_len)); let messages = self.messages.drain(0..size).collect::>(); messages @@ -90,8 +92,6 @@ impl Queue for QueueHandler { return; } - let mes_len = self.messages.len(); - let size = cmp::min(mes_len, size.unwrap_or(mes_len)); let mut emitters = self.grouped_emitters(size); // If there are any failed emitters, we need to re-emit them @@ -138,6 +138,60 @@ pub enum HandleResult { mod tests { use super::*; + #[test] + fn test_grouped_emitters() { + struct W; + impl WriteRead for W { + fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), StreamError> { + Ok(()) + } + } + + let mut queue = QueueHandler { + messages: VecDeque::new(), + failed_emitters: VecDeque::new(), + flusher: W, + recovery_settings: RecoverySettings::new(RecoveryMode::Discard), + }; + + let now = SystemTime::now(); + + queue.push("a".to_string(), now, vec![0u8, 9u8]); + queue.push("b".to_string(), now, vec![1u8, 8u8]); + queue.push("a".to_string(), now, vec![2u8, 7u8]); + queue.push("b".to_string(), now, vec![3u8, 6u8]); + queue.push("c".to_string(), now, vec![4u8, 5u8]); + + assert_eq!( + queue + .grouped_emitters(Some(3)) + .iter() + .map(|v| (v.tag().to_string(), v.entries().clone())) + .sorted() + .collect::>(), + vec![ + ( + "a".to_string(), + vec![(now, vec![0u8, 9u8]), (now, vec![2u8, 7u8])] + ), + ("b".to_string(), vec![(now, vec![1u8, 8u8])]), + ] + ); + + assert_eq!( + queue + .grouped_emitters(Some(3)) + .iter() + .map(|v| (v.tag().to_string(), v.entries().clone())) + .sorted() + .collect::>(), + vec![ + ("b".to_string(), vec![(now, vec![3u8, 6u8])]), + ("c".to_string(), vec![(now, vec![4u8, 5u8])]), + ] + ); + } + #[test] fn test_push_flush() { struct W;