From 45db053cc003def43c39af6984d69b9d80b256f5 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Fri, 6 Sep 2024 16:44:16 -0400 Subject: [PATCH 1/3] feat(source): support webhook source table --- Cargo.lock | 7 + proto/catalog.proto | 8 + proto/expr.proto | 48 ++-- src/common/secret/src/secret_manager.rs | 24 ++ src/connector/src/source/mod.rs | 2 + src/expr/impl/Cargo.toml | 1 + src/expr/impl/src/scalar/hmac.rs | 78 ++++++ src/expr/impl/src/scalar/mod.rs | 1 + src/frontend/Cargo.toml | 11 + src/frontend/planner_test/src/lib.rs | 2 + .../binder/expr/function/builtin_scalar.rs | 1 + src/frontend/src/binder/expr/function/mod.rs | 2 +- src/frontend/src/binder/expr/mod.rs | 15 +- src/frontend/src/binder/mod.rs | 19 ++ src/frontend/src/catalog/table_catalog.rs | 8 +- src/frontend/src/expr/pure.rs | 1 + src/frontend/src/handler/create_source.rs | 15 +- src/frontend/src/handler/create_table.rs | 77 +++++- src/frontend/src/handler/create_table_as.rs | 1 + src/frontend/src/handler/explain.rs | 2 + src/frontend/src/handler/mod.rs | 2 + src/frontend/src/lib.rs | 6 + src/frontend/src/optimizer/mod.rs | 4 +- .../src/optimizer/plan_expr_visitor/strong.rs | 1 + .../optimizer/plan_node/stream_materialize.rs | 6 + src/frontend/src/optimizer/plan_node/utils.rs | 1 + .../src/scheduler/distributed/query.rs | 1 + src/frontend/src/webhook/mod.rs | 223 ++++++++++++++++++ src/frontend/src/webhook/utils.rs | 77 ++++++ src/meta/model/migration/src/lib.rs | 3 + .../src/m20241001_013810_webhook_source.rs | 35 +++ src/meta/model/src/lib.rs | 1 + src/meta/model/src/table.rs | 4 +- src/meta/src/controller/mod.rs | 1 + src/prost/build.rs | 1 + src/sqlparser/src/ast/ddl.rs | 12 +- src/sqlparser/src/ast/mod.rs | 9 +- src/sqlparser/src/keywords.rs | 1 + src/sqlparser/src/parser.rs | 65 ++++- 39 files changed, 732 insertions(+), 44 deletions(-) create mode 100644 src/expr/impl/src/scalar/hmac.rs create mode 100644 src/frontend/src/webhook/mod.rs create mode 100644 src/frontend/src/webhook/utils.rs create mode 100644 src/meta/model/migration/src/m20241001_013810_webhook_source.rs diff --git a/Cargo.lock b/Cargo.lock index 2e84e822497b..60ba38463f40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11291,6 +11291,7 @@ dependencies = [ "futures-util", "ginepro", "hex", + "hmac", "icelake", "itertools 0.13.0", "jsonbb", @@ -11343,6 +11344,7 @@ dependencies = [ "async-trait", "auto_enums", "auto_impl", + "axum", "base64 0.22.0", "bk-tree", "bytes", @@ -11359,6 +11361,8 @@ dependencies = [ "fixedbitset 0.5.0", "futures", "futures-async-stream", + "hex", + "hmac", "iana-time-zone", "iceberg", "icelake", @@ -11403,6 +11407,7 @@ dependencies = [ "rw_futures_util", "serde", "serde_json", + "sha1", "sha2", "smallvec", "speedate", @@ -11411,6 +11416,8 @@ dependencies = [ "thiserror-ext", "tokio-postgres", "tokio-stream 0.1.15", + "tower 0.4.13", + "tower-http", "tracing", "uuid", "workspace-hack", diff --git a/proto/catalog.proto b/proto/catalog.proto index 5383104e9c0f..2286b20ccf64 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -94,6 +94,12 @@ message StreamSourceInfo { map format_encode_secret_refs = 16; } +message WebhookSourceInfo { + secret.SecretRef secret_ref = 1; + string header_key = 2; + expr.ExprNode signature_expr = 3; +} + message Source { // For shared source, this is the same as the job id. // For non-shared source and table with connector, this is a different oid. @@ -433,6 +439,8 @@ message Table { // for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`. optional uint32 maybe_vnode_count = 40; + optional WebhookSourceInfo webhook_info = 41; + // Per-table catalog version, used by schema change. `None` for internal // tables and tests. Not to be confused with the global catalog version for // notification service. diff --git a/proto/expr.proto b/proto/expr.proto index 533084351284..3eaba5f6e55e 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -12,8 +12,9 @@ message ExprNode { message NowRexNode {} // TODO: move this into `FunctionCall`. enum Type { - // `InputRef`, `Constant`, and `UserDefinedFunction` are indicated by the viriant of `rex_node`. - // Their types are therefore deprecated and should be `UNSPECIFIED` instead. + // `InputRef`, `Constant`, and `UserDefinedFunction` are indicated by the + // viriant of `rex_node`. Their types are therefore deprecated and should be + // `UNSPECIFIED` instead. reserved 1, 2, 3000; reserved "INPUT_REF", "CONSTANT_VALUE", "UDF"; @@ -127,7 +128,7 @@ message ExprNode { LPAD = 238; RPAD = 239; REVERSE = 240; - STRPOS = 241 [deprecated = true]; // duplicated with POSITION + STRPOS = 241 [ deprecated = true ]; // duplicated with POSITION TO_ASCII = 242; TO_HEX = 243; QUOTE_IDENT = 244; @@ -197,6 +198,7 @@ message ExprNode { INET_NTOA = 329; QUOTE_LITERAL = 330; QUOTE_NULLABLE = 331; + HMAC = 332; // Unary operators NEG = 401; @@ -329,7 +331,8 @@ message ExprNode { // EXTERNAL ICEBERG_TRANSFORM = 2201; } - // Only use this field for function call. For other types of expression, it should be UNSPECIFIED. + // Only use this field for function call. For other types of expression, it + // should be UNSPECIFIED. Type function_type = 1; data.DataType return_type = 3; oneof rex_node { @@ -390,8 +393,8 @@ message Constant { // The items which can occur in the select list of `ProjectSet` operator. // -// When there are table functions in the SQL query `SELECT ...`, it will be planned as `ProjectSet`. -// Otherwise it will be planned as `Project`. +// When there are table functions in the SQL query `SELECT ...`, it will be +// planned as `ProjectSet`. Otherwise it will be planned as `Project`. // // # Examples // @@ -421,9 +424,7 @@ message ProjectSetSelectItem { } } -message FunctionCall { - repeated ExprNode children = 1; -} +message FunctionCall { repeated ExprNode children = 1; } // Aggregate Function Calls for Aggregation message AggCall { @@ -460,7 +461,8 @@ message AggCall { // user defined aggregate function USER_DEFINED = 100; - // wraps a scalar function that takes a list as input as an aggregate function. + // wraps a scalar function that takes a list as input as an aggregate + // function. WRAP_SCALAR = 101; } Kind kind = 1; @@ -494,7 +496,8 @@ message WindowFrame { enum Type { TYPE_UNSPECIFIED = 0; - TYPE_ROWS_LEGACY = 2 [deprecated = true]; // Deprecated since we introduced `RANGE` frame. + TYPE_ROWS_LEGACY = 2 + [ deprecated = true ]; // Deprecated since we introduced `RANGE` frame. TYPE_ROWS = 5; TYPE_RANGE = 10; @@ -554,8 +557,10 @@ message WindowFrame { Type type = 1; - Bound start = 2 [deprecated = true]; // Deprecated since we introduced `RANGE` frame. - Bound end = 3 [deprecated = true]; // Deprecated since we introduced `RANGE` frame. + Bound start = 2 + [ deprecated = true ]; // Deprecated since we introduced `RANGE` frame. + Bound end = 3 + [ deprecated = true ]; // Deprecated since we introduced `RANGE` frame. Exclusion exclusion = 4; @@ -589,8 +594,9 @@ message WindowFunction { WindowFrame frame = 5; } -// Note: due to historic reasons, UserDefinedFunction is a oneof variant parallel to FunctionCall, -// while UserDefinedFunctionMetadata is embedded as a field in TableFunction and AggCall. +// Note: due to historic reasons, UserDefinedFunction is a oneof variant +// parallel to FunctionCall, while UserDefinedFunctionMetadata is embedded as a +// field in TableFunction and AggCall. message UserDefinedFunction { repeated ExprNode children = 1; @@ -601,8 +607,10 @@ message UserDefinedFunction { // The link to the external function service. optional string link = 5; // An unique identifier to the function. - // - If `link` is not empty, the name of the function in the external function service. - // - If `language` is `rust` or `wasm`, the name of the function in the wasm binary file. + // - If `link` is not empty, the name of the function in the external function + // service. + // - If `language` is `rust` or `wasm`, the name of the function in the wasm + // binary file. // - If `language` is `javascript`, the name of the function. optional string identifier = 6; // - If `language` is `javascript`, the source code of the function. @@ -610,9 +618,11 @@ message UserDefinedFunction { // - If `language` is `rust` or `wasm`, the zstd-compressed wasm binary. optional bytes compressed_binary = 10; bool always_retry_on_network_error = 9; - // The runtime used when javascript is used as the language. Could be "quickjs" or "deno". + // The runtime used when javascript is used as the language. Could be + // "quickjs" or "deno". optional string runtime = 11; - // The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator" + // The function type, which is used to execute the function. Could be "sync", + // "async", "generator" or "async_generator" optional string function_type = 12; } diff --git a/src/common/secret/src/secret_manager.rs b/src/common/secret/src/secret_manager.rs index b6a71a4c3ebe..d8215b34454d 100644 --- a/src/common/secret/src/secret_manager.rs +++ b/src/common/secret/src/secret_manager.rs @@ -142,6 +142,30 @@ impl LocalSecretManager { Ok(options) } + pub fn fill_secret(&self, secret_ref: PbSecretRef) -> SecretResult { + let secret_guard = self.secrets.read(); + let secret_id = secret_ref.secret_id; + let pb_secret_bytes = secret_guard + .get(&secret_id) + .ok_or(SecretError::ItemNotFound(secret_id))?; + let secret_value_bytes = Self::get_secret_value(pb_secret_bytes)?; + match secret_ref.ref_as() { + RefAsType::Text => { + // We converted the secret string from sql to bytes using `as_bytes` in frontend. + // So use `from_utf8` here to convert it back to string. + return Ok(String::from_utf8(secret_value_bytes.clone())?); + } + RefAsType::File => { + let path_str = + self.get_or_init_secret_file(secret_id, secret_value_bytes.clone())?; + return Ok(path_str); + } + RefAsType::Unspecified => { + return Err(SecretError::UnspecifiedRefType(secret_id)); + } + } + } + /// Get the secret file for the given secret id and return the path string. If the file does not exist, create it. /// WARNING: This method should be called only when the secret manager is locked. fn get_or_init_secret_file( diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 899fc2a2379f..d9f1d039e7ed 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -55,6 +55,8 @@ pub use crate::source::filesystem::S3_CONNECTOR; pub use crate::source::nexmark::NEXMARK_CONNECTOR; pub use crate::source::pulsar::PULSAR_CONNECTOR; +pub const WEBHOOK_CONNECTOR: &str = "webhook"; + pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool { const PREFIXES: &[&str] = &[ "schema.registry", diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index 257cf19b77e7..cfd0ed70155a 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -43,6 +43,7 @@ futures-async-stream = { workspace = true } futures-util = "0.3" ginepro = "0.8" hex = "0.4" +hmac = "0.12" icelake = { workspace = true } itertools = { workspace = true } jsonbb = { workspace = true } diff --git a/src/expr/impl/src/scalar/hmac.rs b/src/expr/impl/src/scalar/hmac.rs new file mode 100644 index 000000000000..c8bc0f8dc80f --- /dev/null +++ b/src/expr/impl/src/scalar/hmac.rs @@ -0,0 +1,78 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance witmuth the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use hex::encode; +use hmac::{Hmac, Mac}; +use risingwave_expr::function; +use sha1::Sha1; +use sha2::Sha256; + +#[function("hmac(varchar, bytea, varchar) -> bytea")] +pub fn hmac<'a>(secret: &str, payload: &[u8], sha_type: &str) -> Box<[u8]> { + if sha_type == "sha1" { + sha1_hmac(secret, payload) + } else if sha_type == "sha256" { + sha256_hmac(secret, payload) + } else { + panic!("Unsupported SHA type: {}", sha_type) + } +} + +fn sha256_hmac(secret: &str, payload: &[u8]) -> Box<[u8]> { + let mut mac = + Hmac::::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size"); + + mac.update(payload); + + let result = mac.finalize(); + let code_bytes = result.into_bytes(); + let computed_signature = format!("sha256={}", encode(code_bytes)); + computed_signature.as_bytes().into() +} + +fn sha1_hmac(secret: &str, payload: &[u8]) -> Box<[u8]> { + let mut mac = + Hmac::::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size"); + + mac.update(payload); + + let result = mac.finalize(); + let code_bytes = result.into_bytes(); + let computed_signature = format!("sha1={}", encode(code_bytes)); + computed_signature.as_bytes().into() +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[tokio::test] + async fn test_verify_signature_hmac_sha256() -> anyhow::Result<()> { + let secret = "your_secret_key"; + let payload = b"your_webhook_payload"; + let signature = b"sha256=cef8b98a91902c492b85d97f049aa4bfc5e7e3f9b8b7bf7cb49c5f829d2dac85"; // 替换为 + assert!(*sha256_hmac(secret, payload) == *signature); + Ok(()) + } + + #[tokio::test] + async fn test_verify_signature_hmac_sha1() -> anyhow::Result<()> { + let secret = "your_secret_key"; + let payload = b"your_webhook_payload"; + let signature = b"sha1=65cb920a4b8c6ab8e2eab861a096a7bc2c05d8ba"; // 替换为 + assert!(*sha1_hmac(secret, payload) == *signature); + Ok(()) + } +} diff --git a/src/expr/impl/src/scalar/mod.rs b/src/expr/impl/src/scalar/mod.rs index fbf9b512ea86..ea5ce353fac4 100644 --- a/src/expr/impl/src/scalar/mod.rs +++ b/src/expr/impl/src/scalar/mod.rs @@ -47,6 +47,7 @@ mod extract; mod field; mod format; mod format_type; +mod hmac; mod in_; mod int256; mod jsonb_access; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 502eaeba2f68..072b590d1f9a 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -21,6 +21,7 @@ async-recursion = "1.1.0" async-trait = "0.1" auto_enums = { workspace = true } auto_impl = "1" +axum = { workspace = true } base64 = "0.22" bk-tree = "0.5.0" bytes = "1" @@ -36,6 +37,8 @@ fancy-regex = "0.14.0" fixedbitset = "0.5" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } +hex = "0.4" +hmac = "0.12" iana-time-zone = "0.1" iceberg = { workspace = true } icelake = { workspace = true } @@ -79,6 +82,7 @@ risingwave_variables = { workspace = true } rw_futures_util = { workspace = true } serde = { version = "1", features = ["derive"] } serde_json = "1" +sha1 = "0.10.6" sha2 = "0.10.7" smallvec = { version = "1.13.1", features = ["serde"] } speedate = "0.15.0" @@ -97,10 +101,17 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ tokio-postgres = "0.7" tokio-stream = { workspace = true } tonic = { workspace = true } +tower = { version = "0.4", features = ["util", "load-shed"] } +tower-http = { version = "0.6", features = [ + "add-extension", + "cors", + "compression-gzip", +] } tracing = "0.1" uuid = "1" zstd = { version = "0.13", default-features = false } + [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 528fa88ef350..d854ae662232 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -435,6 +435,7 @@ impl TestCase { cdc_table_info, include_column_options, wildcard_idx, + webhook_info, .. } => { let format_encode = format_encode.map(|schema| schema.into_v2_with_warning()); @@ -453,6 +454,7 @@ impl TestCase { with_version_column, cdc_table_info, include_column_options, + webhook_info, ) .await?; } diff --git a/src/frontend/src/binder/expr/function/builtin_scalar.rs b/src/frontend/src/binder/expr/function/builtin_scalar.rs index 68b37a3fee4e..200abf27bed7 100644 --- a/src/frontend/src/binder/expr/function/builtin_scalar.rs +++ b/src/frontend/src/binder/expr/function/builtin_scalar.rs @@ -294,6 +294,7 @@ impl Binder { ("sha512", raw_call(ExprType::Sha512)), ("encrypt", raw_call(ExprType::Encrypt)), ("decrypt", raw_call(ExprType::Decrypt)), + ("hmac", raw_call(ExprType::Hmac)), ("left", raw_call(ExprType::Left)), ("right", raw_call(ExprType::Right)), ("inet_aton", raw_call(ExprType::InetAton)), diff --git a/src/frontend/src/binder/expr/function/mod.rs b/src/frontend/src/binder/expr/function/mod.rs index f7a4007ffd46..dfe2b5d2c99e 100644 --- a/src/frontend/src/binder/expr/function/mod.rs +++ b/src/frontend/src/binder/expr/function/mod.rs @@ -146,7 +146,7 @@ impl Binder { ); return self.bind_array_transform(arg_list.args); } - + let mut args: Vec<_> = arg_list .args .iter() diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 85ed93c7dc0c..2a0641c8f1fe 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -90,7 +90,20 @@ impl Binder { ) } } else { - self.bind_column(&[ident]) + if let Some(ctx) = self.secure_compare_context.as_ref() { + if ident.real_value() == ctx.secret_name { + Ok(InputRef::new(0, DataType::Varchar).into()) + } else if ident.real_value() == ctx.column_name { + Ok(InputRef::new(1, DataType::Bytea).into()) + } else { + Err( + ErrorCode::ItemNotFound(format!("Unknown arg: {}", ident.real_value())) + .into(), + ) + } + } else { + self.bind_column(&[ident]) + } } } Expr::CompoundIdentifier(idents) => self.bind_column(&idents), diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index adb7a1b9d0f2..8ebb464b887a 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -127,6 +127,15 @@ pub struct Binder { /// The temporary sources that will be used during binding phase temporary_source_manager: TemporarySourceManager, + + /// information about secure_compare + secure_compare_context: Option, +} + +#[derive(Default, Clone, Debug)] +pub struct SecureCompareContext { + pub column_name: String, + pub secret_name: String, } #[derive(Clone, Debug, Default)] @@ -326,6 +335,7 @@ impl Binder { param_types: ParameterTypes::new(param_types), udf_context: UdfContext::new(), temporary_source_manager: session.temporary_source_manager(), + secure_compare_context: None, } } @@ -348,6 +358,15 @@ impl Binder { Self::new_inner(session, BindFor::Ddl, vec![]) } + pub fn new_for_ddl_with_secure_compare( + session: &SessionImpl, + ctx: SecureCompareContext, + ) -> Binder { + let mut binder = Self::new_inner(session, BindFor::Ddl, vec![]); + binder.secure_compare_context = Some(ctx); + binder + } + pub fn new_for_system(session: &SessionImpl) -> Binder { Self::new_inner(session, BindFor::System, vec![]) } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index b533e6d95668..16833571878a 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -25,7 +25,7 @@ use risingwave_common::hash::{VnodeCount, VnodeCountCompat}; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion}; -use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable}; +use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable, PbWebhookSourceInfo}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::DefaultColumnDesc; @@ -182,6 +182,8 @@ pub struct TableCatalog { /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build pub vnode_count: VnodeCount, + + pub webhook_info: Option, } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] @@ -452,6 +454,7 @@ impl TableCatalog { retention_seconds: self.retention_seconds, cdc_table_id: self.cdc_table_id.clone(), maybe_vnode_count: self.vnode_count.to_protobuf(), + webhook_info: self.webhook_info.clone(), } } @@ -635,6 +638,7 @@ impl From for TableCatalog { .collect_vec(), cdc_table_id: tb.cdc_table_id, vnode_count, + webhook_info: tb.webhook_info, } } } @@ -726,6 +730,7 @@ mod tests { version_column_index: None, cdc_table_id: None, maybe_vnode_count: VnodeCount::set(233).to_protobuf(), + webhook_info: None, } .into(); @@ -790,6 +795,7 @@ mod tests { version_column_index: None, cdc_table_id: None, vnode_count: VnodeCount::set(233), + webhook_info: None, } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index 5e3bd968a46b..ac05af99138c 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -232,6 +232,7 @@ impl ExprVisitor for ImpureAnalyzer { | Type::Sha256 | Type::Sha384 | Type::Sha512 + | Type::Hmac | Type::Decrypt | Type::Encrypt | Type::Tand diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 9c37799422a5..c5f5017beb1e 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -21,7 +21,6 @@ use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; -use rand::Rng; use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ @@ -56,7 +55,7 @@ pub use risingwave_connector::source::UPSTREAM_SOURCE_KEY; use risingwave_connector::source::{ ConnectorProperties, AZBLOB_CONNECTOR, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, - POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, + POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, WEBHOOK_CONNECTOR, }; use risingwave_connector::WithPropertiesExt; use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc}; @@ -1137,6 +1136,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json], ), + WEBHOOK_CONNECTOR => hashmap!( + Format::Native => vec![Encode::Native], + ), )) }); @@ -1466,14 +1468,6 @@ pub fn bind_connector_props( .to_string(), ); } - if with_properties.is_mysql_cdc_connector() { - // Generate a random server id for mysql cdc source if needed - // `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication - // group (that is, different from any other server id being used by any master or slave) - with_properties - .entry("server.id".to_string()) - .or_insert(rand::thread_rng().gen_range(1..u32::MAX).to_string()); - } Ok(with_properties) } @@ -1622,6 +1616,7 @@ pub async fn bind_create_source_or_table_with_connector( } else { Some(TableId::placeholder()) }; + let source = SourceCatalog { id: TableId::placeholder().table_id, name: source_name, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 1e5dc489c1a0..7ef62c98b342 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -36,22 +36,25 @@ use risingwave_connector::source::cdc::external::{ }; use risingwave_connector::{source, WithOptionsSecResolved}; use risingwave_pb::catalog::source::OptionalAssociatedTableId; -use risingwave_pb::catalog::{PbSource, PbTable, Table, WatermarkDesc}; +use risingwave_pb::catalog::{PbSource, PbTable, PbWebhookSourceInfo, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::{ AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc, }; +use risingwave_pb::secret::secret_ref::PbRefAsType; +use risingwave_pb::secret::PbSecretRef; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_sqlparser::ast::{ - CdcTableInfo, ColumnDef, ColumnOption, DataType as AstDataType, ExplainOptions, Format, - FormatEncodeOptions, ObjectName, OnConflict, SourceWatermark, TableConstraint, + CdcTableInfo, ColumnDef, ColumnOption, DataType, DataType as AstDataType, ExplainOptions, + Format, FormatEncodeOptions, ObjectName, OnConflict, SecretRefAsType, SourceWatermark, + TableConstraint, WebhookSourceInfo, }; use risingwave_sqlparser::parser::IncludeOption; use thiserror_ext::AsReport; use super::RwPgResponse; -use crate::binder::{bind_data_type, bind_struct_field, Clause}; +use crate::binder::{bind_data_type, bind_struct_field, Clause, SecureCompareContext}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::table_catalog::TableVersion; @@ -544,6 +547,7 @@ pub(crate) fn gen_create_table_plan( append_only: bool, on_conflict: Option, with_version_column: Option, + webhook_info: Option, ) -> Result<(PlanRef, PbTable)> { let definition = context.normalized_sql().to_owned(); let mut columns = bind_sql_columns(&column_defs)?; @@ -568,6 +572,7 @@ pub(crate) fn gen_create_table_plan( on_conflict, with_version_column, Some(col_id_gen.into_version()), + webhook_info, ) } @@ -584,6 +589,7 @@ pub(crate) fn gen_create_table_plan_without_source( on_conflict: Option, with_version_column: Option, version: Option, + webhook_info: Option, ) -> Result<(PlanRef, PbTable)> { ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; @@ -626,6 +632,7 @@ pub(crate) fn gen_create_table_plan_without_source( None, database_id, schema_id, + webhook_info, ) } @@ -656,6 +663,7 @@ fn gen_table_plan_with_source( Some(cloned_source_catalog), database_id, schema_id, + None, ) } @@ -676,6 +684,7 @@ fn gen_table_plan_inner( source_catalog: Option, database_id: DatabaseId, schema_id: SchemaId, + webhook_info: Option, ) -> Result<(PlanRef, PbTable)> { let session = context.session_ctx().clone(); let retention_seconds = context.with_options().retention_seconds(); @@ -746,6 +755,7 @@ fn gen_table_plan_inner( is_external_source, retention_seconds, None, + webhook_info, )?; let mut table = materialize.table().to_prost(schema_id, database_id); @@ -871,6 +881,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( true, None, Some(cdc_table_id), + None, )?; let mut table = materialize.table().to_prost(schema_id, database_id); @@ -952,6 +963,7 @@ pub(super) async fn handle_create_table_plan( on_conflict: Option, with_version_column: Option, include_column_options: IncludeOption, + webhook_info: Option, ) -> Result<(PlanRef, Option, PbTable, TableJobType)> { let col_id_gen = ColumnIdGenerator::new_initial(); let format_encode = check_create_table_with_source( @@ -983,6 +995,10 @@ pub(super) async fn handle_create_table_plan( TableJobType::General, ), (None, None) => { + let webhook_info = webhook_info + .map(|info| bind_webhook_info(&handler_args.session, &column_defs, info)) + .transpose()?; + let context = OptimizerContext::new(handler_args, explain_options); let (plan, table) = gen_create_table_plan( context, @@ -994,6 +1010,7 @@ pub(super) async fn handle_create_table_plan( append_only, on_conflict, with_version_column, + webhook_info, )?; ((plan, None, table), TableJobType::General) @@ -1075,6 +1092,7 @@ pub(super) async fn handle_create_table_plan( ) .into()), }; + Ok((plan, source, table, job_type)) } @@ -1243,6 +1261,7 @@ pub async fn handle_create_table( with_version_column: Option, cdc_table_info: Option, include_column_options: IncludeOption, + webhook_info: Option, ) -> Result { let session = handler_args.session.clone(); @@ -1275,6 +1294,7 @@ pub async fn handle_create_table( on_conflict, with_version_column, include_column_options, + webhook_info, ) .await?; @@ -1374,6 +1394,7 @@ pub async fn generate_stream_graph_for_replace_table( append_only, on_conflict, with_version_column, + original_catalog.webhook_info.clone(), )?; ((plan, None, table), TableJobType::General) } @@ -1486,6 +1507,54 @@ fn get_source_and_resolved_table_name( Ok((source, resolved_table_name, database_id, schema_id)) } +fn bind_webhook_info( + session: &Arc, + columns_defs: &[ColumnDef], + webhook_info: WebhookSourceInfo, +) -> Result { + if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &DataType::Jsonb { + return Err(ErrorCode::InvalidInputSyntax( + "Table with webhook source should have exactly one JSONB column".to_owned(), + ) + .into()); + } + + let WebhookSourceInfo { + secret_ref, + header_key, + signature_expr, + } = webhook_info; + + let db_name = session.database(); + let (schema_name, secret_name) = + Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?; + let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?; + let pb_secret_ref = PbSecretRef { + secret_id: secret_catalog.id.secret_id(), + ref_as: match secret_ref.ref_as { + SecretRefAsType::Text => PbRefAsType::Text, + SecretRefAsType::File => PbRefAsType::File, + } + .into(), + }; + + // TODO(kexiang): use real column name + let secure_compare_context = SecureCompareContext { + column_name: columns_defs[0].name.real_value(), + secret_name: secret_name.clone(), + }; + let mut binder = Binder::new_for_ddl_with_secure_compare(session, secure_compare_context); + let expr = binder.bind_expr(signature_expr.clone())?; + + let pb_webhook_info = PbWebhookSourceInfo { + secret_ref: Some(pb_secret_ref), + header_key, + signature_expr: Some(expr.to_expr_proto()), + }; + + Ok(pb_webhook_info) +} + #[cfg(test)] mod tests { use risingwave_common::catalog::{Field, DEFAULT_DATABASE_NAME, ROWID_PREFIX}; diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 27c527969f9b..5bffd99a0874 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -108,6 +108,7 @@ pub async fn handle_create_as( on_conflict, with_version_column, Some(col_id_gen.into_version()), + None, )?; let graph = build_graph(plan)?; diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 1740c161c3fb..e580d71e8e7f 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -65,6 +65,7 @@ async fn do_handle_explain( cdc_table_info, include_column_options, wildcard_idx, + webhook_info, .. } => { let format_encode = format_encode.map(|s| s.into_v2_with_warning()); @@ -83,6 +84,7 @@ async fn do_handle_explain( on_conflict, with_version_column, include_column_options, + webhook_info, ) .await?; let context = plan.ctx(); diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 9cf94a37c65b..93061dc6e3e9 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -344,6 +344,7 @@ pub async fn handle( with_version_column, cdc_table_info, include_column_options, + webhook_info, } => { if or_replace { bail_not_implemented!("CREATE OR REPLACE TABLE"); @@ -379,6 +380,7 @@ pub async fn handle( with_version_column, cdc_table_info, include_column_options, + webhook_info, ) .await } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 5c006e191157..26b84258f445 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -69,6 +69,7 @@ pub(crate) mod error; mod meta_client; pub mod test_utils; mod user; +pub mod webhook; pub mod health_service; mod monitor; @@ -178,6 +179,7 @@ impl Default for FrontendOpts { } use std::future::Future; +use std::net::SocketAddr; use std::pin::Pin; use pgwire::pg_protocol::TlsConfig; @@ -208,6 +210,10 @@ pub fn start( .collect::>(), ); + let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); + let webhook_service = crate::webhook::WebhookService { webhook_addr: addr }; + let _task = tokio::spawn(webhook_service.serve()); + pg_serve( &listen_addr, tcp_keepalive, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index e08f6b2c4dd4..3f09d7c58dac 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -57,7 +57,7 @@ use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_connector::sink::catalog::SinkFormatDesc; -use risingwave_pb::catalog::WatermarkDesc; +use risingwave_pb::catalog::{WatermarkDesc, PbWebhookSourceInfo}; use risingwave_pb::stream_plan::StreamScanType; use self::heuristic_optimizer::ApplyOrder; @@ -639,6 +639,7 @@ impl PlanRoot { with_external_source: bool, retention_seconds: Option, cdc_table_id: Option, + webhook_info: Option, ) -> Result { assert_eq!(self.phase, PlanPhase::Logical); assert_eq!(self.plan.convention(), Convention::Logical); @@ -873,6 +874,7 @@ impl PlanRoot { version, retention_seconds, cdc_table_id, + webhook_info, ) } diff --git a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs index 890152f00e33..0f1f75b03b63 100644 --- a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs +++ b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs @@ -221,6 +221,7 @@ impl Strong { | ExprType::Sha256 | ExprType::Sha384 | ExprType::Sha512 + | ExprType::Hmac | ExprType::Left | ExprType::Right | ExprType::Format diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 342bfbedd182..c8f673f11b52 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -24,6 +24,7 @@ use risingwave_common::catalog::{ use risingwave_common::hash::VnodeCount; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; +use risingwave_pb::catalog::PbWebhookSourceInfo; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::derive::derive_columns; @@ -110,6 +111,7 @@ impl StreamMaterialize { cardinality, retention_seconds, create_type, + None, )?; Ok(Self::new(input, table)) @@ -135,6 +137,7 @@ impl StreamMaterialize { version: Option, retention_seconds: Option, cdc_table_id: Option, + webhook_info: Option, ) -> Result { let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?; @@ -153,6 +156,7 @@ impl StreamMaterialize { Cardinality::unknown(), // unknown cardinality for tables retention_seconds, CreateType::Foreground, + webhook_info, )?; table.cdc_table_id = cdc_table_id; @@ -227,6 +231,7 @@ impl StreamMaterialize { cardinality: Cardinality, retention_seconds: Option, create_type: CreateType, + webhook_info: Option, ) -> Result { let input = rewritten_input; @@ -285,6 +290,7 @@ impl StreamMaterialize { retention_seconds: retention_seconds.map(|i| i.into()), cdc_table_id: None, vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later + webhook_info, }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 2433a659bad0..327380088f29 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -196,6 +196,7 @@ impl TableCatalogBuilder { retention_seconds: None, cdc_table_id: None, vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later + webhook_info: None, } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 2d40328bab40..4d5d7188e4fb 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -590,6 +590,7 @@ pub(crate) mod tests { created_at_cluster_version: None, cdc_table_id: None, vnode_count: VnodeCount::set(vnode_count), + webhook_info: None, }; let batch_plan_node: PlanRef = LogicalScan::create( "".to_string(), diff --git a/src/frontend/src/webhook/mod.rs b/src/frontend/src/webhook/mod.rs new file mode 100644 index 000000000000..0d77b6ef45a3 --- /dev/null +++ b/src/frontend/src/webhook/mod.rs @@ -0,0 +1,223 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; + +use anyhow::{anyhow, Context}; +use axum::body::Bytes; +use axum::extract::{Extension, Path}; +use axum::http::{HeaderMap, Method, StatusCode}; +use axum::routing::post; +use axum::Router; +use pgwire::net::Address; +use pgwire::pg_server::SessionManager; +use risingwave_common::secret::LocalSecretManager; +use risingwave_sqlparser::ast::{Expr, ObjectName}; +use tokio::net::TcpListener; +use tower::ServiceBuilder; +use tower_http::add_extension::AddExtensionLayer; +use tower_http::compression::CompressionLayer; +use tower_http::cors::{self, CorsLayer}; + +use crate::handler::handle; +use crate::webhook::utils::{err, Result}; +mod utils; + +#[derive(Clone)] +pub struct WebhookService { + pub webhook_addr: SocketAddr, + // pub compute_clients: ComputeClientPool, +} +pub type Service = Arc; + +pub(super) mod handlers { + + use std::net::Ipv4Addr; + + use risingwave_pb::catalog::WebhookSourceInfo; + use risingwave_sqlparser::ast::{Query, SetExpr, Statement, Value, Values}; + use utils::verify_signature; + + use super::*; + use crate::catalog::root_catalog::SchemaPath; + use crate::session::SESSION_MANAGER; + + pub async fn handle_post_request( + Extension(_srv): Extension, + headers: HeaderMap, + Path((user, database, schema, table)): Path<(String, String, String, String)>, + body: Bytes, + ) -> Result<()> { + let session_mgr = SESSION_MANAGER + .get() + .expect("session manager has been initialized"); + + let dummy_addr = Address::Tcp(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + 5691, // port of meta + )); + + // TODO(kexiang): optimize this + // get a session object for the corresponding user and database + let session = session_mgr + .connect(database.as_str(), user.as_str(), Arc::new(dummy_addr)) + .map_err(|e| { + err( + anyhow!(e).context(format!( + "failed to create session for database: {}, user: {}", + database, user + )), + StatusCode::UNAUTHORIZED, + ) + })?; + + let WebhookSourceInfo { + secret_ref, + header_key, + signature_expr, + } = { + let search_path = session.config().search_path(); + let user_name = user.as_str(); + let schema_path = SchemaPath::new(Some(schema.as_str()), &search_path, user_name); + + let reader = session.env().catalog_reader().read_guard(); + let (table, _schema) = reader + .get_any_table_by_name(database.as_str(), schema_path, &table) + .map_err(|e| err(e, StatusCode::NOT_FOUND))?; + + table + .webhook_info + .as_ref() + .ok_or_else(|| { + err( + anyhow!("Table {:?} is not connected to wehbook source", table), + StatusCode::METHOD_NOT_ALLOWED, + ) + })? + .clone() + }; + + let secret_string = LocalSecretManager::global() + .fill_secret(secret_ref.unwrap()) + .map_err(|e| err(e, StatusCode::NOT_FOUND))?; + + let signature = headers + .get(header_key) + .ok_or_else(|| { + err( + anyhow!("Signature not found in the header"), + StatusCode::BAD_REQUEST, + ) + })? + .as_bytes(); + + let is_valid = verify_signature( + secret_string.as_str(), + body.as_ref(), + signature_expr.unwrap(), + signature, + ) + .await?; + if !is_valid { + return Err(err( + anyhow!("Signature verification failed"), + StatusCode::UNAUTHORIZED, + )); + } + + let payload = String::from_utf8(body.to_vec()).map_err(|e| { + err( + anyhow!(e).context("Failed to parse body"), + StatusCode::BAD_REQUEST, + ) + })?; + + let insert_stmt = Statement::Insert { + table_name: ObjectName::from(vec![table.as_str().into()]), + columns: vec![], + source: Box::new(Query { + with: None, + body: SetExpr::Values(Values(vec![vec![Expr::Value(Value::SingleQuotedString( + payload, + ))]])), + order_by: vec![], + limit: None, + offset: None, + fetch: None, + }), + returning: vec![], + }; + + let _rsp = handle(session, insert_stmt, Arc::from(""), vec![]) + .await + .map_err(|e| anyhow!("failed to insert: {:?}", e))?; + + Ok(()) + } +} + +impl WebhookService { + pub async fn serve(self) -> anyhow::Result<()> { + use handlers::*; + let srv = Arc::new(self); + + // tracing_subscriber::registry() + // .with(tracing_subscriber::fmt::layer()) + // .init(); + + let cors_layer = CorsLayer::new() + .allow_origin(cors::Any) + .allow_methods(vec![Method::POST]); + + let api_router = Router::new() + .route("/:user/:database/:schema/:table", post(handle_post_request)) + .layer( + ServiceBuilder::new() + .layer(AddExtensionLayer::new(srv.clone())) + .into_inner(), + ) + .layer(cors_layer); + + // let dashboard_router = risingwave_meta_dashboard::router(); + + let app = Router::new() + // .fallback_service(dashboard_router) + .nest("/message", api_router) + .layer(CompressionLayer::new()); + + let listener = TcpListener::bind(&srv.webhook_addr) + .await + .context("failed to bind dashboard address")?; + axum::serve(listener, app) + .await + .context("failed to serve dashboard service")?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + + #[tokio::test] + #[ignore] + async fn test_exchange_client() -> anyhow::Result<()> { + let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); + let service = crate::webhook::WebhookService { webhook_addr: addr }; + service.serve().await?; + Ok(()) + } +} diff --git a/src/frontend/src/webhook/utils.rs b/src/frontend/src/webhook/utils.rs new file mode 100644 index 000000000000..2cda72f69d96 --- /dev/null +++ b/src/frontend/src/webhook/utils.rs @@ -0,0 +1,77 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::Json; +use risingwave_common::row::OwnedRow; +use risingwave_pb::expr::ExprNode; +use serde_json::json; +use thiserror_ext::AsReport; + +use crate::expr::ExprImpl; + +pub struct WebhookError { + err: anyhow::Error, + code: StatusCode, +} + +pub(crate) type Result = std::result::Result; + +pub(crate) fn err(err: impl Into, code: StatusCode) -> WebhookError { + WebhookError { + err: err.into(), + code, + } +} + +impl From for WebhookError { + fn from(value: anyhow::Error) -> Self { + WebhookError { + err: value.into(), + code: StatusCode::INTERNAL_SERVER_ERROR, + } + } +} + +impl IntoResponse for WebhookError { + fn into_response(self) -> axum::response::Response { + let mut resp = Json(json!({ + "error": self.err.to_report_string(), + })) + .into_response(); + *resp.status_mut() = self.code; + resp + } +} + +pub async fn verify_signature( + secret: &str, + payload: &[u8], + signature_expr: ExprNode, + signature: &[u8], +) -> Result { + let row = OwnedRow::new(vec![Some(secret.into()), Some(payload.into())]); + + let signature_expr_impl = ExprImpl::from_expr_proto(&signature_expr) + .map_err(|e| err(e, StatusCode::INTERNAL_SERVER_ERROR))?; + + let result = signature_expr_impl + .eval_row(&row) + .await + .map_err(|e| err(e, StatusCode::INTERNAL_SERVER_ERROR))? + .unwrap(); + let computed_signature = result.as_bytea(); + Ok(**computed_signature == *signature) +} diff --git a/src/meta/model/migration/src/lib.rs b/src/meta/model/migration/src/lib.rs index b84a29891eee..cdd2c95b5ae8 100644 --- a/src/meta/model/migration/src/lib.rs +++ b/src/meta/model/migration/src/lib.rs @@ -22,6 +22,8 @@ mod m20240726_063833_auto_schema_change; mod m20240806_143329_add_rate_limit_to_source_catalog; mod m20240820_081248_add_time_travel_per_table_epoch; mod m20240911_083152_variable_vnode_count; +mod m20241001_013810_webhook_source; + mod m20241016_065621_hummock_gc_history; mod m20241025_062548_singleton_vnode_count; mod utils; @@ -86,6 +88,7 @@ impl MigratorTrait for Migrator { Box::new(m20240911_083152_variable_vnode_count::Migration), Box::new(m20241016_065621_hummock_gc_history::Migration), Box::new(m20241025_062548_singleton_vnode_count::Migration), + Box::new(m20241001_013810_webhook_source::Migration), ] } } diff --git a/src/meta/model/migration/src/m20241001_013810_webhook_source.rs b/src/meta/model/migration/src/m20241001_013810_webhook_source.rs new file mode 100644 index 000000000000..b46c25366faa --- /dev/null +++ b/src/meta/model/migration/src/m20241001_013810_webhook_source.rs @@ -0,0 +1,35 @@ +use sea_orm_migration::prelude::{Table as MigrationTable, *}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Table::Table) + .add_column(ColumnDef::new(Table::WebhookInfo).integer()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Table::Table) + .drop_column(Table::WebhookInfo) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Table { + Table, + WebhookInfo, +} diff --git a/src/meta/model/src/lib.rs b/src/meta/model/src/lib.rs index 6610484a8918..5ef5b50d190b 100644 --- a/src/meta/model/src/lib.rs +++ b/src/meta/model/src/lib.rs @@ -369,6 +369,7 @@ derive_array_from_blob!( PbColumnCatalogArray ); derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo); +derive_from_blob!(WebhookSourceInfo, risingwave_pb::catalog::PbWebhookSourceInfo); derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc); derive_array_from_blob!( WatermarkDescArray, diff --git a/src/meta/model/src/table.rs b/src/meta/model/src/table.rs index b8ba38d438b7..20fae0c926b1 100644 --- a/src/meta/model/src/table.rs +++ b/src/meta/model/src/table.rs @@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize}; use crate::{ Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, SourceId, - TableId, TableVersion, + TableId, TableVersion, WebhookSourceInfo, }; #[derive( @@ -135,6 +135,7 @@ pub struct Model { pub incoming_sinks: I32Array, pub cdc_table_id: Option, pub vnode_count: i32, + pub webhook_info: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -267,6 +268,7 @@ impl From for ActiveModel { incoming_sinks: Set(pb_table.incoming_sinks.into()), cdc_table_id: Set(pb_table.cdc_table_id), vnode_count, + webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)), } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index c7cf45daad9e..51c75ada8a9a 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -166,6 +166,7 @@ impl From> for PbTable { retention_seconds: value.0.retention_seconds.map(|id| id as u32), cdc_table_id: value.0.cdc_table_id, maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(), + webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()), } } } diff --git a/src/prost/build.rs b/src/prost/build.rs index c4744e14c1b6..1dc7cd66c345 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -116,6 +116,7 @@ fn main() -> Result<(), Box> { // The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr .type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]") .type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]") + .type_attribute("catalog.WebhookSourceInfo", "#[derive(Eq, Hash)]") .type_attribute("secret.SecretRef", "#[derive(Eq, Hash)]") .type_attribute("catalog.IndexColumnProperties", "#[derive(Eq, Hash)]") .type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]") diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index d94cf80cb9f2..2b6eb7da8361 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -22,7 +22,8 @@ use serde::{Deserialize, Serialize}; use super::FormatEncodeOptions; use crate::ast::{ - display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SetVariableValue, + display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SecretRef, + SetVariableValue, }; use crate::tokenizer::Token; @@ -802,3 +803,12 @@ impl fmt::Display for ReferentialAction { }) } } + +/// secure secret definition for webhook source +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct WebhookSourceInfo { + pub secret_ref: SecretRef, + pub header_key: String, + pub signature_expr: Expr, +} diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 563dc66be478..8134605fa0fa 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -39,7 +39,7 @@ pub use self::data_type::{DataType, StructField}; pub use self::ddl::{ AlterColumnOperation, AlterConnectionOperation, AlterDatabaseOperation, AlterFunctionOperation, AlterSchemaOperation, AlterTableOperation, ColumnDef, ColumnOption, ColumnOptionDef, - ReferentialAction, SourceWatermark, TableConstraint, + ReferentialAction, SourceWatermark, TableConstraint, WebhookSourceInfo, }; pub use self::legacy_source::{ get_delimiter, AvroSchema, CompatibleFormatEncode, DebeziumAvroSchema, ProtobufSchema, @@ -1302,6 +1302,8 @@ pub enum Statement { cdc_table_info: Option, /// `INCLUDE a AS b INCLUDE c` include_column_options: IncludeOption, + /// `VALIDATE SECRET secure_secret_name AS secure_compare ()` + webhook_info: Option, }, /// CREATE INDEX CreateIndex { @@ -1835,6 +1837,7 @@ impl fmt::Display for Statement { query, cdc_table_info, include_column_options, + webhook_info, } => { // We want to allow the following options // Empty column list, allowed by PostgreSQL: @@ -1884,6 +1887,10 @@ impl fmt::Display for Statement { write!(f, " FROM {}", info.source_name)?; write!(f, " TABLE '{}'", info.external_table_name)?; } + if let Some(info)= webhook_info { + write!(f, " VALIDATE SECRET {}", info.secret_ref.secret_name)?; + write!(f, " AS secure_compare ()")?; + } Ok(()) } Statement::CreateIndex { diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 93e47d7a6b11..53038fdcd08b 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -554,6 +554,7 @@ define_keywords!( USER, USING, UUID, + VALIDATE, VALUE, VALUES, VALUE_OF, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index d5582f31a64d..c83ca230d230 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -22,6 +22,7 @@ use alloc::{ }; use core::fmt; +use ddl::WebhookSourceInfo; use itertools::Itertools; use tracing::{debug, instrument}; use winnow::combinator::{alt, cut_err, dispatch, fail, opt, peek, preceded, repeat, separated}; @@ -2578,14 +2579,23 @@ impl Parser<'_> { let include_options = self.parse_include_options()?; // PostgreSQL supports `WITH ( options )`, before `AS` - let with_options = self.parse_with_properties()?; + let mut with_options = self.parse_with_properties()?; let option = with_options .iter() .find(|&opt| opt.name.real_value() == UPSTREAM_SOURCE_KEY); let connector = option.map(|opt| opt.value.to_string()); - - let format_encode = if let Some(connector) = connector { + let contain_webhook = if let Some(connector) = &connector + && connector.contains("webhook") + { + with_options.clear(); + true + } else { + false + }; + let format_encode = if let Some(connector) = connector + && !contain_webhook + { Some(self.parse_format_encode_with_connector(&connector, false)?) } else { None // Table is NOT created with an external connector. @@ -2612,6 +2622,53 @@ impl Parser<'_> { None }; + let webhook_info = if contain_webhook && self.parse_keyword(Keyword::VALIDATE) { + self.expect_keyword(Keyword::SECRET)?; + let secret_ref = SecretRef { + secret_name: self.parse_object_name()?, + ref_as: SecretRefAsType::Text, + }; + if self.parse_keywords(&[Keyword::AS, Keyword::FILE]) { + parser_err!("Secret for SECURE_COMPARE() does not support AS FILE"); + }; + self.expect_keyword(Keyword::AS)?; + let function_name = self.parse_identifier()?; + if function_name.real_value().to_uppercase() != String::from("SECURE_COMPARE") { + parser_err!( + "SECURE_COMPARE() is the only function supported for secret validation" + ); + } + self.expect_token(&Token::LParen)?; + let headers = self.parse_identifier()?; + if headers.real_value().to_uppercase() != String::from("HEADERS") { + parser_err!("The first argument of SECURE_COMPARE() should be like `HEADERS ->> {{header_key}}`"); + } + self.expect_token(&Token::LongArrow)?; + let header_key = self.parse_literal_string()?; + self.expect_token(&Token::Comma)?; + let checkpoint = *self; + let signature_expr = if let Ok(ident) = self.parse_identifier() + && !matches!(self.peek_token().token, Token::LParen) + { + // secret name + Expr::Identifier(ident) + } else { + // function to generate signature, e.g., HMAC(secret, payload, 'sha256') + *self = checkpoint; + self.parse_function()? + }; + + self.expect_token(&Token::RParen)?; + + Some(WebhookSourceInfo { + secret_ref, + header_key, + signature_expr, + }) + } else { + None + }; + Ok(Statement::CreateTable { name: table_name, temporary, @@ -2629,6 +2686,7 @@ impl Parser<'_> { query, cdc_table_info, include_column_options: include_options, + webhook_info, }) } @@ -5081,7 +5139,6 @@ impl Parser<'_> { let source = Box::new(self.parse_query()?); let returning = self.parse_returning(Optional)?; - Ok(Statement::Insert { table_name, columns, From 6fe4863efd0730d09e78987950fed1f194b052f4 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Tue, 5 Nov 2024 19:55:28 -0500 Subject: [PATCH 2/3] tmp --- src/expr/impl/src/scalar/hmac.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/expr/impl/src/scalar/hmac.rs b/src/expr/impl/src/scalar/hmac.rs index c8bc0f8dc80f..8d09d205e09d 100644 --- a/src/expr/impl/src/scalar/hmac.rs +++ b/src/expr/impl/src/scalar/hmac.rs @@ -1,7 +1,7 @@ // Copyright 2024 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance witmuth the License. +// you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 From bc9d79868eb9a9171c1ecb4b7a63c0648a5f78e0 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Tue, 5 Nov 2024 20:14:50 -0500 Subject: [PATCH 3/3] tmp --- src/frontend/src/webhook/mod.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/frontend/src/webhook/mod.rs b/src/frontend/src/webhook/mod.rs index 0d77b6ef45a3..d25e8e0ca91e 100644 --- a/src/frontend/src/webhook/mod.rs +++ b/src/frontend/src/webhook/mod.rs @@ -173,10 +173,6 @@ impl WebhookService { use handlers::*; let srv = Arc::new(self); - // tracing_subscriber::registry() - // .with(tracing_subscriber::fmt::layer()) - // .init(); - let cors_layer = CorsLayer::new() .allow_origin(cors::Any) .allow_methods(vec![Method::POST]); @@ -190,10 +186,7 @@ impl WebhookService { ) .layer(cors_layer); - // let dashboard_router = risingwave_meta_dashboard::router(); - let app = Router::new() - // .fallback_service(dashboard_router) .nest("/message", api_router) .layer(CompressionLayer::new());