diff --git a/logservice/eventstore/helper.go b/logservice/eventstore/helper.go index 855bade10..7fe76caed 100644 --- a/logservice/eventstore/helper.go +++ b/logservice/eventstore/helper.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/logservice/logpuller" "github.com/pingcap/ticdc/utils/dynstream" + "go.uber.org/zap" ) const ( @@ -43,8 +44,9 @@ func (h *eventsHandler) Handle(subStat *subscriptionStat, events ...kvEvent) boo subStat.resolvedTs.Store(events[0].raw.CRTs) subStat.dispatchers.RLock() defer subStat.dispatchers.RUnlock() - for _, notifier := range subStat.dispatchers.notifiers { + for dispatcherID, notifier := range subStat.dispatchers.notifiers { notifier(events[0].raw.CRTs, subStat.maxEventCommitTs.Load()) + log.Info("event store notify resolved ts", zap.Uint64("ts", events[0].raw.CRTs), zap.Stringer("dispatcherID", dispatcherID)) } return false } diff --git a/logservice/logpuller/log_puller_multi_span.go b/logservice/logpuller/log_puller_multi_span.go index 57dee7071..f160fd4f7 100644 --- a/logservice/logpuller/log_puller_multi_span.go +++ b/logservice/logpuller/log_puller_multi_span.go @@ -116,6 +116,9 @@ func (p *LogPullerMultiSpan) tryUpdatePendingResolvedTs(subID SubscriptionID, ne log.Panic("unknown zubscriptionID, should not happen", zap.Uint64("subID", uint64(subID))) } + log.Info("schema store update pending resolved ts", + zap.Uint64("subID", uint64(subID)), + zap.Uint64("newResolvedTs", newResolvedTs)) if newResolvedTs < item.resolvedTs { log.Panic("resolved ts should not fallback", zap.Uint64("newResolvedTs", newResolvedTs), diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index ea914fb10..b7ce8606a 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -15,7 +15,6 @@ package logpuller import ( "context" - "math/rand" "sync" "sync/atomic" "time" @@ -147,7 +146,7 @@ type SubscriptionClientConfig struct { // TODO: add a metric for busy ratio? ChangeEventProcessorNum uint // The time interval to advance resolvedTs for a region - AdvanceResolvedTsIntervalInMs uint + AdvanceResolvedTsIntervalInMs int64 } type sharedClientMetrics struct { @@ -818,11 +817,13 @@ func (s *SubscriptionClient) newSubscribedSpan( rangeLock := regionlock.NewRangeLock(uint64(subID), span.StartKey, span.EndKey, startTs) rt := &subscribedSpan{ - subID: subID, - span: span, - startTs: startTs, - rangeLock: rangeLock, - advanceInterval: int64(s.config.AdvanceResolvedTsIntervalInMs)/4*3 + int64(rand.Intn(int(s.config.AdvanceResolvedTsIntervalInMs)/4)), + subID: subID, + span: span, + startTs: startTs, + rangeLock: rangeLock, + // advanceInterval: int64(s.config.AdvanceResolvedTsIntervalInMs)/4*3 + int64(rand.Intn(int(s.config.AdvanceResolvedTsIntervalInMs)/4)), + // advanceInterval: s.config.AdvanceResolvedTsIntervalInMs, + advanceInterval: 0, } rt.resolvedTs.Store(startTs) diff --git a/logservice/schemastore/ddl_job_fetcher.go b/logservice/schemastore/ddl_job_fetcher.go index 75a69e40b..91c4b74aa 100644 --- a/logservice/schemastore/ddl_job_fetcher.go +++ b/logservice/schemastore/ddl_job_fetcher.go @@ -59,7 +59,7 @@ func newDDLJobFetcher( clientConfig := &logpuller.SubscriptionClientConfig{ RegionRequestWorkerPerStore: 1, ChangeEventProcessorNum: 1, // must be 1, because ddlJobFetcher.input cannot be called concurrently - AdvanceResolvedTsIntervalInMs: 100, + AdvanceResolvedTsIntervalInMs: 0, } client := logpuller.NewSubscriptionClient( logpuller.ClientIDSchemaStore, diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 2263ba23f..764d960a0 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -2,6 +2,7 @@ package schemastore import ( "context" + "sync" "sync/atomic" "time" @@ -20,14 +21,16 @@ import ( "golang.org/x/sync/errgroup" ) +type ResolveTsUpdateNotifier func() + type SchemaStore interface { common.SubModule GetAllPhysicalTables(snapTs uint64, filter filter.Filter) ([]commonEvent.Table, error) - RegisterTable(tableID int64, startTs uint64) error + RegisterTable(dispatcherID common.DispatcherID, tableID int64, startTs uint64, notifier ResolveTsUpdateNotifier) error - UnregisterTable(tableID int64) error + UnregisterTable(dispatcherID common.DispatcherID, tableID int64) error // return table info with largest version <= ts GetTableInfo(tableID int64, ts uint64) (*common.TableInfo, error) @@ -67,6 +70,10 @@ type schemaStore struct { // max resolvedTs of all applied ddl events resolvedTs atomic.Uint64 + resolveTsUpdateNotifiers sync.Map + + notifyDispatcherCh chan interface{} + // the following two fields are used to filter out duplicate ddl events // they will just be updated and read by a single goroutine, so no lock is needed @@ -88,12 +95,13 @@ func New( upperBound := dataStorage.getUpperBound() s := &schemaStore{ - pdClock: pdClock, - unsortedCache: newDDLCache(), - dataStorage: dataStorage, - notifyCh: make(chan interface{}, 4), - finishedDDLTs: upperBound.FinishedDDLTs, - schemaVersion: upperBound.SchemaVersion, + pdClock: pdClock, + unsortedCache: newDDLCache(), + dataStorage: dataStorage, + notifyCh: make(chan interface{}, 4), + finishedDDLTs: upperBound.FinishedDDLTs, + schemaVersion: upperBound.SchemaVersion, + notifyDispatcherCh: make(chan interface{}, 4), } s.pendingResolvedTs.Store(upperBound.ResolvedTs) s.resolvedTs.Store(upperBound.ResolvedTs) @@ -123,6 +131,9 @@ func (s *schemaStore) Run(ctx context.Context) error { eg.Go(func() error { return s.updateResolvedTsPeriodically(ctx) }) + eg.Go(func() error { + return s.notifyDispatcherPeriodically(ctx) + }) eg.Go(func() error { return s.ddlJobFetcher.run(ctx) }) @@ -147,6 +158,8 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error { if pendingTs <= s.resolvedTs.Load() { return } + log.Info("schema store advance resolve ts", + zap.Uint64("resolvedTs", pendingTs)) resolvedEvents := s.unsortedCache.fetchSortedDDLEventBeforeTS(pendingTs) if len(resolvedEvents) != 0 { log.Info("schema store begin to apply resolved ddl events", @@ -187,6 +200,10 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error { // so we can only update resolved ts after all ddl jobs are written to disk // Can we optimize it to update resolved ts more eagerly? s.resolvedTs.Store(pendingTs) + select { + case s.notifyDispatcherCh <- struct{}{}: + default: + } s.dataStorage.updateUpperBound(UpperBoundMeta{ FinishedDDLTs: s.finishedDDLTs, SchemaVersion: s.schemaVersion, @@ -206,13 +223,34 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error { } } +func (s *schemaStore) notifyDispatcherPeriodically(ctx context.Context) error { + // ticker := time.NewTicker(50 * time.Millisecond) + for { + select { + case <-ctx.Done(): + return nil + case <-s.notifyDispatcherCh: + s.resolveTsUpdateNotifiers.Range(func(_, value interface{}) bool { + value.(ResolveTsUpdateNotifier)() + return true + }) + } + } +} + func (s *schemaStore) GetAllPhysicalTables(snapTs uint64, filter filter.Filter) ([]commonEvent.Table, error) { s.waitResolvedTs(0, snapTs, 10*time.Second) return s.dataStorage.getAllPhysicalTables(snapTs, filter) } -func (s *schemaStore) RegisterTable(tableID int64, startTs uint64) error { +func (s *schemaStore) RegisterTable( + dispatcherID common.DispatcherID, + tableID int64, + startTs uint64, + notifier ResolveTsUpdateNotifier, +) error { metrics.SchemaStoreResolvedRegisterTableGauge.Inc() + s.resolveTsUpdateNotifiers.Store(dispatcherID, notifier) s.waitResolvedTs(tableID, startTs, 5*time.Second) log.Info("register table", zap.Int64("tableID", tableID), @@ -221,8 +259,9 @@ func (s *schemaStore) RegisterTable(tableID int64, startTs uint64) error { return s.dataStorage.registerTable(tableID, startTs) } -func (s *schemaStore) UnregisterTable(tableID int64) error { +func (s *schemaStore) UnregisterTable(dispatcherID common.DispatcherID, tableID int64) error { metrics.SchemaStoreResolvedRegisterTableGauge.Dec() + s.resolveTsUpdateNotifiers.Delete(dispatcherID) return s.dataStorage.unregisterTable(tableID) } @@ -307,7 +346,7 @@ func (s *schemaStore) writeDDLEvent(ddlEvent DDLJobWithCommitTs) { } func (s *schemaStore) advanceResolvedTs(resolvedTs uint64) { - // log.Info("advance resolved ts", zap.Any("resolvedTS", resolvedTs)) + log.Info("schema store update pending resolved ts", zap.Uint64("resolvedTs", resolvedTs)) if resolvedTs < s.pendingResolvedTs.Load() { log.Panic("resolved ts should not fallback", zap.Uint64("pendingResolveTs", s.pendingResolvedTs.Load()), diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 9fd50e92d..01d3681c5 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -361,6 +361,7 @@ func (c *eventBroker) checkNeedScan(task scanTask, mustCheck bool) (bool, common // The dispatcher has no new events. In such case, we don't need to scan the event store. // We just send the watermark to the dispatcher. remoteID := node.ID(task.info.GetServerID()) + log.Info("send watermark", zap.Uint64("watermark", dataRange.EndTs), zap.Stringer("dispatcher", task.id)) c.sendWatermark(remoteID, task, dataRange.EndTs, task.metricEventServiceSendResolvedTsCount) task.watermark.Store(dataRange.EndTs) return false, dataRange @@ -672,6 +673,7 @@ func (c *eventBroker) updateMetrics(ctx context.Context) { case <-ticker.C: receivedMinResolvedTs := uint64(0) sentMinWaterMark := uint64(0) + var dispatcherID common.DispatcherID c.dispatchers.Range(func(key, value interface{}) bool { dispatcher := value.(*dispatcherStat) resolvedTs := dispatcher.resolvedTs.Load() @@ -681,6 +683,7 @@ func (c *eventBroker) updateMetrics(ctx context.Context) { watermark := dispatcher.watermark.Load() if sentMinWaterMark == 0 || watermark < sentMinWaterMark { sentMinWaterMark = watermark + dispatcherID = dispatcher.info.GetID() } return true }) @@ -693,6 +696,10 @@ func (c *eventBroker) updateMetrics(ctx context.Context) { c.metricEventServiceResolvedTsLag.Set(lag) lag = float64(oracle.GetPhysical(time.Now())-oracle.ExtractPhysical(sentMinWaterMark)) / 1e3 c.metricEventServiceSentResolvedTs.Set(lag) + log.Info("eventBroker update metrics", + zap.Stringer("disptcherID", dispatcherID), + zap.Float64("lag(s)", lag), + zap.Uint64("sentResolvedTs", sentMinWaterMark)) metricEventBrokerPendingScanTaskCount.Set(float64(len(c.taskQueue))) @@ -745,6 +752,15 @@ func (c *eventBroker) onNotify(d *dispatcherStat, resolvedTs uint64, latestCommi } } +func (c *eventBroker) onDDLResolveTsUpdate(d *dispatcherStat) { + log.Info("resolve ts update", zap.Stringer("dispatcher", d.id)) + needScan, _ := c.checkNeedScan(d, false) + if needScan { + d.scanning.Store(true) + c.taskQueue <- d + } +} + func (c *eventBroker) getDispatcher(id common.DispatcherID) (*dispatcherStat, bool) { stat, ok := c.dispatchers.Load(id) if !ok { @@ -797,7 +813,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) { return } - err = c.schemaStore.RegisterTable(span.GetTableID(), info.GetStartTs()) + err = c.schemaStore.RegisterTable(id, span.GetTableID(), info.GetStartTs(), func() { c.onDDLResolveTsUpdate(dispatcher) }) if err != nil { log.Panic("register table to schemaStore failed", zap.Error(err), zap.Int64("tableID", span.TableID), zap.Uint64("startTs", info.GetStartTs())) } @@ -824,7 +840,7 @@ func (c *eventBroker) removeDispatcher(dispatcherInfo DispatcherInfo) { return } c.eventStore.UnregisterDispatcher(id) - c.schemaStore.UnregisterTable(dispatcherInfo.GetTableSpan().TableID) + c.schemaStore.UnregisterTable(id, dispatcherInfo.GetTableSpan().TableID) c.dispatchers.Delete(id) log.Info("deregister acceptor", zap.Uint64("clusterID", c.tidbClusterID), zap.Any("acceptorID", id)) }