diff --git a/sea-streamer-file/src/consumer/mod.rs b/sea-streamer-file/src/consumer/mod.rs index 18121ac..e3d57a3 100644 --- a/sea-streamer-file/src/consumer/mod.rs +++ b/sea-streamer-file/src/consumer/mod.rs @@ -5,10 +5,7 @@ pub use future::StreamFuture as FileMessageStream; use flume::{r#async::RecvFut, Receiver, Sender, TrySendError}; use sea_streamer_types::{ - export::{ - async_trait, - futures::{Future, FutureExt}, - }, + export::futures::{Future, FutureExt}, Consumer as ConsumerTrait, SeqPos, ShardId, SharedMessage, StreamErr, StreamKey, Timestamp, }; @@ -66,7 +63,6 @@ impl Drop for FileConsumer { } } -#[async_trait] impl ConsumerTrait for FileConsumer { type Error = FileErr; type Message<'a> = SharedMessage; diff --git a/sea-streamer-file/src/producer/mod.rs b/sea-streamer-file/src/producer/mod.rs index db328d9..72d4ac7 100644 --- a/sea-streamer-file/src/producer/mod.rs +++ b/sea-streamer-file/src/producer/mod.rs @@ -5,9 +5,8 @@ use std::{fmt::Debug, future::Future}; use crate::{Bytes, FileErr, FileId, FileResult}; use sea_streamer_types::{ - export::{async_trait, futures::FutureExt}, - Buffer, MessageHeader, Producer as ProducerTrait, ShardId, StreamErr, StreamKey, StreamResult, - Timestamp, + export::futures::FutureExt, Buffer, MessageHeader, Producer as ProducerTrait, ShardId, + StreamErr, StreamKey, StreamResult, Timestamp, }; pub(crate) use backend::{end_producer, new_producer}; @@ -75,7 +74,6 @@ impl Debug for SendFuture { } } -#[async_trait] impl ProducerTrait for FileProducer { type Error = FileErr; type SendFuture = SendFuture; diff --git a/sea-streamer-file/src/streamer.rs b/sea-streamer-file/src/streamer.rs index 1b76f97..1531473 100644 --- a/sea-streamer-file/src/streamer.rs +++ b/sea-streamer-file/src/streamer.rs @@ -7,7 +7,7 @@ use crate::{ DEFAULT_PREFETCH_MESSAGE, }; use sea_streamer_types::{ - export::async_trait, ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode, + ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode, ConsumerOptions as ConsumerOptionsTrait, ProducerOptions as ProducerOptionsTrait, StreamErr, StreamKey, StreamUrlErr, Streamer as StreamerTrait, StreamerUri, }; @@ -74,7 +74,6 @@ pub enum ConfigErr { InvalidBeaconInterval, } -#[async_trait] impl StreamerTrait for FileStreamer { type Error = FileErr; type Producer = FileProducer; diff --git a/sea-streamer-kafka/src/consumer.rs b/sea-streamer-kafka/src/consumer.rs index e4691d7..1a17278 100644 --- a/sea-streamer-kafka/src/consumer.rs +++ b/sea-streamer-kafka/src/consumer.rs @@ -8,13 +8,10 @@ use sea_streamer_runtime::spawn_blocking; use std::{collections::HashSet, fmt::Debug, time::Duration}; use sea_streamer_types::{ - export::{ - async_trait, - futures::{ - future::Map, - stream::{Map as StreamMap, StreamFuture}, - FutureExt, StreamExt, - }, + export::futures::{ + future::Map, + stream::{Map as StreamMap, StreamFuture}, + FutureExt, StreamExt, }, runtime_error, Consumer as ConsumerTrait, ConsumerGroup, ConsumerMode, ConsumerOptions, Message, Payload, SeqNo, SeqPos, ShardId, StreamErr, StreamKey, StreamerUri, Timestamp, @@ -251,7 +248,6 @@ impl std::fmt::Debug for KafkaConsumer { } } -#[async_trait] impl ConsumerTrait for KafkaConsumer { type Error = KafkaErr; type Message<'a> = KafkaMessage<'a>; diff --git a/sea-streamer-kafka/src/producer.rs b/sea-streamer-kafka/src/producer.rs index 3be4070..b7d4749 100644 --- a/sea-streamer-kafka/src/producer.rs +++ b/sea-streamer-kafka/src/producer.rs @@ -11,9 +11,8 @@ use rdkafka::{ pub use rdkafka::{consumer::ConsumerGroupMetadata, producer::FutureRecord, TopicPartitionList}; use sea_streamer_runtime::spawn_blocking; use sea_streamer_types::{ - export::{async_trait, futures::FutureExt}, - runtime_error, Buffer, MessageHeader, Producer, ProducerOptions, ShardId, StreamErr, StreamKey, - StreamResult, StreamerUri, Timestamp, + export::futures::FutureExt, runtime_error, Buffer, MessageHeader, Producer, ProducerOptions, + ShardId, StreamErr, StreamKey, StreamResult, StreamerUri, Timestamp, }; #[derive(Clone)] @@ -80,7 +79,6 @@ impl Default for CompressionType { } } -#[async_trait] impl Producer for KafkaProducer { type Error = KafkaErr; type SendFuture = SendFuture; diff --git a/sea-streamer-kafka/src/streamer.rs b/sea-streamer-kafka/src/streamer.rs index 59f1f9c..a01116e 100644 --- a/sea-streamer-kafka/src/streamer.rs +++ b/sea-streamer-kafka/src/streamer.rs @@ -6,8 +6,8 @@ use std::{ use sea_streamer_runtime::spawn_blocking; use sea_streamer_types::{ - export::async_trait, runtime_error, ConnectOptions, ConsumerGroup, ConsumerMode, - ConsumerOptions, StreamErr, StreamKey, Streamer, StreamerUri, + runtime_error, ConnectOptions, ConsumerGroup, ConsumerMode, ConsumerOptions, StreamErr, + StreamKey, Streamer, StreamerUri, }; use crate::{ @@ -196,7 +196,6 @@ impl_into_string!(BaseOptionKey); impl_into_string!(SecurityProtocol); impl_into_string!(SaslMechanism); -#[async_trait] impl Streamer for KafkaStreamer { type Error = KafkaErr; type Producer = KafkaProducer; diff --git a/sea-streamer-redis/src/consumer/mod.rs b/sea-streamer-redis/src/consumer/mod.rs index 66cbdbc..468b79a 100644 --- a/sea-streamer-redis/src/consumer/mod.rs +++ b/sea-streamer-redis/src/consumer/mod.rs @@ -20,10 +20,9 @@ use crate::{ }; use sea_streamer_runtime::{spawn_task, timeout}; use sea_streamer_types::{ - export::{async_trait, futures::FutureExt}, - Buffer, ConnectOptions, Consumer, ConsumerGroup, ConsumerId, ConsumerMode, ConsumerOptions, - Message, MessageHeader, SeqNo, SeqPos, ShardId, SharedMessage, StreamErr, StreamKey, Timestamp, - SEA_STREAMER_INTERNAL, + export::futures::FutureExt, Buffer, ConnectOptions, Consumer, ConsumerGroup, ConsumerId, + ConsumerMode, ConsumerOptions, Message, MessageHeader, SeqNo, SeqPos, ShardId, SharedMessage, + StreamErr, StreamKey, Timestamp, SEA_STREAMER_INTERNAL, }; #[derive(Debug)] @@ -77,7 +76,6 @@ pub mod constants { pub const HEARTBEAT: Duration = Duration::from_secs(10); } -#[async_trait] impl Consumer for RedisConsumer { type Error = RedisErr; type Message<'a> = SharedMessage; diff --git a/sea-streamer-redis/src/producer.rs b/sea-streamer-redis/src/producer.rs index b553eaf..e769390 100644 --- a/sea-streamer-redis/src/producer.rs +++ b/sea-streamer-redis/src/producer.rs @@ -8,9 +8,8 @@ use crate::{ }; use sea_streamer_runtime::{sleep, spawn_task}; use sea_streamer_types::{ - export::{async_trait, futures::FutureExt}, - Buffer, MessageHeader, Producer, ProducerOptions, ShardId, StreamErr, StreamKey, Timestamp, - SEA_STREAMER_INTERNAL, + export::futures::FutureExt, Buffer, MessageHeader, Producer, ProducerOptions, ShardId, + StreamErr, StreamKey, Timestamp, SEA_STREAMER_INTERNAL, }; const MAX_RETRY: usize = 100; @@ -87,7 +86,6 @@ pub struct RoundRobinSharder { state: u32, } -#[async_trait] impl Producer for RedisProducer { type Error = RedisErr; type SendFuture = SendFuture; diff --git a/sea-streamer-redis/src/streamer.rs b/sea-streamer-redis/src/streamer.rs index 75bd6b4..4e7862d 100644 --- a/sea-streamer-redis/src/streamer.rs +++ b/sea-streamer-redis/src/streamer.rs @@ -5,7 +5,7 @@ use crate::{ RedisProducer, RedisProducerOptions, RedisResult, REDIS_PORT, }; use sea_streamer_types::{ - export::async_trait, ConnectOptions, StreamErr, StreamKey, StreamUrlErr, Streamer, StreamerUri, + ConnectOptions, StreamErr, StreamKey, StreamUrlErr, Streamer, StreamerUri, }; #[derive(Debug, Clone)] @@ -26,7 +26,6 @@ pub struct RedisConnectOptions { disable_hostname_verification: bool, } -#[async_trait] impl Streamer for RedisStreamer { type Error = RedisErr; type Producer = RedisProducer; diff --git a/sea-streamer-socket/src/consumer.rs b/sea-streamer-socket/src/consumer.rs index 9e2c7e0..f1e3141 100644 --- a/sea-streamer-socket/src/consumer.rs +++ b/sea-streamer-socket/src/consumer.rs @@ -9,10 +9,7 @@ use sea_streamer_stdio::StdioConsumer; use crate::{map_err, Backend, BackendErr, SeaMessage, SeaResult, SeaStreamerBackend}; use sea_streamer_types::{ - export::{ - async_trait, - futures::{FutureExt, Stream}, - }, + export::futures::{FutureExt, Stream}, Consumer, SeqPos, ShardId, StreamKey, StreamResult, Timestamp, }; use std::{fmt::Debug, future::Future, pin::Pin, task::Poll}; @@ -201,7 +198,6 @@ impl SeaStreamerBackend for SeaConsumer { } } -#[async_trait] impl Consumer for SeaConsumer { type Error = BackendErr; type Message<'a> = SeaMessage<'a>; diff --git a/sea-streamer-socket/src/producer.rs b/sea-streamer-socket/src/producer.rs index 0e5a232..781f7a3 100644 --- a/sea-streamer-socket/src/producer.rs +++ b/sea-streamer-socket/src/producer.rs @@ -9,8 +9,7 @@ use sea_streamer_stdio::StdioProducer; use crate::{map_err, Backend, BackendErr, SeaResult, SeaStreamerBackend}; use sea_streamer_types::{ - export::{async_trait, futures::FutureExt}, - Buffer, Producer, Receipt, StreamKey, StreamResult, + export::futures::FutureExt, Buffer, Producer, Receipt, StreamKey, StreamResult, }; use std::{future::Future, pin::Pin, task::Poll}; @@ -157,7 +156,6 @@ pub enum SendFuture { File(sea_streamer_file::SendFuture), } -#[async_trait] impl Producer for SeaProducer { type Error = BackendErr; diff --git a/sea-streamer-socket/src/streamer.rs b/sea-streamer-socket/src/streamer.rs index 94ae431..0525400 100644 --- a/sea-streamer-socket/src/streamer.rs +++ b/sea-streamer-socket/src/streamer.rs @@ -7,7 +7,7 @@ use sea_streamer_redis::RedisStreamer; #[cfg(feature = "backend-stdio")] use sea_streamer_stdio::StdioStreamer; -use sea_streamer_types::{export::async_trait, StreamErr, StreamKey, Streamer, StreamerUri}; +use sea_streamer_types::{StreamErr, StreamKey, Streamer, StreamerUri}; use crate::{ map_err, Backend, BackendErr, SeaConnectOptions, SeaConsumer, SeaConsumerBackend, @@ -145,7 +145,6 @@ impl SeaStreamerBackend for SeaStreamer { } } -#[async_trait] impl Streamer for SeaStreamer { type Error = BackendErr; type Producer = SeaProducer; diff --git a/sea-streamer-stdio/src/consumer.rs b/sea-streamer-stdio/src/consumer.rs index 684e357..1f6192a 100644 --- a/sea-streamer-stdio/src/consumer.rs +++ b/sea-streamer-stdio/src/consumer.rs @@ -5,10 +5,7 @@ use flume::{ use std::sync::Mutex; use sea_streamer_types::{ - export::{ - async_trait, - futures::{future::MapErr, stream::Map as StreamMap, StreamExt, TryFutureExt}, - }, + export::futures::{future::MapErr, stream::Map as StreamMap, StreamExt, TryFutureExt}, Consumer as ConsumerTrait, ConsumerGroup, SeqPos, ShardId, SharedMessage, StreamErr, StreamKey, Timestamp, }; @@ -118,7 +115,6 @@ impl Drop for StdioConsumer { } } -#[async_trait] impl ConsumerTrait for StdioConsumer { type Error = StdioErr; type Message<'a> = SharedMessage; diff --git a/sea-streamer-stdio/src/producer.rs b/sea-streamer-stdio/src/producer.rs index e6c2008..efc1661 100644 --- a/sea-streamer-stdio/src/producer.rs +++ b/sea-streamer-stdio/src/producer.rs @@ -2,9 +2,8 @@ use flume::{bounded, r#async::RecvFut, unbounded, Sender}; use std::{collections::HashMap, fmt::Debug, future::Future, sync::Mutex}; use sea_streamer_types::{ - export::{async_trait, futures::FutureExt}, - Buffer, Message, MessageHeader, Producer as ProducerTrait, Receipt, SeqNo, ShardId, - SharedMessage, StreamErr, StreamKey, StreamResult, Timestamp, + export::futures::FutureExt, Buffer, Message, MessageHeader, Producer as ProducerTrait, Receipt, + SeqNo, ShardId, SharedMessage, StreamErr, StreamKey, StreamResult, Timestamp, }; use crate::{PartialHeader, StdioErr, StdioResult, BROADCAST, TIMESTAMP_FORMAT}; @@ -174,7 +173,6 @@ impl Debug for SendFuture { } } -#[async_trait] impl ProducerTrait for StdioProducer { type Error = StdioErr; type SendFuture = SendFuture; diff --git a/sea-streamer-stdio/src/streamer.rs b/sea-streamer-stdio/src/streamer.rs index a56a45b..80f4ac9 100644 --- a/sea-streamer-stdio/src/streamer.rs +++ b/sea-streamer-stdio/src/streamer.rs @@ -4,7 +4,7 @@ use crate::{ consumer, create_consumer, producer, StdioConsumer, StdioErr, StdioProducer, StdioResult, }; use sea_streamer_types::{ - export::async_trait, ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode, + ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode, ConsumerOptions as ConsumerOptionsTrait, ProducerOptions as ProducerOptionsTrait, StreamErr, StreamKey, Streamer as StreamerTrait, StreamerUri, }; @@ -28,7 +28,6 @@ pub struct StdioConsumerOptions { #[derive(Debug, Default, Clone)] pub struct StdioProducerOptions {} -#[async_trait] impl StreamerTrait for StdioStreamer { type Error = StdioErr; type Producer = StdioProducer; diff --git a/sea-streamer-types/Cargo.toml b/sea-streamer-types/Cargo.toml index 06bb78c..f8ef582 100644 --- a/sea-streamer-types/Cargo.toml +++ b/sea-streamer-types/Cargo.toml @@ -16,7 +16,6 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies] -async-trait = { version = "0.1" } futures = { version = "0.3", default-features = false, features = ["std", "alloc", "async-await"] } thiserror = { version = "1", default-features = false } time = { version = "0.3", default-features = false, features = ["std", "macros", "formatting"] } diff --git a/sea-streamer-types/src/consumer.rs b/sea-streamer-types/src/consumer.rs index 048a934..74c5f7f 100644 --- a/sea-streamer-types/src/consumer.rs +++ b/sea-streamer-types/src/consumer.rs @@ -1,5 +1,4 @@ use crate::{Message, SeqPos, ShardId, StreamKey, StreamResult, Timestamp}; -use async_trait::async_trait; use futures::{Future, Stream}; #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -50,7 +49,6 @@ pub trait ConsumerOptions: Default + Clone + Send { ) -> StreamResult<&mut Self, Self::Error>; } -#[async_trait] /// Common interface of consumers, to be implemented by all backends. pub trait Consumer: Sized + Send + Sync { type Error: std::error::Error; @@ -69,12 +67,16 @@ pub trait Consumer: Sized + Send + Sync { /// with a timestamp later than `to`. /// /// If the consumer is not already assigned, shard ZERO will be used. - async fn seek(&mut self, to: Timestamp) -> StreamResult<(), Self::Error>; + fn seek(&mut self, to: Timestamp) + -> impl Future> + Send; /// Rewind all streams to a particular sequence number. /// /// If the consumer is not already assigned, shard ZERO will be used. - async fn rewind(&mut self, offset: SeqPos) -> StreamResult<(), Self::Error>; + fn rewind( + &mut self, + offset: SeqPos, + ) -> impl Future> + Send; /// Assign this consumer to a particular shard. Can be called multiple times to assign /// to multiple shards. Returns error `StreamKeyNotFound` if the stream is not currently subscribed. diff --git a/sea-streamer-types/src/export.rs b/sea-streamer-types/src/export.rs index a9f7031..5b6c768 100644 --- a/sea-streamer-types/src/export.rs +++ b/sea-streamer-types/src/export.rs @@ -1,4 +1,3 @@ -pub use async_trait::async_trait; pub use futures; pub use time; pub use url; diff --git a/sea-streamer-types/src/producer.rs b/sea-streamer-types/src/producer.rs index ca0cd50..604c773 100644 --- a/sea-streamer-types/src/producer.rs +++ b/sea-streamer-types/src/producer.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use futures::Future; use crate::{Buffer, MessageHeader, StreamKey, StreamResult}; @@ -9,7 +8,6 @@ pub trait ProducerOptions: Default + Clone + Send {} /// Delivery receipt. pub type Receipt = MessageHeader; -#[async_trait] /// Common interface of producers, to be implemented by all backends. pub trait Producer: Clone + Send + Sync { type Error: std::error::Error; @@ -32,10 +30,10 @@ pub trait Producer: Clone + Send + Sync { } /// End this producer, only after flushing all it's pending messages. - async fn end(self) -> StreamResult<(), Self::Error>; + fn end(self) -> impl Future> + Send; /// Flush all pending messages. - async fn flush(&mut self) -> StreamResult<(), Self::Error>; + fn flush(&mut self) -> impl Future> + Send; /// Lock this producer to a particular stream. This function can only be called once. /// Subsequent calls should return `StreamErr::AlreadyAnchored` error. diff --git a/sea-streamer-types/src/streamer.rs b/sea-streamer-types/src/streamer.rs index 95c9ff3..cfed0d3 100644 --- a/sea-streamer-types/src/streamer.rs +++ b/sea-streamer-types/src/streamer.rs @@ -4,7 +4,7 @@ use crate::{ ConnectOptions, Consumer, ConsumerOptions, Producer, ProducerOptions, StreamKey, StreamResult, StreamUrlErr, }; -use async_trait::async_trait; +use futures::{Future, FutureExt, TryFutureExt}; use url::Url; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -38,7 +38,6 @@ pub struct StreamUrl { streams: Vec, } -#[async_trait] /// Common interface of streamer clients. pub trait Streamer: Sized { type Error: std::error::Error; @@ -49,37 +48,41 @@ pub trait Streamer: Sized { type ProducerOptions: ProducerOptions; /// Establish a connection to the streaming server. - async fn connect( + fn connect( streamer: StreamerUri, options: Self::ConnectOptions, - ) -> StreamResult; + ) -> impl Future> + Send; /// Flush and disconnect from the streaming server. - async fn disconnect(self) -> StreamResult<(), Self::Error>; + fn disconnect(self) -> impl Future> + Send; /// Create a producer that can stream to any stream key. - async fn create_generic_producer( + fn create_generic_producer( &self, options: Self::ProducerOptions, - ) -> StreamResult; + ) -> impl Future> + Send; /// Create a producer that streams to the specified stream. - async fn create_producer( + fn create_producer( &self, stream: StreamKey, options: Self::ProducerOptions, - ) -> StreamResult { - let mut producer = self.create_generic_producer(options).await?; - producer.anchor(stream)?; - Ok(producer) + ) -> impl Future> + Send { + self.create_generic_producer(options).map(|res| { + res.and_then(|mut producer| { + producer.anchor(stream)?; + + Ok(producer) + }) + }) } /// Create a consumer subscribing to the specified streams. - async fn create_consumer( + fn create_consumer( &self, streams: &[StreamKey], options: Self::ConsumerOptions, - ) -> StreamResult; + ) -> impl Future> + Send; } impl Display for StreamerUri {