Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Test] notify ddl #651

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion logservice/eventstore/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/logservice/logpuller"
"github.com/pingcap/ticdc/utils/dynstream"
"go.uber.org/zap"
)

const (
Expand All @@ -43,8 +44,9 @@ func (h *eventsHandler) Handle(subStat *subscriptionStat, events ...kvEvent) boo
subStat.resolvedTs.Store(events[0].raw.CRTs)
subStat.dispatchers.RLock()
defer subStat.dispatchers.RUnlock()
for _, notifier := range subStat.dispatchers.notifiers {
for dispatcherID, notifier := range subStat.dispatchers.notifiers {
notifier(events[0].raw.CRTs, subStat.maxEventCommitTs.Load())
log.Info("event store notify resolved ts", zap.Uint64("ts", events[0].raw.CRTs), zap.Stringer("dispatcherID", dispatcherID))
}
return false
}
Expand Down
3 changes: 3 additions & 0 deletions logservice/logpuller/log_puller_multi_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ func (p *LogPullerMultiSpan) tryUpdatePendingResolvedTs(subID SubscriptionID, ne
log.Panic("unknown zubscriptionID, should not happen",
zap.Uint64("subID", uint64(subID)))
}
log.Info("schema store update pending resolved ts",
zap.Uint64("subID", uint64(subID)),
zap.Uint64("newResolvedTs", newResolvedTs))
if newResolvedTs < item.resolvedTs {
log.Panic("resolved ts should not fallback",
zap.Uint64("newResolvedTs", newResolvedTs),
Expand Down
15 changes: 8 additions & 7 deletions logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package logpuller

import (
"context"
"math/rand"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -147,7 +146,7 @@ type SubscriptionClientConfig struct {
// TODO: add a metric for busy ratio?
ChangeEventProcessorNum uint
// The time interval to advance resolvedTs for a region
AdvanceResolvedTsIntervalInMs uint
AdvanceResolvedTsIntervalInMs int64
}

type sharedClientMetrics struct {
Expand Down Expand Up @@ -818,11 +817,13 @@ func (s *SubscriptionClient) newSubscribedSpan(
rangeLock := regionlock.NewRangeLock(uint64(subID), span.StartKey, span.EndKey, startTs)

rt := &subscribedSpan{
subID: subID,
span: span,
startTs: startTs,
rangeLock: rangeLock,
advanceInterval: int64(s.config.AdvanceResolvedTsIntervalInMs)/4*3 + int64(rand.Intn(int(s.config.AdvanceResolvedTsIntervalInMs)/4)),
subID: subID,
span: span,
startTs: startTs,
rangeLock: rangeLock,
// advanceInterval: int64(s.config.AdvanceResolvedTsIntervalInMs)/4*3 + int64(rand.Intn(int(s.config.AdvanceResolvedTsIntervalInMs)/4)),
// advanceInterval: s.config.AdvanceResolvedTsIntervalInMs,
advanceInterval: 0,
}
rt.resolvedTs.Store(startTs)

Expand Down
2 changes: 1 addition & 1 deletion logservice/schemastore/ddl_job_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newDDLJobFetcher(
clientConfig := &logpuller.SubscriptionClientConfig{
RegionRequestWorkerPerStore: 1,
ChangeEventProcessorNum: 1, // must be 1, because ddlJobFetcher.input cannot be called concurrently
AdvanceResolvedTsIntervalInMs: 100,
AdvanceResolvedTsIntervalInMs: 0,
}
client := logpuller.NewSubscriptionClient(
logpuller.ClientIDSchemaStore,
Expand Down
61 changes: 50 additions & 11 deletions logservice/schemastore/schema_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package schemastore

import (
"context"
"sync"
"sync/atomic"
"time"

Expand All @@ -20,14 +21,16 @@ import (
"golang.org/x/sync/errgroup"
)

type ResolveTsUpdateNotifier func()

type SchemaStore interface {
common.SubModule

GetAllPhysicalTables(snapTs uint64, filter filter.Filter) ([]commonEvent.Table, error)

RegisterTable(tableID int64, startTs uint64) error
RegisterTable(dispatcherID common.DispatcherID, tableID int64, startTs uint64, notifier ResolveTsUpdateNotifier) error

UnregisterTable(tableID int64) error
UnregisterTable(dispatcherID common.DispatcherID, tableID int64) error

// return table info with largest version <= ts
GetTableInfo(tableID int64, ts uint64) (*common.TableInfo, error)
Expand Down Expand Up @@ -67,6 +70,10 @@ type schemaStore struct {
// max resolvedTs of all applied ddl events
resolvedTs atomic.Uint64

resolveTsUpdateNotifiers sync.Map

notifyDispatcherCh chan interface{}

// the following two fields are used to filter out duplicate ddl events
// they will just be updated and read by a single goroutine, so no lock is needed

Expand All @@ -88,12 +95,13 @@ func New(
upperBound := dataStorage.getUpperBound()

s := &schemaStore{
pdClock: pdClock,
unsortedCache: newDDLCache(),
dataStorage: dataStorage,
notifyCh: make(chan interface{}, 4),
finishedDDLTs: upperBound.FinishedDDLTs,
schemaVersion: upperBound.SchemaVersion,
pdClock: pdClock,
unsortedCache: newDDLCache(),
dataStorage: dataStorage,
notifyCh: make(chan interface{}, 4),
finishedDDLTs: upperBound.FinishedDDLTs,
schemaVersion: upperBound.SchemaVersion,
notifyDispatcherCh: make(chan interface{}, 4),
}
s.pendingResolvedTs.Store(upperBound.ResolvedTs)
s.resolvedTs.Store(upperBound.ResolvedTs)
Expand Down Expand Up @@ -123,6 +131,9 @@ func (s *schemaStore) Run(ctx context.Context) error {
eg.Go(func() error {
return s.updateResolvedTsPeriodically(ctx)
})
eg.Go(func() error {
return s.notifyDispatcherPeriodically(ctx)
})
eg.Go(func() error {
return s.ddlJobFetcher.run(ctx)
})
Expand All @@ -147,6 +158,8 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error {
if pendingTs <= s.resolvedTs.Load() {
return
}
log.Info("schema store advance resolve ts",
zap.Uint64("resolvedTs", pendingTs))
resolvedEvents := s.unsortedCache.fetchSortedDDLEventBeforeTS(pendingTs)
if len(resolvedEvents) != 0 {
log.Info("schema store begin to apply resolved ddl events",
Expand Down Expand Up @@ -187,6 +200,10 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error {
// so we can only update resolved ts after all ddl jobs are written to disk
// Can we optimize it to update resolved ts more eagerly?
s.resolvedTs.Store(pendingTs)
select {
case s.notifyDispatcherCh <- struct{}{}:
default:
}
s.dataStorage.updateUpperBound(UpperBoundMeta{
FinishedDDLTs: s.finishedDDLTs,
SchemaVersion: s.schemaVersion,
Expand All @@ -206,13 +223,34 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error {
}
}

func (s *schemaStore) notifyDispatcherPeriodically(ctx context.Context) error {
// ticker := time.NewTicker(50 * time.Millisecond)
for {
select {
case <-ctx.Done():
return nil
case <-s.notifyDispatcherCh:
s.resolveTsUpdateNotifiers.Range(func(_, value interface{}) bool {
value.(ResolveTsUpdateNotifier)()
return true
})
}
}
}

func (s *schemaStore) GetAllPhysicalTables(snapTs uint64, filter filter.Filter) ([]commonEvent.Table, error) {
s.waitResolvedTs(0, snapTs, 10*time.Second)
return s.dataStorage.getAllPhysicalTables(snapTs, filter)
}

func (s *schemaStore) RegisterTable(tableID int64, startTs uint64) error {
func (s *schemaStore) RegisterTable(
dispatcherID common.DispatcherID,
tableID int64,
startTs uint64,
notifier ResolveTsUpdateNotifier,
) error {
metrics.SchemaStoreResolvedRegisterTableGauge.Inc()
s.resolveTsUpdateNotifiers.Store(dispatcherID, notifier)
s.waitResolvedTs(tableID, startTs, 5*time.Second)
log.Info("register table",
zap.Int64("tableID", tableID),
Expand All @@ -221,8 +259,9 @@ func (s *schemaStore) RegisterTable(tableID int64, startTs uint64) error {
return s.dataStorage.registerTable(tableID, startTs)
}

func (s *schemaStore) UnregisterTable(tableID int64) error {
func (s *schemaStore) UnregisterTable(dispatcherID common.DispatcherID, tableID int64) error {
metrics.SchemaStoreResolvedRegisterTableGauge.Dec()
s.resolveTsUpdateNotifiers.Delete(dispatcherID)
return s.dataStorage.unregisterTable(tableID)
}

Expand Down Expand Up @@ -307,7 +346,7 @@ func (s *schemaStore) writeDDLEvent(ddlEvent DDLJobWithCommitTs) {
}

func (s *schemaStore) advanceResolvedTs(resolvedTs uint64) {
// log.Info("advance resolved ts", zap.Any("resolvedTS", resolvedTs))
log.Info("schema store update pending resolved ts", zap.Uint64("resolvedTs", resolvedTs))
if resolvedTs < s.pendingResolvedTs.Load() {
log.Panic("resolved ts should not fallback",
zap.Uint64("pendingResolveTs", s.pendingResolvedTs.Load()),
Expand Down
20 changes: 18 additions & 2 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ func (c *eventBroker) checkNeedScan(task scanTask, mustCheck bool) (bool, common
// The dispatcher has no new events. In such case, we don't need to scan the event store.
// We just send the watermark to the dispatcher.
remoteID := node.ID(task.info.GetServerID())
log.Info("send watermark", zap.Uint64("watermark", dataRange.EndTs), zap.Stringer("dispatcher", task.id))
c.sendWatermark(remoteID, task, dataRange.EndTs, task.metricEventServiceSendResolvedTsCount)
task.watermark.Store(dataRange.EndTs)
return false, dataRange
Expand Down Expand Up @@ -672,6 +673,7 @@ func (c *eventBroker) updateMetrics(ctx context.Context) {
case <-ticker.C:
receivedMinResolvedTs := uint64(0)
sentMinWaterMark := uint64(0)
var dispatcherID common.DispatcherID
c.dispatchers.Range(func(key, value interface{}) bool {
dispatcher := value.(*dispatcherStat)
resolvedTs := dispatcher.resolvedTs.Load()
Expand All @@ -681,6 +683,7 @@ func (c *eventBroker) updateMetrics(ctx context.Context) {
watermark := dispatcher.watermark.Load()
if sentMinWaterMark == 0 || watermark < sentMinWaterMark {
sentMinWaterMark = watermark
dispatcherID = dispatcher.info.GetID()
}
return true
})
Expand All @@ -693,6 +696,10 @@ func (c *eventBroker) updateMetrics(ctx context.Context) {
c.metricEventServiceResolvedTsLag.Set(lag)
lag = float64(oracle.GetPhysical(time.Now())-oracle.ExtractPhysical(sentMinWaterMark)) / 1e3
c.metricEventServiceSentResolvedTs.Set(lag)
log.Info("eventBroker update metrics",
zap.Stringer("disptcherID", dispatcherID),
zap.Float64("lag(s)", lag),
zap.Uint64("sentResolvedTs", sentMinWaterMark))

metricEventBrokerPendingScanTaskCount.Set(float64(len(c.taskQueue)))

Expand Down Expand Up @@ -745,6 +752,15 @@ func (c *eventBroker) onNotify(d *dispatcherStat, resolvedTs uint64, latestCommi
}
}

func (c *eventBroker) onDDLResolveTsUpdate(d *dispatcherStat) {
log.Info("resolve ts update", zap.Stringer("dispatcher", d.id))
needScan, _ := c.checkNeedScan(d, false)
if needScan {
d.scanning.Store(true)
c.taskQueue <- d
}
}

func (c *eventBroker) getDispatcher(id common.DispatcherID) (*dispatcherStat, bool) {
stat, ok := c.dispatchers.Load(id)
if !ok {
Expand Down Expand Up @@ -797,7 +813,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) {
return
}

err = c.schemaStore.RegisterTable(span.GetTableID(), info.GetStartTs())
err = c.schemaStore.RegisterTable(id, span.GetTableID(), info.GetStartTs(), func() { c.onDDLResolveTsUpdate(dispatcher) })
if err != nil {
log.Panic("register table to schemaStore failed", zap.Error(err), zap.Int64("tableID", span.TableID), zap.Uint64("startTs", info.GetStartTs()))
}
Expand All @@ -824,7 +840,7 @@ func (c *eventBroker) removeDispatcher(dispatcherInfo DispatcherInfo) {
return
}
c.eventStore.UnregisterDispatcher(id)
c.schemaStore.UnregisterTable(dispatcherInfo.GetTableSpan().TableID)
c.schemaStore.UnregisterTable(id, dispatcherInfo.GetTableSpan().TableID)
c.dispatchers.Delete(id)
log.Info("deregister acceptor", zap.Uint64("clusterID", c.tidbClusterID), zap.Any("acceptorID", id))
}
Expand Down