Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: various reader failover fixes #1227

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ protected List<HostSpec> waitTillTopologyGetsUpdated(final long timeoutMs) throw
}

final long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMs);

// Note that we are checking reference equality instead of value equality here. We will break out of the loop if
// there is a new entry in the topology map, even if the value of the hosts in latestHosts is the same as
// currentHosts.
while (currentHosts == (latestHosts = this.topologyMap.get(this.clusterId))
&& System.nanoTime() < end) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,24 +152,14 @@ private Future<ReaderFailoverResult> submitInternalFailoverTask(
}

// need to ensure that new connection is a connection to a reader node

pluginService.forceRefreshHostList(result.getConnection());
topology = pluginService.getAllHosts();
for (final HostSpec node : topology) {
if (node.getUrl().equals(result.getHost().getUrl())) {
// found new connection host in the latest topology
if (node.getRole() == HostRole.READER) {
return result;
}
try {
if (HostRole.READER.equals(this.pluginService.getHostRole(result.getConnection()))) {
return result;
}
} catch (SQLException e) {
LOGGER.fine(Messages.get("ClusterAwareReaderFailoverHandler.errorGettingHostRole", new Object[]{e}));
}

// New node is not found in the latest topology. There are few possible reasons for that.
// - Node is not yet presented in the topology due to failover process in progress
// - Node is in the topology but its role isn't a
// READER (that is not acceptable option due to this.strictReader setting)
// Need to continue this loop and to make another try to connect to a reader.

try {
result.getConnection().close();
} catch (final SQLException ex) {
Expand Down Expand Up @@ -244,8 +234,9 @@ public List<HostSpec> getHostsByPriority(final List<HostSpec> hosts) {

final List<HostSpec> hostsByPriority = new ArrayList<>(activeReaders);
final int numOfReaders = activeReaders.size() + downHostList.size();
if (writerHost != null
&& (!this.enableFailoverStrictReader || numOfReaders == 0)) {
// Since the writer instance may change during failover, the original writer is likely now a reader. We will include
// it and then verify the role once connected if using "strict-reader".
if (writerHost != null || numOfReaders == 0) {
hostsByPriority.add(writerHost);
}
hostsByPriority.addAll(downHostList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
import software.amazon.jdbc.plugin.staledns.AuroraStaleDnsHelper;
import software.amazon.jdbc.targetdriverdialect.TargetDriverDialect;
import software.amazon.jdbc.util.ConnectionUrlParser;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.RdsUrlType;
import software.amazon.jdbc.util.RdsUtils;
Expand Down Expand Up @@ -289,10 +290,11 @@ void initHostProvider(
return;
}

initHostProviderFunc.call();
initFailoverMode(initialUrl);

this.readerFailoverHandler = readerFailoverHandlerSupplier.get();
this.writerFailoverHandler = writerFailoverHandlerSupplier.get();

initHostProviderFunc.call();
}

@Override
Expand Down Expand Up @@ -364,12 +366,10 @@ private void initSettings() {
TELEMETRY_FAILOVER_ADDITIONAL_TOP_TRACE.getBoolean(this.properties);
}

protected void initFailoverMode() {
protected void initFailoverMode(final String initialUrl) {
if (this.rdsUrlType == null) {
this.failoverMode = FailoverMode.fromValue(FAILOVER_MODE.getString(this.properties));
final HostSpec initialHostSpec = this.hostListProviderService.getInitialConnectionHostSpec();
this.rdsUrlType = this.rdsHelper.identifyRdsType(initialHostSpec.getHost());

this.rdsUrlType = this.rdsHelper.identifyRdsType(ConnectionUrlParser.parseHost(initialUrl));
aaron-congo marked this conversation as resolved.
Show resolved Hide resolved
if (this.failoverMode == null) {
this.failoverMode = this.rdsUrlType == RdsUrlType.RDS_READER_CLUSTER
? FailoverMode.READER_OR_WRITER
Expand Down Expand Up @@ -812,7 +812,6 @@ public Connection connect(
private Connection connectInternal(String driverProtocol, HostSpec hostSpec, Properties props,
boolean isInitialConnection, JdbcCallable<Connection, SQLException> connectFunc, boolean isForceConnect)
throws SQLException {
this.initFailoverMode();
Connection conn = null;
try {
conn =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package software.amazon.jdbc.plugin.failover2;

import static software.amazon.jdbc.plugin.failover.FailoverMode.STRICT_READER;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
Expand All @@ -26,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import software.amazon.jdbc.AwsWrapperProperty;
import software.amazon.jdbc.HostListProviderService;
import software.amazon.jdbc.HostRole;
Expand Down Expand Up @@ -342,85 +345,110 @@

try {
LOGGER.fine(() -> Messages.get("Failover.startReaderFailover"));

// It's expected that this method synchronously returns when topology is stabilized,
// i.e. when cluster control plane has already chosen a new writer.
if (!this.pluginService.forceRefreshHostList(false, 0)) {
// "Unable to establish SQL connection to reader instance"
this.failoverReaderFailedCounter.inc();
LOGGER.severe(Messages.get("Failover.unableToConnectToReader"));
throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToReader"));
LOGGER.severe(Messages.get("Failover.unableToRefreshHostList", new Object[]{"[failoverReader]"}));
throw new FailoverFailedSQLException(
Messages.get("Failover.unableToRefreshHostList", new Object[]{"[failoverReader]"}));
}

final Properties copyProp = PropertyUtils.copyProperties(this.properties);
copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true");

// The host roles in this host list may or may not be accurate, depending on whether the new topology has become
// available or not yet.
final List<HostSpec> hosts = this.pluginService.getHosts();
Connection readerCandidateConn = null;
Connection candidateConn = null;
HostSpec readerCandidate = null;
final Set<HostSpec> remainingHosts = new HashSet<>(hosts);
HostSpec verifiedWriter = null;
final HostSpec originalWriter = hosts.stream()
.filter(hostSpec -> HostRole.WRITER.equals(hostSpec.getRole()))
.findFirst()
.orElse(null);
final Set<HostSpec> originalReaders = hosts.stream()
.filter(hostSpec -> HostRole.READER.equals(hostSpec.getRole()))
.collect(Collectors.toSet());
while (candidateConn == null && System.nanoTime() < failoverEndTimeNano) {
final Set<HostSpec> remainingHosts = new HashSet<>(originalReaders);
if (FailoverMode.STRICT_READER.equals(this.failoverMode) && verifiedWriter != null) {
remainingHosts.remove(verifiedWriter);
}

while (!remainingHosts.isEmpty()
&& readerCandidateConn == null
&& System.nanoTime() < failoverEndTimeNano) {
try {
readerCandidate =
this.pluginService.getHostSpecByStrategy(
new ArrayList<>(remainingHosts),
HostRole.READER,
this.failoverReaderHostSelectorStrategySetting);
} catch (UnsupportedOperationException | SQLException ex) {
// can't use selected strategy to get a reader host
LOGGER.finest("Error: " + ex.getMessage());
break;
while (!remainingHosts.isEmpty()
&& candidateConn == null
&& System.nanoTime() < failoverEndTimeNano) {
try {
readerCandidate =
this.pluginService.getHostSpecByStrategy(
new ArrayList<>(remainingHosts),
HostRole.READER,
this.failoverReaderHostSelectorStrategySetting);
} catch (UnsupportedOperationException | SQLException ex) {
// can't use selected strategy to get a reader host
LOGGER.finest("Error: " + ex.getMessage());
break;
}

if (readerCandidate == null) {
aaron-congo marked this conversation as resolved.
Show resolved Hide resolved
break;
}

try {
candidateConn = this.pluginService.connect(readerCandidate, copyProp);
// Since the roles in the host list might not be accurate, we execute a query to check the instance's role.
if (this.failoverMode == STRICT_READER
&& this.pluginService.getHostRole(candidateConn) == HostRole.WRITER) {
verifiedWriter = readerCandidate;
candidateConn.close();
candidateConn = null;
remainingHosts.remove(readerCandidate);
readerCandidate = null;
}
} catch (SQLException ex) {
remainingHosts.remove(readerCandidate);
candidateConn = null;
readerCandidate = null;
}
}

if (readerCandidate != null
|| candidateConn != null

Check warning on line 415 in wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java

View workflow job for this annotation

GitHub Actions / Qodana Community for JVM

Constant values

Condition `candidateConn != null` is always `false`
|| originalWriter == null
|| System.nanoTime() > failoverEndTimeNano) {
continue;
}

if (readerCandidate == null) {
break;
if (STRICT_READER.equals(this.failoverMode) && originalWriter.equals(verifiedWriter)) {
continue;
}

// Try the original writer. The role may be inaccurate, so we will try connecting to it even if failover mode
// is set to STRICT_READER.
readerCandidate = originalWriter;
try {
readerCandidateConn = this.pluginService.connect(readerCandidate, copyProp);
if (this.pluginService.getHostRole(readerCandidateConn) != HostRole.READER) {
readerCandidateConn.close();
readerCandidateConn = null;
candidateConn = this.pluginService.connect(readerCandidate, copyProp);
if (this.failoverMode == STRICT_READER
&& this.pluginService.getHostRole(candidateConn) == HostRole.WRITER) {
verifiedWriter = readerCandidate;
candidateConn.close();
candidateConn = null;
remainingHosts.remove(readerCandidate);
readerCandidate = null;
}
} catch (SQLException ex) {
remainingHosts.remove(readerCandidate);
readerCandidateConn = null;
readerCandidate = null;
candidateConn = null;
}
}

if (readerCandidate == null
&& readerCandidateConn == null
&& this.failoverMode == FailoverMode.READER_OR_WRITER) {
// As a last resort, let's try to connect to a writer
try {
readerCandidate = this.pluginService.getHostSpecByStrategy(HostRole.WRITER,
this.failoverReaderHostSelectorStrategySetting);
if (readerCandidate != null) {
try {
readerCandidateConn = this.pluginService.connect(readerCandidate, copyProp);
} catch (SQLException ex) {
readerCandidate = null;
}
}
} catch (UnsupportedOperationException ex) {
// can't use selected strategy to get a reader host
}
}

if (readerCandidateConn == null) {
if (candidateConn == null) {
// "Unable to establish SQL connection to reader instance"
this.failoverReaderFailedCounter.inc();
LOGGER.severe(Messages.get("Failover.unableToConnectToReader"));
throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToReader"));
}

this.pluginService.setCurrentConnection(readerCandidateConn, readerCandidate);
this.pluginService.setCurrentConnection(candidateConn, readerCandidate);

LOGGER.info(
() -> Messages.get(
Expand Down Expand Up @@ -466,8 +494,9 @@
if (!this.pluginService.forceRefreshHostList(true, this.failoverTimeoutMsSetting)) {
// "Unable to establish SQL connection to writer node"
this.failoverWriterFailedCounter.inc();
LOGGER.severe(Messages.get("Failover.unableToRefreshHostList"));
throw new FailoverFailedSQLException(Messages.get("Failover.unableToRefreshHostList"));
LOGGER.severe(Messages.get("Failover.unableToRefreshHostList", new Object[]{"[failoverWriter]"}));
throw new FailoverFailedSQLException(
Messages.get("Failover.unableToRefreshHostList", new Object[]{"[failoverWriter]"}));
}

final List<HostSpec> updatedHosts = this.pluginService.getAllHosts();
Expand Down Expand Up @@ -503,8 +532,8 @@
} catch (SQLException ex) {
this.failoverWriterFailedCounter.inc();
LOGGER.severe(
Messages.get("Failover.exceptionConnectingToWriter", new Object[]{writerCandidate.getHost(), ex}));
throw new FailoverFailedSQLException(Messages.get("Failover.exceptionConnectingToWriter"));
Messages.get("Failover.exceptionConnectingToWriter", new Object[]{writerCandidate.getHost()}));
throw new FailoverFailedSQLException(Messages.get("Failover.exceptionConnectingToWriter"), ex);
}

HostRole role = this.pluginService.getHostRole(writerCandidateConn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,27 @@ public static HostSpec parseHostPortPair(final String url, final HostRole role,
return getHostSpec(hostPortPair, role, hostSpecBuilderSupplier.get());
}

public static @Nullable String parseHost(final String connString) {
final Matcher matcher = CONNECTION_STRING_PATTERN.matcher(connString);
if (!matcher.matches()) {
return null;
}

final String hosts = matcher.group("hosts") == null ? null : matcher.group("hosts").trim();
if (hosts == null) {
return null;
}

final String[] hostArray = hosts.split(HOSTS_SEPARATOR);
if (hostArray.length == 0) {
return null;
}

final String url = hostArray[0];
final String[] hostPortPair = url.split(HOST_PORT_SEPARATOR, 2);
return hostPortPair.length == 0 ? null : hostPortPair[0];
}

private static HostSpec getHostSpec(final String[] hostPortPair, final HostRole hostRole,
final HostSpecBuilder hostSpecBuilder) {
String hostId = rdsUtils.getRdsInstanceId(hostPortPair[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ AwsWrapperDataSource.configurationProfileNotFound=Configuration profile ''{0}''
# Cluster Aware Reader Failover Handler
ClusterAwareReaderFailoverHandler.interruptedThread=Thread was interrupted.
ClusterAwareReaderFailoverHandler.attemptingReaderConnection=Trying to connect to reader: ''{0}'', with properties ''{1}''
ClusterAwareReaderFailoverHandler.errorGettingHostRole=An error occurred while trying to determine the role of the reader candidate: {0}
ClusterAwareReaderFailoverHandler.successfulReaderConnection=Connected to reader: ''{0}''
ClusterAwareReaderFailoverHandler.failedReaderConnection=Failed to connect to reader: ''{0}''
ClusterAwareReaderFailoverHandler.invalidTopology=''{0}'' was called with an invalid (null or empty) topology.
Expand Down Expand Up @@ -171,12 +172,12 @@ ExecutionTimeConnectionPlugin.executionTime=Executed {0} in {1} nanos.
# Failover Connection Plugin
Failover.transactionResolutionUnknownError=Transaction resolution unknown. Please re-configure session state if required and try restarting the transaction.
Failover.connectionChangedError=The active SQL connection has changed due to a connection failure. Please re-configure session state if required.
Failover.exceptionConnectingToWriter=An exception occurred while trying to connect to the new writer ''{0}''. Exception: {1}
Failover.exceptionConnectingToWriter=An exception occurred while trying to connect to the new writer ''{0}''.
Failover.parameterValue={0}={1}
Failover.unableToConnect=Unable to establish a SQL connection due to an unexpected error.
Failover.unableToConnectToWriter=Unable to establish SQL connection to the writer instance.
Failover.unableToConnectToReader=Unable to establish SQL connection to the reader instance.
Failover.unableToRefreshHostList=The request to discover the new topology timed out or was unsuccessful.
Failover.unableToRefreshHostList={0} The request to discover the new topology timed out or was unsuccessful.
Failover.unexpectedReaderRole=The new writer was identified to be ''{0}'', but querying the instance for its role returned a role of {1}.
Failover.detectedException=Detected an exception while executing a command: {0}
Failover.failoverDisabled=Cluster-aware failover is disabled.
Expand Down
Loading
Loading