Skip to content

Commit 2336d2e

Browse files
committed
runtime: support for unsealing and starting local connectors
Support for local connectors largely mirrors image connectors. We unseal and unnest its endpoint configuration, then start the configured program and drive it using its configured protocol. Also relax unsealing (sops decryption) to pass through non-object documents, which can happen if the config wasn't resolved (for example, because it's being generated).
1 parent 34d1aaf commit 2336d2e

File tree

12 files changed

+507
-22
lines changed

12 files changed

+507
-22
lines changed

crates/connector-init/src/codec.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,12 @@ impl Codec {
6060
let Some(bound) = buf.iter().position(|b| *b == b'\n') else { break };
6161
let bound = bound + 1; // Byte index after '\n'.
6262

63-
out.push(
64-
serde_json::from_slice::<M>(&buf[..bound]).context("parsing JSON message")?,
65-
);
63+
out.push(serde_json::from_slice::<M>(&buf[..bound]).with_context(|| {
64+
format!(
65+
"could not parse '{}' into JSON response",
66+
String::from_utf8_lossy(&buf[..bound])
67+
)
68+
})?);
6669
consumed += bound;
6770

6871
buf = &buf[bound..];

crates/locate-bin/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ pub fn locate(binary: &str) -> anyhow::Result<PathBuf> {
55
// Look for binary alongside this program.
66
let this_program = std::env::args().next().unwrap();
77

8-
tracing::debug!(%this_program, "attempting for find '{binary}'");
8+
tracing::debug!(%this_program, "attempting to find '{binary}'");
99
let mut bin = std::path::Path::new(&this_program)
1010
.parent()
1111
.unwrap()
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
use super::extract_endpoint;
2+
use crate::{
3+
local_connector::{Connector, UnsealFuture, Unsealed},
4+
unseal,
5+
};
6+
use futures::{channel::mpsc, FutureExt, Stream};
7+
use proto_flow::{
8+
capture::{Request, Response},
9+
runtime::CaptureRequestExt,
10+
};
11+
12+
fn unseal(mut request: Request) -> Result<UnsealFuture<Request>, Request> {
13+
if !matches!(
14+
request,
15+
Request { spec: Some(_), .. }
16+
| Request {
17+
discover: Some(_),
18+
..
19+
}
20+
| Request {
21+
validate: Some(_),
22+
..
23+
}
24+
| Request { apply: Some(_), .. }
25+
| Request { open: Some(_), .. }
26+
) {
27+
return Err(request); // Not an unseal-able request.
28+
};
29+
30+
Ok(async move {
31+
let (endpoint, config_json) = extract_endpoint(&mut request)?;
32+
33+
let models::CaptureEndpoint::Local(models::LocalConfig {
34+
command,
35+
config: sealed_config,
36+
env,
37+
protobuf,
38+
}) = endpoint else {
39+
anyhow::bail!("task connector type has changed and is no longer an image")
40+
};
41+
42+
*config_json = unseal::decrypt_sops(&sealed_config).await?.to_string();
43+
44+
let log_level = match request.get_internal() {
45+
Ok(CaptureRequestExt {
46+
labels: Some(labels),
47+
..
48+
}) => Some(labels.log_level()),
49+
_ => None,
50+
};
51+
52+
Ok(Unsealed {
53+
command,
54+
env,
55+
log_level,
56+
protobuf,
57+
request,
58+
})
59+
}
60+
.boxed())
61+
}
62+
63+
pub fn connector<L, R>(log_handler: L, request_rx: R) -> mpsc::Receiver<tonic::Result<Response>>
64+
where
65+
L: Fn(&ops::Log) + Clone + Send + Sync + 'static,
66+
R: Stream<Item = tonic::Result<Request>> + Send + Unpin + 'static,
67+
{
68+
let (connector, response_rx) = Connector::new(log_handler, request_rx, unseal);
69+
tokio::spawn(async move { connector.run().await });
70+
response_rx
71+
}

crates/runtime/src/capture/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use std::sync::Arc;
2626
// Forward Checkpoint enriched with stats.
2727

2828
mod image;
29+
mod local;
2930

3031
pub type BoxStream = futures::stream::BoxStream<'static, tonic::Result<Response>>;
3132

@@ -93,7 +94,9 @@ where
9394
"Local connectors are not permitted in this context",
9495
))
9596
}
96-
models::CaptureEndpoint::Local(_) => todo!(),
97+
models::CaptureEndpoint::Local(_) => {
98+
local::connector(self.log_handler, request_rx).boxed()
99+
}
97100
};
98101

99102
Ok(response_rx)

crates/runtime/src/derive/local.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use super::extract_endpoint;
2+
use crate::{
3+
local_connector::{Connector, UnsealFuture, Unsealed},
4+
unseal,
5+
};
6+
use futures::{channel::mpsc, FutureExt, Stream};
7+
use proto_flow::{
8+
derive::{Request, Response},
9+
runtime::DeriveRequestExt,
10+
};
11+
12+
fn unseal(mut request: Request) -> Result<UnsealFuture<Request>, Request> {
13+
if !matches!(
14+
request,
15+
Request { spec: Some(_), .. }
16+
| Request {
17+
validate: Some(_),
18+
..
19+
}
20+
| Request { open: Some(_), .. }
21+
) {
22+
return Err(request); // Not an unseal-able request.
23+
};
24+
25+
Ok(async move {
26+
let (endpoint, config_json) = extract_endpoint(&mut request)?;
27+
28+
let models::DeriveUsing::Local(models::LocalConfig {
29+
command,
30+
config: sealed_config,
31+
env,
32+
protobuf,
33+
}) = endpoint else {
34+
anyhow::bail!("task connector type has changed and is no longer an image")
35+
};
36+
*config_json = unseal::decrypt_sops(&sealed_config).await?.to_string();
37+
38+
let log_level = match request.get_internal() {
39+
Ok(DeriveRequestExt {
40+
labels: Some(labels),
41+
..
42+
}) => Some(labels.log_level()),
43+
_ => None,
44+
};
45+
46+
Ok(Unsealed {
47+
command,
48+
env,
49+
log_level,
50+
protobuf,
51+
request,
52+
})
53+
}
54+
.boxed())
55+
}
56+
57+
pub fn connector<L, R>(log_handler: L, request_rx: R) -> mpsc::Receiver<tonic::Result<Response>>
58+
where
59+
L: Fn(&ops::Log) + Clone + Send + Sync + 'static,
60+
R: Stream<Item = tonic::Result<Request>> + Send + Unpin + 'static,
61+
{
62+
let (connector, response_rx) = Connector::new(log_handler, request_rx, unseal);
63+
tokio::spawn(async move { connector.run().await });
64+
response_rx
65+
}

crates/runtime/src/derive/mod.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ use proto_flow::runtime::DeriveRequestExt;
88
use std::pin::Pin;
99
use std::sync::Arc;
1010

11-
pub mod combine;
12-
pub mod image;
13-
pub mod rocksdb;
11+
mod combine;
12+
mod image;
13+
mod local;
14+
mod rocksdb;
1415

1516
pub type BoxStream = futures::stream::BoxStream<'static, tonic::Result<Response>>;
1617

@@ -108,7 +109,21 @@ where
108109
"Local connectors are not permitted in this context",
109110
))
110111
}
111-
models::DeriveUsing::Local(_) => todo!(),
112+
models::DeriveUsing::Local(_) => {
113+
// Request interceptor for stateful RocksDB storage.
114+
let (request_rx, rocks_back) = rocksdb::adapt_requests(&peek_request, request_rx)
115+
.map_err(crate::anyhow_to_status)?;
116+
117+
// Invoke the underlying local connector.
118+
let response_rx = local::connector(self.log_handler, request_rx);
119+
120+
// Response interceptor for stateful RocksDB storage.
121+
let response_rx = rocks_back.adapt_responses(response_rx);
122+
// Response interceptor for combining over documents.
123+
let response_rx = combine_back.adapt_responses(response_rx);
124+
125+
response_rx.boxed()
126+
}
112127
models::DeriveUsing::Sqlite(_) => {
113128
// Invoke the underlying SQLite connector.
114129
let response_rx = ::derive_sqlite::connector(&peek_request, request_rx)?;
@@ -125,7 +140,7 @@ where
125140
}
126141
}
127142

128-
pub fn adjust_log_level<R>(
143+
fn adjust_log_level<R>(
129144
request_rx: R,
130145
set_log_level: Option<Arc<dyn Fn(ops::log::Level) + Send + Sync>>,
131146
) -> impl Stream<Item = tonic::Result<Request>>

crates/runtime/src/image_connector.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ enum State<Request> {
7878
container_status: JoinHandle<tonic::Result<()>>,
7979
unseal: UnsealFuture<Request>,
8080
},
81-
// Requests reach EOF. We've sent EOF into the container and are
81+
// Requests reached EOF. We've sent EOF into the container and are
8282
// draining its final responses.
8383
Draining {
8484
container_status: JoinHandle<tonic::Result<()>>,
@@ -111,15 +111,15 @@ where
111111
(
112112
Self {
113113
attach_container,
114-
unseal,
114+
log_handler,
115115
network: network.to_string(),
116116
request_rx,
117-
start_rpc,
118117
response_tx,
118+
start_rpc,
119119
state: State::<Request>::Idle,
120120
task_name: task_name.to_string(),
121-
log_handler,
122121
task_type,
122+
unseal,
123123
},
124124
response_rx,
125125
)

crates/runtime/src/lib.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod capture;
44
mod container;
55
mod derive;
66
mod image_connector;
7+
mod local_connector;
78
mod materialize;
89
mod task_service;
910
mod tokio_context;
@@ -17,6 +18,13 @@ pub use tokio_context::TokioContext;
1718
// See go/protocols/flow/document_extensions.go.
1819
pub const UUID_PLACEHOLDER: &str = "DocUUIDPlaceholder-329Bb50aa48EAa9ef";
1920

21+
/// CHANNEL_BUFFER is the standard buffer size used for holding documents in an
22+
/// asynchronous processing pipeline. User documents can be large -- up to 64MB --
23+
/// so this value should be small. At the same time, processing steps such as
24+
/// schema validation are greatly accelerated when they can loop over multiple
25+
/// documents without yielding, so it should not be *too* small.
26+
pub const CHANNEL_BUFFER: usize = 8;
27+
2028
fn anyhow_to_status(err: anyhow::Error) -> tonic::Status {
2129
tonic::Status::internal(format!("{err:?}"))
2230
}
@@ -80,5 +88,3 @@ where
8088
)
8189
}
8290
}
83-
84-
const CHANNEL_BUFFER: usize = 8;

0 commit comments

Comments
 (0)