Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cli: fully and properly drain target node of decommission #141411

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 17 additions & 24 deletions pkg/cli/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package cli
import (
"context"
"fmt"
"io"
"math"
"os"
"reflect"
Expand Down Expand Up @@ -610,29 +609,23 @@ func runDecommissionNodeImpl(
)
continue
}
drainReq := &serverpb.DrainRequest{
Shutdown: false,
DoDrain: true,
NodeId: targetNode.String(),
}
stream, err := c.Drain(ctx, drainReq)
if err != nil {
fmt.Fprintln(stderr)
return errors.Wrapf(err, "while trying to drain n%d", targetNode)
}

// Consume responses until the stream ends (which signals drain
// completion).
for {
_, err := stream.Recv()
if err == io.EOF {
// Stream gracefully closed by other side.
break
}
if err != nil {
fmt.Fprintln(stderr)
return errors.Wrapf(err, "while trying to drain n%d", targetNode)
}
_, _ = fmt.Fprintf(stderr, "draining node n%d\n", targetNode)

if _, _, err := doDrain(ctx, c, targetNode.String()); err != nil {
// NB: doDrain already prints to stdErr.
//
// Defense in depth: in decommission invocations that don't have to
// do much work, if the target node was _just_ shutdown prior to
// starting `node decommission`, the node may be absent but the liveness
// status sent us here anyway. We don't want to fail out on the drain
// step to make the decommissioning command more robust.
_, _ = fmt.Fprintf(stderr,
"drain step for node n%d failed; decommissioning anyway\n", targetNode,
)
_ = err // discard intentionally
} else {
// NB: this output is matched on in the decommission/drains roachtest.
_, _ = fmt.Fprintf(stderr, "node n%d drained successfully\n", targetNode)
}
}

Expand Down
36 changes: 31 additions & 5 deletions pkg/cmd/roachtest/tests/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ func runDecommissionDrains(ctx context.Context, t test.Test, c cluster.Cluster,
}
t.Status(fmt.Sprintf("decommissioning node %d", decommNodeID))
e := retry.WithMaxAttempts(ctx, retryOpts, maxAttempts, func() error {
o, err := h.decommission(ctx, decommNode, pinnedNodeID, "--wait=none", "--format=csv")
o, e, err := h.decommissionExt(ctx, decommNode, pinnedNodeID, "--wait=none", "--format=csv")
require.NoError(t, errors.Wrapf(err, "decommission failed"))

// Check if all the replicas have been transferred.
Expand All @@ -1072,6 +1072,15 @@ func runDecommissionDrains(ctx context.Context, t test.Test, c cluster.Cluster,
return nil
}

// Check that the cli printed the below message. We do this because the drain
// step does not error out the decommission command on failure (to make that
// command more resilient to situations in which a node is terminated right
// around the time we try to drain it).
// This message is emitted in `cli.runDecommissionNodeImpl` and has a
// back-referencing comment mentioning that this roachtest matches on
// it.
require.Contains(t, e, "drained successfully")

// Check to see if the node has been drained or decommissioned.
// If not, queries should not fail.
// Connect to node 4 (the target node of the decommission).
Expand Down Expand Up @@ -1235,11 +1244,21 @@ func (h *decommTestHelper) getLogicalNodeID(ctx context.Context, nodeIdx int) (i
return nodeID, nil
}

// decommission decommissions the given targetNodes, running the process
// through the specified runNode.
func (h *decommTestHelper) decommission(
ctx context.Context, targetNodes option.NodeListOption, runNode int, verbs ...string,
) (string, error) {
o, _, err := h.decommissionExt(ctx, targetNodes, runNode, verbs...)
return o, err
}

// decommission decommissions the given targetNodes, running the process
// through the specified runNode.
// Returns stdout, stderr, error.
// Stdout has the tabular decommission process, stderr contains informational
// updates.
func (h *decommTestHelper) decommissionExt(
ctx context.Context, targetNodes option.NodeListOption, runNode int, verbs ...string,
) (string, string, error) {
args := []string{"node", "decommission"}
args = append(args, verbs...)

Expand All @@ -1250,7 +1269,7 @@ func (h *decommTestHelper) decommission(
args = append(args, strconv.Itoa(target))
}
}
return execCLI(ctx, h.t, h.c, runNode, args...)
return execCLIExt(ctx, h.t, h.c, runNode, args...)
}

// recommission recommissions the given targetNodes, running the process
Expand Down Expand Up @@ -1479,13 +1498,20 @@ func (h *decommTestHelper) getRandNodeOtherThan(ids ...int) int {
func execCLI(
ctx context.Context, t test.Test, c cluster.Cluster, runNode int, extraArgs ...string,
) (string, error) {
out, _, err := execCLIExt(ctx, t, c, runNode, extraArgs...)
return out, err
}

func execCLIExt(
ctx context.Context, t test.Test, c cluster.Cluster, runNode int, extraArgs ...string,
) (string, string, error) {
args := []string{"./cockroach"}
args = append(args, extraArgs...)
args = append(args, fmt.Sprintf("--port={pgport:%d}", runNode))
args = append(args, fmt.Sprintf("--certs-dir=%s", install.CockroachNodeCertsDir))
result, err := c.RunWithDetailsSingleNode(ctx, t.L(), option.WithNodes(c.Node(runNode)), args...)
t.L().Printf("%s\n", result.Stdout)
return result.Stdout, err
return result.Stdout, result.Stderr, err
}

// Increase the logging verbosity for decommission tests to make life easier
Expand Down
Loading