From 9388c46da679c6fa0ccc4dbb2826e855e35af516 Mon Sep 17 00:00:00 2001 From: xuty <55723017+Lxuty@users.noreply.github.com> Date: Mon, 14 Mar 2022 09:33:34 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9kafka=E8=BE=93=E5=87=BA?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=EF=BC=8C=E9=80=82=E9=85=8Dstarrocks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bifrost针对kafka格式,做了一定的变化,主要是只保存更新后的数据,去掉DDL语句与COMMIT语句,让StarRocks Routine可以识别消费。 MySQL原表: CREATE TABLE `a` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 StarRocks主键表创建及routine示例: CREATE TABLE `a`( `id` int NOT NULL ) PRIMARY KEY (`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 32 PROPERTIES('replication_num' = '1'); --必须要先创建一个kafka topic才可以 create routine load routine_a on `a` COLUMNS TERMINATED BY ",", COLUMNS(id,EventType, __op=case when EventType="insert" then 0 when EventType="update" then 0 when EventType="delete" then 1 else null end) PROPERTIES ( "format" = "json", "jsonpaths"= "[\"$.Rows[0].id\",\"$.EventType\"]", "desired_concurrent_number"="1", "strict_mode" = "false", "strip_outer_array" ="false", "max_error_number" = "1000" ) FROM KAFKA ( "kafka_broker_list" = "xxx.xxx.xxx.xxx:9092", "kafka_topic" = "a", "kafka_partitions" = "0", "kafka_offsets" = "OFFSET_BEGINNING" ); --- plugin/kafka/src/kafka.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/plugin/kafka/src/kafka.go b/plugin/kafka/src/kafka.go index 0697fdd0..e1b7f668 100644 --- a/plugin/kafka/src/kafka.go +++ b/plugin/kafka/src/kafka.go @@ -212,6 +212,11 @@ func (This *Conn) getMsg(data *pluginDriver.PluginDataType) (*sarama.ProducerMes Key := fmt.Sprint(pluginDriver.TransfeResult(This.p.Key,data,len(data.Rows)-1)) msg.Key = sarama.StringEncoder(Key) } + + // 适配StarRocks里的kafka格式,将更新前的数据被去掉了,只留下更新后的数据。 + if len(data.Rows) == 2 && data.EventType == "update" { + data.Rows = data.Rows[1:] + } c,err := json.Marshal(data) if err != nil{ @@ -226,6 +231,12 @@ func (This *Conn) sendToList(data *pluginDriver.PluginDataType,retry bool,isComm LastSuccessCommitData,err = This.sendToKafkaByBatch() goto endErr } + + // 如果是sql或者commit类型,则不加入dataList发送到kafka中,兼容StarRocks,对端无法识别这类数据 + if data.EventType == "sql" || data.EventType == "commit" { + goto endErr + } + if This.p.BatchSize > 1 { if retry == false { var msg *sarama.ProducerMessage @@ -333,4 +344,4 @@ func (This *Conn) sendToKafkaByBatch() (*pluginDriver.PluginDataType, error) { func (This *Conn) TimeOutCommit() (*pluginDriver.PluginDataType, *pluginDriver.PluginDataType,error) { return This.sendToList(nil,true,false) -} \ No newline at end of file +}