-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: various reader failover fixes #1227
base: main
Are you sure you want to change the base?
Changes from all commits
ab110c3
0906502
90fbc09
2696be9
bfd9631
9316886
9d69dc1
710e13e
f1b4835
4ec5a6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,6 @@ | |
import software.amazon.jdbc.HostSpec; | ||
import software.amazon.jdbc.JdbcCallable; | ||
import software.amazon.jdbc.NodeChangeOptions; | ||
import software.amazon.jdbc.OldConnectionSuggestedAction; | ||
import software.amazon.jdbc.PluginManagerService; | ||
import software.amazon.jdbc.PluginService; | ||
import software.amazon.jdbc.PropertyDefinition; | ||
|
@@ -107,6 +106,8 @@ public class FailoverConnectionPlugin extends AbstractConnectionPlugin { | |
private RdsUrlType rdsUrlType = null; | ||
private HostListProviderService hostListProviderService; | ||
private final AuroraStaleDnsHelper staleDnsHelper; | ||
private Supplier<WriterFailoverHandler> writerFailoverHandlerSupplier; | ||
private Supplier<ReaderFailoverHandler> readerFailoverHandlerSupplier; | ||
|
||
public static final AwsWrapperProperty FAILOVER_CLUSTER_TOPOLOGY_REFRESH_RATE_MS = | ||
new AwsWrapperProperty( | ||
|
@@ -257,7 +258,6 @@ public void initHostProvider( | |
final JdbcCallable<Void, SQLException> initHostProviderFunc) | ||
throws SQLException { | ||
initHostProvider( | ||
initialUrl, | ||
hostListProviderService, | ||
initHostProviderFunc, | ||
() -> | ||
|
@@ -278,28 +278,22 @@ public void initHostProvider( | |
} | ||
|
||
void initHostProvider( | ||
final String initialUrl, | ||
final HostListProviderService hostListProviderService, | ||
final JdbcCallable<Void, SQLException> initHostProviderFunc, | ||
final Supplier<ClusterAwareReaderFailoverHandler> readerFailoverHandlerSupplier, | ||
final Supplier<ClusterAwareWriterFailoverHandler> writerFailoverHandlerSupplier) | ||
final Supplier<ReaderFailoverHandler> readerFailoverHandlerSupplier, | ||
final Supplier<WriterFailoverHandler> writerFailoverHandlerSupplier) | ||
throws SQLException { | ||
this.readerFailoverHandlerSupplier = readerFailoverHandlerSupplier; | ||
this.writerFailoverHandlerSupplier = writerFailoverHandlerSupplier; | ||
|
||
this.hostListProviderService = hostListProviderService; | ||
if (!this.enableFailoverSetting) { | ||
return; | ||
} | ||
|
||
this.readerFailoverHandler = readerFailoverHandlerSupplier.get(); | ||
this.writerFailoverHandler = writerFailoverHandlerSupplier.get(); | ||
|
||
initHostProviderFunc.call(); | ||
} | ||
|
||
@Override | ||
public OldConnectionSuggestedAction notifyConnectionChanged(final EnumSet<NodeChangeOptions> changes) { | ||
return OldConnectionSuggestedAction.NO_OPINION; | ||
} | ||
|
||
@Override | ||
public void notifyNodeListChanged(final Map<String, EnumSet<NodeChangeOptions>> changes) { | ||
|
||
|
@@ -341,11 +335,6 @@ private boolean isNodeStillValid(final String node, final Map<String, EnumSet<No | |
return true; | ||
} | ||
|
||
// For testing purposes | ||
void setRdsUrlType(final RdsUrlType rdsUrlType) { | ||
this.rdsUrlType = rdsUrlType; | ||
} | ||
|
||
public boolean isFailoverEnabled() { | ||
return this.enableFailoverSetting | ||
&& !RdsUrlType.RDS_PROXY.equals(this.rdsUrlType) | ||
|
@@ -494,7 +483,7 @@ private boolean isWriter(final HostSpec hostSpec) { | |
return hostSpec.getRole() == HostRole.WRITER; | ||
} | ||
|
||
private void processFailoverFailure(final String message) throws SQLException { | ||
private void throwFailoverFailedException(final String message) throws SQLException { | ||
LOGGER.severe(message); | ||
throw new FailoverFailedSQLException(message); | ||
} | ||
|
@@ -578,22 +567,6 @@ protected void failover(final HostSpec failedHost) throws SQLException { | |
} else { | ||
failoverReader(failedHost); | ||
} | ||
|
||
if (isInTransaction || this.pluginService.isInTransaction()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was moved to throwFailoverSuccessException so that the exception is thrown in failoverReader/failoverWriter instead. This was needed because failoverReader/failoverWriter catch the exception and then update the telemetry context, but there was a bug where we the exception was thrown here instead of in failoverReader/failoverWriter. |
||
if (this.pluginManagerService != null) { | ||
this.pluginManagerService.setInTransaction(false); | ||
} | ||
// "Transaction resolution unknown. Please re-configure session state if required and try | ||
// restarting transaction." | ||
final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError"); | ||
LOGGER.info(errorMessage); | ||
throw new TransactionStateUnknownSQLException(); | ||
} else { | ||
// "The active SQL connection has changed due to a connection failure. Please re-configure | ||
// session state if required. " | ||
LOGGER.severe(() -> Messages.get("Failover.connectionChangedError")); | ||
throw new FailoverSuccessSQLException(); | ||
} | ||
} | ||
|
||
protected void failoverReader(final HostSpec failedHostSpec) throws SQLException { | ||
|
@@ -610,9 +583,8 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException | |
if (failedHostSpec != null && failedHostSpec.getRawAvailability() == HostAvailability.AVAILABLE) { | ||
failedHost = failedHostSpec; | ||
} | ||
final ReaderFailoverResult result = | ||
readerFailoverHandler.failover(this.pluginService.getHosts(), failedHost); | ||
|
||
final ReaderFailoverResult result = readerFailoverHandler.failover(this.pluginService.getHosts(), failedHost); | ||
if (result != null) { | ||
final SQLException exception = result.getException(); | ||
if (exception != null) { | ||
|
@@ -621,28 +593,24 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException | |
} | ||
|
||
if (result == null || !result.isConnected()) { | ||
// "Unable to establish SQL connection to reader instance" | ||
processFailoverFailure(Messages.get("Failover.unableToConnectToReader")); | ||
this.failoverReaderFailedCounter.inc(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there was a bug here and other places where the failover failed counter was incorrectly increased twice, once here and once in the catch section for this block |
||
throwFailoverFailedException(Messages.get("Failover.unableToConnectToReader")); | ||
return; | ||
} | ||
|
||
this.pluginService.setCurrentConnection(result.getConnection(), result.getHost()); | ||
|
||
this.pluginService.getCurrentHostSpec().removeAlias(oldAliases.toArray(new String[]{})); | ||
this.pluginService.getCurrentHostSpec().removeAlias(oldAliases.toArray(new String[] {})); | ||
updateTopology(true); | ||
|
||
LOGGER.info( | ||
() -> Messages.get( | ||
"Failover.establishedConnection", | ||
new Object[]{this.pluginService.getCurrentHostSpec()})); | ||
|
||
this.failoverReaderSuccessCounter.inc(); | ||
|
||
new Object[] {this.pluginService.getCurrentHostSpec()})); | ||
throwFailoverSuccessException(); | ||
} catch (FailoverSuccessSQLException ex) { | ||
this.failoverReaderSuccessCounter.inc(); | ||
telemetryContext.setSuccess(true); | ||
telemetryContext.setException(ex); | ||
this.failoverReaderSuccessCounter.inc(); | ||
throw ex; | ||
} catch (Exception ex) { | ||
telemetryContext.setSuccess(false); | ||
|
@@ -657,6 +625,24 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException | |
} | ||
} | ||
|
||
protected void throwFailoverSuccessException() throws SQLException { | ||
if (isInTransaction || this.pluginService.isInTransaction()) { | ||
if (this.pluginManagerService != null) { | ||
this.pluginManagerService.setInTransaction(false); | ||
} | ||
// "Transaction resolution unknown. Please re-configure session state if required and try | ||
// restarting transaction." | ||
final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError"); | ||
LOGGER.info(errorMessage); | ||
throw new TransactionStateUnknownSQLException(); | ||
} else { | ||
// "The active SQL connection has changed due to a connection failure. Please re-configure | ||
// session state if required. " | ||
LOGGER.severe(() -> Messages.get("Failover.connectionChangedError")); | ||
throw new FailoverSuccessSQLException(); | ||
} | ||
} | ||
|
||
protected void failoverWriter() throws SQLException { | ||
TelemetryFactory telemetryFactory = this.pluginService.getTelemetryFactory(); | ||
TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext( | ||
|
@@ -665,8 +651,7 @@ protected void failoverWriter() throws SQLException { | |
|
||
try { | ||
LOGGER.info(() -> Messages.get("Failover.startWriterFailover")); | ||
final WriterFailoverResult failoverResult = | ||
this.writerFailoverHandler.failover(this.pluginService.getAllHosts()); | ||
final WriterFailoverResult failoverResult = this.writerFailoverHandler.failover(this.pluginService.getAllHosts()); | ||
if (failoverResult != null) { | ||
final SQLException exception = failoverResult.getException(); | ||
if (exception != null) { | ||
|
@@ -675,23 +660,25 @@ protected void failoverWriter() throws SQLException { | |
} | ||
|
||
if (failoverResult == null || !failoverResult.isConnected()) { | ||
// "Unable to establish SQL connection to writer node" | ||
processFailoverFailure(Messages.get("Failover.unableToConnectToWriter")); | ||
this.failoverWriterFailedCounter.inc(); | ||
throwFailoverFailedException(Messages.get("Failover.unableToConnectToWriter")); | ||
return; | ||
} | ||
|
||
List<HostSpec> hosts = failoverResult.getTopology(); | ||
final HostSpec writerHostSpec = getWriter(hosts); | ||
if (writerHostSpec == null) { | ||
throwFailoverFailedException( | ||
Messages.get( | ||
"Failover.noWriterHostAfterReconnecting", | ||
new Object[]{Utils.logTopology(hosts, "")})); | ||
return; | ||
} | ||
|
||
// successfully re-connected to a writer node | ||
final HostSpec writerHostSpec = getWriter(failoverResult.getTopology()); | ||
final List<HostSpec> allowedHosts = this.pluginService.getHosts(); | ||
if (!allowedHosts.contains(writerHostSpec)) { | ||
this.failoverWriterFailedCounter.inc(); | ||
processFailoverFailure( | ||
throwFailoverFailedException( | ||
Messages.get("Failover.newWriterNotAllowed", | ||
new Object[] { | ||
writerHostSpec == null ? "<null>" : writerHostSpec.getHost(), | ||
Utils.logTopology(allowedHosts, "") | ||
})); | ||
new Object[] {writerHostSpec.getHost(), Utils.logTopology(allowedHosts, "")})); | ||
return; | ||
} | ||
|
||
|
@@ -703,8 +690,7 @@ protected void failoverWriter() throws SQLException { | |
new Object[]{this.pluginService.getCurrentHostSpec()})); | ||
|
||
this.pluginService.refreshHostList(); | ||
|
||
this.failoverWriterSuccessCounter.inc(); | ||
throwFailoverSuccessException(); | ||
} catch (FailoverSuccessSQLException ex) { | ||
telemetryContext.setSuccess(true); | ||
telemetryContext.setException(ex); | ||
|
@@ -813,6 +799,20 @@ private Connection connectInternal(String driverProtocol, HostSpec hostSpec, Pro | |
boolean isInitialConnection, JdbcCallable<Connection, SQLException> connectFunc, boolean isForceConnect) | ||
throws SQLException { | ||
this.initFailoverMode(); | ||
if (this.readerFailoverHandler == null) { | ||
if (this.readerFailoverHandlerSupplier == null) { | ||
throw new SQLException(Messages.get("Failover.nullReaderFailoverHandlerSupplier")); | ||
} | ||
this.readerFailoverHandler = this.readerFailoverHandlerSupplier.get(); | ||
} | ||
|
||
if (this.writerFailoverHandler == null) { | ||
if (this.writerFailoverHandlerSupplier == null) { | ||
throw new SQLException(Messages.get("Failover.nullWriterFailoverHandlerSupplier")); | ||
} | ||
this.writerFailoverHandler = this.writerFailoverHandlerSupplier.get(); | ||
} | ||
|
||
Connection conn = null; | ||
try { | ||
conn = | ||
|
@@ -852,4 +852,17 @@ public Connection forceConnect( | |
throws SQLException { | ||
return connectInternal(driverProtocol, hostSpec, props, isInitialConnection, forceConnectFunc, true); | ||
} | ||
|
||
// The below methods are for testing purposes | ||
void setRdsUrlType(final RdsUrlType rdsUrlType) { | ||
this.rdsUrlType = rdsUrlType; | ||
} | ||
|
||
void setWriterFailoverHandler(final WriterFailoverHandler writerFailoverHandler) { | ||
this.writerFailoverHandler = writerFailoverHandler; | ||
} | ||
|
||
void setReaderFailoverHandler(final ReaderFailoverHandler readerFailoverHandler) { | ||
this.readerFailoverHandler = readerFailoverHandler; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function has the same implementation in the superclass so I removed it