diff --git a/plugin/kafka/src/kafka.go b/plugin/kafka/src/kafka.go index 0697fdd0..e1b7f668 100644 --- a/plugin/kafka/src/kafka.go +++ b/plugin/kafka/src/kafka.go @@ -212,6 +212,11 @@ func (This *Conn) getMsg(data *pluginDriver.PluginDataType) (*sarama.ProducerMes Key := fmt.Sprint(pluginDriver.TransfeResult(This.p.Key,data,len(data.Rows)-1)) msg.Key = sarama.StringEncoder(Key) } + + // 适配StarRocks里的kafka格式,将更新前的数据被去掉了,只留下更新后的数据。 + if len(data.Rows) == 2 && data.EventType == "update" { + data.Rows = data.Rows[1:] + } c,err := json.Marshal(data) if err != nil{ @@ -226,6 +231,12 @@ func (This *Conn) sendToList(data *pluginDriver.PluginDataType,retry bool,isComm LastSuccessCommitData,err = This.sendToKafkaByBatch() goto endErr } + + // 如果是sql或者commit类型,则不加入dataList发送到kafka中,兼容StarRocks,对端无法识别这类数据 + if data.EventType == "sql" || data.EventType == "commit" { + goto endErr + } + if This.p.BatchSize > 1 { if retry == false { var msg *sarama.ProducerMessage @@ -333,4 +344,4 @@ func (This *Conn) sendToKafkaByBatch() (*pluginDriver.PluginDataType, error) { func (This *Conn) TimeOutCommit() (*pluginDriver.PluginDataType, *pluginDriver.PluginDataType,error) { return This.sendToList(nil,true,false) -} \ No newline at end of file +}