From 5d9d7160601ee1e523f0ff4dffb1ac1f88bee15a Mon Sep 17 00:00:00 2001 From: releng Date: Fri, 8 Nov 2024 15:48:19 -0500 Subject: [PATCH] Sync from server repo (c51565346e9) --- commands/cmd_create_db.go | 11 +- commands/vcluster_config.go | 4 +- vclusterops/https_get_up_nodes_op.go | 5 +- vclusterops/https_mark_nodes_ephemeral_op.go | 9 + .../https_poll_compute_node_state_op.go | 227 ---------------- .../https_poll_node_state_indirect_op.go | 256 ++++++++++++++++++ vclusterops/https_poll_node_state_op.go | 27 +- .../https_poll_subcluster_node_state_op.go | 2 +- .../https_poll_subscription_state_op.go | 2 +- vclusterops/nma_signal_vertica_op.go | 117 ++++++++ vclusterops/remove_node.go | 46 +++- vclusterops/start_node.go | 19 +- vclusterops/stop_node.go | 56 +++- vclusterops/util/util.go | 9 - 14 files changed, 528 insertions(+), 262 deletions(-) delete mode 100644 vclusterops/https_poll_compute_node_state_op.go create mode 100644 vclusterops/https_poll_node_state_indirect_op.go create mode 100644 vclusterops/nma_signal_vertica_op.go diff --git a/commands/cmd_create_db.go b/commands/cmd_create_db.go index f2972cd..a505ff3 100644 --- a/commands/cmd_create_db.go +++ b/commands/cmd_create_db.go @@ -284,8 +284,17 @@ func (c *CmdCreateDB) Run(vcc vclusterops.ClusterCommands) error { // write db info to vcluster config file err := writeConfig(&vdb, c.createDBOptions.ForceOverwriteFile) if err != nil { - vcc.DisplayWarning("Failed to write the configuration file: %s\n", err) + vcc.DisplayWarning("Failed to write the configuration file: %s", err) + if dbOptions.ConfigPath != defaultConfigFilePath { + vcc.DisplayWarning("Attempting writing to default config file path: %s", defaultConfigFilePath) + dbOptions.ConfigPath = defaultConfigFilePath + err = writeConfig(&vdb, c.createDBOptions.ForceOverwriteFile) + if err != nil { + vcc.DisplayWarning("Failed to write the configuration file to default path: %s", err) + } + } } + // write config parameters to vcluster config param file err = c.writeConfigParam(c.createDBOptions.ConfigurationParameters, c.createDBOptions.ForceOverwriteFile) if err != nil { diff --git a/commands/vcluster_config.go b/commands/vcluster_config.go index 00004b9..4547b36 100644 --- a/commands/vcluster_config.go +++ b/commands/vcluster_config.go @@ -37,6 +37,7 @@ const ( currentConfigFileVersion = "1.0" configFilePerm = 0644 rpmConfDir = "/opt/vertica/config" + defaultConfigFilePath = rpmConfDir + "/" + defConfigFileName ) // Config is the struct of vertica_cluster.yaml @@ -124,7 +125,7 @@ func initConfigImpl(vclusterExePath string, ensureOptVerticaConfigExists, ensure } cobra.CheckErr(err) } else { - dbOptions.ConfigPath = fmt.Sprintf("%s/%s", rpmConfDir, defConfigFileName) + dbOptions.ConfigPath = defaultConfigFilePath return } } @@ -360,6 +361,7 @@ func (c *DatabaseConfig) write(configFilePath string, forceOverwrite bool) error if err != nil { return fmt.Errorf("fail to marshal configuration data, details: %w", err) } + err = os.WriteFile(configFilePath, configBytes, configFilePerm) if err != nil { return fmt.Errorf("fail to write configuration file, details: %w", err) diff --git a/vclusterops/https_get_up_nodes_op.go b/vclusterops/https_get_up_nodes_op.go index 0da6eba..1aca1a8 100644 --- a/vclusterops/https_get_up_nodes_op.go +++ b/vclusterops/https_get_up_nodes_op.go @@ -210,7 +210,10 @@ func isCompleteScanRequired(cmdType CmdType) bool { cmdType == ManageConnectionDrainingCmd || cmdType == SetConfigurationParameterCmd || cmdType == GetConfigurationParameterCmd || - cmdType == GetDrainingStatusCmd + cmdType == GetDrainingStatusCmd || + // need to find an up node from the sandbox if we're starting sandbox + // nodes, to handle identifying compute nodes in the sandbox + cmdType == StartNodeCmd } func (op *httpsGetUpNodesOp) finalize(_ *opEngineExecContext) error { diff --git a/vclusterops/https_mark_nodes_ephemeral_op.go b/vclusterops/https_mark_nodes_ephemeral_op.go index aece2eb..0cb7fec 100644 --- a/vclusterops/https_mark_nodes_ephemeral_op.go +++ b/vclusterops/https_mark_nodes_ephemeral_op.go @@ -17,6 +17,7 @@ package vclusterops import ( "errors" + "strings" "github.com/vertica/vcluster/vclusterops/util" ) @@ -76,11 +77,19 @@ func (op *httpsMarkEphemeralNodeOp) execute(execContext *opEngineExecContext) er func (op *httpsMarkEphemeralNodeOp) processResult(_ *opEngineExecContext) error { var allErrs error + const errComputeNodeMsg = "cannot change node type from compute to ephemeral" for host, result := range op.clusterHTTPRequest.ResultCollection { op.logResponse(host, result) if !result.isSuccess() { + if result.isInternalError() { + errLower := strings.ToLower(result.err.Error()) + if strings.Contains(errLower, errComputeNodeMsg) { + // down compute nodes can be skipped + continue + } + } allErrs = errors.Join(allErrs, result.err) continue } diff --git a/vclusterops/https_poll_compute_node_state_op.go b/vclusterops/https_poll_compute_node_state_op.go deleted file mode 100644 index 6944a16..0000000 --- a/vclusterops/https_poll_compute_node_state_op.go +++ /dev/null @@ -1,227 +0,0 @@ -/* - (c) Copyright [2023-2024] Open Text. - Licensed under the Apache License, Version 2.0 (the "License"); - You may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package vclusterops - -import ( - "errors" - "fmt" - "strings" - - "github.com/vertica/vcluster/vclusterops/util" -) - -type httpsPollComputeNodeStateOp struct { - opBase - opHTTPSBase - // Map of compute hosts to be added to whether or not they are the desired status yet - computeHostStatus map[string]bool - // The timeout for the entire operation (polling) - timeout int - // The timeout for each http request. Requests will be repeated if timeout hasn't been exceeded. - httpRequestTimeout int - // poll for nodes down: Set to true if nodes need to be polled to be down - checkDown bool -} - -func makeHTTPSPollComputeNodeStateOpHelper(hosts, computeHosts []string, - useHTTPPassword bool, userName string, httpsPassword *string) (httpsPollComputeNodeStateOp, error) { - op := httpsPollComputeNodeStateOp{} - op.name = "HTTPSPollComputeNodeStateOp" - op.hosts = hosts // should be 1+ hosts capable of retrieving accurate node states, e.g. primary up hosts - if len(op.hosts) < 1 { - return op, errors.New("polling compute node state requires at least one primary up host") - } - op.computeHostStatus = make(map[string]bool, len(computeHosts)) - for _, computeHost := range computeHosts { - op.computeHostStatus[computeHost] = false - } - op.useHTTPPassword = useHTTPPassword - op.httpRequestTimeout = defaultHTTPSRequestTimeoutSeconds - err := util.ValidateUsernameAndPassword(op.name, useHTTPPassword, userName) - if err != nil { - return op, err - } - op.userName = userName - op.httpsPassword = httpsPassword - return op, nil -} - -func makeHTTPSPollComputeNodeStateOp(hosts, computeHosts []string, - useHTTPPassword bool, userName string, - httpsPassword *string, timeout int) (httpsPollComputeNodeStateOp, error) { - op, err := makeHTTPSPollComputeNodeStateOpHelper(hosts, computeHosts, useHTTPPassword, userName, httpsPassword) - if err != nil { - return op, err - } - if timeout == 0 { - // using default value - op.timeout = util.GetEnvInt("NODE_STATE_POLLING_TIMEOUT", StartupPollingTimeout) - } else { - op.timeout = timeout - } - op.checkDown = false // poll for COMPUTE state (UP equivalent) - op.description = fmt.Sprintf("Wait for %d compute node(s) to reach COMPUTE state", len(computeHosts)) - return op, err -} - -//nolint:unused // for NYI stop node -func makeHTTPSPollComputeNodeStateDownOp(hosts, computeHosts []string, - useHTTPPassword bool, userName string, - httpsPassword *string) (httpsPollComputeNodeStateOp, error) { - op, err := makeHTTPSPollComputeNodeStateOpHelper(hosts, computeHosts, useHTTPPassword, userName, httpsPassword) - if err != nil { - return op, err - } - op.timeout = util.GetEnvInt("NODE_STATE_POLLING_TIMEOUT", StartupPollingTimeout) - op.checkDown = true - op.description = fmt.Sprintf("Wait for %d compute node(s) to go DOWN", len(hosts)) - return op, nil -} - -func (op *httpsPollComputeNodeStateOp) getPollingTimeout() int { - return util.Max(op.timeout, 0) -} - -func (op *httpsPollComputeNodeStateOp) setupClusterHTTPRequest(hosts []string) error { - for _, host := range hosts { - httpRequest := hostHTTPRequest{} - httpRequest.Method = GetMethod - httpRequest.Timeout = op.httpRequestTimeout - httpRequest.buildHTTPSEndpoint(util.NodesEndpoint) - if op.useHTTPPassword { - httpRequest.Password = op.httpsPassword - httpRequest.Username = op.userName - } - - op.clusterHTTPRequest.RequestCollection[host] = httpRequest - } - - return nil -} - -func (op *httpsPollComputeNodeStateOp) prepare(execContext *opEngineExecContext) error { - execContext.dispatcher.setup(op.hosts) - - return op.setupClusterHTTPRequest(op.hosts) -} - -func (op *httpsPollComputeNodeStateOp) execute(execContext *opEngineExecContext) error { - if err := op.runExecute(execContext); err != nil { - return err - } - - return op.processResult(execContext) -} - -func (op *httpsPollComputeNodeStateOp) finalize(_ *opEngineExecContext) error { - return nil -} - -func (op *httpsPollComputeNodeStateOp) checkStatusToString() string { - if op.checkDown { - return strings.ToLower(util.NodeDownState) - } - return "up (compute)" -} - -func (op *httpsPollComputeNodeStateOp) getRemainingHostsString() string { - var remainingHosts []string - for host, statusOk := range op.computeHostStatus { - if statusOk { - remainingHosts = append(remainingHosts, host) - } - } - return strings.Join(remainingHosts, ",") -} - -func (op *httpsPollComputeNodeStateOp) processResult(execContext *opEngineExecContext) error { - op.logger.PrintInfo("[%s] expecting %d %s host(s)", op.name, len(op.hosts), op.checkStatusToString()) - - err := pollState(op, execContext) - if err != nil { - // show the hosts that are not COMPUTE or DOWN - msg := fmt.Sprintf("the hosts [%s] are not in %s state after %d seconds, details: %s", - op.getRemainingHostsString(), op.checkStatusToString(), op.timeout, err) - op.logger.PrintError(msg) - return errors.New(msg) - } - return nil -} - -func (op *httpsPollComputeNodeStateOp) shouldStopPolling() (bool, error) { - if op.checkDown { - return op.shouldStopPollingForDown() - } - - for host, result := range op.clusterHTTPRequest.ResultCollection { - // when we get timeout error, we know that the host is unreachable/dead - if result.isTimeout() { - return true, fmt.Errorf("[%s] cannot connect to host %s, please check if the host is still alive", op.name, host) - } - - // We don't need to wait until timeout to determine if all nodes are up or not. - // If we find the wrong password for the HTTPS service on any hosts, we should fail immediately. - // We also need to let user know to wait until all nodes are up - if result.isPasswordAndCertificateError(op.logger) { - op.logger.PrintError("[%s] The credentials are incorrect. 'Catalog Sync' will not be executed.", - op.name) - return false, makePollNodeStateAuthenticationError(op.name, host) - } - if result.isPassing() { - // parse the /nodes endpoint response for all nodes, then look for the new ones - nodesInformation := nodesInfo{} - err := op.parseAndCheckResponse(host, result.content, &nodesInformation) - if err != nil { - op.logger.PrintError("[%s] fail to parse result on host %s, details: %s", - op.name, host, err) - return true, err - } - - // check which nodes have COMPUTE status - upNodeCount := 0 - for _, nodeInfo := range nodesInformation.NodeList { - _, ok := op.computeHostStatus[nodeInfo.Address] - if !ok { - // skip unrelated nodes - continue - } - if nodeInfo.State == util.NodeComputeState { - upNodeCount++ - op.computeHostStatus[nodeInfo.Address] = true - } else { - // it would be weird for a previously COMPUTE node to change status while we're still - // polling, but no reason not to use the updated value in case it differs. - op.computeHostStatus[nodeInfo.Address] = false - } - } - if upNodeCount == len(op.computeHostStatus) { - op.logger.PrintInfo("[%s] All nodes are %s", op.name, op.checkStatusToString()) - op.updateSpinnerStopMessage("all nodes are %s", op.checkStatusToString()) - return true, nil - } - // try the next host's result - op.logger.PrintInfo("[%s] %d host(s) up (compute)", op.name, upNodeCount) - op.updateSpinnerMessage("%d host(s) up (compute), expecting %d up (compute) host(s)", upNodeCount, len(op.computeHostStatus)) - } - } - // no host returned all new compute nodes as status COMPUTE, so keep polling - return false, nil -} - -func (op *httpsPollComputeNodeStateOp) shouldStopPollingForDown() (bool, error) { - // for NYI stop node - return true, fmt.Errorf("NYI") -} diff --git a/vclusterops/https_poll_node_state_indirect_op.go b/vclusterops/https_poll_node_state_indirect_op.go new file mode 100644 index 0000000..26962f5 --- /dev/null +++ b/vclusterops/https_poll_node_state_indirect_op.go @@ -0,0 +1,256 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "errors" + "fmt" + "slices" + "strings" + + "github.com/vertica/vcluster/vclusterops/util" +) + +type httpsPollNodeStateIndirectOp struct { + opBase + opHTTPSBase + // Map of hosts to state to compute permanent and/or non-up nodes as needed + checkedHostsToState map[string]string + // The timeout for the entire operation (polling) + timeout int + // The timeout for each http request. Requests will be repeated if timeout hasn't been exceeded. + httpRequestTimeout int + // Node states considered final and ok when polling + allowedStates []string + // Pointer to output slice of permanent UP nodes identified by this op + permanentNodes *[]string +} + +var errNoUpNodesForPolling = errors.New("polling node state indirectly requires at least one primary up host") + +func makeHTTPSPollNodeStateIndirectOpHelper(hosts, hostsToCheck []string, + useHTTPPassword bool, userName string, httpsPassword *string, + timeout int) (httpsPollNodeStateIndirectOp, error) { + op := httpsPollNodeStateIndirectOp{} + op.name = "HTTPSPollNodeStateIndirectOp" + op.hosts = hosts // should be 1+ hosts capable of retrieving accurate node states, e.g. primary up hosts + if op.hosts != nil && len(op.hosts) < 1 { + return op, errNoUpNodesForPolling + } + op.checkedHostsToState = make(map[string]string, len(hostsToCheck)) + for _, host := range hostsToCheck { + op.checkedHostsToState[host] = "" + } + if timeout == 0 { + // using default value + op.timeout = util.GetEnvInt("NODE_STATE_POLLING_TIMEOUT", StartupPollingTimeout) + } else { + op.timeout = timeout + } + op.useHTTPPassword = useHTTPPassword + op.httpRequestTimeout = defaultHTTPSRequestTimeoutSeconds + err := util.ValidateUsernameAndPassword(op.name, useHTTPPassword, userName) + if err != nil { + return op, err + } + op.userName = userName + op.httpsPassword = httpsPassword + return op, nil +} + +// makeHTTPSPollComputeNodeStateOp constructs a httpsPollNodeStateIndirectOp to poll +// until a known set of compute nodes are up (COMPUTE). +func makeHTTPSPollComputeNodeStateOp(hosts, computeHosts []string, + useHTTPPassword bool, userName string, + httpsPassword *string, timeout int) (httpsPollNodeStateIndirectOp, error) { + op, err := makeHTTPSPollNodeStateIndirectOpHelper(hosts, computeHosts, useHTTPPassword, + userName, httpsPassword, timeout) + if err != nil { + return op, err + } + op.allowedStates = []string{util.NodeComputeState} // poll for COMPUTE state (UP equivalent) + op.description = fmt.Sprintf("Wait for %d compute node(s) to reach COMPUTE state", len(computeHosts)) + return op, err +} + +// makeHTTPSPollUnknownNodeStateOp constructs a httpsPollNodeStateIndirectOp for polling +// until a set of nodes are up (COMPUTE or UP). It also identifies the non-compute nodes for +// further operations. +func makeHTTPSPollUnknownNodeStateOp(hostsToCheck []string, + permanentNodes *[]string, useHTTPPassword bool, userName string, + httpsPassword *string, timeout int) (httpsPollNodeStateIndirectOp, error) { + // get hosts from execContext later + op, err := makeHTTPSPollNodeStateIndirectOpHelper(nil, hostsToCheck, useHTTPPassword, + userName, httpsPassword, timeout) + if err != nil { + return op, err + } + op.permanentNodes = permanentNodes + op.allowedStates = []string{util.NodeComputeState, util.NodeUpState} // poll for any valid up state + op.description = "Wait for permanent and/or compute node(s) to come up" + return op, nil +} + +func (op *httpsPollNodeStateIndirectOp) getPollingTimeout() int { + return max(op.timeout, 0) +} + +func (op *httpsPollNodeStateIndirectOp) setupClusterHTTPRequest(hosts []string) error { + for _, host := range hosts { + httpRequest := hostHTTPRequest{} + httpRequest.Method = GetMethod + httpRequest.Timeout = op.httpRequestTimeout + httpRequest.buildHTTPSEndpoint(util.NodesEndpoint) + if op.useHTTPPassword { + httpRequest.Password = op.httpsPassword + httpRequest.Username = op.userName + } + + op.clusterHTTPRequest.RequestCollection[host] = httpRequest + } + + return nil +} + +func (op *httpsPollNodeStateIndirectOp) prepare(execContext *opEngineExecContext) error { + if op.hosts == nil { + if len(execContext.upHosts) < 1 { + return errNoUpNodesForPolling + } + op.hosts = execContext.upHosts + } + execContext.dispatcher.setup(op.hosts) + + return op.setupClusterHTTPRequest(op.hosts) +} + +func (op *httpsPollNodeStateIndirectOp) execute(execContext *opEngineExecContext) error { + if err := op.runExecute(execContext); err != nil { + return err + } + + return op.processResult(execContext) +} + +func (op *httpsPollNodeStateIndirectOp) finalize(_ *opEngineExecContext) error { + return nil +} + +// checkStatusToString formats the node state set in a user-readable way +func (op *httpsPollNodeStateIndirectOp) checkStatusToString() string { + // only a short slice, no need for a string builder + statusStr := "" + for i, status := range op.allowedStates { + if i > 0 { + statusStr += " or " + } + if status == util.NodeComputeState { + statusStr += "up (compute)" + } else { + statusStr += strings.ToLower(status) + } + } + return statusStr +} + +func (op *httpsPollNodeStateIndirectOp) getRemainingHostsString() string { + var remainingHosts []string + for host, state := range op.checkedHostsToState { + if !slices.Contains(op.allowedStates, state) { + remainingHosts = append(remainingHosts, host) + } + } + return strings.Join(remainingHosts, ",") +} + +func (op *httpsPollNodeStateIndirectOp) processResult(execContext *opEngineExecContext) error { + op.logger.PrintInfo("[%s] expecting %d %s host(s)", op.name, len(op.checkedHostsToState), op.checkStatusToString()) + + err := pollState(op, execContext) + if err != nil { + // show the hosts that are not up + msg := fmt.Sprintf("the hosts [%s] are not in %s state after %d seconds, details: %s", + op.getRemainingHostsString(), op.checkStatusToString(), op.timeout, err) + op.logger.PrintError(msg) + return errors.New(msg) + } + // if the permanent nodes list is needed, extract it + if op.permanentNodes != nil { + allowedPermanentStates := util.SliceDiff(op.allowedStates, []string{util.NodeComputeState}) + for host, state := range op.checkedHostsToState { + if slices.Contains(allowedPermanentStates, state) { + *op.permanentNodes = append(*op.permanentNodes, host) + } + } + } + return nil +} + +func (op *httpsPollNodeStateIndirectOp) shouldStopPolling() (bool, error) { + for host, result := range op.clusterHTTPRequest.ResultCollection { + // when we get timeout error, we know that the host is unreachable/dead + if result.isTimeout() { + return true, fmt.Errorf("[%s] cannot connect to host %s, please check if the host is still alive", op.name, host) + } + + // We don't need to wait until timeout to determine if all nodes are up or not. + // If we find the wrong password for the HTTPS service on any hosts, we should fail immediately. + // We also need to let user know to wait until all nodes are up + if result.isPasswordAndCertificateError(op.logger) { + op.logger.PrintError("[%s] The credentials are incorrect. 'Catalog Sync' will not be executed.", + op.name) + return false, makePollNodeStateAuthenticationError(op.name, host) + } + if result.isPassing() { + // parse the /nodes endpoint response for all nodes, then look for the specified ones + nodesInformation := nodesInfo{} + err := op.parseAndCheckResponse(host, result.content, &nodesInformation) + if err != nil { + op.logger.PrintError("[%s] fail to parse result on host %s, details: %s", + op.name, host, err) + return true, err + } + + // check which nodes have COMPUTE status + upNodeCount := 0 + for _, nodeInfo := range nodesInformation.NodeList { + _, ok := op.checkedHostsToState[nodeInfo.Address] + if !ok { + // skip unrelated nodes + continue + } + if slices.Contains(op.allowedStates, nodeInfo.State) { + upNodeCount++ + } + // stash state regardless of up/down/compute/etc. it would be weird for a + // previously up node to change status while we're still polling, but no + // reason not to use the updated value in case it differs. + op.checkedHostsToState[nodeInfo.Address] = nodeInfo.State + } + if upNodeCount == len(op.checkedHostsToState) { + op.logger.PrintInfo("[%s] All nodes are %s", op.name, op.checkStatusToString()) + op.updateSpinnerStopMessage("all nodes are %s", op.checkStatusToString()) + return true, nil + } + // try the next host's result + op.logger.PrintInfo("[%s] %d host(s) %s", op.name, upNodeCount, op.checkStatusToString()) + op.updateSpinnerMessage("%d host(s) %s, expecting %d %s host(s)", + upNodeCount, op.checkStatusToString(), len(op.checkedHostsToState), op.checkStatusToString()) + } + } + // no host returned all new nodes as acceptable states, so keep polling + return false, nil +} diff --git a/vclusterops/https_poll_node_state_op.go b/vclusterops/https_poll_node_state_op.go index f6e5a39..cd73ad9 100644 --- a/vclusterops/https_poll_node_state_op.go +++ b/vclusterops/https_poll_node_state_op.go @@ -38,6 +38,8 @@ type httpsPollNodeStateOp struct { cmdType CmdType // poll for nodes down: Set to true if nodes need to be polled to be down checkDown bool + // Pointer to list of permanent hosts, as identified by a preceding op + permanentHosts *[]string } func makeHTTPSPollNodeStateOpHelper(hosts []string, @@ -88,8 +90,21 @@ func makeHTTPSPollNodeStateOp(hosts []string, return op, err } +// makeHTTPSPollPermanentNodeStateOp will filter out non-permanent hosts from +// polling, as identified dynamically by a previous op +func makeHTTPSPollPermanentNodeStateOp(hosts []string, + permanentHosts *[]string, useHTTPPassword bool, userName string, + httpsPassword *string, timeout int) (httpsPollNodeStateOp, error) { + op, err := makeHTTPSPollNodeStateOp(hosts, useHTTPPassword, userName, httpsPassword, timeout) + if err != nil { + return op, err + } + op.permanentHosts = permanentHosts + return op, nil +} + func (op *httpsPollNodeStateOp) getPollingTimeout() int { - return util.Max(op.timeout, 0) + return max(op.timeout, 0) } func (op *httpsPollNodeStateOp) setupClusterHTTPRequest(hosts []string) error { @@ -110,6 +125,16 @@ func (op *httpsPollNodeStateOp) setupClusterHTTPRequest(hosts []string) error { } func (op *httpsPollNodeStateOp) prepare(execContext *opEngineExecContext) error { + // if needed, filter out hosts that can't be polled + if op.permanentHosts != nil { + op.hosts = util.SliceCommon(op.hosts, *op.permanentHosts) + // if all hosts started were compute nodes, nothing to do here + if len(op.hosts) == 0 { + op.skipExecute = true + return nil + } + } + execContext.dispatcher.setup(op.hosts) return op.setupClusterHTTPRequest(op.hosts) diff --git a/vclusterops/https_poll_subcluster_node_state_op.go b/vclusterops/https_poll_subcluster_node_state_op.go index 488e391..5a3d3f6 100644 --- a/vclusterops/https_poll_subcluster_node_state_op.go +++ b/vclusterops/https_poll_subcluster_node_state_op.go @@ -84,7 +84,7 @@ func makeHTTPSPollSubclusterNodeStateDownOp(hosts []string, scName string, func (op *httpsPollSubclusterNodeStateOp) getPollingTimeout() int { // a negative value indicates no timeout and should never be used for this op - return util.Max(op.timeout, 0) + return max(op.timeout, 0) } func (op *httpsPollSubclusterNodeStateOp) setupClusterHTTPRequest(hosts []string) error { diff --git a/vclusterops/https_poll_subscription_state_op.go b/vclusterops/https_poll_subscription_state_op.go index 88b5872..e818f1e 100644 --- a/vclusterops/https_poll_subscription_state_op.go +++ b/vclusterops/https_poll_subscription_state_op.go @@ -61,7 +61,7 @@ func makeHTTPSPollSubscriptionStateOp(hosts []string, useHTTPPassword bool, user func (op *httpsPollSubscriptionStateOp) getPollingTimeout() int { // a negative value indicates no timeout and should never be used for this op - return util.Max(op.timeout, 0) + return max(op.timeout, 0) } func (op *httpsPollSubscriptionStateOp) setupClusterHTTPRequest(hosts []string) error { diff --git a/vclusterops/nma_signal_vertica_op.go b/vclusterops/nma_signal_vertica_op.go new file mode 100644 index 0000000..d6c1cf9 --- /dev/null +++ b/vclusterops/nma_signal_vertica_op.go @@ -0,0 +1,117 @@ +/* + (c) Copyright [2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "errors" + "fmt" + "strings" +) + +type nmaSignalVerticaOp struct { + opBase + signal string // signal type ("term" or "kill" or "" for endpoint default) + hostCatPathMap map[string]string // map of hosts to catalog paths. may be nil to allow backend to auto-detect vertica pids. +} + +func makeNMASignalVerticaOpHelper(hosts []string, hostCatPathMap map[string]string) (op nmaSignalVerticaOp, err error) { + op = nmaSignalVerticaOp{} + op.name = "NMASignalVerticaOp" + op.description = "Terminate applicable nodes via signal" + op.hosts = hosts + op.hostCatPathMap = hostCatPathMap + if op.hostCatPathMap != nil { + // the caller is responsible for making sure hosts and maps match up exactly + err = validateHostMaps(hosts, hostCatPathMap) + } + return op, err +} + +func makeNMASigTermVerticaOp(hosts []string, hostCatPathMap map[string]string) (op nmaSignalVerticaOp, err error) { + op, err = makeNMASignalVerticaOpHelper(hosts, hostCatPathMap) + op.signal = "term" + return op, err +} + +// setupClusterHTTPRequest works as the module setup in Admintools +func (op *nmaSignalVerticaOp) setupClusterHTTPRequest(hosts []string) error { + for _, host := range hosts { + httpRequest := hostHTTPRequest{} + httpRequest.Method = PostMethod + httpRequest.buildNMAEndpoint("vertica-processes/signal") + + // signal vertica endpoint uses query params despite being POST + httpRequest.QueryParams = map[string]string{"signal_type": op.signal} + + // Passing the catalog path allows the backend to find the vertica pid directly. + // The catalog path for the signal endpoint is the parent dir containing pid, log, etc. + // If we can't figure out the dir, rely on auto-detecting processes by skipping the arg. + if op.hostCatPathMap != nil && strings.HasSuffix(op.hostCatPathMap[host], "/Catalog") { + httpRequest.QueryParams["catalog_path"] = strings.TrimSuffix(op.hostCatPathMap[host], "/Catalog") + } + + op.clusterHTTPRequest.RequestCollection[host] = httpRequest + } + + return nil +} + +func (op *nmaSignalVerticaOp) prepare(execContext *opEngineExecContext) error { + execContext.dispatcher.setup(op.hosts) + + return op.setupClusterHTTPRequest(op.hosts) +} + +func (op *nmaSignalVerticaOp) execute(execContext *opEngineExecContext) error { + if err := op.runExecute(execContext); err != nil { + return err + } + + return op.processResult(execContext) +} + +func (op *nmaSignalVerticaOp) finalize(_ *opEngineExecContext) error { + return nil +} + +func (op *nmaSignalVerticaOp) processResult(_ *opEngineExecContext) error { + var allErrs error + var errorHosts []string + for host, result := range op.clusterHTTPRequest.ResultCollection { + op.logResponse(host, result) + var err error + + if result.isPassing() { + _, err = op.parseAndCheckMapResponse(host, result.content) + if err == nil { + // Note that a passing result means the signal was sent successfully, but + // not that the process has successfully terminated yet (or at all). + continue + } + } else { + err = result.err + } + errorHosts = append(errorHosts, host) + allErrs = errors.Join(allErrs, err) + } + + if allErrs != nil { + err := fmt.Errorf("Error terminating Vertica via signal on hosts %v", errorHosts) + allErrs = errors.Join(err, allErrs) + } + + return allErrs +} diff --git a/vclusterops/remove_node.go b/vclusterops/remove_node.go index d73fd87..c5d4e07 100644 --- a/vclusterops/remove_node.go +++ b/vclusterops/remove_node.go @@ -382,6 +382,8 @@ func getSortedHosts(hostsToRemove []string, hostNodeMap vHostNodeMap) []string { // - Poll subscription state, wait for all subscrptions ACTIVE for Eon mode // - Remove secondary nodes from spread // - Drop Nodes +// - Kill live compute nodes +// - Poll compute node status // - Reload spread // - Delete catalog and data directories // - Sync catalog (eon only) @@ -434,13 +436,20 @@ func (vcc VClusterCommands) produceRemoveNodeInstructions(vdb *VCoordinationData } sortedHosts := getSortedHosts(options.HostsToRemove, vdb.HostNodeMap) - err = vcc.produceDropNodeOps(&instructions, sortedHosts, initiatorHost, usePassword, username, password, vdb.HostNodeMap, vdb.IsEon, options.IsSubcluster) if err != nil { return instructions, err } + // compute nodes don't get the distcall to stop upon drop, so terminate them directly + computeHostsToRemove := util.SliceCommon(options.HostsToRemove, vdb.ComputeNodes) + err = vcc.produceStopAndPollComputeNodeOps(&instructions, computeHostsToRemove, vdb.HostNodeMap, + usePassword, username, password) + if err != nil { + return instructions, nil + } + httpsReloadSpreadOp, err := makeHTTPSReloadSpreadOpWithInitiator(initiatorHost, usePassword, username, password) if err != nil { return instructions, err @@ -535,12 +544,14 @@ func (vcc VClusterCommands) produceRebalanceClusterOps(instructions *[]clusterOp func (vcc VClusterCommands) produceRebalanceSubclusterShardsOps(instructions *[]clusterOp, initiatorHost, scNames []string, useHTTPPassword bool, userName string, httpsPassword *string) error { for _, scName := range scNames { - op, err := makeHTTPSRebalanceSubclusterShardsOp( - initiatorHost, useHTTPPassword, userName, httpsPassword, scName) - if err != nil { - return err + if scName != "" { + op, err := makeHTTPSRebalanceSubclusterShardsOp( + initiatorHost, useHTTPPassword, userName, httpsPassword, scName) + if err != nil { + return err + } + *instructions = append(*instructions, &op) } - *instructions = append(*instructions, &op) } return nil @@ -564,6 +575,29 @@ func (vcc VClusterCommands) produceDropNodeOps(instructions *[]clusterOp, target return nil } +// produceStopAndPollComputeNodeOps produces the instructions to stop compute nodes +// and poll only the compute nodes for DOWN state. +func (vcc VClusterCommands) produceStopAndPollComputeNodeOps(instructions *[]clusterOp, + computeHostsToStop []string, + hostNodeMap vHostNodeMap, + usePassword bool, username string, password *string) error { + if len(computeHostsToStop) > 0 { + err := vcc.produceStopComputeNodeOps(instructions, computeHostsToStop, hostNodeMap) + if err != nil { + return err + } + + // Poll for compute nodes down + httpsPollNodesDownOp, err := makeHTTPSPollNodeStateDownOp(computeHostsToStop, + usePassword, username, password) + if err != nil { + return err + } + *instructions = append(*instructions, &httpsPollNodesDownOp) + } + return nil +} + // produceSpreadRemoveNodeOp calls HTTPSSpreadRemoveNodeOp // when there is at least one secondary node to remove func (vcc VClusterCommands) produceSpreadRemoveNodeOp(instructions *[]clusterOp, hostsToRemove []string, diff --git a/vclusterops/start_node.go b/vclusterops/start_node.go index b08ea4b..f19976a 100644 --- a/vclusterops/start_node.go +++ b/vclusterops/start_node.go @@ -353,7 +353,8 @@ func (options *VStartNodesOptions) checkQuorum(vdb *VCoordinationDatabase, resta // - Sync the confs to the nodes to be started // - Call https /v1/startup/command to get start command of the nodes to be started // - start nodes -// - Poll node start up +// - Poll all node start up indirectly +// - Poll permanent node start up directly // - sync catalog func (vcc VClusterCommands) produceStartNodesInstructions(startNodeInfo *VStartNodesInfo, options *VStartNodesOptions, vdb *VCoordinationDatabase) ([]clusterOp, error) { @@ -403,8 +404,16 @@ func (vcc VClusterCommands) produceStartNodesInstructions(startNodeInfo *VStartN return instructions, err } nmaStartNewNodesOp := makeNMAStartNodeOpWithVDB(startNodeInfo.HostsToStart, options.StartUpConf, vdb) - httpsPollNodeStateOp, err := makeHTTPSPollNodeStateOp(startNodeInfo.HostsToStart, - options.usePassword, options.UserName, options.Password, options.StatePollingTimeout) + permanentNodes := make([]string, 0, len(startNodeInfo.HostsToStart)) + httpsPollNodeStateIndirectOp, err := makeHTTPSPollUnknownNodeStateOp(startNodeInfo.HostsToStart, + &permanentNodes, options.usePassword, options.UserName, options.Password, + options.StatePollingTimeout) + if err != nil { + return instructions, err + } + httpsPollNodeStateOp, err := makeHTTPSPollPermanentNodeStateOp(startNodeInfo.HostsToStart, + &permanentNodes, options.usePassword, options.UserName, options.Password, + options.StatePollingTimeout) if err != nil { return instructions, err } @@ -412,6 +421,7 @@ func (vcc VClusterCommands) produceStartNodesInstructions(startNodeInfo *VStartN instructions = append(instructions, &httpsRestartUpCommandOp, &nmaStartNewNodesOp, + &httpsPollNodeStateIndirectOp, &httpsPollNodeStateOp, ) if vdb.IsEon { @@ -546,6 +556,9 @@ func (options *VStartNodesOptions) separateHostsBasedOnReIPNeed( vnode, ok := vdb.HostNodeMap[newIP] if ok && vnode.State == util.NodeDownState { startNodeInfo.hasDownNodeNoNeedToReIP = true + } else if ok && (vnode.State == util.NodeUpState || vnode.State == util.NodeComputeState) { + // skip UP or COMPUTE nodes with no re-ip need + continue } } diff --git a/vclusterops/stop_node.go b/vclusterops/stop_node.go index 94751f8..61a4f60 100644 --- a/vclusterops/stop_node.go +++ b/vclusterops/stop_node.go @@ -163,15 +163,30 @@ func (vcc VClusterCommands) produceStopNodeInstructions(vdb *VCoordinationDataba username := options.UserName usePassword := options.usePassword password := options.Password - stopHostNodeNameMap := make(map[string]string) - stopHostNodeMap := vdb.copyHostNodeMap(options.StopHosts) - for h, vnode := range stopHostNodeMap { - stopHostNodeNameMap[vnode.Name] = h + + // most node types can be stopped via HTTPS service + regularHostsToStop := util.SliceDiff(options.StopHosts, vdb.ComputeNodes) + if len(regularHostsToStop) > 0 { + regularStopHostNodeNameMap := make(map[string]string) + regularStopHostNodeMap := vdb.copyHostNodeMap(regularHostsToStop) + for h, vnode := range regularStopHostNodeMap { + regularStopHostNodeNameMap[vnode.Name] = h + } + + httpsStopNodeOp, err := makeHTTPSStopInputNodesOp(regularStopHostNodeNameMap, usePassword, username, password, nil) + if err != nil { + return instructions, err + } + instructions = append(instructions, &httpsStopNodeOp) } - httpsStopNodeOp, err := makeHTTPSStopInputNodesOp(stopHostNodeNameMap, usePassword, username, password, nil) - if err != nil { - return instructions, err + // compute nodes currently don't support distcalls, so need to kill by signal via NMA + computeHostsToStop := util.SliceCommon(options.StopHosts, vdb.ComputeNodes) + if len(computeHostsToStop) > 0 { + err := vcc.produceStopComputeNodeOps(&instructions, computeHostsToStop, vdb.HostNodeMap) + if err != nil { + return instructions, err + } } // Poll for nodes down @@ -181,9 +196,28 @@ func (vcc VClusterCommands) produceStopNodeInstructions(vdb *VCoordinationDataba return instructions, err } - instructions = append(instructions, - &httpsStopNodeOp, - &httpsPollNodesDown, - ) + instructions = append(instructions, &httpsPollNodesDown) return instructions, nil } + +// produceStopComputeNodeOps creates the instructions required to terminate compute nodes. +// Since compute nodes lack distcall support, they must be stopped via signal, including when +// dropped. +func (vcc VClusterCommands) produceStopComputeNodeOps(instructions *[]clusterOp, + computeHostsToStop []string, + hostNodeMap vHostNodeMap) error { + nmaHealthOp := makeNMAHealthOp(computeHostsToStop) + computeHostCatPathMap := make(map[string]string, len(computeHostsToStop)) + for _, host := range computeHostsToStop { + computeHostCatPathMap[host] = hostNodeMap[host].CatalogPath + } + nmaSigTermNodeOp, err := makeNMASigTermVerticaOp(computeHostsToStop, computeHostCatPathMap) + if err != nil { + return err + } + *instructions = append(*instructions, + &nmaHealthOp, + &nmaSigTermNodeOp, + ) + return nil +} diff --git a/vclusterops/util/util.go b/vclusterops/util/util.go index 50c3311..12d2f32 100644 --- a/vclusterops/util/util.go +++ b/vclusterops/util/util.go @@ -682,15 +682,6 @@ func ValidateCommunalStorageLocation(location string) error { return nil } -// Max works on all sane types, not just float64 like the math package funcs. -// Can be removed after upgrade to go 1.21 (VER-90410) as min/max become builtins. -func Max[T constraints.Ordered](a, b T) T { - if a > b { - return a - } - return b -} - // GetPathPrefix returns a path prefix for a (catalog/data/depot) path of a node func GetPathPrefix(path string) string { if path == "" {