Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: various reader failover fixes #1227

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ protected List<HostSpec> waitTillTopologyGetsUpdated(final long timeoutMs) throw
}

final long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMs);

// Note that we are checking reference equality instead of value equality here. We will break out of the loop if
// there is a new entry in the topology map, even if the value of the hosts in latestHosts is the same as
// currentHosts.
while (currentHosts == (latestHosts = this.topologyMap.get(this.clusterId))
&& System.nanoTime() < end) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,24 +152,14 @@ private Future<ReaderFailoverResult> submitInternalFailoverTask(
}

// need to ensure that new connection is a connection to a reader node

pluginService.forceRefreshHostList(result.getConnection());
topology = pluginService.getAllHosts();
for (final HostSpec node : topology) {
if (node.getUrl().equals(result.getHost().getUrl())) {
// found new connection host in the latest topology
if (node.getRole() == HostRole.READER) {
return result;
}
try {
if (HostRole.READER.equals(this.pluginService.getHostRole(result.getConnection()))) {
return result;
}
} catch (SQLException e) {
LOGGER.fine(Messages.get("ClusterAwareReaderFailoverHandler.errorGettingHostRole", new Object[]{e}));
}

// New node is not found in the latest topology. There are few possible reasons for that.
// - Node is not yet presented in the topology due to failover process in progress
// - Node is in the topology but its role isn't a
// READER (that is not acceptable option due to this.strictReader setting)
// Need to continue this loop and to make another try to connect to a reader.

try {
result.getConnection().close();
} catch (final SQLException ex) {
Expand Down Expand Up @@ -244,8 +234,9 @@ public List<HostSpec> getHostsByPriority(final List<HostSpec> hosts) {

final List<HostSpec> hostsByPriority = new ArrayList<>(activeReaders);
final int numOfReaders = activeReaders.size() + downHostList.size();
if (writerHost != null
&& (!this.enableFailoverStrictReader || numOfReaders == 0)) {
// Since the writer instance may change during failover, the original writer is likely now a reader. We will include
// it and then verify the role once connected if using "strict-reader".
if (writerHost != null || numOfReaders == 0) {
hostsByPriority.add(writerHost);
}
hostsByPriority.addAll(downHostList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import software.amazon.jdbc.HostSpec;
import software.amazon.jdbc.JdbcCallable;
import software.amazon.jdbc.NodeChangeOptions;
import software.amazon.jdbc.OldConnectionSuggestedAction;
import software.amazon.jdbc.PluginManagerService;
import software.amazon.jdbc.PluginService;
import software.amazon.jdbc.PropertyDefinition;
Expand Down Expand Up @@ -107,6 +106,8 @@ public class FailoverConnectionPlugin extends AbstractConnectionPlugin {
private RdsUrlType rdsUrlType = null;
private HostListProviderService hostListProviderService;
private final AuroraStaleDnsHelper staleDnsHelper;
private Supplier<WriterFailoverHandler> writerFailoverHandlerSupplier;
private Supplier<ReaderFailoverHandler> readerFailoverHandlerSupplier;

public static final AwsWrapperProperty FAILOVER_CLUSTER_TOPOLOGY_REFRESH_RATE_MS =
new AwsWrapperProperty(
Expand Down Expand Up @@ -257,7 +258,6 @@ public void initHostProvider(
final JdbcCallable<Void, SQLException> initHostProviderFunc)
throws SQLException {
initHostProvider(
initialUrl,
hostListProviderService,
initHostProviderFunc,
() ->
Expand All @@ -278,28 +278,22 @@ public void initHostProvider(
}

void initHostProvider(
final String initialUrl,
final HostListProviderService hostListProviderService,
final JdbcCallable<Void, SQLException> initHostProviderFunc,
final Supplier<ClusterAwareReaderFailoverHandler> readerFailoverHandlerSupplier,
final Supplier<ClusterAwareWriterFailoverHandler> writerFailoverHandlerSupplier)
final Supplier<ReaderFailoverHandler> readerFailoverHandlerSupplier,
final Supplier<WriterFailoverHandler> writerFailoverHandlerSupplier)
throws SQLException {
this.readerFailoverHandlerSupplier = readerFailoverHandlerSupplier;
this.writerFailoverHandlerSupplier = writerFailoverHandlerSupplier;

this.hostListProviderService = hostListProviderService;
if (!this.enableFailoverSetting) {
return;
}

this.readerFailoverHandler = readerFailoverHandlerSupplier.get();
this.writerFailoverHandler = writerFailoverHandlerSupplier.get();

initHostProviderFunc.call();
}

@Override
public OldConnectionSuggestedAction notifyConnectionChanged(final EnumSet<NodeChangeOptions> changes) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function has the same implementation in the superclass so I removed it

return OldConnectionSuggestedAction.NO_OPINION;
}

@Override
public void notifyNodeListChanged(final Map<String, EnumSet<NodeChangeOptions>> changes) {

Expand Down Expand Up @@ -341,11 +335,6 @@ private boolean isNodeStillValid(final String node, final Map<String, EnumSet<No
return true;
}

// For testing purposes
void setRdsUrlType(final RdsUrlType rdsUrlType) {
this.rdsUrlType = rdsUrlType;
}

public boolean isFailoverEnabled() {
return this.enableFailoverSetting
&& !RdsUrlType.RDS_PROXY.equals(this.rdsUrlType)
Expand Down Expand Up @@ -494,7 +483,7 @@ private boolean isWriter(final HostSpec hostSpec) {
return hostSpec.getRole() == HostRole.WRITER;
}

private void processFailoverFailure(final String message) throws SQLException {
private void throwFailoverFailedException(final String message) throws SQLException {
LOGGER.severe(message);
throw new FailoverFailedSQLException(message);
}
Expand Down Expand Up @@ -578,22 +567,6 @@ protected void failover(final HostSpec failedHost) throws SQLException {
} else {
failoverReader(failedHost);
}

if (isInTransaction || this.pluginService.isInTransaction()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved to throwFailoverSuccessException so that the exception is thrown in failoverReader/failoverWriter instead. This was needed because failoverReader/failoverWriter catch the exception and then update the telemetry context, but there was a bug where we the exception was thrown here instead of in failoverReader/failoverWriter.

if (this.pluginManagerService != null) {
this.pluginManagerService.setInTransaction(false);
}
// "Transaction resolution unknown. Please re-configure session state if required and try
// restarting transaction."
final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError");
LOGGER.info(errorMessage);
throw new TransactionStateUnknownSQLException();
} else {
// "The active SQL connection has changed due to a connection failure. Please re-configure
// session state if required. "
LOGGER.severe(() -> Messages.get("Failover.connectionChangedError"));
throw new FailoverSuccessSQLException();
}
}

protected void failoverReader(final HostSpec failedHostSpec) throws SQLException {
Expand All @@ -610,9 +583,8 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException
if (failedHostSpec != null && failedHostSpec.getRawAvailability() == HostAvailability.AVAILABLE) {
failedHost = failedHostSpec;
}
final ReaderFailoverResult result =
readerFailoverHandler.failover(this.pluginService.getHosts(), failedHost);

final ReaderFailoverResult result = readerFailoverHandler.failover(this.pluginService.getHosts(), failedHost);
if (result != null) {
final SQLException exception = result.getException();
if (exception != null) {
Expand All @@ -621,28 +593,24 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException
}

if (result == null || !result.isConnected()) {
// "Unable to establish SQL connection to reader instance"
processFailoverFailure(Messages.get("Failover.unableToConnectToReader"));
this.failoverReaderFailedCounter.inc();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was a bug here and other places where the failover failed counter was incorrectly increased twice, once here and once in the catch section for this block

throwFailoverFailedException(Messages.get("Failover.unableToConnectToReader"));
return;
}

this.pluginService.setCurrentConnection(result.getConnection(), result.getHost());

this.pluginService.getCurrentHostSpec().removeAlias(oldAliases.toArray(new String[]{}));
this.pluginService.getCurrentHostSpec().removeAlias(oldAliases.toArray(new String[] {}));
updateTopology(true);

LOGGER.info(
() -> Messages.get(
"Failover.establishedConnection",
new Object[]{this.pluginService.getCurrentHostSpec()}));

this.failoverReaderSuccessCounter.inc();

new Object[] {this.pluginService.getCurrentHostSpec()}));
throwFailoverSuccessException();
} catch (FailoverSuccessSQLException ex) {
this.failoverReaderSuccessCounter.inc();
telemetryContext.setSuccess(true);
telemetryContext.setException(ex);
this.failoverReaderSuccessCounter.inc();
throw ex;
} catch (Exception ex) {
telemetryContext.setSuccess(false);
Expand All @@ -657,6 +625,24 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException
}
}

protected void throwFailoverSuccessException() throws SQLException {
if (isInTransaction || this.pluginService.isInTransaction()) {
if (this.pluginManagerService != null) {
this.pluginManagerService.setInTransaction(false);
}
// "Transaction resolution unknown. Please re-configure session state if required and try
// restarting transaction."
final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError");
LOGGER.info(errorMessage);
throw new TransactionStateUnknownSQLException();
} else {
// "The active SQL connection has changed due to a connection failure. Please re-configure
// session state if required. "
LOGGER.severe(() -> Messages.get("Failover.connectionChangedError"));
throw new FailoverSuccessSQLException();
}
}

protected void failoverWriter() throws SQLException {
TelemetryFactory telemetryFactory = this.pluginService.getTelemetryFactory();
TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext(
Expand All @@ -665,8 +651,7 @@ protected void failoverWriter() throws SQLException {

try {
LOGGER.info(() -> Messages.get("Failover.startWriterFailover"));
final WriterFailoverResult failoverResult =
this.writerFailoverHandler.failover(this.pluginService.getAllHosts());
final WriterFailoverResult failoverResult = this.writerFailoverHandler.failover(this.pluginService.getAllHosts());
if (failoverResult != null) {
final SQLException exception = failoverResult.getException();
if (exception != null) {
Expand All @@ -675,23 +660,25 @@ protected void failoverWriter() throws SQLException {
}

if (failoverResult == null || !failoverResult.isConnected()) {
// "Unable to establish SQL connection to writer node"
processFailoverFailure(Messages.get("Failover.unableToConnectToWriter"));
this.failoverWriterFailedCounter.inc();
throwFailoverFailedException(Messages.get("Failover.unableToConnectToWriter"));
return;
}

List<HostSpec> hosts = failoverResult.getTopology();
final HostSpec writerHostSpec = getWriter(hosts);
if (writerHostSpec == null) {
throwFailoverFailedException(
Messages.get(
"Failover.noWriterHostAfterReconnecting",
new Object[]{Utils.logTopology(hosts, "")}));
return;
}

// successfully re-connected to a writer node
final HostSpec writerHostSpec = getWriter(failoverResult.getTopology());
final List<HostSpec> allowedHosts = this.pluginService.getHosts();
if (!allowedHosts.contains(writerHostSpec)) {
this.failoverWriterFailedCounter.inc();
processFailoverFailure(
throwFailoverFailedException(
Messages.get("Failover.newWriterNotAllowed",
new Object[] {
writerHostSpec == null ? "<null>" : writerHostSpec.getHost(),
Utils.logTopology(allowedHosts, "")
}));
new Object[] {writerHostSpec.getHost(), Utils.logTopology(allowedHosts, "")}));
return;
}

Expand All @@ -703,8 +690,7 @@ protected void failoverWriter() throws SQLException {
new Object[]{this.pluginService.getCurrentHostSpec()}));

this.pluginService.refreshHostList();

this.failoverWriterSuccessCounter.inc();
throwFailoverSuccessException();
} catch (FailoverSuccessSQLException ex) {
telemetryContext.setSuccess(true);
telemetryContext.setException(ex);
Expand Down Expand Up @@ -813,6 +799,20 @@ private Connection connectInternal(String driverProtocol, HostSpec hostSpec, Pro
boolean isInitialConnection, JdbcCallable<Connection, SQLException> connectFunc, boolean isForceConnect)
throws SQLException {
this.initFailoverMode();
if (this.readerFailoverHandler == null) {
if (this.readerFailoverHandlerSupplier == null) {
throw new SQLException(Messages.get("Failover.nullReaderFailoverHandlerSupplier"));
}
this.readerFailoverHandler = this.readerFailoverHandlerSupplier.get();
}

if (this.writerFailoverHandler == null) {
if (this.writerFailoverHandlerSupplier == null) {
throw new SQLException(Messages.get("Failover.nullWriterFailoverHandlerSupplier"));
}
this.writerFailoverHandler = this.writerFailoverHandlerSupplier.get();
}

Connection conn = null;
try {
conn =
Expand Down Expand Up @@ -852,4 +852,17 @@ public Connection forceConnect(
throws SQLException {
return connectInternal(driverProtocol, hostSpec, props, isInitialConnection, forceConnectFunc, true);
}

// The below methods are for testing purposes
void setRdsUrlType(final RdsUrlType rdsUrlType) {
this.rdsUrlType = rdsUrlType;
}

void setWriterFailoverHandler(final WriterFailoverHandler writerFailoverHandler) {
this.writerFailoverHandler = writerFailoverHandler;
}

void setReaderFailoverHandler(final ReaderFailoverHandler readerFailoverHandler) {
this.readerFailoverHandler = readerFailoverHandler;
}
}
Loading
Loading