From a4edd19e524cb04e1c0fe9523181f0d75bfa0570 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 3 Dec 2024 17:14:28 +0800 Subject: [PATCH 1/8] schemastore: notify dispatcher when ddl resolve ts update --- logservice/schemastore/schema_store.go | 24 ++++++++++++++++++++---- pkg/eventservice/event_broker.go | 12 ++++++++++-- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 2263ba23f..6ddb3ed45 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -2,6 +2,7 @@ package schemastore import ( "context" + "sync" "sync/atomic" "time" @@ -20,14 +21,16 @@ import ( "golang.org/x/sync/errgroup" ) +type ResolveTsUpdateNotifier func() + type SchemaStore interface { common.SubModule GetAllPhysicalTables(snapTs uint64, filter filter.Filter) ([]commonEvent.Table, error) - RegisterTable(tableID int64, startTs uint64) error + RegisterTable(dispatcherID common.DispatcherID, tableID int64, startTs uint64, notifier ResolveTsUpdateNotifier) error - UnregisterTable(tableID int64) error + UnregisterTable(dispatcherID common.DispatcherID, tableID int64) error // return table info with largest version <= ts GetTableInfo(tableID int64, ts uint64) (*common.TableInfo, error) @@ -67,6 +70,8 @@ type schemaStore struct { // max resolvedTs of all applied ddl events resolvedTs atomic.Uint64 + resolveTsUpdateNotifiers sync.Map + // the following two fields are used to filter out duplicate ddl events // they will just be updated and read by a single goroutine, so no lock is needed @@ -187,6 +192,10 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error { // so we can only update resolved ts after all ddl jobs are written to disk // Can we optimize it to update resolved ts more eagerly? s.resolvedTs.Store(pendingTs) + s.resolveTsUpdateNotifiers.Range(func(_, value interface{}) bool { + value.(ResolveTsUpdateNotifier)() + return true + }) s.dataStorage.updateUpperBound(UpperBoundMeta{ FinishedDDLTs: s.finishedDDLTs, SchemaVersion: s.schemaVersion, @@ -211,8 +220,14 @@ func (s *schemaStore) GetAllPhysicalTables(snapTs uint64, filter filter.Filter) return s.dataStorage.getAllPhysicalTables(snapTs, filter) } -func (s *schemaStore) RegisterTable(tableID int64, startTs uint64) error { +func (s *schemaStore) RegisterTable( + dispatcherID common.DispatcherID, + tableID int64, + startTs uint64, + notifier ResolveTsUpdateNotifier, +) error { metrics.SchemaStoreResolvedRegisterTableGauge.Inc() + s.resolveTsUpdateNotifiers.Store(dispatcherID, notifier) s.waitResolvedTs(tableID, startTs, 5*time.Second) log.Info("register table", zap.Int64("tableID", tableID), @@ -221,8 +236,9 @@ func (s *schemaStore) RegisterTable(tableID int64, startTs uint64) error { return s.dataStorage.registerTable(tableID, startTs) } -func (s *schemaStore) UnregisterTable(tableID int64) error { +func (s *schemaStore) UnregisterTable(dispatcherID common.DispatcherID, tableID int64) error { metrics.SchemaStoreResolvedRegisterTableGauge.Dec() + s.resolveTsUpdateNotifiers.Delete(dispatcherID) return s.dataStorage.unregisterTable(tableID) } diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 9fd50e92d..ce7ebf48c 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -745,6 +745,14 @@ func (c *eventBroker) onNotify(d *dispatcherStat, resolvedTs uint64, latestCommi } } +func (c *eventBroker) onDDLResolveTsUpdate(d *dispatcherStat) { + needScan, _ := c.checkNeedScan(d, false) + if needScan { + d.scanning.Store(true) + c.taskQueue <- d + } +} + func (c *eventBroker) getDispatcher(id common.DispatcherID) (*dispatcherStat, bool) { stat, ok := c.dispatchers.Load(id) if !ok { @@ -797,7 +805,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) { return } - err = c.schemaStore.RegisterTable(span.GetTableID(), info.GetStartTs()) + err = c.schemaStore.RegisterTable(id, span.GetTableID(), info.GetStartTs(), func() { c.onDDLResolveTsUpdate(dispatcher) }) if err != nil { log.Panic("register table to schemaStore failed", zap.Error(err), zap.Int64("tableID", span.TableID), zap.Uint64("startTs", info.GetStartTs())) } @@ -824,7 +832,7 @@ func (c *eventBroker) removeDispatcher(dispatcherInfo DispatcherInfo) { return } c.eventStore.UnregisterDispatcher(id) - c.schemaStore.UnregisterTable(dispatcherInfo.GetTableSpan().TableID) + c.schemaStore.UnregisterTable(id, dispatcherInfo.GetTableSpan().TableID) c.dispatchers.Delete(id) log.Info("deregister acceptor", zap.Uint64("clusterID", c.tidbClusterID), zap.Any("acceptorID", id)) } From 19febc2dd285d6c23d0abc052ffcaf6498fde785 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 3 Dec 2024 17:47:05 +0800 Subject: [PATCH 2/8] add log --- pkg/eventservice/event_broker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index ce7ebf48c..b482cb5e7 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -746,6 +746,7 @@ func (c *eventBroker) onNotify(d *dispatcherStat, resolvedTs uint64, latestCommi } func (c *eventBroker) onDDLResolveTsUpdate(d *dispatcherStat) { + log.Info("resolve ts update") needScan, _ := c.checkNeedScan(d, false) if needScan { d.scanning.Store(true) From 815e194444e2a762410bb12de5fa2cc1542e0132 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 4 Dec 2024 11:55:36 +0800 Subject: [PATCH 3/8] use a seperate goroutine to notify --- logservice/schemastore/schema_store.go | 41 +++++++++++++++++++------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 6ddb3ed45..905273d96 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -72,6 +72,8 @@ type schemaStore struct { resolveTsUpdateNotifiers sync.Map + notifyDispatcherCh chan interface{} + // the following two fields are used to filter out duplicate ddl events // they will just be updated and read by a single goroutine, so no lock is needed @@ -93,12 +95,13 @@ func New( upperBound := dataStorage.getUpperBound() s := &schemaStore{ - pdClock: pdClock, - unsortedCache: newDDLCache(), - dataStorage: dataStorage, - notifyCh: make(chan interface{}, 4), - finishedDDLTs: upperBound.FinishedDDLTs, - schemaVersion: upperBound.SchemaVersion, + pdClock: pdClock, + unsortedCache: newDDLCache(), + dataStorage: dataStorage, + notifyCh: make(chan interface{}, 4), + finishedDDLTs: upperBound.FinishedDDLTs, + schemaVersion: upperBound.SchemaVersion, + notifyDispatcherCh: make(chan interface{}, 4), } s.pendingResolvedTs.Store(upperBound.ResolvedTs) s.resolvedTs.Store(upperBound.ResolvedTs) @@ -128,6 +131,9 @@ func (s *schemaStore) Run(ctx context.Context) error { eg.Go(func() error { return s.updateResolvedTsPeriodically(ctx) }) + eg.Go(func() error { + return s.notifyDispatcherPeriodically(ctx) + }) eg.Go(func() error { return s.ddlJobFetcher.run(ctx) }) @@ -192,10 +198,10 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error { // so we can only update resolved ts after all ddl jobs are written to disk // Can we optimize it to update resolved ts more eagerly? s.resolvedTs.Store(pendingTs) - s.resolveTsUpdateNotifiers.Range(func(_, value interface{}) bool { - value.(ResolveTsUpdateNotifier)() - return true - }) + select { + case s.notifyDispatcherCh <- struct{}{}: + default: + } s.dataStorage.updateUpperBound(UpperBoundMeta{ FinishedDDLTs: s.finishedDDLTs, SchemaVersion: s.schemaVersion, @@ -215,6 +221,21 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error { } } +func (s *schemaStore) notifyDispatcherPeriodically(ctx context.Context) error { + // ticker := time.NewTicker(50 * time.Millisecond) + for { + select { + case <-ctx.Done(): + return nil + case <-s.notifyDispatcherCh: + s.resolveTsUpdateNotifiers.Range(func(_, value interface{}) bool { + value.(ResolveTsUpdateNotifier)() + return true + }) + } + } +} + func (s *schemaStore) GetAllPhysicalTables(snapTs uint64, filter filter.Filter) ([]commonEvent.Table, error) { s.waitResolvedTs(0, snapTs, 10*time.Second) return s.dataStorage.getAllPhysicalTables(snapTs, filter) From 3347bb22795bf08a27b81cbc6531420700870ed5 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Dec 2024 13:47:10 +0800 Subject: [PATCH 4/8] change advance interval --- logservice/logpuller/subscription_client.go | 14 +++++++------- logservice/schemastore/ddl_job_fetcher.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index ea914fb10..361547dd5 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -15,7 +15,6 @@ package logpuller import ( "context" - "math/rand" "sync" "sync/atomic" "time" @@ -147,7 +146,7 @@ type SubscriptionClientConfig struct { // TODO: add a metric for busy ratio? ChangeEventProcessorNum uint // The time interval to advance resolvedTs for a region - AdvanceResolvedTsIntervalInMs uint + AdvanceResolvedTsIntervalInMs int64 } type sharedClientMetrics struct { @@ -818,11 +817,12 @@ func (s *SubscriptionClient) newSubscribedSpan( rangeLock := regionlock.NewRangeLock(uint64(subID), span.StartKey, span.EndKey, startTs) rt := &subscribedSpan{ - subID: subID, - span: span, - startTs: startTs, - rangeLock: rangeLock, - advanceInterval: int64(s.config.AdvanceResolvedTsIntervalInMs)/4*3 + int64(rand.Intn(int(s.config.AdvanceResolvedTsIntervalInMs)/4)), + subID: subID, + span: span, + startTs: startTs, + rangeLock: rangeLock, + // advanceInterval: int64(s.config.AdvanceResolvedTsIntervalInMs)/4*3 + int64(rand.Intn(int(s.config.AdvanceResolvedTsIntervalInMs)/4)), + advanceInterval: s.config.AdvanceResolvedTsIntervalInMs, } rt.resolvedTs.Store(startTs) diff --git a/logservice/schemastore/ddl_job_fetcher.go b/logservice/schemastore/ddl_job_fetcher.go index 75a69e40b..91c4b74aa 100644 --- a/logservice/schemastore/ddl_job_fetcher.go +++ b/logservice/schemastore/ddl_job_fetcher.go @@ -59,7 +59,7 @@ func newDDLJobFetcher( clientConfig := &logpuller.SubscriptionClientConfig{ RegionRequestWorkerPerStore: 1, ChangeEventProcessorNum: 1, // must be 1, because ddlJobFetcher.input cannot be called concurrently - AdvanceResolvedTsIntervalInMs: 100, + AdvanceResolvedTsIntervalInMs: 0, } client := logpuller.NewSubscriptionClient( logpuller.ClientIDSchemaStore, From 96c3ecfb4d5fe1186e8689db2fc2b7b9f017fb43 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Dec 2024 14:10:37 +0800 Subject: [PATCH 5/8] hack interval --- logservice/logpuller/subscription_client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 361547dd5..b7ce8606a 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -822,7 +822,8 @@ func (s *SubscriptionClient) newSubscribedSpan( startTs: startTs, rangeLock: rangeLock, // advanceInterval: int64(s.config.AdvanceResolvedTsIntervalInMs)/4*3 + int64(rand.Intn(int(s.config.AdvanceResolvedTsIntervalInMs)/4)), - advanceInterval: s.config.AdvanceResolvedTsIntervalInMs, + // advanceInterval: s.config.AdvanceResolvedTsIntervalInMs, + advanceInterval: 0, } rt.resolvedTs.Store(startTs) From d3359b3c8d0a9f752b8080ea87598eb5b2768baa Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Dec 2024 15:01:41 +0800 Subject: [PATCH 6/8] add more log --- pkg/eventservice/event_broker.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index b482cb5e7..01d3681c5 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -361,6 +361,7 @@ func (c *eventBroker) checkNeedScan(task scanTask, mustCheck bool) (bool, common // The dispatcher has no new events. In such case, we don't need to scan the event store. // We just send the watermark to the dispatcher. remoteID := node.ID(task.info.GetServerID()) + log.Info("send watermark", zap.Uint64("watermark", dataRange.EndTs), zap.Stringer("dispatcher", task.id)) c.sendWatermark(remoteID, task, dataRange.EndTs, task.metricEventServiceSendResolvedTsCount) task.watermark.Store(dataRange.EndTs) return false, dataRange @@ -672,6 +673,7 @@ func (c *eventBroker) updateMetrics(ctx context.Context) { case <-ticker.C: receivedMinResolvedTs := uint64(0) sentMinWaterMark := uint64(0) + var dispatcherID common.DispatcherID c.dispatchers.Range(func(key, value interface{}) bool { dispatcher := value.(*dispatcherStat) resolvedTs := dispatcher.resolvedTs.Load() @@ -681,6 +683,7 @@ func (c *eventBroker) updateMetrics(ctx context.Context) { watermark := dispatcher.watermark.Load() if sentMinWaterMark == 0 || watermark < sentMinWaterMark { sentMinWaterMark = watermark + dispatcherID = dispatcher.info.GetID() } return true }) @@ -693,6 +696,10 @@ func (c *eventBroker) updateMetrics(ctx context.Context) { c.metricEventServiceResolvedTsLag.Set(lag) lag = float64(oracle.GetPhysical(time.Now())-oracle.ExtractPhysical(sentMinWaterMark)) / 1e3 c.metricEventServiceSentResolvedTs.Set(lag) + log.Info("eventBroker update metrics", + zap.Stringer("disptcherID", dispatcherID), + zap.Float64("lag(s)", lag), + zap.Uint64("sentResolvedTs", sentMinWaterMark)) metricEventBrokerPendingScanTaskCount.Set(float64(len(c.taskQueue))) @@ -746,7 +753,7 @@ func (c *eventBroker) onNotify(d *dispatcherStat, resolvedTs uint64, latestCommi } func (c *eventBroker) onDDLResolveTsUpdate(d *dispatcherStat) { - log.Info("resolve ts update") + log.Info("resolve ts update", zap.Stringer("dispatcher", d.id)) needScan, _ := c.checkNeedScan(d, false) if needScan { d.scanning.Store(true) From 866be7a199a79b55ae3cebe522c091c030c33692 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Dec 2024 15:56:23 +0800 Subject: [PATCH 7/8] add more log --- logservice/eventstore/helper.go | 4 +++- logservice/schemastore/schema_store.go | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/logservice/eventstore/helper.go b/logservice/eventstore/helper.go index 855bade10..7fe76caed 100644 --- a/logservice/eventstore/helper.go +++ b/logservice/eventstore/helper.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/logservice/logpuller" "github.com/pingcap/ticdc/utils/dynstream" + "go.uber.org/zap" ) const ( @@ -43,8 +44,9 @@ func (h *eventsHandler) Handle(subStat *subscriptionStat, events ...kvEvent) boo subStat.resolvedTs.Store(events[0].raw.CRTs) subStat.dispatchers.RLock() defer subStat.dispatchers.RUnlock() - for _, notifier := range subStat.dispatchers.notifiers { + for dispatcherID, notifier := range subStat.dispatchers.notifiers { notifier(events[0].raw.CRTs, subStat.maxEventCommitTs.Load()) + log.Info("event store notify resolved ts", zap.Uint64("ts", events[0].raw.CRTs), zap.Stringer("dispatcherID", dispatcherID)) } return false } diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 905273d96..92bb7d9ef 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -158,6 +158,8 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error { if pendingTs <= s.resolvedTs.Load() { return } + log.Info("schema store advance resolve ts", + zap.Uint64("resolvedTs", pendingTs)) resolvedEvents := s.unsortedCache.fetchSortedDDLEventBeforeTS(pendingTs) if len(resolvedEvents) != 0 { log.Info("schema store begin to apply resolved ddl events", From f987936270b78b20bcbbb274ac880bacd6140f0a Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 9 Dec 2024 17:25:02 +0800 Subject: [PATCH 8/8] add some log --- logservice/logpuller/log_puller_multi_span.go | 3 +++ logservice/schemastore/schema_store.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/logservice/logpuller/log_puller_multi_span.go b/logservice/logpuller/log_puller_multi_span.go index 57dee7071..f160fd4f7 100644 --- a/logservice/logpuller/log_puller_multi_span.go +++ b/logservice/logpuller/log_puller_multi_span.go @@ -116,6 +116,9 @@ func (p *LogPullerMultiSpan) tryUpdatePendingResolvedTs(subID SubscriptionID, ne log.Panic("unknown zubscriptionID, should not happen", zap.Uint64("subID", uint64(subID))) } + log.Info("schema store update pending resolved ts", + zap.Uint64("subID", uint64(subID)), + zap.Uint64("newResolvedTs", newResolvedTs)) if newResolvedTs < item.resolvedTs { log.Panic("resolved ts should not fallback", zap.Uint64("newResolvedTs", newResolvedTs), diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 92bb7d9ef..764d960a0 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -346,7 +346,7 @@ func (s *schemaStore) writeDDLEvent(ddlEvent DDLJobWithCommitTs) { } func (s *schemaStore) advanceResolvedTs(resolvedTs uint64) { - // log.Info("advance resolved ts", zap.Any("resolvedTS", resolvedTs)) + log.Info("schema store update pending resolved ts", zap.Uint64("resolvedTs", resolvedTs)) if resolvedTs < s.pendingResolvedTs.Load() { log.Panic("resolved ts should not fallback", zap.Uint64("pendingResolveTs", s.pendingResolvedTs.Load()),