Skip to content

Commit

Permalink
Support split table for ddl in dispatchers (#488)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Nov 8, 2024
1 parent 1050ee7 commit daf56c3
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/sink/util"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/pingcap/tiflow/pkg/spanz"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -342,7 +343,15 @@ func (d *Dispatcher) checkHandshakeEvents(dispatcherEvents []DispatcherEvent) (b
return false, dispatcherEvents[index:]
}

func shouldBlock(event commonEvent.BlockEvent) bool {
func isCompleteSpan(tableSpan *heartbeatpb.TableSpan) bool {
startKey, endKey := spanz.GetTableRange(tableSpan.TableID)
if spanz.StartCompare(startKey, tableSpan.StartKey) == 0 && spanz.EndCompare(endKey, tableSpan.EndKey) == 0 {
return true
}
return false
}

func (d *Dispatcher) shouldBlock(event commonEvent.BlockEvent) bool {
switch event.GetType() {
case commonEvent.TypeDDLEvent:
ddlEvent := event.(*commonEvent.DDLEvent)
Expand All @@ -351,9 +360,11 @@ func shouldBlock(event commonEvent.BlockEvent) bool {
case commonEvent.InfluenceTypeNormal:
if len(ddlEvent.GetBlockedTables().TableIDs) > 1 {
return true
} else {
return false
} else if !isCompleteSpan(d.tableSpan) {
// if the table is split, even the blockTable only itself, it should block
return true
}
return false
case commonEvent.InfluenceTypeDB, commonEvent.InfluenceTypeAll:
return true
}
Expand Down Expand Up @@ -385,7 +396,7 @@ func (d *Dispatcher) reset() {
// If the ddl leads to add new tables or drop tables, it should send heartbeat to maintainer
// 2. If the event is a multi-table DDL / sync point Event, it will generate a TableSpanBlockStatus message with ddl info to send to maintainer.
func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
if !shouldBlock(event) {
if !d.shouldBlock(event) {
d.sink.AddBlockEvent(event, d.tableProgress)
if event.GetNeedAddedTables() != nil || event.GetNeedDroppedTables() != nil {
message := &heartbeatpb.TableSpanBlockStatus{
Expand Down Expand Up @@ -582,7 +593,7 @@ func (d *Dispatcher) GetBlockStatus() *heartbeatpb.State {
pendingEvent, blockStage := d.blockStatus.getEventAndStage()

// we only need to report the block status for the ddl that block others and not finished.
if pendingEvent == nil || !shouldBlock(pendingEvent) {
if pendingEvent == nil || !d.shouldBlock(pendingEvent) {
return nil
}

Expand Down

0 comments on commit daf56c3

Please sign in to comment.