Skip to content

Commit

Permalink
Make dispatcher to write ddl in sync (#491)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Nov 8, 2024
1 parent daf56c3 commit 23df85e
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 49 deletions.
20 changes: 15 additions & 5 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,16 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
// deal with the dispatcher action
action := dispatcherStatus.GetAction()
if action != nil {
pendingEvent, _ := d.blockStatus.getEventAndStage()
if pendingEvent != nil && action.CommitTs == pendingEvent.GetCommitTs() {
pendingEvent, blockStatus := d.blockStatus.getEventAndStage()
if pendingEvent != nil && action.CommitTs == pendingEvent.GetCommitTs() && blockStatus == heartbeatpb.BlockStage_WAITING {
d.blockStatus.updateBlockStage(heartbeatpb.BlockStage_WRITING)
if action.Action == heartbeatpb.Action_Write {
d.sink.AddBlockEvent(pendingEvent, d.tableProgress)
err := d.sink.WriteBlockEvent(pendingEvent, d.tableProgress)
if err != nil {
// TODO: handle error
log.Error("write block event failed", zap.Error(err))
return
}
} else {
d.sink.PassBlockEvent(pendingEvent, d.tableProgress)
dispatcherEventDynamicStream := GetEventDynamicStream()
Expand Down Expand Up @@ -344,8 +349,9 @@ func (d *Dispatcher) checkHandshakeEvents(dispatcherEvents []DispatcherEvent) (b
}

func isCompleteSpan(tableSpan *heartbeatpb.TableSpan) bool {
spanz.TableIDToComparableSpan(tableSpan.TableID)
startKey, endKey := spanz.GetTableRange(tableSpan.TableID)
if spanz.StartCompare(startKey, tableSpan.StartKey) == 0 && spanz.EndCompare(endKey, tableSpan.EndKey) == 0 {
if spanz.StartCompare(spanz.ToComparableKey(startKey), tableSpan.StartKey) == 0 && spanz.EndCompare(spanz.ToComparableKey(endKey), tableSpan.EndKey) == 0 {
return true
}
return false
Expand Down Expand Up @@ -397,7 +403,11 @@ func (d *Dispatcher) reset() {
// 2. If the event is a multi-table DDL / sync point Event, it will generate a TableSpanBlockStatus message with ddl info to send to maintainer.
func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
if !d.shouldBlock(event) {
d.sink.AddBlockEvent(event, d.tableProgress)
err := d.sink.WriteBlockEvent(event, d.tableProgress)
if err != nil {
// TODO: handle error
log.Error("write block event failed", zap.Error(err))
}
if event.GetNeedAddedTables() != nil || event.GetNeedDroppedTables() != nil {
message := &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
Expand Down
5 changes: 3 additions & 2 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *KafkaSink) PassBlockEvent(event commonEvent.BlockEvent, tableProgress *
tableProgress.Pass(event)
}

func (s *KafkaSink) AddBlockEvent(event commonEvent.BlockEvent, tableProgress *types.TableProgress) {
func (s *KafkaSink) WriteBlockEvent(event commonEvent.BlockEvent, tableProgress *types.TableProgress) error {
tableProgress.Add(event)
switch event := event.(type) {
case *commonEvent.DDLEvent:
Expand All @@ -195,7 +195,7 @@ func (s *KafkaSink) AddBlockEvent(event commonEvent.BlockEvent, tableProgress *t
for _, cb := range event.PostTxnFlushed {
cb()
}
return
return nil
}
s.ddlWorker.GetDDLEventChan() <- event
case *commonEvent.SyncPointEvent:
Expand All @@ -209,6 +209,7 @@ func (s *KafkaSink) AddBlockEvent(event commonEvent.BlockEvent, tableProgress *t
zap.String("changefeed", s.changefeedID.Name()),
zap.Any("event type", event.GetType()))
}
return nil
}

func (s *KafkaSink) AddCheckpointTs(ts uint64) {
Expand Down
5 changes: 2 additions & 3 deletions downstreamadapter/sink/mysql_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func NewMysqlSink(ctx context.Context, changefeedID common.ChangeFeedID, workerC
}

func (s *MysqlSink) Run() error {
s.ddlWorker.Run()
for i := 0; i < s.workerCount; i++ {
s.dmlWorker[i].Run()
}
Expand Down Expand Up @@ -108,9 +107,9 @@ func (s *MysqlSink) PassBlockEvent(event commonEvent.BlockEvent, tableProgress *
tableProgress.Pass(event)
}

func (s *MysqlSink) AddBlockEvent(event commonEvent.BlockEvent, tableProgress *types.TableProgress) {
func (s *MysqlSink) WriteBlockEvent(event commonEvent.BlockEvent, tableProgress *types.TableProgress) error {
tableProgress.Add(event)
s.ddlWorker.GetDDLEventChan() <- event
return s.ddlWorker.WriteBlockEvent(event)
}

func (s *MysqlSink) AddCheckpointTs(ts uint64) {}
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (

type Sink interface {
AddDMLEvent(event *commonEvent.DMLEvent, tableProgress *types.TableProgress)
AddBlockEvent(event commonEvent.BlockEvent, tableProgress *types.TableProgress)
WriteBlockEvent(event commonEvent.BlockEvent, tableProgress *types.TableProgress) error
PassBlockEvent(event commonEvent.BlockEvent, tableProgress *types.TableProgress)
AddCheckpointTs(ts uint64)
SetTableSchemaStore(tableSchemaStore *sinkutil.TableSchemaStore)
Expand Down
56 changes: 19 additions & 37 deletions downstreamadapter/worker/mysql_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ type MysqlDDLWorker struct {
ctx context.Context
changefeedID common.ChangeFeedID
mysqlWriter *mysql.MysqlWriter
ddlEventChan chan commonEvent.BlockEvent
errgroup *errgroup.Group
}

Expand All @@ -163,44 +162,10 @@ func NewMysqlDDLWorker(
ctx: ctx,
changefeedID: changefeedID,
mysqlWriter: mysql.NewMysqlWriter(ctx, db, config, changefeedID, statistics),
ddlEventChan: make(chan commonEvent.BlockEvent, 16),
errgroup: errGroup,
}
}

func (w *MysqlDDLWorker) Run() {
w.errgroup.Go(func() error {
for {
select {
case <-w.ctx.Done():
return errors.Trace(w.ctx.Err())
case event := <-w.ddlEventChan:
switch event.GetType() {
case commonEvent.TypeDDLEvent:
err := w.mysqlWriter.FlushDDLEvent(event.(*commonEvent.DDLEvent))
if err != nil {
return errors.Trace(err)
}
case commonEvent.TypeSyncPointEvent:
err := w.mysqlWriter.FlushSyncPointEvent(event.(*commonEvent.SyncPointEvent))
if err != nil {
log.Error("Failed to flush sync point event",
zap.String("namespace", w.changefeedID.Namespace()),
zap.String("changefeed", w.changefeedID.Name()),
zap.Any("event", event),
zap.Error(err))
}
default:
log.Error("unknown event type",
zap.String("namespace", w.changefeedID.Namespace()),
zap.String("changefeed", w.changefeedID.Name()),
zap.Any("event", event))
}
}
}
})
}

func (w *MysqlDDLWorker) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) {
w.mysqlWriter.SetTableSchemaStore(tableSchemaStore)
}
Expand All @@ -216,8 +181,25 @@ func (w *MysqlDDLWorker) CheckStartTs(tableId int64, startTs uint64) (int64, err
return max(ddlTs, int64(startTs)), nil
}

func (w *MysqlDDLWorker) GetDDLEventChan() chan commonEvent.BlockEvent {
return w.ddlEventChan
func (w *MysqlDDLWorker) WriteBlockEvent(event commonEvent.BlockEvent) error {
switch event.GetType() {
case commonEvent.TypeDDLEvent:
err := w.mysqlWriter.FlushDDLEvent(event.(*commonEvent.DDLEvent))
if err != nil {
return errors.Trace(err)
}
case commonEvent.TypeSyncPointEvent:
err := w.mysqlWriter.FlushSyncPointEvent(event.(*commonEvent.SyncPointEvent))
if err != nil {
return errors.Trace(err)
}
default:
log.Error("unknown event type",
zap.String("namespace", w.changefeedID.Namespace()),
zap.String("changefeed", w.changefeedID.Name()),
zap.Any("event", event))
}
return nil
}

func (w *MysqlDDLWorker) RemoveDDLTsItem() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/mysql/mysql_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ func (w *MysqlWriter) execDDL(event *commonEvent.DDLEvent) error {
}

func (w *MysqlWriter) execDDLWithMaxRetries(event *commonEvent.DDLEvent) error {
return retry.Do(context.Background(), func() error {
return retry.Do(w.ctx, func() error {
err := w.statistics.RecordDDLExecution(func() error { return w.execDDL(event) })
if err != nil {
if apperror.IsIgnorableMySQLDDLError(err) {
Expand Down

0 comments on commit 23df85e

Please sign in to comment.