Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan committed Aug 21, 2024
1 parent 29d3f94 commit a13a409
Show file tree
Hide file tree
Showing 13 changed files with 638 additions and 547 deletions.
283 changes: 44 additions & 239 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
package dispatcher

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/flowbehappy/tigate/downstreamadapter/sink"
"github.com/flowbehappy/tigate/downstreamadapter/sink/types"
Expand Down Expand Up @@ -67,8 +64,6 @@ type Dispatcher struct {
tableSpan *common.TableSpan
sink sink.Sink

ddlActions chan *heartbeatpb.DispatcherAction
acks chan *heartbeatpb.ACK
tableSpanStatusesChan chan *heartbeatpb.TableSpanStatus

//SyncPointInfo *SyncPointInfo
Expand All @@ -81,75 +76,37 @@ type Dispatcher struct {

resolvedTs *TsWithMutex // 用来记 eventChan 中目前收到的 event 中收到的最大的 commitTs - 1,不代表 dispatcher 的 checkpointTs

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

ddlPendingEvent *common.TxnEvent
ddlFinishCh chan struct{}
isRemoving atomic.Bool

tableProgress *types.TableProgress

resendTask *ResendTask
checkTableProgressEmptyTask *CheckTableProgressEmptyTask
}

func NewDispatcher(tableSpan *common.TableSpan, sink sink.Sink, startTs uint64, tableSpanStatusesChan chan *heartbeatpb.TableSpanStatus, filter filter.Filter) *Dispatcher {
ctx, cancel := context.WithCancel(context.Background())
dispatcher := &Dispatcher{
id: common.NewDispatcherID(),
eventCh: make(chan *common.TxnEvent, 16),
tableSpan: tableSpan,
sink: sink,
ddlActions: make(chan *heartbeatpb.DispatcherAction, 16),
acks: make(chan *heartbeatpb.ACK, 16),
tableSpanStatusesChan: tableSpanStatusesChan,
//SyncPointInfo: syncPointInfo,
//MemoryUsage: NewMemoryUsage(),
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Working),
resolvedTs: newTsWithMutex(startTs),
ctx: ctx,
cancel: cancel,
filter: filter,
ddlFinishCh: make(chan struct{}),
isRemoving: atomic.Bool{},
ddlPendingEvent: nil,
tableProgress: types.NewTableProgress(),
}

// dispatcher.wg.Add(1)
// go dispatcher.DispatcherEvents(ctx)

dispatcher.wg.Add(1)
go dispatcher.HandleDDLActions(ctx)

log.Info("dispatcher created", zap.Any("DispatcherID", dispatcher.id))

return dispatcher
}

type DispatcherEventsHandler struct {
}

func (h *DispatcherEventsHandler) Path(event *common.TxnEvent) common.DispatcherID {
return event.GetDispatcherID()
}

// TODO: 这个后面需要按照更大的粒度进行攒批
func (h *DispatcherEventsHandler) Handle(event *common.TxnEvent, dispatcher *Dispatcher) bool {
sink := dispatcher.GetSink()

if event.IsDMLEvent() {
sink.AddDMLEvent(event, dispatcher.tableProgress)
return false
} else if event.IsDDLEvent() {
dispatcher.AddDDLEventToSinkWhenAvailable(event)
// ddl 彻底落盘成功后才能 wake
return false
} else {
dispatcher.resolvedTs.Set(event.ResolvedTs)
return false
}
}

// 1. 如果是单表内的 ddl,达到下推的条件为: sink 中没有还没执行完的当前表的 event
// 2. 如果是多表内的 ddl 或者是表间的 ddl,则需要满足的条件为:
// 2.1 sink 中没有还没执行完的当前表的 event
Expand All @@ -175,74 +132,31 @@ func (d *Dispatcher) AddDDLEventToSinkWhenAvailable(event *common.TxnEvent) bool
sink.AddDDLAndSyncPointEvent(event, d.tableProgress)
return true // 不能直接在内部调用写入,需要用异步操作
} else {
//
// // TODO:先写一个 定时 check 的逻辑,后面用 dynamic stream 改造
// timer := time.NewTimer(time.Millisecond * 50)
// for {
// select {
// case <-timer.C:
// if d.tableProgress.Empty() {
// sink.AddDDLAndSyncPointEvent(event, d.tableProgress)
// }
// }
// }
d.SetDDLPendingEvent(event)
d.checkTableProgressEmptyTask = newCheckTableProgressEmptyTask(d)
}
}

d.SetDDLPendingEvent(event)

// TODO:消息需要保证发送后收到 ack 才可以停止重发,具体重发时间需要调整
message := &heartbeatpb.TableSpanStatus{
Span: tableSpan.TableSpan,
ComponentStatus: heartbeatpb.ComponentState_Working,
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: event.CommitTs,
BlockTableSpan: event.GetBlockedTableSpan(), // 这个包含自己的 span 是不是也无所谓,不然就要剔除掉
NeedDroppedTableSpan: event.GetNeedDroppedTableSpan(),
NeedAddedTableSpan: event.GetNeedAddedTableSpan(),
},
}
d.GetTableSpanStatusesChan() <- message
timer := time.NewTimer(time.Millisecond * 100)
loop:
for {
select {
case <-timer.C:
// 重发消息
d.GetTableSpanStatusesChan() <- message
case ack := <-d.GetACKs():
if ack.CommitTs == event.CommitTs {
break loop
} else {
// cross ddl 也需要前序 dml 写完了才能开始判断是否可以下推
d.SetDDLPendingEvent(event)
if d.tableProgress.Empty() {
message := &heartbeatpb.TableSpanStatus{
Span: tableSpan.TableSpan,
ComponentStatus: heartbeatpb.ComponentState_Working,
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: event.CommitTs,
BlockTableSpan: event.GetBlockedTableSpan(), // 这个包含自己的 span 是不是也无所谓,不然就要剔除掉
NeedDroppedTableSpan: event.GetNeedDroppedTableSpan(),
NeedAddedTableSpan: event.GetNeedAddedTableSpan(),
},
}
d.GetTableSpanStatusesChan() <- message
d.SetResendTask(newResendTask(message, d))
} else {
d.checkTableProgressEmptyTask = newCheckTableProgressEmptyTask(d)
}
}

// 收到 ack 以后可以开始等 actions 来进行处理,等待 finish 信号
<-d.GetDDLFinishCh()
}

// func (d *Dispatcher) DispatcherEvents(ctx context.Context) {
// defer d.wg.Done()
// sink := d.GetSink()
// for {
// select {
// case <-ctx.Done():
// return
// case event := <-d.GetEventChan():
// if event.IsDMLEvent() {
// sink.AddDMLEvent(event, d.tableProgress)
// } else if event.IsDDLEvent() {
// d.AddDDLEventToSinkWhenAvailable(event)
// } else {
// d.resolvedTs.Set(event.ResolvedTs)
// }
// }
// }
// }

func (d *Dispatcher) GetCtx() context.Context {
return d.ctx
return true
}

func (d *Dispatcher) GetSink() sink.Sink {
Expand Down Expand Up @@ -282,20 +196,21 @@ func (d *Dispatcher) GetId() common.DispatcherID {
return d.id
}

// func (d *Dispatcher) GetDispatcherType() DispatcherType {
// return TableEventDispatcherType
// }

func (d *Dispatcher) GetDDLActions() chan *heartbeatpb.DispatcherAction {
return d.ddlActions
func (d *Dispatcher) GetTableSpanStatusesChan() chan *heartbeatpb.TableSpanStatus {
return d.tableSpanStatusesChan
}

func (d *Dispatcher) GetACKs() chan *heartbeatpb.ACK {
return d.acks
func (d *Dispatcher) CancelResendTask() {
if d.resendTask != nil {
d.resendTask.Cancel()
d.resendTask = nil
} else {
log.Warn("try to cancel a nil resend task")
}
}

func (d *Dispatcher) GetTableSpanStatusesChan() chan *heartbeatpb.TableSpanStatus {
return d.tableSpanStatusesChan
func (d *Dispatcher) SetResendTask(task *ResendTask) {
d.resendTask = task
}

//func (d *Dispatcher) GetSyncPointInfo() *SyncPointInfo {
Expand All @@ -312,30 +227,22 @@ func (d *Dispatcher) PushTxnEvent(event *common.TxnEvent) {

func (d *Dispatcher) Remove() {
// TODO: 修改这个 dispatcher 的 status 为 removing
d.cancel()
log.Info("table event dispatcher component status changed to stopping", zap.String("table", d.tableSpan.String()))
d.isRemoving.Store(true)
}

func (d *Dispatcher) TryClose() (w heartbeatpb.Watermark, ok bool) {
// removing 后每次收集心跳的时候,call TryClose, 来判断是否能关掉 dispatcher 了(sink.isEmpty)
// 如果不能关掉,返回 0, false; 可以关掉的话,就返回 checkpointTs, true -- 这个要对齐过(startTs 和 checkpointTs 的关系)
// if d.sink.IsEmpty(d.tableSpan) {
// // calculate the checkpointTs, and clean the resource
// d.sink.RemoveTableSpan(d.tableSpan)
// w.CheckpointTs = d.GetCheckpointTs()
// w.ResolvedTs = d.GetResolvedTs()

// //d.MemoryUsage.Clear()
// d.componentStatus.Set(heartbeatpb.ComponentState_Stopped)
// return w, true
// }
// return w, false
w.CheckpointTs = d.GetCheckpointTs()
w.ResolvedTs = d.GetResolvedTs()

d.componentStatus.Set(heartbeatpb.ComponentState_Stopped)
return w, true
if d.tableProgress.Empty() {
w.CheckpointTs = d.GetCheckpointTs()
w.ResolvedTs = d.GetResolvedTs()

//d.MemoryUsage.Clear()
d.componentStatus.Set(heartbeatpb.ComponentState_Stopped)
return w, true
}
return w, false
}

func (d *Dispatcher) GetComponentStatus() heartbeatpb.ComponentState {
Expand All @@ -346,10 +253,6 @@ func (d *Dispatcher) GetFilter() filter.Filter {
return d.filter
}

func (d *Dispatcher) GetWG() *sync.WaitGroup {
return &d.wg
}

func (d *Dispatcher) GetDDLPendingEvent() *common.TxnEvent {
return d.ddlPendingEvent
}
Expand All @@ -362,108 +265,10 @@ func (d *Dispatcher) SetDDLPendingEvent(event *common.TxnEvent) {
d.ddlPendingEvent = event
}

func (d *Dispatcher) GetDDLFinishCh() chan struct{} {
return d.ddlFinishCh
}
func (d *Dispatcher) GetRemovingStatus() bool {
return d.isRemoving.Load()
}

type DDLActionWithDispatcherID struct {
dispatcherAction *heartbeatpb.DispatcherAction
dispatcherID common.DispatcherID
}

func NewDDLActionWithDispatcherID(dispatcherAction *heartbeatpb.DispatcherAction, dispatcherID common.DispatcherID) *DDLActionWithDispatcherID {
return &DDLActionWithDispatcherID{
dispatcherAction: dispatcherAction,
dispatcherID: dispatcherID,
}
}

func (d *DDLActionWithDispatcherID) GetDispatcherAction() *heartbeatpb.DispatcherAction {
return d.dispatcherAction
}

func (d *DDLActionWithDispatcherID) GetDispatcherID() common.DispatcherID {
return d.dispatcherID
}

type DDLActionsHandler struct {
}

func (h *DDLActionsHandler) Path(event DDLActionWithDispatcherID) common.DispatcherID {
return event.GetDispatcherID()
}

func (h *DDLActionsHandler) Handle(event DDLActionWithDispatcherID, dispatcher *Dispatcher) (await bool) {
sink := dispatcher.GetSink()
tableSpan := dispatcher.GetTableSpan()
pendingEvent := dispatcher.GetDDLPendingEvent()
dispatcherAction := event.GetDispatcherAction()
if pendingEvent == nil {
// 只可能出现在 event 已经推进了,但是还重复收到了 action 消息的时候,则重发包含 checkpointTs 的心跳
dispatcher.GetTableSpanStatusesChan() <- &heartbeatpb.TableSpanStatus{
Span: tableSpan.TableSpan,
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: dispatcher.GetCheckpointTs(),
}
return false
}

if dispatcherAction.CommitTs == pendingEvent.CommitTs {
if dispatcherAction.Action == heartbeatpb.Action_Write {
sink.AddDDLAndSyncPointEvent(pendingEvent, dispatcher.tableProgress) // 这个是同步写,所以写完的时候 sink 也 available 了
} else {
sink.PassDDLAndSyncPointEvent(pendingEvent, dispatcher.tableProgress) // 为了更新 tableProgress,避免 checkpointTs 计算的 corner case
// TODO: wake 隔壁 dynamic stream
}
dispatcher.GetTableSpanStatusesChan() <- &heartbeatpb.TableSpanStatus{
Span: tableSpan.TableSpan,
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: dispatcher.GetCheckpointTs(),
}
}
return false
}

func (d *Dispatcher) HandleDDLActions(ctx context.Context) {
defer d.GetWG().Done()
sink := d.GetSink()
tableSpan := d.GetTableSpan()
for {
select {
case <-ctx.Done():
return
case dispatcherAction := <-d.GetDDLActions():
event := d.GetDDLPendingEvent()
if event == nil {
// 只可能出现在 event 已经推进了,但是还重复收到了 action 消息的时候,则重发包含 checkpointTs 的心跳
d.GetTableSpanStatusesChan() <- &heartbeatpb.TableSpanStatus{
Span: tableSpan.TableSpan,
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: d.GetCheckpointTs(),
}
continue
}
if dispatcherAction.CommitTs == event.CommitTs {
if dispatcherAction.Action == heartbeatpb.Action_Write {
sink.AddDDLAndSyncPointEvent(event, d.tableProgress) // 这个是同步写,所以写完的时候 sink 也 available 了
} else {
sink.PassDDLAndSyncPointEvent(event, d.tableProgress) // 为了更新 tableProgress,避免 checkpointTs 计算的 corner case
}
d.GetTableSpanStatusesChan() <- &heartbeatpb.TableSpanStatus{
Span: tableSpan.TableSpan,
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: d.GetCheckpointTs(),
}
d.GetDDLFinishCh() <- struct{}{}
return
}
}
}
}

func (d *Dispatcher) CollectDispatcherHeartBeatInfo(h *HeartBeatInfo) {
// use checkpointTs to release memory usage
//d.GetMemoryUsage().Release(checkpointTs)
Expand Down
Loading

0 comments on commit a13a409

Please sign in to comment.