Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jan 22, 2025
1 parent b18d3d8 commit 253d5cb
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ tokio-retry = "0.3"
tokio-stream = { workspace = true }
tonic = { workspace = true }
tracing = "0.1"
url = "2"
tracing-subscriber = "0.3.17"
url = "2"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }
Expand Down
34 changes: 22 additions & 12 deletions src/stream/src/executor/sync_kv_log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ impl<S: StateStore, LS: LocalStateStore> SyncedKvLogStoreExecutor<S, LS> {
metrics: &KvLogStoreMetrics,
state_store: S,
) -> StreamExecutorResult<StateStoreStream<S>> {

let init_epoch_pair = barrier.epoch;
local_state_store
.init(InitOptions::new(init_epoch_pair))
Expand Down Expand Up @@ -727,16 +726,18 @@ where
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use crate::executor::prelude::*;
use risingwave_common::test_prelude::*;
use risingwave_common::catalog::Field;
use risingwave_common::hash::VirtualNode;
use risingwave_common::test_prelude::*;
use risingwave_common::util::epoch::test_epoch;
use risingwave_storage::memory::MemoryStateStore;

use super::*;
use crate::common::log_store_impl::kv_log_store::test_utils::{check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema};
use crate::common::log_store_impl::kv_log_store::test_utils::{
check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
};
use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
use crate::executor::prelude::*;
use crate::executor::test_utils::MockSource;

fn init_logger() {
Expand Down Expand Up @@ -777,7 +778,8 @@ mod tests {
10,
source,
)
.await.boxed();
.await
.boxed();

// Init
tx.push_barrier(test_epoch(1), false);
Expand Down Expand Up @@ -871,7 +873,8 @@ mod tests {
10,
source,
)
.await.boxed();
.await
.boxed();

// Init
tx.push_barrier(test_epoch(1), false);
Expand Down Expand Up @@ -938,9 +941,10 @@ mod tests {

let pk_info = &KV_LOG_STORE_V2_INFO;
let column_descs = test_payload_schema(&pk_info);
let fields = column_descs.into_iter().map(|desc| {
Field::new(desc.name.clone(), desc.data_type.clone())
}).collect_vec();
let fields = column_descs
.into_iter()
.map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
.collect_vec();
let schema = Schema { fields };
let pk_indices = vec![0];
let (mut tx, source) = MockSource::channel();
Expand All @@ -962,7 +966,8 @@ mod tests {
0,
source,
)
.await.boxed();
.await
.boxed();

// Init
tx.push_barrier(test_epoch(1), false);
Expand Down Expand Up @@ -1014,9 +1019,14 @@ mod tests {
- 6 10
- 8 10
U- 10 11
U+ 10 10"
U+ 10 10",
);
assert!(
check_stream_chunk_eq(&actual, &expected),
"Expected: {:#?}, got: {:#?}",
expected,
actual
);
assert!(check_stream_chunk_eq(&actual, &expected), "Expected: {:#?}, got: {:#?}", expected, actual);
}
other => panic!("Expected a chunk message, got {:?}", other),
}
Expand Down

0 comments on commit 253d5cb

Please sign in to comment.