Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Bifrost针对kafka插件输出格式,做了一定的变化,主要是只保存更新后的数据,去掉DDL语句与COMMIT语句,让StarRocks Routine可以识别消费。
MySQL原表:
CREATE TABLE
a
(id
int(11) NOT NULL,PRIMARY KEY (
id
)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
StarRocks主键表创建及routine示例:
CREATE TABLE
a
(id
int NOT NULL) PRIMARY KEY (
id
)DISTRIBUTED BY HASH(
id
) BUCKETS 32PROPERTIES('replication_num' = '1');
--必须要先创建一个kafka topic才可以
create routine load routine_a on
a
COLUMNS TERMINATED BY ",",
COLUMNS(id,EventType, __op=case when EventType="insert" then 0 when EventType="update" then 0 when EventType="delete" then 1 else null end) PROPERTIES
(
"format" = "json",
"jsonpaths"= "["$.Rows[0].id","$.EventType"]",
"desired_concurrent_number"="1",
"strict_mode" = "false",
"strip_outer_array" ="false",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" = "xxx.xxx.xxx.xxx:9092",
"kafka_topic" = "a",
"kafka_partitions" = "0",
"kafka_offsets" = "OFFSET_BEGINNING"
);