Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): support webhook source table #19272

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ message StreamSourceInfo {
map<string, secret.SecretRef> format_encode_secret_refs = 16;
}

message WebhookSourceInfo {
secret.SecretRef secret_ref = 1;
string header_key = 2;
expr.ExprNode signature_expr = 3;
}

message Source {
// For shared source, this is the same as the job id.
// For non-shared source and table with connector, this is a different oid.
Expand Down Expand Up @@ -433,6 +439,8 @@ message Table {
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`.
optional uint32 maybe_vnode_count = 40;

optional WebhookSourceInfo webhook_info = 41;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
Expand Down
48 changes: 29 additions & 19 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ message ExprNode {
message NowRexNode {}
// TODO: move this into `FunctionCall`.
enum Type {
// `InputRef`, `Constant`, and `UserDefinedFunction` are indicated by the viriant of `rex_node`.
// Their types are therefore deprecated and should be `UNSPECIFIED` instead.
// `InputRef`, `Constant`, and `UserDefinedFunction` are indicated by the
// viriant of `rex_node`. Their types are therefore deprecated and should be
// `UNSPECIFIED` instead.
reserved 1, 2, 3000;
reserved "INPUT_REF", "CONSTANT_VALUE", "UDF";

Expand Down Expand Up @@ -127,7 +128,7 @@ message ExprNode {
LPAD = 238;
RPAD = 239;
REVERSE = 240;
STRPOS = 241 [deprecated = true]; // duplicated with POSITION
STRPOS = 241 [ deprecated = true ]; // duplicated with POSITION
TO_ASCII = 242;
TO_HEX = 243;
QUOTE_IDENT = 244;
Expand Down Expand Up @@ -197,6 +198,7 @@ message ExprNode {
INET_NTOA = 329;
QUOTE_LITERAL = 330;
QUOTE_NULLABLE = 331;
HMAC = 332;

// Unary operators
NEG = 401;
Expand Down Expand Up @@ -329,7 +331,8 @@ message ExprNode {
// EXTERNAL
ICEBERG_TRANSFORM = 2201;
}
// Only use this field for function call. For other types of expression, it should be UNSPECIFIED.
// Only use this field for function call. For other types of expression, it
// should be UNSPECIFIED.
Type function_type = 1;
data.DataType return_type = 3;
oneof rex_node {
Expand Down Expand Up @@ -390,8 +393,8 @@ message Constant {

// The items which can occur in the select list of `ProjectSet` operator.
//
// When there are table functions in the SQL query `SELECT ...`, it will be planned as `ProjectSet`.
// Otherwise it will be planned as `Project`.
// When there are table functions in the SQL query `SELECT ...`, it will be
// planned as `ProjectSet`. Otherwise it will be planned as `Project`.
//
// # Examples
//
Expand Down Expand Up @@ -421,9 +424,7 @@ message ProjectSetSelectItem {
}
}

message FunctionCall {
repeated ExprNode children = 1;
}
message FunctionCall { repeated ExprNode children = 1; }

// Aggregate Function Calls for Aggregation
message AggCall {
Expand Down Expand Up @@ -460,7 +461,8 @@ message AggCall {

// user defined aggregate function
USER_DEFINED = 100;
// wraps a scalar function that takes a list as input as an aggregate function.
// wraps a scalar function that takes a list as input as an aggregate
// function.
WRAP_SCALAR = 101;
}
Kind kind = 1;
Expand Down Expand Up @@ -494,7 +496,8 @@ message WindowFrame {
enum Type {
TYPE_UNSPECIFIED = 0;

TYPE_ROWS_LEGACY = 2 [deprecated = true]; // Deprecated since we introduced `RANGE` frame.
TYPE_ROWS_LEGACY = 2
[ deprecated = true ]; // Deprecated since we introduced `RANGE` frame.

TYPE_ROWS = 5;
TYPE_RANGE = 10;
Expand Down Expand Up @@ -554,8 +557,10 @@ message WindowFrame {

Type type = 1;

Bound start = 2 [deprecated = true]; // Deprecated since we introduced `RANGE` frame.
Bound end = 3 [deprecated = true]; // Deprecated since we introduced `RANGE` frame.
Bound start = 2
[ deprecated = true ]; // Deprecated since we introduced `RANGE` frame.
Bound end = 3
[ deprecated = true ]; // Deprecated since we introduced `RANGE` frame.

Exclusion exclusion = 4;

Expand Down Expand Up @@ -589,8 +594,9 @@ message WindowFunction {
WindowFrame frame = 5;
}

// Note: due to historic reasons, UserDefinedFunction is a oneof variant parallel to FunctionCall,
// while UserDefinedFunctionMetadata is embedded as a field in TableFunction and AggCall.
// Note: due to historic reasons, UserDefinedFunction is a oneof variant
// parallel to FunctionCall, while UserDefinedFunctionMetadata is embedded as a
// field in TableFunction and AggCall.

message UserDefinedFunction {
repeated ExprNode children = 1;
Expand All @@ -601,18 +607,22 @@ message UserDefinedFunction {
// The link to the external function service.
optional string link = 5;
// An unique identifier to the function.
// - If `link` is not empty, the name of the function in the external function service.
// - If `language` is `rust` or `wasm`, the name of the function in the wasm binary file.
// - If `link` is not empty, the name of the function in the external function
// service.
// - If `language` is `rust` or `wasm`, the name of the function in the wasm
// binary file.
// - If `language` is `javascript`, the name of the function.
optional string identifier = 6;
// - If `language` is `javascript`, the source code of the function.
optional string body = 7;
// - If `language` is `rust` or `wasm`, the zstd-compressed wasm binary.
optional bytes compressed_binary = 10;
bool always_retry_on_network_error = 9;
// The runtime used when javascript is used as the language. Could be "quickjs" or "deno".
// The runtime used when javascript is used as the language. Could be
// "quickjs" or "deno".
optional string runtime = 11;
// The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator"
// The function type, which is used to execute the function. Could be "sync",
// "async", "generator" or "async_generator"
optional string function_type = 12;
}

Expand Down
24 changes: 24 additions & 0 deletions src/common/secret/src/secret_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,30 @@ impl LocalSecretManager {
Ok(options)
}

pub fn fill_secret(&self, secret_ref: PbSecretRef) -> SecretResult<String> {
let secret_guard = self.secrets.read();
let secret_id = secret_ref.secret_id;
let pb_secret_bytes = secret_guard
.get(&secret_id)
.ok_or(SecretError::ItemNotFound(secret_id))?;
let secret_value_bytes = Self::get_secret_value(pb_secret_bytes)?;
match secret_ref.ref_as() {
RefAsType::Text => {
// We converted the secret string from sql to bytes using `as_bytes` in frontend.
// So use `from_utf8` here to convert it back to string.
return Ok(String::from_utf8(secret_value_bytes.clone())?);
}
RefAsType::File => {
let path_str =
self.get_or_init_secret_file(secret_id, secret_value_bytes.clone())?;
return Ok(path_str);
}
RefAsType::Unspecified => {
return Err(SecretError::UnspecifiedRefType(secret_id));
}
}
}

/// Get the secret file for the given secret id and return the path string. If the file does not exist, create it.
/// WARNING: This method should be called only when the secret manager is locked.
fn get_or_init_secret_file(
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub use crate::source::filesystem::S3_CONNECTOR;
pub use crate::source::nexmark::NEXMARK_CONNECTOR;
pub use crate::source::pulsar::PULSAR_CONNECTOR;

pub const WEBHOOK_CONNECTOR: &str = "webhook";

pub fn should_copy_to_format_encode_options(key: &str, connector: &str) -> bool {
const PREFIXES: &[&str] = &[
"schema.registry",
Expand Down
1 change: 1 addition & 0 deletions src/expr/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ futures-async-stream = { workspace = true }
futures-util = "0.3"
ginepro = "0.8"
hex = "0.4"
hmac = "0.12"
icelake = { workspace = true }
itertools = { workspace = true }
jsonbb = { workspace = true }
Expand Down
78 changes: 78 additions & 0 deletions src/expr/impl/src/scalar/hmac.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 RisingWave Labs
KeXiangWang marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use hex::encode;
use hmac::{Hmac, Mac};
use risingwave_expr::function;
use sha1::Sha1;
use sha2::Sha256;

#[function("hmac(varchar, bytea, varchar) -> bytea")]
pub fn hmac<'a>(secret: &str, payload: &[u8], sha_type: &str) -> Box<[u8]> {
if sha_type == "sha1" {
sha1_hmac(secret, payload)
} else if sha_type == "sha256" {
sha256_hmac(secret, payload)
} else {
panic!("Unsupported SHA type: {}", sha_type)
}
}

fn sha256_hmac(secret: &str, payload: &[u8]) -> Box<[u8]> {
let mut mac =
Hmac::<Sha256>::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");

mac.update(payload);

let result = mac.finalize();
let code_bytes = result.into_bytes();
let computed_signature = format!("sha256={}", encode(code_bytes));
computed_signature.as_bytes().into()
}

fn sha1_hmac(secret: &str, payload: &[u8]) -> Box<[u8]> {
let mut mac =
Hmac::<Sha1>::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size");

mac.update(payload);

let result = mac.finalize();
let code_bytes = result.into_bytes();
let computed_signature = format!("sha1={}", encode(code_bytes));
computed_signature.as_bytes().into()
}

#[cfg(test)]
mod tests {

use super::*;

#[tokio::test]
async fn test_verify_signature_hmac_sha256() -> anyhow::Result<()> {
let secret = "your_secret_key";
let payload = b"your_webhook_payload";
let signature = b"sha256=cef8b98a91902c492b85d97f049aa4bfc5e7e3f9b8b7bf7cb49c5f829d2dac85"; // 替换为
assert!(*sha256_hmac(secret, payload) == *signature);
Ok(())
}

#[tokio::test]
async fn test_verify_signature_hmac_sha1() -> anyhow::Result<()> {
let secret = "your_secret_key";
let payload = b"your_webhook_payload";
let signature = b"sha1=65cb920a4b8c6ab8e2eab861a096a7bc2c05d8ba"; // 替换为
assert!(*sha1_hmac(secret, payload) == *signature);
Ok(())
}
}
1 change: 1 addition & 0 deletions src/expr/impl/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod extract;
mod field;
mod format;
mod format_type;
mod hmac;
mod in_;
mod int256;
mod jsonb_access;
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async-recursion = "1.1.0"
async-trait = "0.1"
auto_enums = { workspace = true }
auto_impl = "1"
axum = { workspace = true }
base64 = "0.22"
bk-tree = "0.5.0"
bytes = "1"
Expand All @@ -36,6 +37,8 @@ fancy-regex = "0.14.0"
fixedbitset = "0.5"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
hex = "0.4"
hmac = "0.12"
iana-time-zone = "0.1"
iceberg = { workspace = true }
icelake = { workspace = true }
Expand Down Expand Up @@ -79,6 +82,7 @@ risingwave_variables = { workspace = true }
rw_futures_util = { workspace = true }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha1 = "0.10.6"
sha2 = "0.10.7"
smallvec = { version = "1.13.1", features = ["serde"] }
speedate = "0.15.0"
Expand All @@ -97,10 +101,17 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
tokio-postgres = "0.7"
tokio-stream = { workspace = true }
tonic = { workspace = true }
tower = { version = "0.4", features = ["util", "load-shed"] }
tower-http = { version = "0.6", features = [
"add-extension",
"cors",
"compression-gzip",
] }
tracing = "0.1"
uuid = "1"
zstd = { version = "0.13", default-features = false }


[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ impl TestCase {
cdc_table_info,
include_column_options,
wildcard_idx,
webhook_info,
..
} => {
let format_encode = format_encode.map(|schema| schema.into_v2_with_warning());
Expand All @@ -453,6 +454,7 @@ impl TestCase {
with_version_column,
cdc_table_info,
include_column_options,
webhook_info,
)
.await?;
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/binder/expr/function/builtin_scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ impl Binder {
("sha512", raw_call(ExprType::Sha512)),
("encrypt", raw_call(ExprType::Encrypt)),
("decrypt", raw_call(ExprType::Decrypt)),
("hmac", raw_call(ExprType::Hmac)),
("left", raw_call(ExprType::Left)),
("right", raw_call(ExprType::Right)),
("inet_aton", raw_call(ExprType::InetAton)),
Expand Down
Loading
Loading