diff --git a/sea-streamer-file/src/consumer/group.rs b/sea-streamer-file/src/consumer/group.rs index 1ef31fd..ca09260 100644 --- a/sea-streamer-file/src/consumer/group.rs +++ b/sea-streamer-file/src/consumer/group.rs @@ -8,8 +8,8 @@ use std::{ use super::{CtrlMsg, FileConsumer}; use crate::{ - is_end_of_stream, is_pulse, pulse_message, ConfigErr, FileErr, FileId, MessageSource, - StreamMode, + is_end_of_stream, is_internal, is_pulse, is_wildcard, pulse_message, ConfigErr, FileErr, + FileId, MessageSource, StreamMode, }; use sea_streamer_types::{ export::futures::{select, FutureExt}, @@ -497,7 +497,10 @@ impl Subscribers { } for (stream_key, sid) in map.ungrouped.iter() { - if is_pulse(&message) || stream_key == message.header().stream_key() { + if is_pulse(&message) + || stream_key == message.header().stream_key() + || is_wildcard(stream_key) && !is_internal(&message) + { let sender = map.senders.get(sid).unwrap(); sender.send(Ok(message.clone())).ok(); } diff --git a/sea-streamer-file/src/lib.rs b/sea-streamer-file/src/lib.rs index 76b8e78..944fabf 100644 --- a/sea-streamer-file/src/lib.rs +++ b/sea-streamer-file/src/lib.rs @@ -92,3 +92,6 @@ pub use surveyor::*; pub const DEFAULT_BEACON_INTERVAL: u32 = 1024 * 1024; // 1MB pub const DEFAULT_FILE_SIZE_LIMIT: u64 = 16 * 1024 * 1024 * 1024; // 16GB pub const DEFAULT_PREFETCH_MESSAGE: usize = 1000; + +/// Reserved by SeaStreamer. Avoid using this as StreamKey. +pub const SEA_STREAMER_WILDCARD: &str = "SEA_STREAMER_WILDCARD"; diff --git a/sea-streamer-file/src/messages.rs b/sea-streamer-file/src/messages.rs index f033354..98a1553 100644 --- a/sea-streamer-file/src/messages.rs +++ b/sea-streamer-file/src/messages.rs @@ -10,6 +10,7 @@ use crate::{ format::{Beacon, Checksum, FormatErr, Header, Marker, Message, RunningChecksum}, AsyncFile, BeaconReader, ByteBuffer, ByteSource, Bytes, DynFileSource, FileErr, FileId, FileReader, FileSink, FileSourceType, SeekErr, StreamMode, SurveyResult, Surveyor, + SEA_STREAMER_WILDCARD, }; pub const END_OF_STREAM: &str = "EOS"; @@ -665,37 +666,37 @@ impl MessageSink { } } -/// This can be written to a file to properly end the stream -pub fn end_of_stream() -> OwnedMessage { - let header = MessageHeader::new( - StreamKey::new(SEA_STREAMER_INTERNAL).unwrap(), - ShardId::new(0), - 0, - Timestamp::now_utc(), - ); - OwnedMessage::new(header, END_OF_STREAM.into_bytes()) -} - -pub trait MessageWithHeader: MessageTrait { +pub trait HasMessageHeader: MessageTrait { fn header(&self) -> &MessageHeader; } -impl MessageWithHeader for SharedMessage { +impl HasMessageHeader for SharedMessage { fn header(&self) -> &MessageHeader { self.header() } } -impl MessageWithHeader for OwnedMessage { +impl HasMessageHeader for OwnedMessage { fn header(&self) -> &MessageHeader { self.header() } } -pub fn is_end_of_stream(mess: &M) -> bool { +/// This can be written to a file to properly end the stream +pub fn end_of_stream() -> OwnedMessage { + let header = MessageHeader::new( + StreamKey::new(SEA_STREAMER_INTERNAL).unwrap(), + ShardId::new(0), + 0, + Timestamp::now_utc(), + ); + OwnedMessage::new(header, END_OF_STREAM.into_bytes()) +} + +pub fn is_end_of_stream(mess: &M) -> bool { mess.header().stream_key().name() == SEA_STREAMER_INTERNAL && mess.message().as_bytes() == END_OF_STREAM.as_bytes() } -/// This should not be written on file +/// This should never be written on file pub fn pulse_message() -> OwnedMessage { let header = MessageHeader::new( StreamKey::new(SEA_STREAMER_INTERNAL).unwrap(), @@ -710,3 +711,11 @@ pub fn is_pulse(mess: &SharedMessage) -> bool { mess.header().stream_key().name() == SEA_STREAMER_INTERNAL && mess.message().as_bytes() == PULSE_MESSAGE.as_bytes() } + +pub fn is_internal(mess: &SharedMessage) -> bool { + mess.header().stream_key().name() == SEA_STREAMER_INTERNAL +} + +pub fn is_wildcard(key: &StreamKey) -> bool { + key.name() == SEA_STREAMER_WILDCARD +} diff --git a/sea-streamer-file/src/streamer.rs b/sea-streamer-file/src/streamer.rs index 0d6a236..2354db0 100644 --- a/sea-streamer-file/src/streamer.rs +++ b/sea-streamer-file/src/streamer.rs @@ -4,7 +4,7 @@ use thiserror::Error; use crate::{ consumer::new_consumer, end_producer, format::Header, new_producer, AsyncFile, FileConsumer, FileErr, FileId, FileProducer, FileResult, DEFAULT_BEACON_INTERVAL, DEFAULT_FILE_SIZE_LIMIT, - DEFAULT_PREFETCH_MESSAGE, + DEFAULT_PREFETCH_MESSAGE, SEA_STREAMER_WILDCARD, }; use sea_streamer_types::{ ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode, @@ -72,6 +72,8 @@ pub enum ConfigErr { SameGroupSameMode, #[error("Please choose a 'better aligned' beacon interval")] InvalidBeaconInterval, + #[error("Wildcard stream can only be subscribed in ungrouped Consumer")] + NoWildcardInGroup, } impl StreamerTrait for FileStreamer { @@ -126,6 +128,8 @@ impl StreamerTrait for FileStreamer { .map_err(StreamErr::Backend) } + /// To subscribe to all streams in a file, you can use the magic [`SEA_STREAMER_WILDCARD`] stream key. + /// For `RealTime` consumer only. async fn create_consumer( &self, streams: &[StreamKey], @@ -139,7 +143,7 @@ impl StreamerTrait for FileStreamer { } ConsumerMode::Resumable => { return Err(StreamErr::Unsupported( - "File does not support Resumable".to_owned(), + "File does not support Resumable yet".to_owned(), )) } ConsumerMode::LoadBalanced => { @@ -171,6 +175,12 @@ impl StreamerTrait for FileStreamer { } }; + if options.group.is_some() && streams.iter().any(|s| s.name() == SEA_STREAMER_WILDCARD) { + return Err(StreamErr::Backend(FileErr::ConfigErr( + ConfigErr::NoWildcardInGroup, + ))); + } + let consumer = new_consumer( self.file_id.clone(), stream_mode, diff --git a/sea-streamer-file/tests/consumer.rs b/sea-streamer-file/tests/consumer.rs index d0880a5..bca5304 100644 --- a/sea-streamer-file/tests/consumer.rs +++ b/sea-streamer-file/tests/consumer.rs @@ -11,7 +11,7 @@ static INIT: std::sync::Once = std::sync::Once::new(); async fn consumer() -> anyhow::Result<()> { use sea_streamer_file::{ AutoStreamReset, FileConsumerOptions, FileErr, FileStreamer, MessageSink, - DEFAULT_FILE_SIZE_LIMIT, + DEFAULT_FILE_SIZE_LIMIT, SEA_STREAMER_WILDCARD, }; use sea_streamer_types::{ export::futures::TryStreamExt, Buffer, Consumer, Message, MessageHeader, OwnedMessage, @@ -21,78 +21,92 @@ async fn consumer() -> anyhow::Result<()> { const TEST: &str = "consumer"; INIT.call_once(env_logger::init); - let now = Timestamp::now_utc(); - let file_id = temp_file(format!("{}-{}", TEST, millis_of(&now)).as_str())?; - println!("{file_id}"); - let stream_key = StreamKey::new("hello")?; - let shard = ShardId::new(1); - - let message = |i: u64| -> OwnedMessage { - let header = MessageHeader::new(stream_key.clone(), shard, i, Timestamp::now_utc()); - OwnedMessage::new(header, format!("{}-{}", stream_key.name(), i).into_bytes()) - }; - let check = |i: u64, mess: SharedMessage| { - assert_eq!(mess.header().stream_key(), &stream_key); - assert_eq!(mess.header().shard_id(), &shard); - assert_eq!(mess.header().sequence(), &i); - assert_eq!( - mess.message().as_str().unwrap(), - format!("{}-{}", stream_key.name(), i) - ); - }; + run("a", false).await?; + run("b", true).await?; - let streamer = FileStreamer::connect(file_id.to_streamer_uri()?, Default::default()).await?; + async fn run(suffix: &'static str, wildcard: bool) -> anyhow::Result<()> { + println!("wildcard = {wildcard:?}"); + let now = Timestamp::now_utc(); + let file_id = temp_file(format!("{}-{}-{}", TEST, millis_of(&now), suffix).as_str())?; + println!("{file_id}"); + let stream_key = StreamKey::new("hello")?; + let shard = ShardId::new(1); - let mut sink = MessageSink::new( - file_id.clone(), - 1024, // 1KB - DEFAULT_FILE_SIZE_LIMIT, - ) - .await?; + let message = |i: u64| -> OwnedMessage { + let header = MessageHeader::new(stream_key.clone(), shard, i, Timestamp::now_utc()); + OwnedMessage::new(header, format!("{}-{}", stream_key.name(), i).into_bytes()) + }; + let check = |i: u64, mess: SharedMessage| { + assert_eq!(mess.header().stream_key(), &stream_key); + assert_eq!(mess.header().shard_id(), &shard); + assert_eq!(mess.header().sequence(), &i); + assert_eq!( + mess.message().as_str().unwrap(), + format!("{}-{}", stream_key.name(), i) + ); + }; - for i in 0..50 { - sink.write(message(i))?; - } - sink.flush().await?; + let streamer = + FileStreamer::connect(file_id.to_streamer_uri()?, Default::default()).await?; - let mut options = FileConsumerOptions::default(); - options.set_auto_stream_reset(AutoStreamReset::Earliest); - let mut earliest = streamer - .create_consumer(&[stream_key.clone()], options.clone()) + let mut sink = MessageSink::new( + file_id.clone(), + 1024, // 1KB + DEFAULT_FILE_SIZE_LIMIT, + ) .await?; - for i in 0..25 { - check(i, earliest.next().await?); - } - { - let mut stream = earliest.stream(); - for i in 25..50 { - check(i, stream.try_next().await?.unwrap()); + for i in 0..50 { + sink.write(message(i))?; } - } - println!("Stream from earliest ... ok"); + sink.flush().await?; - options.set_auto_stream_reset(AutoStreamReset::Latest); - let latest = streamer - .create_consumer(&[stream_key.clone()], options) - .await?; + let stream_key_to_subscribe = [if wildcard { + StreamKey::new(SEA_STREAMER_WILDCARD)? + } else { + stream_key.clone() + }]; - for i in 50..100 { - sink.write(message(i))?; - } - sink.flush().await?; + let mut options = FileConsumerOptions::default(); + options.set_auto_stream_reset(AutoStreamReset::Earliest); + let mut earliest = streamer + .create_consumer(&stream_key_to_subscribe, options.clone()) + .await?; - for i in 50..100 { - check(i, earliest.next().await?); - check(i, latest.next().await?); - } - println!("Stream from latest ... ok"); + for i in 0..25 { + check(i, earliest.next().await?); + } + { + let mut stream = earliest.stream(); + for i in 25..50 { + check(i, stream.try_next().await?.unwrap()); + } + } + println!("Stream from earliest ... ok"); + + options.set_auto_stream_reset(AutoStreamReset::Latest); + let latest = streamer + .create_consumer(&stream_key_to_subscribe, options) + .await?; + + for i in 50..100 { + sink.write(message(i))?; + } + sink.flush().await?; - sink.end(true).await?; - let ended = |e| matches!(e, Err(StreamErr::Backend(FileErr::StreamEnded))); - assert!(ended(earliest.next().await)); - assert!(ended(latest.next().await)); + for i in 50..100 { + check(i, earliest.next().await?); + check(i, latest.next().await?); + } + println!("Stream from latest ... ok"); + sink.end(true).await?; + let ended = |e| matches!(e, Err(StreamErr::Backend(FileErr::StreamEnded))); + assert!(ended(earliest.next().await)); + assert!(ended(latest.next().await)); + + Ok(()) + } Ok(()) }