Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions go/mysql/replication/replication_position.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ func (rp Position) IsZero() bool {
return rp.GTIDSet == nil
}

// ComparePositions compares two Positions, returning:
// 0 if both a anb b are equal positions.
// 1 if a is > than b.
// -1 if a is < than b.
// This can be used as a sort function via
// slices.SortFunc and slices.SortFuncStable.
func ComparePositions(a, b Position) int {
if a.Equal(b) {
return 0
}
if a.AtLeast(b) {
return -1
}
return 1
}

// AppendGTID returns a new Position that represents the position
// after the given GTID is replicated.
func AppendGTID(rp Position, gtid GTID) Position {
Expand Down
31 changes: 31 additions & 0 deletions go/mysql/replication/replication_position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package replication

import (
"encoding/json"
"slices"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPositionEqual(t *testing.T) {
Expand Down Expand Up @@ -407,3 +409,32 @@ func TestJsonUnmarshalPositionZero(t *testing.T) {
assert.True(t, got.Equal(want), "json.Unmarshal(%#v) = %#v, want %#v", input, got, want)

}

func TestComparePositionsSortStable(t *testing.T) {
sid, _ := ParseSID("3e11fa47-71ca-11e1-9e33-c80aa9429562")
positions := []Position{
{GTIDSet: Mysql56GTIDSet{sid: []interval{{start: 1, end: 5}}}},
{GTIDSet: Mysql56GTIDSet{sid: []interval{{start: 1, end: 5}}}},
{GTIDSet: Mysql56GTIDSet{sid: []interval{{start: 1, end: 6}}}},
{GTIDSet: Mysql56GTIDSet{sid: []interval{{start: 1, end: 2}}}},
{GTIDSet: Mysql56GTIDSet{sid: []interval{{start: 1, end: 7}}}},
{GTIDSet: Mysql56GTIDSet{sid: []interval{{start: 1, end: 6}}}},
}

wantedStrings := []string{
"3e11fa47-71ca-11e1-9e33-c80aa9429562:1-7",
"3e11fa47-71ca-11e1-9e33-c80aa9429562:1-6",
"3e11fa47-71ca-11e1-9e33-c80aa9429562:1-6",
"3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5",
"3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5",
"3e11fa47-71ca-11e1-9e33-c80aa9429562:1-2",
}

slices.SortStableFunc(positions, func(a, b Position) int {
return ComparePositions(a, b)
})

for i, wanted := range wantedStrings {
require.Equal(t, wanted, positions[i].String())
}
}
1 change: 1 addition & 0 deletions go/vt/vtctl/reparentutil/emergency_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *eve
return err
}
// Restrict the valid candidates list. We remove any tablet which is of the type DRAINED, RESTORE or BACKUP.
// The remaining candidates are reduced to a majority with the most advanced relay log GTIDs.
validCandidates, err = restrictValidCandidates(validCandidates, tabletMap)
if err != nil {
return err
Expand Down
29 changes: 29 additions & 0 deletions go/vt/vtctl/reparentutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package reparentutil
import (
"context"
"fmt"
"math"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -313,9 +315,20 @@ func getValidCandidatesAndPositionsAsList(validCandidates map[string]replication
return validTablets, tabletPositions, nil
}

// getValidCandidatesMajorityCount returns a number equal to a majority of candidates. If
// there are fewer than 3 candidates, all provided candidates are the majority.
func getValidCandidatesMajorityCount(validCandidates map[string]replication.Position) int {
totalCandidates := len(validCandidates)
if totalCandidates < 3 {
return totalCandidates
}
return int(math.Floor(float64(totalCandidates)/2) + 1)
}

// restrictValidCandidates is used to restrict some candidates from being considered eligible for becoming the intermediate source or the final promotion candidate
func restrictValidCandidates(validCandidates map[string]replication.Position, tabletMap map[string]*topo.TabletInfo) (map[string]replication.Position, error) {
restrictedValidCandidates := make(map[string]replication.Position)
validPositions := make([]replication.Position, 0, len(validCandidates))
for candidate, position := range validCandidates {
candidateInfo, ok := tabletMap[candidate]
if !ok {
Expand All @@ -326,6 +339,22 @@ func restrictValidCandidates(validCandidates map[string]replication.Position, ta
continue
}
restrictedValidCandidates[candidate] = position
validPositions = append(validPositions, position)
}

// sort by replication positions with greatest GTID set first, then remove
// replicas that are not part of a majority of the most-advanced replicas.
slices.SortStableFunc(validPositions, func(a, b replication.Position) int {
return replication.ComparePositions(a, b)
})
majorityCandidatesCount := getValidCandidatesMajorityCount(restrictedValidCandidates)
validPositions = validPositions[:majorityCandidatesCount]
for tabletAlias, position := range restrictedValidCandidates {
if !slices.ContainsFunc(validPositions, func(rp replication.Position) bool {
return position.Equal(rp)
}) {
delete(restrictedValidCandidates, tabletAlias)
}
}
return restrictedValidCandidates, nil
}
Expand Down
36 changes: 27 additions & 9 deletions go/vt/vtctl/reparentutil/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,10 @@ func TestWaitForCatchUp(t *testing.T) {
}

func TestRestrictValidCandidates(t *testing.T) {
gtidSet1, _ := replication.ParseMysql56GTIDSet("3e11fa47-71ca-11e1-9e33-c80aa9429562:1-6")
gtidSet2, _ := replication.ParseMysql56GTIDSet("3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5")
gtidSet3, _ := replication.ParseMysql56GTIDSet("3e11fa47-71ca-11e1-9e33-c80aa9429562:1-3")
gtidSet4, _ := replication.ParseMysql56GTIDSet("3e11fa47-71ca-11e1-9e33-c80aa9429562:1-2")
tests := []struct {
name string
validCandidates map[string]replication.Position
Expand All @@ -1731,12 +1735,12 @@ func TestRestrictValidCandidates(t *testing.T) {
{
name: "remove invalid tablets",
validCandidates: map[string]replication.Position{
"zone1-0000000100": {},
"zone1-0000000101": {},
"zone1-0000000102": {},
"zone1-0000000103": {},
"zone1-0000000104": {},
"zone1-0000000105": {},
"zone1-0000000100": {GTIDSet: gtidSet1},
"zone1-0000000101": {GTIDSet: gtidSet2},
"zone1-0000000102": {GTIDSet: gtidSet2},
"zone1-0000000103": {GTIDSet: gtidSet3},
"zone1-0000000104": {GTIDSet: gtidSet3},
"zone1-0000000105": {GTIDSet: gtidSet4},
},
tabletMap: map[string]*topo.TabletInfo{
"zone1-0000000100": {
Expand Down Expand Up @@ -1795,9 +1799,8 @@ func TestRestrictValidCandidates(t *testing.T) {
},
},
result: map[string]replication.Position{
"zone1-0000000100": {},
"zone1-0000000101": {},
"zone1-0000000104": {},
"zone1-0000000100": {GTIDSet: gtidSet1},
"zone1-0000000101": {GTIDSet: gtidSet2},
},
},
}
Expand Down Expand Up @@ -2074,3 +2077,18 @@ func TestGetBackupCandidates(t *testing.T) {
})
}
}

func TestGetValidCandidatesMajorityCount(t *testing.T) {
buildCandidatesFunc := func(length int) map[string]replication.Position {
candidates := make(map[string]replication.Position, length)
for i := 1; i <= length; i++ {
candidates[fmt.Sprintf("candidate-%d", i)] = replication.Position{}
}
return candidates
}
require.Equal(t, 1, getValidCandidatesMajorityCount(buildCandidatesFunc(1)))
require.Equal(t, 2, getValidCandidatesMajorityCount(buildCandidatesFunc(2)))
require.Equal(t, 2, getValidCandidatesMajorityCount(buildCandidatesFunc(3)))
require.Equal(t, 3, getValidCandidatesMajorityCount(buildCandidatesFunc(5)))
require.Equal(t, 5, getValidCandidatesMajorityCount(buildCandidatesFunc(9)))
}
Loading