Skip to content

Commit

Permalink
File: wildcard stream
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-numeus committed Jul 6, 2024
1 parent 916bd4a commit f64ea68
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 82 deletions.
9 changes: 6 additions & 3 deletions sea-streamer-file/src/consumer/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::{

use super::{CtrlMsg, FileConsumer};
use crate::{
is_end_of_stream, is_pulse, pulse_message, ConfigErr, FileErr, FileId, MessageSource,
StreamMode,
is_end_of_stream, is_internal, is_pulse, is_wildcard, pulse_message, ConfigErr, FileErr,
FileId, MessageSource, StreamMode,
};
use sea_streamer_types::{
export::futures::{select, FutureExt},
Expand Down Expand Up @@ -497,7 +497,10 @@ impl Subscribers {
}

for (stream_key, sid) in map.ungrouped.iter() {
if is_pulse(&message) || stream_key == message.header().stream_key() {
if is_pulse(&message)
|| stream_key == message.header().stream_key()
|| is_wildcard(stream_key) && !is_internal(&message)
{
let sender = map.senders.get(sid).unwrap();
sender.send(Ok(message.clone())).ok();
}
Expand Down
3 changes: 3 additions & 0 deletions sea-streamer-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,6 @@ pub use surveyor::*;
pub const DEFAULT_BEACON_INTERVAL: u32 = 1024 * 1024; // 1MB
pub const DEFAULT_FILE_SIZE_LIMIT: u64 = 16 * 1024 * 1024 * 1024; // 16GB
pub const DEFAULT_PREFETCH_MESSAGE: usize = 1000;

/// Reserved by SeaStreamer. Avoid using this as StreamKey.
pub const SEA_STREAMER_WILDCARD: &str = "SEA_STREAMER_WILDCARD";
41 changes: 25 additions & 16 deletions sea-streamer-file/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
format::{Beacon, Checksum, FormatErr, Header, Marker, Message, RunningChecksum},
AsyncFile, BeaconReader, ByteBuffer, ByteSource, Bytes, DynFileSource, FileErr, FileId,
FileReader, FileSink, FileSourceType, SeekErr, StreamMode, SurveyResult, Surveyor,
SEA_STREAMER_WILDCARD,
};

pub const END_OF_STREAM: &str = "EOS";
Expand Down Expand Up @@ -665,37 +666,37 @@ impl MessageSink {
}
}

/// This can be written to a file to properly end the stream
pub fn end_of_stream() -> OwnedMessage {
let header = MessageHeader::new(
StreamKey::new(SEA_STREAMER_INTERNAL).unwrap(),
ShardId::new(0),
0,
Timestamp::now_utc(),
);
OwnedMessage::new(header, END_OF_STREAM.into_bytes())
}

pub trait MessageWithHeader: MessageTrait {
pub trait HasMessageHeader: MessageTrait {
fn header(&self) -> &MessageHeader;
}
impl MessageWithHeader for SharedMessage {
impl HasMessageHeader for SharedMessage {
fn header(&self) -> &MessageHeader {
self.header()
}
}
impl MessageWithHeader for OwnedMessage {
impl HasMessageHeader for OwnedMessage {
fn header(&self) -> &MessageHeader {
self.header()
}
}

pub fn is_end_of_stream<M: MessageWithHeader>(mess: &M) -> bool {
/// This can be written to a file to properly end the stream
pub fn end_of_stream() -> OwnedMessage {
let header = MessageHeader::new(
StreamKey::new(SEA_STREAMER_INTERNAL).unwrap(),
ShardId::new(0),
0,
Timestamp::now_utc(),
);
OwnedMessage::new(header, END_OF_STREAM.into_bytes())
}

pub fn is_end_of_stream<M: HasMessageHeader>(mess: &M) -> bool {
mess.header().stream_key().name() == SEA_STREAMER_INTERNAL
&& mess.message().as_bytes() == END_OF_STREAM.as_bytes()
}

/// This should not be written on file
/// This should never be written on file
pub fn pulse_message() -> OwnedMessage {
let header = MessageHeader::new(
StreamKey::new(SEA_STREAMER_INTERNAL).unwrap(),
Expand All @@ -710,3 +711,11 @@ pub fn is_pulse(mess: &SharedMessage) -> bool {
mess.header().stream_key().name() == SEA_STREAMER_INTERNAL
&& mess.message().as_bytes() == PULSE_MESSAGE.as_bytes()
}

pub fn is_internal(mess: &SharedMessage) -> bool {
mess.header().stream_key().name() == SEA_STREAMER_INTERNAL
}

pub fn is_wildcard(key: &StreamKey) -> bool {
key.name() == SEA_STREAMER_WILDCARD
}
14 changes: 12 additions & 2 deletions sea-streamer-file/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use thiserror::Error;
use crate::{
consumer::new_consumer, end_producer, format::Header, new_producer, AsyncFile, FileConsumer,
FileErr, FileId, FileProducer, FileResult, DEFAULT_BEACON_INTERVAL, DEFAULT_FILE_SIZE_LIMIT,
DEFAULT_PREFETCH_MESSAGE,
DEFAULT_PREFETCH_MESSAGE, SEA_STREAMER_WILDCARD,
};
use sea_streamer_types::{
ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode,
Expand Down Expand Up @@ -72,6 +72,8 @@ pub enum ConfigErr {
SameGroupSameMode,
#[error("Please choose a 'better aligned' beacon interval")]
InvalidBeaconInterval,
#[error("Wildcard stream can only be subscribed in ungrouped Consumer")]
NoWildcardInGroup,
}

impl StreamerTrait for FileStreamer {
Expand Down Expand Up @@ -126,6 +128,8 @@ impl StreamerTrait for FileStreamer {
.map_err(StreamErr::Backend)
}

/// To subscribe to all streams in a file, you can use the magic [`SEA_STREAMER_WILDCARD`] stream key.
/// For `RealTime` consumer only.
async fn create_consumer(
&self,
streams: &[StreamKey],
Expand All @@ -139,7 +143,7 @@ impl StreamerTrait for FileStreamer {
}
ConsumerMode::Resumable => {
return Err(StreamErr::Unsupported(
"File does not support Resumable".to_owned(),
"File does not support Resumable yet".to_owned(),
))
}
ConsumerMode::LoadBalanced => {
Expand Down Expand Up @@ -171,6 +175,12 @@ impl StreamerTrait for FileStreamer {
}
};

if options.group.is_some() && streams.iter().any(|s| s.name() == SEA_STREAMER_WILDCARD) {
return Err(StreamErr::Backend(FileErr::ConfigErr(
ConfigErr::NoWildcardInGroup,
)));
}

let consumer = new_consumer(
self.file_id.clone(),
stream_mode,
Expand Down
136 changes: 75 additions & 61 deletions sea-streamer-file/tests/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ static INIT: std::sync::Once = std::sync::Once::new();
async fn consumer() -> anyhow::Result<()> {
use sea_streamer_file::{
AutoStreamReset, FileConsumerOptions, FileErr, FileStreamer, MessageSink,
DEFAULT_FILE_SIZE_LIMIT,
DEFAULT_FILE_SIZE_LIMIT, SEA_STREAMER_WILDCARD,
};
use sea_streamer_types::{
export::futures::TryStreamExt, Buffer, Consumer, Message, MessageHeader, OwnedMessage,
Expand All @@ -21,78 +21,92 @@ async fn consumer() -> anyhow::Result<()> {
const TEST: &str = "consumer";
INIT.call_once(env_logger::init);

let now = Timestamp::now_utc();
let file_id = temp_file(format!("{}-{}", TEST, millis_of(&now)).as_str())?;
println!("{file_id}");
let stream_key = StreamKey::new("hello")?;
let shard = ShardId::new(1);

let message = |i: u64| -> OwnedMessage {
let header = MessageHeader::new(stream_key.clone(), shard, i, Timestamp::now_utc());
OwnedMessage::new(header, format!("{}-{}", stream_key.name(), i).into_bytes())
};
let check = |i: u64, mess: SharedMessage| {
assert_eq!(mess.header().stream_key(), &stream_key);
assert_eq!(mess.header().shard_id(), &shard);
assert_eq!(mess.header().sequence(), &i);
assert_eq!(
mess.message().as_str().unwrap(),
format!("{}-{}", stream_key.name(), i)
);
};
run("a", false).await?;
run("b", true).await?;

let streamer = FileStreamer::connect(file_id.to_streamer_uri()?, Default::default()).await?;
async fn run(suffix: &'static str, wildcard: bool) -> anyhow::Result<()> {
println!("wildcard = {wildcard:?}");
let now = Timestamp::now_utc();
let file_id = temp_file(format!("{}-{}-{}", TEST, millis_of(&now), suffix).as_str())?;
println!("{file_id}");
let stream_key = StreamKey::new("hello")?;
let shard = ShardId::new(1);

let mut sink = MessageSink::new(
file_id.clone(),
1024, // 1KB
DEFAULT_FILE_SIZE_LIMIT,
)
.await?;
let message = |i: u64| -> OwnedMessage {
let header = MessageHeader::new(stream_key.clone(), shard, i, Timestamp::now_utc());
OwnedMessage::new(header, format!("{}-{}", stream_key.name(), i).into_bytes())
};
let check = |i: u64, mess: SharedMessage| {
assert_eq!(mess.header().stream_key(), &stream_key);
assert_eq!(mess.header().shard_id(), &shard);
assert_eq!(mess.header().sequence(), &i);
assert_eq!(
mess.message().as_str().unwrap(),
format!("{}-{}", stream_key.name(), i)
);
};

for i in 0..50 {
sink.write(message(i))?;
}
sink.flush().await?;
let streamer =
FileStreamer::connect(file_id.to_streamer_uri()?, Default::default()).await?;

let mut options = FileConsumerOptions::default();
options.set_auto_stream_reset(AutoStreamReset::Earliest);
let mut earliest = streamer
.create_consumer(&[stream_key.clone()], options.clone())
let mut sink = MessageSink::new(
file_id.clone(),
1024, // 1KB
DEFAULT_FILE_SIZE_LIMIT,
)
.await?;

for i in 0..25 {
check(i, earliest.next().await?);
}
{
let mut stream = earliest.stream();
for i in 25..50 {
check(i, stream.try_next().await?.unwrap());
for i in 0..50 {
sink.write(message(i))?;
}
}
println!("Stream from earliest ... ok");
sink.flush().await?;

options.set_auto_stream_reset(AutoStreamReset::Latest);
let latest = streamer
.create_consumer(&[stream_key.clone()], options)
.await?;
let stream_key_to_subscribe = [if wildcard {
StreamKey::new(SEA_STREAMER_WILDCARD)?
} else {
stream_key.clone()
}];

for i in 50..100 {
sink.write(message(i))?;
}
sink.flush().await?;
let mut options = FileConsumerOptions::default();
options.set_auto_stream_reset(AutoStreamReset::Earliest);
let mut earliest = streamer
.create_consumer(&stream_key_to_subscribe, options.clone())
.await?;

for i in 50..100 {
check(i, earliest.next().await?);
check(i, latest.next().await?);
}
println!("Stream from latest ... ok");
for i in 0..25 {
check(i, earliest.next().await?);
}
{
let mut stream = earliest.stream();
for i in 25..50 {
check(i, stream.try_next().await?.unwrap());
}
}
println!("Stream from earliest ... ok");

options.set_auto_stream_reset(AutoStreamReset::Latest);
let latest = streamer
.create_consumer(&stream_key_to_subscribe, options)
.await?;

for i in 50..100 {
sink.write(message(i))?;
}
sink.flush().await?;

sink.end(true).await?;
let ended = |e| matches!(e, Err(StreamErr::Backend(FileErr::StreamEnded)));
assert!(ended(earliest.next().await));
assert!(ended(latest.next().await));
for i in 50..100 {
check(i, earliest.next().await?);
check(i, latest.next().await?);
}
println!("Stream from latest ... ok");

sink.end(true).await?;
let ended = |e| matches!(e, Err(StreamErr::Backend(FileErr::StreamEnded)));
assert!(ended(earliest.next().await));
assert!(ended(latest.next().await));

Ok(())
}
Ok(())
}

Expand Down

0 comments on commit f64ea68

Please sign in to comment.