Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat concurrent chunk data #1398

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant
### binlogsyncer-max-reconnect-attempts
`--binlogsyncer-max-reconnect-attempts=0`, the maximum number of attempts to re-establish a broken inspector connection for sync binlog. `0` or `negative number` means infinite retry, default `0`

### chunk-concurrent-size
`--chunk-concurrent-size=1`, The number of goroutines to execute chunks concurrently in each copy time slot, default `1`, allowed range `0`-`100`.

### conf

`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
golang.org/x/net v0.17.0
golang.org/x/term v0.13.0
golang.org/x/text v0.13.0
golang.org/x/sync v0.1.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
99 changes: 60 additions & 39 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleRea
}
}

type IterationRangeValues struct {
Min *sql.ColumnValues
Max *sql.ColumnValues
Size int64
IsIncludeMinValues bool
HasFurtherRange bool
}

// MigrationContext has the general, global state of migration. It is used by
// all components throughout the migration process.
type MigrationContext struct {
Expand Down Expand Up @@ -119,6 +127,7 @@ type MigrationContext struct {
HeartbeatIntervalMilliseconds int64
defaultNumRetries int64
ChunkSize int64
ChunkConcurrentSize int64
niceRatio float64
MaxLagMillisecondsThrottleThreshold int64
throttleControlReplicaKeys *mysql.InstanceKeyMap
Expand Down Expand Up @@ -210,25 +219,26 @@ type MigrationContext struct {
InCutOverCriticalSectionFlag int64
PanicAbort chan error

OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
OriginalTableVirtualColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
OriginalTableAutoIncrement uint64
GhostTableColumns *sql.ColumnList
GhostTableVirtualColumns *sql.ColumnList
GhostTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey
SharedColumns *sql.ColumnList
ColumnRenameMap map[string]string
DroppedColumnsMap map[string]bool
MappedSharedColumns *sql.ColumnList
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
ForceTmpTableName string
OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
OriginalTableVirtualColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
OriginalTableAutoIncrement uint64
GhostTableColumns *sql.ColumnList
GhostTableVirtualColumns *sql.ColumnList
GhostTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey
SharedColumns *sql.ColumnList
ColumnRenameMap map[string]string
DroppedColumnsMap map[string]bool
MappedSharedColumns *sql.ColumnList
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
CalculateNextIterationRangeEndValuesLock *sync.Mutex
ForceTmpTableName string

recentBinlogCoordinates mysql.BinlogCoordinates

Expand Down Expand Up @@ -269,26 +279,27 @@ type ContextConfig struct {

func NewMigrationContext() *MigrationContext {
return &MigrationContext{
Uuid: uuid.NewString(),
defaultNumRetries: 60,
ChunkSize: 1000,
InspectorConnectionConfig: mysql.NewConnectionConfig(),
ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1500,
CutOverLockTimeoutSeconds: 3,
DMLBatchSize: 10,
etaNanoseonds: ETAUnknown,
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{},
throttleHTTPMutex: &sync.Mutex{},
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
Log: NewDefaultLogger(),
Uuid: uuid.NewString(),
defaultNumRetries: 60,
ChunkSize: 1000,
InspectorConnectionConfig: mysql.NewConnectionConfig(),
ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1500,
CutOverLockTimeoutSeconds: 3,
DMLBatchSize: 10,
etaNanoseonds: ETAUnknown,
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{},
throttleHTTPMutex: &sync.Mutex{},
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
CalculateNextIterationRangeEndValuesLock: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
Log: NewDefaultLogger(),
}
}

Expand Down Expand Up @@ -616,6 +627,16 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) {
atomic.StoreInt64(&this.ChunkSize, chunkSize)
}

func (this *MigrationContext) SetChunkConcurrentSize(chunkConcurrentSize int64) {
if chunkConcurrentSize < 1 {
chunkConcurrentSize = 1
}
if chunkConcurrentSize > 100 {
chunkConcurrentSize = 100
}
atomic.StoreInt64(&this.ChunkConcurrentSize, chunkConcurrentSize)
}

func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {
if batchSize < 1 {
batchSize = 1
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func main() {
flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').")
exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
chunkConcurrentSize := flag.Int64("chunk-concurrent-size", 1, "The number of goroutines to execute chunks concurrently in each copy time slot, default 1 (allowed range: 1-100)")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
Expand Down Expand Up @@ -294,6 +295,7 @@ func main() {
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
migrationContext.SetNiceRatio(*niceRatio)
migrationContext.SetChunkSize(*chunkSize)
migrationContext.SetChunkConcurrentSize(*chunkConcurrentSize)
migrationContext.SetDMLBatchSize(*dmlBatchSize)
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
migrationContext.SetThrottleQuery(*throttleQuery)
Expand Down
57 changes: 35 additions & 22 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (this *Applier) InitDBConnections() (err error) {
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
return err
}
concurrentSize := atomic.LoadInt64(&this.migrationContext.ChunkConcurrentSize)
if concurrentSize > mysql.MaxDBPoolConnections {
this.db.SetMaxOpenConns(int(concurrentSize))
}
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
return err
Expand Down Expand Up @@ -561,11 +565,18 @@ func (this *Applier) ReadMigrationRangeValues() error {
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
// no further chunk to work through, i.e. we're past the last chunk and are done with
// iterating the range (and this done with copying row chunks)
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
if this.migrationContext.MigrationIterationRangeMinValues == nil {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
func (this *Applier) CalculateNextIterationRangeEndValues() (values *base.IterationRangeValues, err error) {
this.migrationContext.CalculateNextIterationRangeEndValuesLock.Lock()
defer this.migrationContext.CalculateNextIterationRangeEndValuesLock.Unlock()

iterationRangeValues := &base.IterationRangeValues{}

iterationRangeValues.Min = this.migrationContext.MigrationIterationRangeMaxValues
if iterationRangeValues.Min == nil {
iterationRangeValues.Min = this.migrationContext.MigrationRangeMinValues
iterationRangeValues.IsIncludeMinValues = true
}

for i := 0; i < 2; i++ {
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
if i == 1 {
Expand All @@ -575,46 +586,48 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
iterationRangeValues.Min.AbstractValues(),
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
atomic.LoadInt64(&this.migrationContext.ChunkSize),
this.migrationContext.GetIteration() == 0,
iterationRangeValues.IsIncludeMinValues,
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
)
if err != nil {
return hasFurtherRange, err
return iterationRangeValues, err
}

rows, err := this.db.Query(query, explodedArgs...)
if err != nil {
return hasFurtherRange, err
return iterationRangeValues, err
}
defer rows.Close()

iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
return hasFurtherRange, err
return iterationRangeValues, err
}
hasFurtherRange = true
iterationRangeValues.HasFurtherRange = true
}
if err = rows.Err(); err != nil {
return hasFurtherRange, err
return iterationRangeValues, err
}
if hasFurtherRange {
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
return hasFurtherRange, nil
if iterationRangeValues.HasFurtherRange {
iterationRangeValues.Max = iterationRangeMaxValues
this.migrationContext.MigrationIterationRangeMinValues = iterationRangeValues.Min
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeValues.Max
return iterationRangeValues, nil
}
}
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
return hasFurtherRange, nil
return iterationRangeValues, nil
}

// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
// data actually gets copied from original table.
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
func (this *Applier) ApplyIterationInsertQuery(iterationRangeValues *base.IterationRangeValues) (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
startTime := time.Now()
chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)
chunkSize = iterationRangeValues.Size

query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
this.migrationContext.DatabaseName,
Expand All @@ -624,9 +637,9 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.MappedSharedColumns.Names(),
this.migrationContext.UniqueKey.Name,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.GetIteration() == 0,
iterationRangeValues.Min.AbstractValues(),
iterationRangeValues.Max.AbstractValues(),
iterationRangeValues.IsIncludeMinValues,
this.migrationContext.IsTransactionalTable(),
)
if err != nil {
Expand Down Expand Up @@ -663,8 +676,8 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
duration = time.Since(startTime)
this.migrationContext.Log.Debugf(
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
this.migrationContext.MigrationIterationRangeMinValues,
this.migrationContext.MigrationIterationRangeMaxValues,
iterationRangeValues.Min,
iterationRangeValues.Max,
this.migrationContext.GetIteration(),
chunkSize)
return chunkSize, rowsAffected, duration, nil
Expand Down
Loading