Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take replication lag into account while selecting primary candidate #14634

Merged
merged 7 commits into from
Dec 5, 2023
Merged
19 changes: 11 additions & 8 deletions go/cmd/vtctldclient/command/reparents.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,10 @@ func commandInitShardPrimary(cmd *cobra.Command, args []string) error {
}

var plannedReparentShardOptions = struct {
NewPrimaryAliasStr string
AvoidPrimaryAliasStr string
WaitReplicasTimeout time.Duration
NewPrimaryAliasStr string
AvoidPrimaryAliasStr string
WaitReplicasTimeout time.Duration
TolerableReplicationLag time.Duration
}{}

func commandPlannedReparentShard(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -216,11 +217,12 @@ func commandPlannedReparentShard(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

resp, err := client.PlannedReparentShard(commandCtx, &vtctldatapb.PlannedReparentShardRequest{
Keyspace: keyspace,
Shard: shard,
NewPrimary: newPrimaryAlias,
AvoidPrimary: avoidPrimaryAlias,
WaitReplicasTimeout: protoutil.DurationToProto(plannedReparentShardOptions.WaitReplicasTimeout),
Keyspace: keyspace,
Shard: shard,
NewPrimary: newPrimaryAlias,
AvoidPrimary: avoidPrimaryAlias,
WaitReplicasTimeout: protoutil.DurationToProto(plannedReparentShardOptions.WaitReplicasTimeout),
TolerableReplicationLag: protoutil.DurationToProto(plannedReparentShardOptions.TolerableReplicationLag),
})
if err != nil {
return err
Expand Down Expand Up @@ -292,6 +294,7 @@ func init() {
Root.AddCommand(InitShardPrimary)

PlannedReparentShard.Flags().DurationVar(&plannedReparentShardOptions.WaitReplicasTimeout, "wait-replicas-timeout", topo.RemoteOperationTimeout, "Time to wait for replicas to catch up on replication both before and after reparenting.")
PlannedReparentShard.Flags().DurationVar(&plannedReparentShardOptions.TolerableReplicationLag, "tolerable-replication-lag", 0, "Amount of replication lag that is considered acceptable for a tablet to be considered when Vitess makes the choice of a new primary.")
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
PlannedReparentShard.Flags().StringVar(&plannedReparentShardOptions.NewPrimaryAliasStr, "new-primary", "", "Alias of a tablet that should be the new primary.")
PlannedReparentShard.Flags().StringVar(&plannedReparentShardOptions.AvoidPrimaryAliasStr, "avoid-primary", "", "Alias of a tablet that should not be the primary; i.e. \"reparent to any other tablet if this one is the primary\".")
Root.AddCommand(PlannedReparentShard)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Flags:
--tablet_manager_grpc_key string the key to use to connect
--tablet_manager_grpc_server_name string the server name to use to validate server certificate
--tablet_manager_protocol string Protocol to use to make tabletmanager RPCs to vttablets. (default "grpc")
--tolerable-replication-lag duration Amount of replication lag that is considered acceptable for a tablet to be considered when Vitess makes the choice of a new primary in PRS
--topo-information-refresh-duration duration Timer duration on which VTOrc refreshes the keyspace and vttablet records from the topology server (default 15s)
--topo_consul_lock_delay duration LockDelay for consul session. (default 15s)
--topo_consul_lock_session_checks string List of checks for consul session. (default "serfHealth")
Expand Down
1,875 changes: 946 additions & 929 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

61 changes: 56 additions & 5 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2705,6 +2705,10 @@ func (s *VtctldServer) PlannedReparentShard(ctx context.Context, req *vtctldatap
} else if !ok {
waitReplicasTimeout = time.Second * 30
}
tolerableReplLag, _, err := protoutil.DurationFromProto(req.TolerableReplicationLag)
if err != nil {
return nil, err
}

span.Annotate("keyspace", req.Keyspace)
span.Annotate("shard", req.Shard)
Expand Down Expand Up @@ -2734,6 +2738,7 @@ func (s *VtctldServer) PlannedReparentShard(ctx context.Context, req *vtctldatap
AvoidPrimaryAlias: req.AvoidPrimary,
NewPrimaryAlias: req.NewPrimary,
WaitReplicasTimeout: waitReplicasTimeout,
TolerableReplLag: tolerableReplLag,
},
)

Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtctl/reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func commandPlannedReparentShard(ctx context.Context, wr *wrangler.Wrangler, sub
}

waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", topo.RemoteOperationTimeout, "time to wait for replicas to catch up on replication before and after reparenting")
tolerableReplicationLag := subFlags.Duration("tolerable-replication-lag", 0, "amount of replication lag that is considered acceptable for a tablet to be considered when Vitess makes the choice of a new primary")
keyspaceShard := subFlags.String("keyspace_shard", "", "keyspace/shard of the shard that needs to be reparented")
newPrimary := subFlags.String("new_primary", "", "alias of a tablet that should be the new primary")
avoidTablet := subFlags.String("avoid_tablet", "", "alias of a tablet that should not be the primary, i.e. reparent to any other tablet if this one is the primary")
Expand Down Expand Up @@ -149,7 +150,7 @@ func commandPlannedReparentShard(ctx context.Context, wr *wrangler.Wrangler, sub
return err
}
}
return wr.PlannedReparentShard(ctx, keyspace, shard, newPrimaryAlias, avoidTabletAlias, *waitReplicasTimeout)
return wr.PlannedReparentShard(ctx, keyspace, shard, newPrimaryAlias, avoidTabletAlias, *waitReplicasTimeout, *tolerableReplicationLag)
}

func commandEmergencyReparentShard(ctx context.Context, wr *wrangler.Wrangler, subFlags *pflag.FlagSet, args []string) error {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtctl/reparentutil/planned_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type PlannedReparentOptions struct {
NewPrimaryAlias *topodatapb.TabletAlias
AvoidPrimaryAlias *topodatapb.TabletAlias
WaitReplicasTimeout time.Duration
TolerableReplLag time.Duration

// Private options managed internally. We use value-passing semantics to
// set these options inside a PlannedReparent without leaking these details
Expand Down Expand Up @@ -179,7 +180,7 @@ func (pr *PlannedReparenter) preflightChecks(

event.DispatchUpdate(ev, "searching for primary candidate")

opts.NewPrimaryAlias, err = ChooseNewPrimary(ctx, pr.tmc, &ev.ShardInfo, tabletMap, opts.AvoidPrimaryAlias, opts.WaitReplicasTimeout, opts.durability, pr.logger)
opts.NewPrimaryAlias, err = ChooseNewPrimary(ctx, pr.tmc, &ev.ShardInfo, tabletMap, opts.AvoidPrimaryAlias, opts.WaitReplicasTimeout, opts.TolerableReplLag, opts.durability, pr.logger)
if err != nil {
return true, err
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtctl/reparentutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func ChooseNewPrimary(
tabletMap map[string]*topo.TabletInfo,
avoidPrimaryAlias *topodatapb.TabletAlias,
waitReplicasTimeout time.Duration,
tolerableReplLag time.Duration,
durability Durabler,
// (TODO:@ajm188) it's a little gross we need to pass this, maybe embed in the context?
logger logutil.Logger,
Expand Down Expand Up @@ -100,7 +101,7 @@ func ChooseNewPrimary(
pos, replLag, err := findPositionAndLagForTablet(groupCtx, tb, logger, tmc, waitReplicasTimeout)
mu.Lock()
defer mu.Unlock()
if err == nil && waitReplicasTimeout >= replLag {
if err == nil && (tolerableReplLag == 0 || tolerableReplLag >= replLag) {
validTablets = append(validTablets, tb)
tabletPositions = append(tabletPositions, pos)
}
Expand Down
70 changes: 67 additions & 3 deletions go/vt/vtctl/reparentutil/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,85 @@ func TestChooseNewPrimary(t *testing.T) {
shardInfo *topo.ShardInfo
tabletMap map[string]*topo.TabletInfo
avoidPrimaryAlias *topodatapb.TabletAlias
tolerableReplLag time.Duration
expected *topodatapb.TabletAlias
shouldErr bool
}{
{
name: "found a replica",
tmc: &chooseNewPrimaryTestTMClient{
// zone1-101 is behind zone1-102
// zone1-101 is behind zone1-102 and zone1-102 has a tolerable replication lag
replicationStatuses: map[string]*replicationdatapb.Status{
"zone1-0000000101": {
Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1",
},
"zone1-0000000102": {
Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5",
Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5",
ReplicationLagSeconds: 20,
},
},
},
tolerableReplLag: 50 * time.Second,
shardInfo: topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{
PrimaryAlias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
}, nil),
tabletMap: map[string]*topo.TabletInfo{
"primary": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
Type: topodatapb.TabletType_PRIMARY,
},
},
"replica1": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 101,
},
Type: topodatapb.TabletType_REPLICA,
},
},
"replica2": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 102,
},
Type: topodatapb.TabletType_REPLICA,
},
},
},
avoidPrimaryAlias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 0,
},
expected: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 102,
},
shouldErr: false,
},
{
name: "found a replica ignoring replica lag",
tmc: &chooseNewPrimaryTestTMClient{
// zone1-101 is behind zone1-102 and we don't care about the replication lag
replicationStatuses: map[string]*replicationdatapb.Status{
"zone1-0000000101": {
Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1",
},
"zone1-0000000102": {
Position: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5",
ReplicationLagSeconds: 230,
},
},
},
tolerableReplLag: 0,
shardInfo: topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{
PrimaryAlias: &topodatapb.TabletAlias{
Cell: "zone1",
Expand Down Expand Up @@ -147,6 +210,7 @@ func TestChooseNewPrimary(t *testing.T) {
},
},
},
tolerableReplLag: 50 * time.Second,
shardInfo: topo.NewShardInfo("testkeyspace", "-", &topodatapb.Shard{
PrimaryAlias: &topodatapb.TabletAlias{
Cell: "zone1",
Expand Down Expand Up @@ -502,7 +566,7 @@ func TestChooseNewPrimary(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

actual, err := ChooseNewPrimary(ctx, tt.tmc, tt.shardInfo, tt.tabletMap, tt.avoidPrimaryAlias, time.Millisecond*50, durability, logger)
actual, err := ChooseNewPrimary(ctx, tt.tmc, tt.shardInfo, tt.tabletMap, tt.avoidPrimaryAlias, time.Millisecond*50, tt.tolerableReplLag, durability, logger)
if tt.shouldErr {
assert.Error(t, err)
return
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtorc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var (
recoveryPeriodBlockDuration = 30 * time.Second
preventCrossCellFailover = false
waitReplicasTimeout = 30 * time.Second
tolerableReplicationLag = 0 * time.Second
topoInformationRefreshDuration = 15 * time.Second
recoveryPollDuration = 1 * time.Second
ersEnabled = true
Expand All @@ -78,6 +79,7 @@ func RegisterFlags(fs *pflag.FlagSet) {
fs.DurationVar(&recoveryPeriodBlockDuration, "recovery-period-block-duration", recoveryPeriodBlockDuration, "Duration for which a new recovery is blocked on an instance after running a recovery")
fs.BoolVar(&preventCrossCellFailover, "prevent-cross-cell-failover", preventCrossCellFailover, "Prevent VTOrc from promoting a primary in a different cell than the current primary in case of a failover")
fs.DurationVar(&waitReplicasTimeout, "wait-replicas-timeout", waitReplicasTimeout, "Duration for which to wait for replica's to respond when issuing RPCs")
fs.DurationVar(&tolerableReplicationLag, "tolerable-replication-lag", tolerableReplicationLag, "Amount of replication lag that is considered acceptable for a tablet to be considered when Vitess makes the choice of a new primary in PRS")
fs.DurationVar(&topoInformationRefreshDuration, "topo-information-refresh-duration", topoInformationRefreshDuration, "Timer duration on which VTOrc refreshes the keyspace and vttablet records from the topology server")
fs.DurationVar(&recoveryPollDuration, "recovery-poll-duration", recoveryPollDuration, "Timer duration on which VTOrc polls its database to run a recovery")
fs.BoolVar(&ersEnabled, "allow-emergency-reparent", ersEnabled, "Whether VTOrc should be allowed to run emergency reparent operation when it detects a dead primary")
Expand All @@ -100,6 +102,7 @@ type Configuration struct {
RecoveryPeriodBlockSeconds int // (overrides `RecoveryPeriodBlockMinutes`) The time for which an instance's recovery is kept "active", so as to avoid concurrent recoveries on smae instance as well as flapping
PreventCrossDataCenterPrimaryFailover bool // When true (default: false), cross-DC primary failover are not allowed, vtorc will do all it can to only fail over within same DC, or else not fail over at all.
WaitReplicasTimeoutSeconds int // Timeout on amount of time to wait for the replicas in case of ERS. Should be a small value because we should fail-fast. Should not be larger than LockTimeout since that is the total time we use for an ERS.
TolerableReplicationLagSeconds int // Amount of replication lag that is considered acceptable for a tablet to be considered when Vitess makes the choice of a new primary in PRS.
TopoInformationRefreshSeconds int // Timer duration on which VTOrc refreshes the keyspace and vttablet records from the topo-server.
RecoveryPollSeconds int // Timer duration on which VTOrc recovery analysis runs
}
Expand Down Expand Up @@ -129,6 +132,7 @@ func UpdateConfigValuesFromFlags() {
Config.RecoveryPeriodBlockSeconds = int(recoveryPeriodBlockDuration / time.Second)
Config.PreventCrossDataCenterPrimaryFailover = preventCrossCellFailover
Config.WaitReplicasTimeoutSeconds = int(waitReplicasTimeout / time.Second)
Config.TolerableReplicationLagSeconds = int(tolerableReplicationLag / time.Second)
Config.TopoInformationRefreshSeconds = int(topoInformationRefreshDuration / time.Second)
Config.RecoveryPollSeconds = int(recoveryPollDuration / time.Second)
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtorc/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@ func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysi
analyzedTablet.Shard,
reparentutil.PlannedReparentOptions{
WaitReplicasTimeout: time.Duration(config.Config.WaitReplicasTimeoutSeconds) * time.Second,
TolerableReplLag: time.Duration(config.Config.TolerableReplicationLagSeconds) * time.Second,
},
)

Expand Down
8 changes: 7 additions & 1 deletion go/vt/wrangler/reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ func (wr *Wrangler) InitShardPrimary(ctx context.Context, keyspace, shard string

// PlannedReparentShard will make the provided tablet the primary for the shard,
// when both the current and new primary are reachable and in good shape.
func (wr *Wrangler) PlannedReparentShard(ctx context.Context, keyspace, shard string, primaryElectTabletAlias, avoidTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration) (err error) {
func (wr *Wrangler) PlannedReparentShard(
ctx context.Context,
keyspace, shard string,
primaryElectTabletAlias, avoidTabletAlias *topodatapb.TabletAlias,
waitReplicasTimeout, tolerableReplicationLag time.Duration,
) (err error) {
_, err = reparentutil.NewPlannedReparenter(wr.ts, wr.tmc, wr.logger).ReparentShard(
ctx,
keyspace,
Expand All @@ -85,6 +90,7 @@ func (wr *Wrangler) PlannedReparentShard(ctx context.Context, keyspace, shard st
AvoidPrimaryAlias: avoidTabletAlias,
NewPrimaryAlias: primaryElectTabletAlias,
WaitReplicasTimeout: waitReplicasTimeout,
TolerableReplLag: tolerableReplicationLag,
},
)

Expand Down
4 changes: 4 additions & 0 deletions proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,10 @@ message PlannedReparentShardRequest {
// WaitReplicasTimeout time to catch up before the reparent, and an additional
// WaitReplicasTimeout time to catch up after the reparent.
vttime.Duration wait_replicas_timeout = 5;
// TolerableReplicationLag is the amount of replication lag that is considered
// acceptable for a tablet to be considered when Vitess makes the choice of a new primary.
// A value of 0 indicates that Vitess shouldn't consider the replication lag at all.
vttime.Duration tolerable_replication_lag = 6;
}

message PlannedReparentShardResponse {
Expand Down
Loading