From 253d5cb09e83ed31c9efd03911d8ca2cbe6276de Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 22 Jan 2025 20:35:46 +0800 Subject: [PATCH] fmt --- src/stream/Cargo.toml | 2 +- src/stream/src/executor/sync_kv_log_store.rs | 34 +++++++++++++------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 0d5ecd4229743..c70368ff88e5f 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -72,8 +72,8 @@ tokio-retry = "0.3" tokio-stream = { workspace = true } tonic = { workspace = true } tracing = "0.1" -url = "2" tracing-subscriber = "0.3.17" +url = "2" [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/stream/src/executor/sync_kv_log_store.rs b/src/stream/src/executor/sync_kv_log_store.rs index dcd88380ec385..cd800ac33fb98 100644 --- a/src/stream/src/executor/sync_kv_log_store.rs +++ b/src/stream/src/executor/sync_kv_log_store.rs @@ -218,7 +218,6 @@ impl SyncedKvLogStoreExecutor { metrics: &KvLogStoreMetrics, state_store: S, ) -> StreamExecutorResult> { - let init_epoch_pair = barrier.epoch; local_state_store .init(InitOptions::new(init_epoch_pair)) @@ -727,16 +726,18 @@ where #[cfg(test)] mod tests { use pretty_assertions::assert_eq; - use crate::executor::prelude::*; - use risingwave_common::test_prelude::*; use risingwave_common::catalog::Field; use risingwave_common::hash::VirtualNode; + use risingwave_common::test_prelude::*; use risingwave_common::util::epoch::test_epoch; use risingwave_storage::memory::MemoryStateStore; use super::*; - use crate::common::log_store_impl::kv_log_store::test_utils::{check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema}; + use crate::common::log_store_impl::kv_log_store::test_utils::{ + check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema, + }; use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO; + use crate::executor::prelude::*; use crate::executor::test_utils::MockSource; fn init_logger() { @@ -777,7 +778,8 @@ mod tests { 10, source, ) - .await.boxed(); + .await + .boxed(); // Init tx.push_barrier(test_epoch(1), false); @@ -871,7 +873,8 @@ mod tests { 10, source, ) - .await.boxed(); + .await + .boxed(); // Init tx.push_barrier(test_epoch(1), false); @@ -938,9 +941,10 @@ mod tests { let pk_info = &KV_LOG_STORE_V2_INFO; let column_descs = test_payload_schema(&pk_info); - let fields = column_descs.into_iter().map(|desc| { - Field::new(desc.name.clone(), desc.data_type.clone()) - }).collect_vec(); + let fields = column_descs + .into_iter() + .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone())) + .collect_vec(); let schema = Schema { fields }; let pk_indices = vec![0]; let (mut tx, source) = MockSource::channel(); @@ -962,7 +966,8 @@ mod tests { 0, source, ) - .await.boxed(); + .await + .boxed(); // Init tx.push_barrier(test_epoch(1), false); @@ -1014,9 +1019,14 @@ mod tests { - 6 10 - 8 10 U- 10 11 - U+ 10 10" + U+ 10 10", + ); + assert!( + check_stream_chunk_eq(&actual, &expected), + "Expected: {:#?}, got: {:#?}", + expected, + actual ); - assert!(check_stream_chunk_eq(&actual, &expected), "Expected: {:#?}, got: {:#?}", expected, actual); } other => panic!("Expected a chunk message, got {:?}", other), }