Skip to content

Commit

Permalink
Migrate to clap (#19)
Browse files Browse the repository at this point in the history
* chore(deps): replace structopt by clap

* refactor: migrate to clap from structopt
  • Loading branch information
negezor authored Apr 13, 2024
1 parent 5830b31 commit 088ba70
Show file tree
Hide file tree
Showing 33 changed files with 128 additions and 128 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Here is a basic [stream consumer](https://github.com/SeaQL/sea-streamer/tree/mai
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();

let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

Expand All @@ -76,7 +76,7 @@ Here is a basic [stream producer](https://github.com/SeaQL/sea-streamer/tree/mai
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();

let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

Expand Down Expand Up @@ -105,7 +105,7 @@ See also other [advanced stream processors](https://github.com/SeaQL/sea-streame
async fn main() -> Result<()> {
env_logger::init();

let Args { input, output } = Args::from_args();
let Args { input, output } = Args::parse();

let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
let options = SeaConsumerOptions::new(ConsumerMode::RealTime);
Expand Down
2 changes: 1 addition & 1 deletion benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ anyhow = { version = "1" }
async-std = { version = "1", features = ["attributes"], optional = true }
env_logger = { version = "0.9" }
flume = { version = "0.10", default-features = false, features = ["async"] }
structopt = { version = "0.3" }
clap = { version = "4.5", features = ["derive", "env"] }
tokio = { version = "1.10", features = ["full"], optional = true }

[dependencies.sea-streamer]
Expand Down
8 changes: 4 additions & 4 deletions benchmark/src/bin/baseline.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::Result;
use sea_streamer::{runtime::sleep, StreamUrl};
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `kafka://localhost:9092/my_topic`",
env = "STREAM_URL"
Expand All @@ -18,7 +18,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();
std::hint::black_box(stream);

for i in 0..100_000 {
Expand Down
8 changes: 4 additions & 4 deletions benchmark/src/bin/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use sea_streamer::{
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, SeaConsumer, SeaConsumerOptions,
SeaMessage, SeaStreamReset, SeaStreamer, StreamUrl, Streamer,
};
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key(s), i.e. try `kafka://localhost:9092/my_topic`",
env = "STREAM_URL"
Expand All @@ -20,7 +20,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();

let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

Expand Down
8 changes: 4 additions & 4 deletions benchmark/src/bin/producer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::Result;
use sea_streamer::{Producer, SeaProducer, SeaStreamer, StreamUrl, Streamer};
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `kafka://localhost:9092/my_topic`",
env = "STREAM_URL"
Expand All @@ -18,7 +18,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();

let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

Expand Down
10 changes: 5 additions & 5 deletions benchmark/src/bin/relay.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use anyhow::Result;
use sea_streamer::stdio::StdioStreamer;
use sea_streamer::{Consumer, Message, Producer, StreamKey, Streamer, StreamerUri};
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(long, help = "Stream key of input")]
#[clap(long, help = "Stream key of input")]
input: StreamKey,
#[structopt(long, help = "Stream key of output")]
#[clap(long, help = "Stream key of output")]
output: StreamKey,
}

#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();

let Args { input, output } = Args::from_args();
let Args { input, output } = Args::parse();

let streamer = StdioStreamer::connect(StreamerUri::zero(), Default::default()).await?;
let consumer = streamer
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ anyhow = { version = "1" }
async-std = { version = "1", features = ["attributes"], optional = true }
env_logger = { version = "0.9" }
flume = { version = "0.10", default-features = false, features = ["async"] }
structopt = { version = "0.3" }
clap = { version = "4.5", features = ["derive", "env"] }
tokio = { version = "1.10", features = ["full"], optional = true }

[dependencies.sea-streamer]
Expand Down
10 changes: 5 additions & 5 deletions examples/src/bin/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use anyhow::Result;
use flume::bounded;
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

use sea_streamer::{
runtime::{sleep, spawn_task},
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, Producer, SeaConsumer,
SeaConsumerOptions, SeaMessage, SeaProducer, SeaStreamer, SharedMessage, StreamUrl, Streamer,
};

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key(s), i.e. try `kafka://localhost:9092/my_topic`"
)]
input: StreamUrl,
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `stdio:///my_stream`"
)]
Expand All @@ -30,7 +30,7 @@ const NUM_THREADS: usize = 4; // Every one has at least 2 cores with 2 hyperthre
async fn main() -> Result<()> {
env_logger::init();

let Args { input, output } = Args::from_args();
let Args { input, output } = Args::parse();

// The queue
let (sender, receiver) = bounded(1024);
Expand Down
10 changes: 5 additions & 5 deletions examples/src/bin/buffered.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use anyhow::Result;
use flume::bounded;
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

use sea_streamer::{
runtime::{sleep, spawn_task},
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, Producer, SeaConsumer,
SeaConsumerOptions, SeaMessage, SeaProducer, SeaStreamer, SharedMessage, StreamUrl, Streamer,
};

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key(s), i.e. try `kafka://localhost:9092/my_topic`"
)]
input: StreamUrl,
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `stdio:///my_stream`"
)]
Expand All @@ -28,7 +28,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { input, output } = Args::from_args();
let Args { input, output } = Args::parse();

// The queue
let (sender, receiver) = bounded(1024);
Expand Down
8 changes: 4 additions & 4 deletions examples/src/bin/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use sea_streamer::{
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, SeaConsumer, SeaConsumerOptions,
SeaMessage, SeaStreamReset, SeaStreamer, StreamUrl, Streamer,
};
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key(s), i.e. try `kafka://localhost:9092/my_topic`",
env = "STREAM_URL"
Expand All @@ -20,7 +20,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();

let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

Expand Down
10 changes: 5 additions & 5 deletions examples/src/bin/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ use sea_streamer::{
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, Producer, SeaConsumer,
SeaConsumerOptions, SeaMessage, SeaProducer, SeaStreamer, StreamUrl, Streamer,
};
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key(s), i.e. try `kafka://localhost:9092/my_topic`"
)]
input: StreamUrl,
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `stdio:///my_stream`"
)]
Expand All @@ -24,7 +24,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { input, output } = Args::from_args();
let Args { input, output } = Args::parse();

let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
let options = SeaConsumerOptions::new(ConsumerMode::RealTime);
Expand Down
8 changes: 4 additions & 4 deletions examples/src/bin/producer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::Result;
use sea_streamer::{Producer, SeaProducer, SeaStreamer, StreamUrl, Streamer};
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `kafka://localhost:9092/my_topic`",
env = "STREAM_URL"
Expand All @@ -18,7 +18,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { stream } = Args::from_args();
let Args { stream } = Args::parse();

let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;

Expand Down
10 changes: 5 additions & 5 deletions examples/src/bin/resumable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ use sea_streamer::{
Streamer,
};
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

const TRANSACTION: bool = true;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key(s), i.e. try `kafka://localhost:9092/my_topic`"
)]
input: StreamUrl,
#[structopt(
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `stdio:///my_stream`"
)]
Expand All @@ -30,7 +30,7 @@ struct Args {
async fn main() -> Result<()> {
env_logger::init();

let Args { input, output } = Args::from_args();
let Args { input, output } = Args::parse();

let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
let mut options = SeaConsumerOptions::new(ConsumerMode::Resumable);
Expand Down
4 changes: 2 additions & 2 deletions sea-streamer-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ sea-streamer-types = { version = "0.3", path = "../sea-streamer-types" }
sea-streamer-runtime = { version = "0.3", path = "../sea-streamer-runtime", features = ["file"]}
serde = { version = "1", optional = true, features = ["derive"] }
serde_json = { version = "1", optional = true }
structopt = { version = "0.3", optional = true }
clap = { version = "4.5", features = ["derive"], optional = true }
thiserror = { version = "1", default-features = false }
tokio = { version = "1.10.0", optional = true }

Expand All @@ -37,7 +37,7 @@ tokio = { version = "1.10.0", optional = true }
[features]
default = []
test = ["anyhow", "async-std?/attributes", "tokio?/full", "env_logger"]
executables = ["anyhow", "tokio/full", "env_logger", "structopt", "sea-streamer-runtime/runtime-tokio", "serde", "serde_json", "sea-streamer-types/serde"]
executables = ["anyhow", "tokio/full", "env_logger", "clap", "sea-streamer-runtime/runtime-tokio", "serde", "serde_json", "sea-streamer-types/serde"]
runtime-async-std = ["async-std", "sea-streamer-runtime/runtime-async-std"]
runtime-tokio = ["tokio", "sea-streamer-runtime/runtime-tokio"]

Expand Down
10 changes: 5 additions & 5 deletions sea-streamer-file/src/bin/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use anyhow::{anyhow, Result};
use sea_streamer_file::{FileId, FileStreamer};
use sea_streamer_types::{Producer, StreamKey, Streamer};
use std::time::Duration;
use structopt::StructOpt;
use clap::Parser;

#[derive(Debug, StructOpt)]
#[derive(Debug, Parser)]
struct Args {
#[structopt(long, help = "Stream to this file")]
#[clap(long, help = "Stream to this file")]
file: FileId,
#[structopt(long, parse(try_from_str = parse_duration), help = "Period of the clock. e.g. 1s, 100ms")]
#[clap(long, parse(try_from_str = parse_duration), help = "Period of the clock. e.g. 1s, 100ms")]
interval: Duration,
}

Expand All @@ -28,7 +28,7 @@ fn parse_duration(src: &str) -> Result<Duration> {
async fn main() -> Result<()> {
env_logger::init();

let Args { file, interval } = Args::from_args();
let Args { file, interval } = Args::parse();

let stream_key = StreamKey::new("clock")?;
let streamer = FileStreamer::connect(file.to_streamer_uri()?, Default::default()).await?;
Expand Down
Loading

0 comments on commit 088ba70

Please sign in to comment.