Skip to content

Commit

Permalink
Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
fede1024 committed Feb 20, 2018
1 parent c89b769 commit 90e0f19
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 32 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ futures = "0.1.0"
log = "0.4.0"
rand = "0.4.0"
# rdkafka = "0.14.0"
rdkafka = { path = "/home/fede/rust/rust-rdkafka/" }
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka.git" }
serde = "1.0.0"
serde_derive = "1.0.0"
serde_yaml = "0.7.0"
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ pub struct ConsumerScenario {

fn or_expect<T: Clone>(first: &Option<T>, second: &Option<T>, name: &str) -> T {
first.as_ref()
.map(|config| config.clone())
.or(second.clone())
.cloned()
.or_else(|| second.clone())
.expect(&format!("Missing configuration parameter: {}", name))
}

Expand Down
4 changes: 2 additions & 2 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl ConsumerBenchmarkStats {
}

fn get_topic_partitions_count<X: ConsumerContext, C: Consumer<X>>(consumer: &C, topic_name: &str) -> Option<usize> {
let metadata = consumer.fetch_metadata(Some(topic_name), 30000)
let metadata = consumer.fetch_metadata(Some(topic_name), Duration::from_secs(30))
.expect("Failed to fetch metadata");

if metadata.topics().is_empty() {
Expand Down Expand Up @@ -76,7 +76,7 @@ fn run_base_consumer_benchmark(scenario: &ConsumerScenario) -> ConsumerBenchmark
let mut bytes = 0;

while messages < limit {
match consumer.poll(1000) {
match consumer.poll(Duration::from_secs(1)) {
None => {},
Some(Ok(message)) => {
if messages == 0 {
Expand Down
10 changes: 5 additions & 5 deletions src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn base_producer_thread(
.send_copy::<str, str>(&scenario.topic, None, Some("warmup"), None, (), None)
.expect("Producer error");
failure_counter.store(0, Ordering::Relaxed);
producer.flush(10_000);
producer.flush(Duration::from_secs(10));

let per_thread_messages = if thread_id == 0 {
scenario.message_count - scenario.message_count / scenario.threads * (scenario.threads - 1)
Expand All @@ -75,7 +75,7 @@ fn base_producer_thread(
None,
) {
Err(KafkaError::MessageProduction(RDKafkaError::QueueFull)) => {
producer.poll(10);
producer.poll(Duration::from_millis(10));
continue;
}
Err(e) => {
Expand All @@ -85,9 +85,9 @@ fn base_producer_thread(
Ok(_) => break,
}
}
producer.poll(0);
producer.poll(Duration::from_secs(0));
}
producer.flush(120_000);
producer.flush(Duration::from_secs(120));
ThreadStats::new(start.elapsed(), failure_counter.load(Ordering::Relaxed))
}

Expand Down Expand Up @@ -143,7 +143,7 @@ fn future_producer_thread(
}
}
failures += wait_all(futures);
producer.flush(120_000);
producer.flush(Duration::from_secs(120));
ThreadStats::new(start.elapsed(), failures)
}

Expand Down
22 changes: 0 additions & 22 deletions src/units.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,6 @@ impl fmt::Display for Seconds {
#[derive(Copy, Clone, Debug, PartialOrd, PartialEq)]
pub struct Messages(pub f64);

impl Messages {
pub fn zero() -> Messages {
Messages(0f64)
}

pub fn is_zero(&self) -> bool {
self.0 == 0f64
}
}

impl Default for Messages {
fn default() -> Messages {
Messages(0f64)
Expand Down Expand Up @@ -92,18 +82,6 @@ const KB: f64 = (1<<10) as f64;
#[derive(Copy, Clone, Debug, PartialOrd, PartialEq)]
pub struct Bytes(pub f64);

impl Bytes {
pub fn zero() -> Bytes {
Bytes(0f64)
}
}

impl Default for Bytes {
fn default() -> Bytes {
Bytes(0f64)
}
}

impl From<u64> for Bytes {
fn from(amount: u64) -> Bytes {
Bytes(amount as f64)
Expand Down

0 comments on commit 90e0f19

Please sign in to comment.