Skip to content

Commit 5891541

Browse files
committed
thread through --allow-local flag to enable/disable local connectors
We'd like them to be available in local contexts, like `flowctl` and also local stacks, while being disabled in production deployments.
1 parent 1c9abc7 commit 5891541

File tree

18 files changed

+167
-87
lines changed

18 files changed

+167
-87
lines changed

crates/agent/src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ struct Args {
4343
/// Email address of user which provisions and maintains tenant accounts.
4444
#[clap(long = "accounts-email", default_value = "[email protected]")]
4545
accounts_email: String,
46+
/// Allow local connectors. True for local stacks, and false otherwise.
47+
#[clap(long = "allow-local", default_value = "false")]
48+
allow_local: bool,
4649
}
4750

4851
#[tokio::main]
@@ -96,6 +99,7 @@ async fn main() -> Result<(), anyhow::Error> {
9699
vec![
97100
Box::new(agent::PublishHandler::new(
98101
&args.accounts_email,
102+
args.allow_local,
99103
&bindir,
100104
&args.broker_address,
101105
&builds_root,

crates/agent/src/publications.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ impl JobStatus {
6060
/// A PublishHandler is a Handler which publishes catalog specifications.
6161
pub struct PublishHandler {
6262
agent_user_email: String,
63+
allow_local: bool,
6364
bindir: String,
6465
broker_address: url::Url,
6566
builds_root: url::Url,
@@ -72,6 +73,7 @@ pub struct PublishHandler {
7273
impl PublishHandler {
7374
pub fn new(
7475
agent_user_email: impl Into<String>,
76+
allow_local: bool,
7577
bindir: &str,
7678
broker_address: &url::Url,
7779
builds_root: &url::Url,
@@ -82,6 +84,7 @@ impl PublishHandler {
8284
) -> Self {
8385
Self {
8486
agent_user_email: agent_user_email.into(),
87+
allow_local,
8588
bindir: bindir.to_string(),
8689
broker_address: broker_address.clone(),
8790
builds_root: builds_root.clone(),
@@ -321,6 +324,7 @@ impl PublishHandler {
321324
let tmpdir = tmpdir_handle.path();
322325

323326
let build_output = builds::build_catalog(
327+
self.allow_local,
324328
&self.builds_root,
325329
&draft_catalog,
326330
&self.connector_network,

crates/agent/src/publications/builds.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ impl BuildOutput {
117117
}
118118

119119
pub async fn build_catalog(
120+
allow_local: bool,
120121
builds_root: &url::Url,
121122
catalog: &models::Catalog,
122123
connector_network: &str,
@@ -146,6 +147,7 @@ pub async fn build_catalog(
146147
let source = url::Url::parse("file:///flow.json").unwrap();
147148

148149
let managed_build = build::managed_build(
150+
allow_local,
149151
build_id.clone(),
150152
connector_network.to_string(),
151153
Box::new(control_plane),

crates/agent/src/publications/specs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,7 @@ mod test {
754754

755755
let mut handler = PublishHandler::new(
756756
757+
false,
757758
"",
758759
&bs_url,
759760
&bs_url,

crates/build/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ pub async fn load(source: &url::Url, file_root: &Path) -> tables::Sources {
9494
/// * If `generate_ops_collections` is set, then ops collections are added into `sources`.
9595
/// * If any of `noop_*` is true, then validations are skipped for connectors of that type.
9696
pub async fn validate<L>(
97+
allow_local: bool,
9798
build_id: &str,
9899
connector_network: &str,
99100
control_plane: &dyn validation::ControlPlane,
@@ -116,6 +117,7 @@ where
116117
::sources::inline_sources(&mut sources);
117118

118119
let runtime = runtime::Runtime::new(
120+
allow_local,
119121
connector_network.to_string(),
120122
log_handler,
121123
None,
@@ -167,6 +169,7 @@ where
167169
/// components but not the `flowctl` CLI, which requires finer-grain
168170
/// control over build behavior.
169171
pub async fn managed_build<L>(
172+
allow_local: bool,
170173
build_id: String,
171174
connector_network: String,
172175
control_plane: Box<dyn validation::ControlPlane>,
@@ -179,6 +182,7 @@ where
179182
L: Fn(&ops::Log) + Send + Sync + Clone + 'static,
180183
{
181184
let (sources, validations) = validate(
185+
allow_local,
182186
&build_id,
183187
&connector_network,
184188
&*control_plane,

crates/flowctl/src/local_specs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ async fn validate(
7777
let project_root = build::project_root(source);
7878

7979
let (sources, mut validations) = build::validate(
80+
true, // Allow local connectors.
8081
"local-build",
8182
"", // Use default connector network.
8283
&Resolver { client },

crates/flowctl/src/preview/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ impl Preview {
150150
.await?;
151151

152152
let mut responses_rx = runtime::Runtime::new(
153+
true, // Allow local connectors.
153154
String::new(),
154155
ops::tracing_log_handler,
155156
None,

crates/flowctl/src/raw/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ async fn do_build(ctx: &mut crate::CliContext, build: &Build) -> anyhow::Result<
206206
let project_root = build::project_root(&source_url);
207207

208208
let build_result = build::managed_build(
209+
true, // Allow local connectors.
209210
build_id.clone(),
210211
connector_network.clone(),
211212
Box::new(local_specs::Resolver { client }),

crates/proto-flow/src/runtime.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ pub struct TaskServiceConfig {
99
pub uds_path: ::prost::alloc::string::String,
1010
#[prost(string, tag = "4")]
1111
pub container_network: ::prost::alloc::string::String,
12+
#[prost(bool, tag = "5")]
13+
pub allow_local: bool,
1214
}
1315
/// ShuffleRequest is the request message of a Shuffle RPC.
1416
/// It's a description of a document shuffle,

crates/runtime/src/capture/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ where
8888
&self.task_name,
8989
)
9090
.boxed(),
91+
models::CaptureEndpoint::Local(_) if !self.allow_local => {
92+
return Err(tonic::Status::failed_precondition(
93+
"Local connectors are not permitted in this context",
94+
))
95+
}
9196
models::CaptureEndpoint::Local(_) => todo!(),
9297
};
9398

0 commit comments

Comments
 (0)