Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.Shell;
import org.slf4j.Logger;
Expand Down Expand Up @@ -63,19 +67,41 @@ protected Logger getLogger() {
return LOG;
}

private void confirmResumed(Set<ServerName> resumedServers) {
if (resumedServers.isEmpty()) {
return;
}
try {
Set<Address> addrs =
resumedServers.stream().map(ServerName::getAddress).collect(Collectors.toSet());
cluster.getClusterMetrics().getLiveServerMetrics().keySet().stream()
.map(ServerName::getAddress).forEach(addrs::remove);
for (Address addr : addrs) {
LOG.warn("Region server {} is crashed after resuming, starting", addr);
startRs(ServerName.valueOf(addr, -1));
}
} catch (IOException e) {
LOG.warn("Failed to check liveness for region servers {}", resumedServers);
}
}

@Override
public void perform() throws Exception {
getLogger().info("Performing action: Rolling batch suspending {}% of region servers",
(int) (ratio * 100));
List<ServerName> selectedServers = selectServers();
Queue<ServerName> serversToBeSuspended = new ArrayDeque<>(selectedServers);
Queue<ServerName> suspendedServers = new ArrayDeque<>();
// After resuming, usually the region server will crash soon because of session expired, and if
// the region server is not started by 'autostart', it will crash for ever. So here we record
// these region servers and make sure that they are all alive before exiting this action. See
// HBASE-29206 for more details.
Set<ServerName> resumedServers = new HashSet<>();
Random rand = ThreadLocalRandom.current();
// loop while there are servers to be suspended or suspended servers to be resumed
while (
(!serversToBeSuspended.isEmpty() || !suspendedServers.isEmpty()) && !context.isStopping()
) {

final SuspendOrResume action;
if (serversToBeSuspended.isEmpty()) { // no more servers to suspend
action = SuspendOrResume.RESUME;
Expand All @@ -88,7 +114,6 @@ public void perform() throws Exception {
// do a coin toss
action = rand.nextBoolean() ? SuspendOrResume.SUSPEND : SuspendOrResume.RESUME;
}

ServerName server;
switch (action) {
case SUSPEND:
Expand All @@ -107,11 +132,13 @@ public void perform() throws Exception {
} catch (Shell.ExitCodeException e) {
LOG.info("Problem resuming, will retry; code={}", e.getExitCode(), e);
}
resumedServers.add(server);
break;
}

getLogger().info("Sleeping for:{}", sleepTime);
Threads.sleep(sleepTime);
confirmResumed(resumedServers);
}
}

Expand Down