Skip to content

Commit 04f4b03

Browse files
committed
crates/runtime: enhance harness to drive multiple Apply requests
Build and parse a complete ops::ShardLabeling and use that to create Apply and Open requests for harness sessions. Perform an Apply immediately before each Open.
1 parent 2f18a0e commit 04f4b03

File tree

6 files changed

+115
-106
lines changed

6 files changed

+115
-106
lines changed

crates/runtime/src/harness/capture.rs

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,58 +24,31 @@ pub fn run_capture<L: LogHandler>(
2424
let spec = spec.clone();
2525
let state_dir = state_dir.to_owned();
2626

27-
// TODO(johnny): extract from spec?
28-
let version = super::unique_version();
29-
3027
coroutines::try_coroutine(move |mut co| async move {
3128
let (mut request_tx, request_rx) = mpsc::channel(crate::CHANNEL_BUFFER);
3229
let response_rx = runtime.serve_capture(request_rx);
3330
tokio::pin!(response_rx);
3431

35-
// Send Apply.
36-
let apply = Request {
37-
apply: Some(request::Apply {
38-
capture: Some(spec.clone()),
39-
dry_run: false,
40-
version: version.clone(),
41-
last_capture: None,
42-
last_version: String::new(),
43-
}),
44-
..Default::default()
45-
};
46-
request_tx.try_send(Ok(apply)).expect("sender is empty");
47-
48-
// Receive Applied.
49-
match response_rx.try_next().await? {
50-
Some(applied) if applied.applied.is_some() => {
51-
() = co.yield_(applied).await;
52-
}
53-
response => return verify("runtime", "Applied").fail(response),
54-
}
55-
5632
let state_dir = state_dir.to_str().context("tempdir is not utf8")?;
57-
let rocksdb_desc = Some(runtime::RocksDbDescriptor {
33+
let rocksdb_desc = runtime::RocksDbDescriptor {
5834
rocksdb_env_memptr: 0,
5935
rocksdb_path: state_dir.to_owned(),
60-
});
61-
let open_ext = capture_request_ext::Open {
62-
rocksdb_descriptor: rocksdb_desc.clone(),
6336
};
6437

6538
let sessions_len = sessions.len();
66-
for (index, target_transactions) in sessions.into_iter().enumerate() {
39+
for (sessions_index, target_transactions) in sessions.into_iter().enumerate() {
6740
() = run_session(
6841
&mut co,
6942
delay,
70-
index == sessions_len - 1,
71-
&open_ext,
7243
&mut request_tx,
7344
&mut response_rx,
45+
&rocksdb_desc,
46+
sessions_index,
47+
sessions_len,
7448
&spec,
7549
&mut state,
7650
target_transactions,
7751
timeout,
78-
&version,
7952
)
8053
.await?;
8154
}
@@ -84,7 +57,7 @@ pub fn run_capture<L: LogHandler>(
8457
verify("runtime", "EOF").is_eof(response_rx.try_next().await?)?;
8558

8659
// Re-open RocksDB.
87-
let rocksdb = RocksDB::open(rocksdb_desc).await?;
60+
let rocksdb = RocksDB::open(Some(rocksdb_desc)).await?;
8861

8962
let checkpoint = rocksdb.load_checkpoint().await?;
9063
tracing::debug!(checkpoint = ?::ops::DebugJson(checkpoint), "final runtime checkpoint");
@@ -113,21 +86,50 @@ pub fn run_capture<L: LogHandler>(
11386
async fn run_session(
11487
co: &mut coroutines::Suspend<Response, ()>,
11588
delay: std::time::Duration,
116-
last_session: bool,
117-
open_ext: &capture_request_ext::Open,
11889
request_tx: &mut mpsc::Sender<anyhow::Result<Request>>,
11990
response_rx: &mut Pin<&mut impl ResponseStream>,
91+
rocksdb_desc: &runtime::RocksDbDescriptor,
92+
sessions_index: usize,
93+
sessions_len: usize,
12094
spec: &flow::CaptureSpec,
12195
state: &mut models::RawValue,
12296
target_transactions: usize,
12397
timeout: std::time::Duration,
124-
version: &str,
12598
) -> anyhow::Result<()> {
99+
let labeling = crate::parse_shard_labeling(spec.shard_template.as_ref())?;
100+
101+
// Send Apply.
102+
let apply = Request {
103+
apply: Some(request::Apply {
104+
capture: Some(spec.clone()),
105+
dry_run: false,
106+
version: labeling.build.clone(),
107+
last_capture: None,
108+
last_version: String::new(),
109+
}),
110+
..Default::default()
111+
}
112+
.with_internal(|internal| {
113+
if sessions_index == 0 {
114+
internal.rocksdb_descriptor = Some(rocksdb_desc.clone());
115+
}
116+
internal.set_log_level(labeling.log_level());
117+
});
118+
request_tx.try_send(Ok(apply)).expect("sender is empty");
119+
120+
// Receive Applied.
121+
match response_rx.try_next().await? {
122+
Some(applied) if applied.applied.is_some() => {
123+
() = co.yield_(applied).await;
124+
}
125+
response => return verify("runtime", "Applied").fail(response),
126+
}
127+
126128
// Send Open.
127129
let open = Request {
128130
open: Some(request::Open {
129131
capture: Some(spec.clone()),
130-
version: version.to_string(),
132+
version: labeling.build.clone(),
131133
range: Some(flow::RangeSpec {
132134
key_begin: 0,
133135
key_end: u32::MAX,
@@ -138,9 +140,7 @@ async fn run_session(
138140
}),
139141
..Default::default()
140142
}
141-
.with_internal(|internal| {
142-
internal.open = Some(open_ext.clone());
143-
});
143+
.with_internal(|internal| internal.set_log_level(labeling.log_level()));
144144
request_tx.try_send(Ok(open)).expect("sender is empty");
145145

146146
// Receive Opened.
@@ -195,7 +195,7 @@ async fn run_session(
195195
match poll_result {
196196
PollResult::Invalid => return verify.fail(poll_response),
197197
PollResult::Ready => true,
198-
PollResult::CoolOff if last_session => break,
198+
PollResult::CoolOff if sessions_index + 1 == sessions_len => break,
199199
PollResult::CoolOff | PollResult::NotReady => false,
200200
PollResult::Restart => break,
201201
}

crates/runtime/src/harness/derive.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,36 +20,29 @@ pub fn run_derive<L: LogHandler>(
2020
let spec = spec.clone();
2121
let state_dir = state_dir.to_owned();
2222

23-
// TODO(johnny): extract from spec?
24-
let version = super::unique_version();
25-
2623
coroutines::try_coroutine(move |mut co| async move {
2724
let (mut request_tx, request_rx) = mpsc::channel(crate::CHANNEL_BUFFER);
2825
let response_rx = runtime.serve_derive(request_rx);
2926
tokio::pin!(response_rx);
3027

3128
let state_dir = state_dir.to_str().context("tempdir is not utf8")?;
32-
let rocksdb_desc = Some(runtime::RocksDbDescriptor {
29+
let rocksdb_desc = runtime::RocksDbDescriptor {
3330
rocksdb_env_memptr: 0,
3431
rocksdb_path: state_dir.to_owned(),
35-
});
36-
let open_ext = derive_request_ext::Open {
37-
rocksdb_descriptor: rocksdb_desc.clone(),
38-
sqlite_vfs_uri: format!("file://{state_dir}/sqlite.db"),
3932
};
4033

41-
for target_transactions in sessions {
34+
for (sessions_index, target_transactions) in sessions.into_iter().enumerate() {
4235
() = run_session(
4336
&mut co,
44-
&open_ext,
4537
reader.clone(),
4638
&mut request_tx,
4739
&mut response_rx,
40+
&rocksdb_desc,
41+
sessions_index,
4842
&spec,
4943
&mut state,
5044
target_transactions,
5145
timeout,
52-
&version,
5346
)
5447
.await?;
5548
}
@@ -58,7 +51,7 @@ pub fn run_derive<L: LogHandler>(
5851
verify("runtime", "EOF").is_eof(response_rx.try_next().await?)?;
5952

6053
// Re-open RocksDB.
61-
let rocksdb = RocksDB::open(rocksdb_desc).await?;
54+
let rocksdb = RocksDB::open(Some(rocksdb_desc)).await?;
6255

6356
let checkpoint = rocksdb.load_checkpoint().await?;
6457
tracing::debug!(checkpoint = ?::ops::DebugJson(checkpoint), "final runtime checkpoint");
@@ -86,21 +79,27 @@ pub fn run_derive<L: LogHandler>(
8679

8780
async fn run_session(
8881
co: &mut coroutines::Suspend<Response, ()>,
89-
open_ext: &derive_request_ext::Open,
9082
reader: impl Reader,
9183
request_tx: &mut mpsc::Sender<anyhow::Result<Request>>,
9284
response_rx: &mut Pin<&mut impl ResponseStream>,
85+
rocksdb_desc: &runtime::RocksDbDescriptor,
86+
sessions_index: usize,
9387
spec: &flow::CollectionSpec,
9488
state: &mut models::RawValue,
9589
target_transactions: usize,
9690
timeout: std::time::Duration,
97-
version: &str,
9891
) -> anyhow::Result<()> {
92+
let labeling = crate::parse_shard_labeling(
93+
spec.derivation
94+
.as_ref()
95+
.and_then(|d| d.shard_template.as_ref()),
96+
)?;
97+
9998
// Send Open.
10099
let open = Request {
101100
open: Some(request::Open {
102101
collection: Some(spec.clone()),
103-
version: version.to_string(),
102+
version: labeling.build.clone(),
104103
range: Some(flow::RangeSpec {
105104
key_begin: 0,
106105
key_end: u32::MAX,
@@ -112,7 +111,13 @@ async fn run_session(
112111
..Default::default()
113112
}
114113
.with_internal(|internal| {
115-
internal.open = Some(open_ext.clone());
114+
if sessions_index == 0 {
115+
internal.rocksdb_descriptor = Some(rocksdb_desc.clone());
116+
}
117+
internal.open = Some(derive_request_ext::Open {
118+
sqlite_vfs_uri: format!("file://{}/sqlite.db", &rocksdb_desc.rocksdb_path),
119+
});
120+
internal.set_log_level(labeling.log_level());
116121
});
117122
request_tx.try_send(Ok(open)).expect("sender is empty");
118123

crates/runtime/src/harness/materialize.rs

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use anyhow::Context;
55
use futures::{channel::mpsc, TryStreamExt};
66
use proto_flow::flow;
77
use proto_flow::materialize::{request, response, Request, Response};
8-
use proto_flow::runtime::{self, materialize_request_ext};
8+
use proto_flow::runtime;
99
use std::pin::Pin;
1010

1111
pub fn run_materialize<L: LogHandler>(
@@ -20,56 +20,29 @@ pub fn run_materialize<L: LogHandler>(
2020
let spec = spec.clone();
2121
let state_dir = state_dir.to_owned();
2222

23-
// TODO(johnny): extract from spec?
24-
let version = super::unique_version();
25-
2623
coroutines::try_coroutine(move |mut co| async move {
2724
let (mut request_tx, request_rx) = mpsc::channel(crate::CHANNEL_BUFFER);
2825
let response_rx = runtime.serve_materialize(request_rx);
2926
tokio::pin!(response_rx);
3027

31-
// Send Apply.
32-
let apply = Request {
33-
apply: Some(request::Apply {
34-
materialization: Some(spec.clone()),
35-
dry_run: false,
36-
version: version.clone(),
37-
last_materialization: None,
38-
last_version: String::new(),
39-
}),
40-
..Default::default()
41-
};
42-
request_tx.try_send(Ok(apply)).expect("sender is empty");
43-
44-
// Receive Applied.
45-
match response_rx.try_next().await? {
46-
Some(applied) if applied.applied.is_some() => {
47-
() = co.yield_(applied).await;
48-
}
49-
response => return verify("runtime", "Applied").fail(response),
50-
}
51-
5228
let state_dir = state_dir.to_str().context("tempdir is not utf8")?;
53-
let rocksdb_desc = Some(runtime::RocksDbDescriptor {
29+
let rocksdb_desc = runtime::RocksDbDescriptor {
5430
rocksdb_env_memptr: 0,
5531
rocksdb_path: state_dir.to_owned(),
56-
});
57-
let open_ext = materialize_request_ext::Open {
58-
rocksdb_descriptor: rocksdb_desc.clone(),
5932
};
6033

61-
for target_transactions in sessions {
34+
for (sessions_index, target_transactions) in sessions.into_iter().enumerate() {
6235
() = run_session(
6336
&mut co,
64-
&open_ext,
6537
reader.clone(),
6638
&mut request_tx,
6739
&mut response_rx,
40+
&rocksdb_desc,
41+
sessions_index,
6842
&spec,
6943
&mut state,
7044
target_transactions,
7145
timeout,
72-
&version,
7346
)
7447
.await?;
7548
}
@@ -78,7 +51,7 @@ pub fn run_materialize<L: LogHandler>(
7851
verify("runtime", "EOF").is_eof(response_rx.try_next().await?)?;
7952

8053
// Re-open RocksDB.
81-
let rocksdb = RocksDB::open(rocksdb_desc).await?;
54+
let rocksdb = RocksDB::open(Some(rocksdb_desc)).await?;
8255

8356
let checkpoint = rocksdb.load_checkpoint().await?;
8457
tracing::debug!(checkpoint = ?::ops::DebugJson(checkpoint), "final runtime checkpoint");
@@ -106,21 +79,50 @@ pub fn run_materialize<L: LogHandler>(
10679

10780
async fn run_session(
10881
co: &mut coroutines::Suspend<Response, ()>,
109-
open_ext: &materialize_request_ext::Open,
11082
reader: impl Reader,
11183
request_tx: &mut mpsc::Sender<anyhow::Result<Request>>,
11284
response_rx: &mut Pin<&mut impl ResponseStream>,
85+
rocksdb_desc: &runtime::RocksDbDescriptor,
86+
sessions_index: usize,
11387
spec: &flow::MaterializationSpec,
11488
state: &mut models::RawValue,
11589
target_transactions: usize,
11690
timeout: std::time::Duration,
117-
version: &str,
11891
) -> anyhow::Result<()> {
92+
let labeling = crate::parse_shard_labeling(spec.shard_template.as_ref())?;
93+
94+
// Send Apply.
95+
let apply = Request {
96+
apply: Some(request::Apply {
97+
materialization: Some(spec.clone()),
98+
dry_run: false,
99+
version: labeling.build.clone(),
100+
last_materialization: None,
101+
last_version: String::new(),
102+
}),
103+
..Default::default()
104+
}
105+
.with_internal(|internal| {
106+
if sessions_index == 0 {
107+
internal.rocksdb_descriptor = Some(rocksdb_desc.clone());
108+
}
109+
internal.set_log_level(labeling.log_level());
110+
});
111+
request_tx.try_send(Ok(apply)).expect("sender is empty");
112+
113+
// Receive Applied.
114+
match response_rx.try_next().await? {
115+
Some(applied) if applied.applied.is_some() => {
116+
() = co.yield_(applied).await;
117+
}
118+
response => return verify("runtime", "Applied").fail(response),
119+
}
120+
119121
// Send Open.
120122
let open = Request {
121123
open: Some(request::Open {
122124
materialization: Some(spec.clone()),
123-
version: version.to_string(),
125+
version: labeling.build.clone(),
124126
range: Some(flow::RangeSpec {
125127
key_begin: 0,
126128
key_end: u32::MAX,
@@ -131,9 +133,7 @@ async fn run_session(
131133
}),
132134
..Default::default()
133135
}
134-
.with_internal(|internal| {
135-
internal.open = Some(open_ext.clone());
136-
});
136+
.with_internal(|internal| internal.set_log_level(labeling.log_level()));
137137
request_tx.try_send(Ok(open)).expect("sender is empty");
138138

139139
// Receive Opened.

0 commit comments

Comments
 (0)