Skip to content

Commit

Permalink
refactor: migrate to clap from structopt
Browse files Browse the repository at this point in the history
  • Loading branch information
negezor committed Apr 12, 2024
1 parent e3efec3 commit 38495f3
Show file tree
Hide file tree
Showing 25 changed files with 115 additions and 115 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Here is a basic [stream consumer](https://github.com/SeaQL/sea-streamer/tree/mai
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();

let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

Expand All @@ -76,7 +76,7 @@ Here is a basic [stream producer](https://github.com/SeaQL/sea-streamer/tree/mai
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();

let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

Expand Down Expand Up @@ -105,7 +105,7 @@ See also other [advanced stream processors](https://github.com/SeaQL/sea-streame
async fn main() -> Result<()> {
env_logger::init();

let Args { input, output } = Args::from_args();
let Args { input, output } = Args::parse();

let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
let options = SeaConsumerOptions::new(ConsumerMode::RealTime);
Expand Down
8 changes: 4 additions & 4 deletions benchmark/src/bin/baseline.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::Result;
use sea_streamer::{runtime::sleep, StreamUrl};
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `kafka://localhost:9092/my_topic`",
env = "STREAM_URL"
Expand All @@ -18,7 +18,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();
std::hint::black_box(stream);

for i in 0..100_000 {
Expand Down
8 changes: 4 additions & 4 deletions benchmark/src/bin/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use sea_streamer::{
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, SeaConsumer, SeaConsumerOptions,
SeaMessage, SeaStreamReset, SeaStreamer, StreamUrl, Streamer,
};
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key(s), i.e. try `kafka://localhost:9092/my_topic`",
env = "STREAM_URL"
Expand All @@ -20,7 +20,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();

let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

Expand Down
8 changes: 4 additions & 4 deletions benchmark/src/bin/producer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::Result;
use sea_streamer::{Producer, SeaProducer, SeaStreamer, StreamUrl, Streamer};
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `kafka://localhost:9092/my_topic`",
env = "STREAM_URL"
Expand All @@ -18,7 +18,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();

let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

Expand Down
10 changes: 5 additions & 5 deletions benchmark/src/bin/relay.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use anyhow::Result;
use sea_streamer::stdio::StdioStreamer;
use sea_streamer::{Consumer, Message, Producer, StreamKey, Streamer, StreamerUri};
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(long, help = "Stream key of input")]
#[clap(long, help = "Stream key of input")]
input: StreamKey,
#[structopt(long, help = "Stream key of output")]
#[clap(long, help = "Stream key of output")]
output: StreamKey,
}

#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();

let Args { input, output } = Args::from_args();
let Args { input, output } = Args::parse();

let streamer = StdioStreamer::connect(StreamerUri::zero(), Default::default()).await?;
let consumer = streamer
Expand Down
10 changes: 5 additions & 5 deletions examples/src/bin/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use anyhow::Result;
use flume::bounded;
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

use sea_streamer::{
runtime::{sleep, spawn_task},
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, Producer, SeaConsumer,
SeaConsumerOptions, SeaMessage, SeaProducer, SeaStreamer, SharedMessage, StreamUrl, Streamer,
};

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key(s), i.e. try `kafka://localhost:9092/my_topic`"
)]
input: StreamUrl,
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `stdio:///my_stream`"
)]
Expand All @@ -30,7 +30,7 @@ const NUM_THREADS: usize = 4; // Every one has at least 2 cores with 2 hyperthre
async fn main() -> Result<()> {
env_logger::init();

let Args { input, output } = Args::from_args();
let Args { input, output } = Args::parse();

// The queue
let (sender, receiver) = bounded(1024);
Expand Down
10 changes: 5 additions & 5 deletions examples/src/bin/buffered.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use anyhow::Result;
use flume::bounded;
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

use sea_streamer::{
runtime::{sleep, spawn_task},
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, Producer, SeaConsumer,
SeaConsumerOptions, SeaMessage, SeaProducer, SeaStreamer, SharedMessage, StreamUrl, Streamer,
};

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key(s), i.e. try `kafka://localhost:9092/my_topic`"
)]
input: StreamUrl,
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `stdio:///my_stream`"
)]
Expand All @@ -28,7 +28,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { input, output } = Args::from_args();
let Args { input, output } = Args::parse();

// The queue
let (sender, receiver) = bounded(1024);
Expand Down
8 changes: 4 additions & 4 deletions examples/src/bin/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use sea_streamer::{
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, SeaConsumer, SeaConsumerOptions,
SeaMessage, SeaStreamReset, SeaStreamer, StreamUrl, Streamer,
};
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key(s), i.e. try `kafka://localhost:9092/my_topic`",
env = "STREAM_URL"
Expand All @@ -20,7 +20,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();

let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

Expand Down
10 changes: 5 additions & 5 deletions examples/src/bin/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ use sea_streamer::{
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, Producer, SeaConsumer,
SeaConsumerOptions, SeaMessage, SeaProducer, SeaStreamer, StreamUrl, Streamer,
};
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key(s), i.e. try `kafka://localhost:9092/my_topic`"
)]
input: StreamUrl,
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `stdio:///my_stream`"
)]
Expand All @@ -24,7 +24,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { input, output } = Args::from_args();
let Args { input, output } = Args::parse();

let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
let options = SeaConsumerOptions::new(ConsumerMode::RealTime);
Expand Down
8 changes: 4 additions & 4 deletions examples/src/bin/producer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::Result;
use sea_streamer::{Producer, SeaProducer, SeaStreamer, StreamUrl, Streamer};
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `kafka://localhost:9092/my_topic`",
env = "STREAM_URL"
Expand All @@ -18,7 +18,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();

let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

Expand Down
10 changes: 5 additions & 5 deletions examples/src/bin/resumable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ use sea_streamer::{
Streamer,
};
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

const TRANSACTION: bool = true;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key(s), i.e. try `kafka://localhost:9092/my_topic`"
)]
input: StreamUrl,
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `stdio:///my_stream`"
)]
Expand All @@ -30,7 +30,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { input, output } = Args::from_args();
let Args { input, output } = Args::parse();

let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
let mut options = SeaConsumerOptions::new(ConsumerMode::Resumable);
Expand Down
10 changes: 5 additions & 5 deletions sea-streamer-file/src/bin/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use anyhow::{anyhow, Result};
use sea_streamer_file::{FileId, FileStreamer};
use sea_streamer_types::{Producer, StreamKey, Streamer};
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(long, help = "Stream to this file")]
#[clap(long, help = "Stream to this file")]
file: FileId,
#[structopt(long, parse(try_from_str = parse_duration), help = "Period of the clock. e.g. 1s, 100ms")]
#[clap(long, parse(try_from_str = parse_duration), help = "Period of the clock. e.g. 1s, 100ms")]
interval: Duration,
}

Expand All @@ -28,7 +28,7 @@ fn parse_duration(src: &str) -> Result<Duration> {
async fn main() -> Result<()> {
env_logger::init();

let Args { file, interval } = Args::from_args();
let Args { file, interval } = Args::parse();

let stream_key = StreamKey::new("clock")?;
let streamer = FileStreamer::connect(file.to_streamer_uri()?, Default::default()).await?;
Expand Down
12 changes: 6 additions & 6 deletions sea-streamer-file/src/bin/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ use sea_streamer_file::{
};
use sea_streamer_types::{Buffer, Message, TIMESTAMP_FORMAT};
use std::str::FromStr;
use structopt::StructOpt;
use clap::Parser;

#[derive(StructOpt)]
#[derive(Parser)]
struct Args {
#[structopt(long, help = "Decode this file")]
#[clap(long, help = "Decode this file")]
file: FileId,
#[structopt(long, help = "If set, skip printing the payload")]
#[clap(long, help = "If set, skip printing the payload")]
header_only: bool,
#[structopt(long, help = "The output format", default_value = "log")]
#[clap(long, help = "The output format", default_value = "log")]
format: Format,
}

Expand All @@ -46,7 +46,7 @@ async fn main() -> Result<()> {
file,
header_only,
format,
} = Args::from_args();
} = Args::parse();

let mut source = MessageSource::new(file, StreamMode::Replay).await?;

Expand Down
10 changes: 5 additions & 5 deletions sea-streamer-file/src/bin/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use anyhow::{anyhow, Result};
use sea_streamer_file::{FileId, MessageSink, DEFAULT_BEACON_INTERVAL, DEFAULT_FILE_SIZE_LIMIT};
use sea_streamer_types::{MessageHeader, OwnedMessage, ShardId, StreamKey, Timestamp};
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(long, help = "Stream to this file")]
#[clap(long, help = "Stream to this file")]
file: FileId,
#[structopt(long, parse(try_from_str = parse_duration), help = "Period of the clock. e.g. 1s, 100ms")]
#[clap(long, parse(try_from_str = parse_duration), help = "Period of the clock. e.g. 1s, 100ms")]
interval: Duration,
}

Expand All @@ -28,7 +28,7 @@ fn parse_duration(src: &str) -> Result<Duration> {
async fn main() -> Result<()> {
env_logger::init();

let Args { file, interval } = Args::from_args();
let Args { file, interval } = Args::parse();
let mut sink = MessageSink::new(
file.clone(),
DEFAULT_BEACON_INTERVAL,
Expand Down
Loading

0 comments on commit 38495f3

Please sign in to comment.