diff --git a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java index 34934e27c..ee2d46511 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java @@ -236,6 +236,10 @@ protected List waitTillTopologyGetsUpdated(final long timeoutMs) throw } final long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMs); + + // Note that we are checking reference equality instead of value equality here. We will break out of the loop if + // there is a new entry in the topology map, even if the value of the hosts in latestHosts is the same as + // currentHosts. while (currentHosts == (latestHosts = this.topologyMap.get(this.clusterId)) && System.nanoTime() < end) { try { diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandler.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandler.java index cc012d676..c6c9bedbe 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandler.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandler.java @@ -152,24 +152,14 @@ private Future submitInternalFailoverTask( } // need to ensure that new connection is a connection to a reader node - - pluginService.forceRefreshHostList(result.getConnection()); - topology = pluginService.getAllHosts(); - for (final HostSpec node : topology) { - if (node.getUrl().equals(result.getHost().getUrl())) { - // found new connection host in the latest topology - if (node.getRole() == HostRole.READER) { - return result; - } + try { + if (HostRole.READER.equals(this.pluginService.getHostRole(result.getConnection()))) { + return result; } + } catch (SQLException e) { + LOGGER.fine(Messages.get("ClusterAwareReaderFailoverHandler.errorGettingHostRole", new Object[]{e})); } - // New node is not found in the latest topology. There are few possible reasons for that. - // - Node is not yet presented in the topology due to failover process in progress - // - Node is in the topology but its role isn't a - // READER (that is not acceptable option due to this.strictReader setting) - // Need to continue this loop and to make another try to connect to a reader. - try { result.getConnection().close(); } catch (final SQLException ex) { @@ -244,8 +234,9 @@ public List getHostsByPriority(final List hosts) { final List hostsByPriority = new ArrayList<>(activeReaders); final int numOfReaders = activeReaders.size() + downHostList.size(); - if (writerHost != null - && (!this.enableFailoverStrictReader || numOfReaders == 0)) { + // Since the writer instance may change during failover, the original writer is likely now a reader. We will include + // it and then verify the role once connected if using "strict-reader". + if (writerHost != null || numOfReaders == 0) { hostsByPriority.add(writerHost); } hostsByPriority.addAll(downHostList); diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java index 38a7ea39b..423ce7006 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java @@ -36,7 +36,6 @@ import software.amazon.jdbc.HostSpec; import software.amazon.jdbc.JdbcCallable; import software.amazon.jdbc.NodeChangeOptions; -import software.amazon.jdbc.OldConnectionSuggestedAction; import software.amazon.jdbc.PluginManagerService; import software.amazon.jdbc.PluginService; import software.amazon.jdbc.PropertyDefinition; @@ -107,6 +106,8 @@ public class FailoverConnectionPlugin extends AbstractConnectionPlugin { private RdsUrlType rdsUrlType = null; private HostListProviderService hostListProviderService; private final AuroraStaleDnsHelper staleDnsHelper; + private Supplier writerFailoverHandlerSupplier; + private Supplier readerFailoverHandlerSupplier; public static final AwsWrapperProperty FAILOVER_CLUSTER_TOPOLOGY_REFRESH_RATE_MS = new AwsWrapperProperty( @@ -257,7 +258,6 @@ public void initHostProvider( final JdbcCallable initHostProviderFunc) throws SQLException { initHostProvider( - initialUrl, hostListProviderService, initHostProviderFunc, () -> @@ -278,28 +278,22 @@ public void initHostProvider( } void initHostProvider( - final String initialUrl, final HostListProviderService hostListProviderService, final JdbcCallable initHostProviderFunc, - final Supplier readerFailoverHandlerSupplier, - final Supplier writerFailoverHandlerSupplier) + final Supplier readerFailoverHandlerSupplier, + final Supplier writerFailoverHandlerSupplier) throws SQLException { + this.readerFailoverHandlerSupplier = readerFailoverHandlerSupplier; + this.writerFailoverHandlerSupplier = writerFailoverHandlerSupplier; + this.hostListProviderService = hostListProviderService; if (!this.enableFailoverSetting) { return; } - this.readerFailoverHandler = readerFailoverHandlerSupplier.get(); - this.writerFailoverHandler = writerFailoverHandlerSupplier.get(); - initHostProviderFunc.call(); } - @Override - public OldConnectionSuggestedAction notifyConnectionChanged(final EnumSet changes) { - return OldConnectionSuggestedAction.NO_OPINION; - } - @Override public void notifyNodeListChanged(final Map> changes) { @@ -341,11 +335,6 @@ private boolean isNodeStillValid(final String node, final Map Messages.get("Failover.connectionChangedError")); - throw new FailoverSuccessSQLException(); - } } protected void failoverReader(final HostSpec failedHostSpec) throws SQLException { @@ -610,9 +583,8 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException if (failedHostSpec != null && failedHostSpec.getRawAvailability() == HostAvailability.AVAILABLE) { failedHost = failedHostSpec; } - final ReaderFailoverResult result = - readerFailoverHandler.failover(this.pluginService.getHosts(), failedHost); + final ReaderFailoverResult result = readerFailoverHandler.failover(this.pluginService.getHosts(), failedHost); if (result != null) { final SQLException exception = result.getException(); if (exception != null) { @@ -621,28 +593,24 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException } if (result == null || !result.isConnected()) { - // "Unable to establish SQL connection to reader instance" - processFailoverFailure(Messages.get("Failover.unableToConnectToReader")); - this.failoverReaderFailedCounter.inc(); + throwFailoverFailedException(Messages.get("Failover.unableToConnectToReader")); return; } this.pluginService.setCurrentConnection(result.getConnection(), result.getHost()); - this.pluginService.getCurrentHostSpec().removeAlias(oldAliases.toArray(new String[]{})); + this.pluginService.getCurrentHostSpec().removeAlias(oldAliases.toArray(new String[] {})); updateTopology(true); LOGGER.info( () -> Messages.get( "Failover.establishedConnection", - new Object[]{this.pluginService.getCurrentHostSpec()})); - - this.failoverReaderSuccessCounter.inc(); - + new Object[] {this.pluginService.getCurrentHostSpec()})); + throwFailoverSuccessException(); } catch (FailoverSuccessSQLException ex) { + this.failoverReaderSuccessCounter.inc(); telemetryContext.setSuccess(true); telemetryContext.setException(ex); - this.failoverReaderSuccessCounter.inc(); throw ex; } catch (Exception ex) { telemetryContext.setSuccess(false); @@ -657,6 +625,24 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException } } + protected void throwFailoverSuccessException() throws SQLException { + if (isInTransaction || this.pluginService.isInTransaction()) { + if (this.pluginManagerService != null) { + this.pluginManagerService.setInTransaction(false); + } + // "Transaction resolution unknown. Please re-configure session state if required and try + // restarting transaction." + final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError"); + LOGGER.info(errorMessage); + throw new TransactionStateUnknownSQLException(); + } else { + // "The active SQL connection has changed due to a connection failure. Please re-configure + // session state if required. " + LOGGER.severe(() -> Messages.get("Failover.connectionChangedError")); + throw new FailoverSuccessSQLException(); + } + } + protected void failoverWriter() throws SQLException { TelemetryFactory telemetryFactory = this.pluginService.getTelemetryFactory(); TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext( @@ -665,8 +651,7 @@ protected void failoverWriter() throws SQLException { try { LOGGER.info(() -> Messages.get("Failover.startWriterFailover")); - final WriterFailoverResult failoverResult = - this.writerFailoverHandler.failover(this.pluginService.getAllHosts()); + final WriterFailoverResult failoverResult = this.writerFailoverHandler.failover(this.pluginService.getAllHosts()); if (failoverResult != null) { final SQLException exception = failoverResult.getException(); if (exception != null) { @@ -675,23 +660,25 @@ protected void failoverWriter() throws SQLException { } if (failoverResult == null || !failoverResult.isConnected()) { - // "Unable to establish SQL connection to writer node" - processFailoverFailure(Messages.get("Failover.unableToConnectToWriter")); - this.failoverWriterFailedCounter.inc(); + throwFailoverFailedException(Messages.get("Failover.unableToConnectToWriter")); + return; + } + + List hosts = failoverResult.getTopology(); + final HostSpec writerHostSpec = getWriter(hosts); + if (writerHostSpec == null) { + throwFailoverFailedException( + Messages.get( + "Failover.noWriterHostAfterReconnecting", + new Object[]{Utils.logTopology(hosts, "")})); return; } - // successfully re-connected to a writer node - final HostSpec writerHostSpec = getWriter(failoverResult.getTopology()); final List allowedHosts = this.pluginService.getHosts(); if (!allowedHosts.contains(writerHostSpec)) { - this.failoverWriterFailedCounter.inc(); - processFailoverFailure( + throwFailoverFailedException( Messages.get("Failover.newWriterNotAllowed", - new Object[] { - writerHostSpec == null ? "" : writerHostSpec.getHost(), - Utils.logTopology(allowedHosts, "") - })); + new Object[] {writerHostSpec.getHost(), Utils.logTopology(allowedHosts, "")})); return; } @@ -703,8 +690,7 @@ protected void failoverWriter() throws SQLException { new Object[]{this.pluginService.getCurrentHostSpec()})); this.pluginService.refreshHostList(); - - this.failoverWriterSuccessCounter.inc(); + throwFailoverSuccessException(); } catch (FailoverSuccessSQLException ex) { telemetryContext.setSuccess(true); telemetryContext.setException(ex); @@ -813,6 +799,20 @@ private Connection connectInternal(String driverProtocol, HostSpec hostSpec, Pro boolean isInitialConnection, JdbcCallable connectFunc, boolean isForceConnect) throws SQLException { this.initFailoverMode(); + if (this.readerFailoverHandler == null) { + if (this.readerFailoverHandlerSupplier == null) { + throw new SQLException(Messages.get("Failover.nullReaderFailoverHandlerSupplier")); + } + this.readerFailoverHandler = this.readerFailoverHandlerSupplier.get(); + } + + if (this.writerFailoverHandler == null) { + if (this.writerFailoverHandlerSupplier == null) { + throw new SQLException(Messages.get("Failover.nullWriterFailoverHandlerSupplier")); + } + this.writerFailoverHandler = this.writerFailoverHandlerSupplier.get(); + } + Connection conn = null; try { conn = @@ -852,4 +852,17 @@ public Connection forceConnect( throws SQLException { return connectInternal(driverProtocol, hostSpec, props, isInitialConnection, forceConnectFunc, true); } + + // The below methods are for testing purposes + void setRdsUrlType(final RdsUrlType rdsUrlType) { + this.rdsUrlType = rdsUrlType; + } + + void setWriterFailoverHandler(final WriterFailoverHandler writerFailoverHandler) { + this.writerFailoverHandler = writerFailoverHandler; + } + + void setReaderFailoverHandler(final ReaderFailoverHandler readerFailoverHandler) { + this.readerFailoverHandler = readerFailoverHandler; + } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java index fd11ea225..beafb4179 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java @@ -16,6 +16,8 @@ package software.amazon.jdbc.plugin.failover2; +import static software.amazon.jdbc.plugin.failover.FailoverMode.STRICT_READER; + import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; @@ -25,7 +27,9 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Logger; +import java.util.stream.Collectors; import software.amazon.jdbc.AwsWrapperProperty; import software.amazon.jdbc.HostListProviderService; import software.amazon.jdbc.HostRole; @@ -124,7 +128,6 @@ public class FailoverConnectionPlugin extends AbstractConnectionPlugin { protected boolean isInTransaction = false; protected RdsUrlType rdsUrlType; protected HostListProviderService hostListProviderService; - protected long failoverStartTimeNano = 0; protected final AuroraStaleDnsHelper staleDnsHelper; protected final TelemetryCounter failoverWriterTriggeredCounter; protected final TelemetryCounter failoverWriterSuccessCounter; @@ -306,28 +309,11 @@ protected void dealWithIllegalStateException( * @throws SQLException if an error occurs */ protected void failover() throws SQLException { - if (this.failoverMode == FailoverMode.STRICT_WRITER) { failoverWriter(); } else { failoverReader(); } - - if (isInTransaction || this.pluginService.isInTransaction()) { - if (this.pluginManagerService != null) { - this.pluginManagerService.setInTransaction(false); - } - // "Transaction resolution unknown. Please re-configure session state if required and try - // restarting transaction." - final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError"); - LOGGER.info(errorMessage); - throw new TransactionStateUnknownSQLException(); - } else { - // "The active SQL connection has changed due to a connection failure. Please re-configure - // session state if required. " - LOGGER.severe(() -> Messages.get("Failover.connectionChangedError")); - throw new FailoverSuccessSQLException(); - } } protected void failoverReader() throws SQLException { @@ -336,117 +322,171 @@ protected void failoverReader() throws SQLException { TELEMETRY_READER_FAILOVER, TelemetryTraceLevel.NESTED); this.failoverReaderTriggeredCounter.inc(); - this.failoverStartTimeNano = System.nanoTime(); - final long failoverEndTimeNano = this.failoverStartTimeNano - + TimeUnit.MILLISECONDS.toNanos(this.failoverTimeoutMsSetting); + final long failoverStartNano = System.nanoTime(); + final long failoverEndNano = failoverStartNano + TimeUnit.MILLISECONDS.toNanos(this.failoverTimeoutMsSetting); try { LOGGER.fine(() -> Messages.get("Failover.startReaderFailover")); - - // It's expected that this method synchronously returns when topology is stabilized, - // i.e. when cluster control plane has already chosen a new writer. + // When we pass a timeout of 0, we inform the plugin service that it should update its topology without waiting + // for it to get updated, since we do not need updated topology to establish a reader connection. if (!this.pluginService.forceRefreshHostList(false, 0)) { - // "Unable to establish SQL connection to reader instance" - this.failoverReaderFailedCounter.inc(); + LOGGER.severe(Messages.get("Failover.failoverReaderUnableToRefreshHostList")); + throw new FailoverFailedSQLException(Messages.get("Failover.failoverReaderUnableToRefreshHostList")); + } + + try { + ReaderFailoverResult result = getReaderFailoverConnection(failoverEndNano); + this.pluginService.setCurrentConnection(result.getConnection(), result.getHostSpec()); + } catch (TimeoutException e) { LOGGER.severe(Messages.get("Failover.unableToConnectToReader")); throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToReader")); } - final Properties copyProp = PropertyUtils.copyProperties(this.properties); - copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true"); + LOGGER.info( + () -> Messages.get( + "Failover.establishedConnection", + new Object[] {this.pluginService.getCurrentHostSpec()})); + throwFailoverSuccessException(); + } catch (FailoverSuccessSQLException ex) { + this.failoverReaderSuccessCounter.inc(); + telemetryContext.setSuccess(true); + telemetryContext.setException(ex); + throw ex; + } catch (Exception ex) { + telemetryContext.setSuccess(false); + telemetryContext.setException(ex); + this.failoverReaderFailedCounter.inc(); + throw ex; + } finally { + LOGGER.finest(() -> Messages.get( + "Failover.readerFailoverElapsed", + new Object[]{TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - failoverStartNano)})); + telemetryContext.closeContext(); + if (this.telemetryFailoverAdditionalTopTraceSetting) { + telemetryFactory.postCopy(telemetryContext, TelemetryTraceLevel.FORCE_TOP_LEVEL); + } + } + } - final List hosts = this.pluginService.getHosts(); - Connection readerCandidateConn = null; - HostSpec readerCandidate = null; - final Set remainingHosts = new HashSet<>(hosts); + protected ReaderFailoverResult getReaderFailoverConnection(long failoverEndTimeNano) throws TimeoutException { + final Properties copyProp = PropertyUtils.copyProperties(this.properties); + copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true"); + + // The roles in this list might not be accurate, depending on whether the new topology has become available yet. + final List hosts = this.pluginService.getHosts(); + final Set readerCandidates = hosts.stream() + .filter(hostSpec -> HostRole.READER.equals(hostSpec.getRole())) + .collect(Collectors.toSet()); + final HostSpec originalWriter = hosts.stream() + .filter(hostSpec -> HostRole.WRITER.equals(hostSpec.getRole())) + .findFirst() + .orElse(null); + boolean isOriginalWriterStillWriter = false; - while (!remainingHosts.isEmpty() - && readerCandidateConn == null - && System.nanoTime() < failoverEndTimeNano) { + do { + // First, try all original readers + final Set remainingReaders = new HashSet<>(readerCandidates); + while (!remainingReaders.isEmpty() && System.nanoTime() < failoverEndTimeNano) { + HostSpec readerCandidate; try { readerCandidate = this.pluginService.getHostSpecByStrategy( - new ArrayList<>(remainingHosts), + new ArrayList<>(remainingReaders), HostRole.READER, this.failoverReaderHostSelectorStrategySetting); } catch (UnsupportedOperationException | SQLException ex) { - // can't use selected strategy to get a reader host - LOGGER.finest("Error: " + ex.getMessage()); + LOGGER.finest( + Utils.logTopology( + new ArrayList<>(remainingReaders), + Messages.get("Failover.errorSelectingReaderHost", new Object[]{ex.getMessage()}))); break; } if (readerCandidate == null) { + LOGGER.finest( + Utils.logTopology(new ArrayList<>(remainingReaders), Messages.get("Failover.readerCandidateNull"))); break; } try { - readerCandidateConn = this.pluginService.connect(readerCandidate, copyProp); - if (this.pluginService.getHostRole(readerCandidateConn) != HostRole.READER) { - readerCandidateConn.close(); - readerCandidateConn = null; - remainingHosts.remove(readerCandidate); - readerCandidate = null; + Connection candidateConn = this.pluginService.connect(readerCandidate, copyProp); + // Since the roles in the host list might not be accurate, we execute a query to check the instance's role. + HostRole role = this.pluginService.getHostRole(candidateConn); + if (role == HostRole.READER || this.failoverMode != STRICT_READER) { + HostSpec updatedHostSpec = new HostSpec(readerCandidate, role); + return new ReaderFailoverResult(candidateConn, updatedHostSpec); + } + + // The role is WRITER or UNKNOWN, and we are in STRICT_READER mode, so the connection is not valid. + remainingReaders.remove(readerCandidate); + candidateConn.close(); + + if (role == HostRole.WRITER) { + // The reader candidate is actually a writer, which is not valid when failoverMode is STRICT_READER. + // We will remove it from the list of reader candidates to avoid retrying it in future iterations. + readerCandidates.remove(readerCandidate); + } else { + LOGGER.fine( + Messages.get("Failover.strictReaderUnknownHostRole", new Object[]{readerCandidate.getUrl()})); } } catch (SQLException ex) { - remainingHosts.remove(readerCandidate); - readerCandidateConn = null; - readerCandidate = null; + remainingReaders.remove(readerCandidate); } } - if (readerCandidate == null - && readerCandidateConn == null - && this.failoverMode == FailoverMode.READER_OR_WRITER) { - // As a last resort, let's try to connect to a writer - try { - readerCandidate = this.pluginService.getHostSpecByStrategy(HostRole.WRITER, - this.failoverReaderHostSelectorStrategySetting); - if (readerCandidate != null) { - try { - readerCandidateConn = this.pluginService.connect(readerCandidate, copyProp); - } catch (SQLException ex) { - readerCandidate = null; - } - } - } catch (UnsupportedOperationException ex) { - // can't use selected strategy to get a reader host - } + // We were not able to connect to any of the original readers. We will try connecting to the original writer, + // which may have been demoted to a reader. + + if (originalWriter == null || System.nanoTime() > failoverEndTimeNano) { + // No writer was found in the original topology, or we have timed out. + continue; } - if (readerCandidateConn == null) { - // "Unable to establish SQL connection to reader instance" - this.failoverReaderFailedCounter.inc(); - LOGGER.severe(Messages.get("Failover.unableToConnectToReader")); - throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToReader")); + if (this.failoverMode == STRICT_READER && isOriginalWriterStillWriter) { + // The original writer has been verified, so it is not valid when in STRICT_READER mode. + continue; } - this.pluginService.setCurrentConnection(readerCandidateConn, readerCandidate); + // Try the original writer, which may have been demoted to a reader. + try { + Connection candidateConn = this.pluginService.connect(originalWriter, copyProp); + HostRole role = this.pluginService.getHostRole(candidateConn); + if (role == HostRole.READER || this.failoverMode != STRICT_READER) { + HostSpec updatedHostSpec = new HostSpec(originalWriter, role); + return new ReaderFailoverResult(candidateConn, updatedHostSpec); + } - LOGGER.info( - () -> Messages.get( - "Failover.establishedConnection", - new Object[]{this.pluginService.getCurrentHostSpec()})); + // The role is WRITER or UNKNOWN, and we are in STRICT_READER mode, so the connection is not valid. + candidateConn.close(); - this.failoverReaderSuccessCounter.inc(); + if (role == HostRole.WRITER) { + isOriginalWriterStillWriter = true; + } else { + LOGGER.fine(Messages.get("Failover.strictReaderUnknownHostRole", new Object[]{originalWriter.getUrl()})); + } + } catch (SQLException ex) { + LOGGER.fine(Messages.get("Failover.failedReaderConnection", new Object[]{originalWriter.getUrl()})); + } + } while (System.nanoTime() < failoverEndTimeNano); // All hosts failed. Keep trying until we hit the timeout. - } catch (FailoverSuccessSQLException ex) { - telemetryContext.setSuccess(true); - telemetryContext.setException(ex); - this.failoverReaderSuccessCounter.inc(); - throw ex; - } catch (Exception ex) { - telemetryContext.setSuccess(false); - telemetryContext.setException(ex); - this.failoverReaderFailedCounter.inc(); - throw ex; - } finally { - LOGGER.finest(() -> Messages.get( - "Failover.readerFailoverElapsed", - new Object[]{TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.failoverStartTimeNano)})); - telemetryContext.closeContext(); - if (this.telemetryFailoverAdditionalTopTraceSetting) { - telemetryFactory.postCopy(telemetryContext, TelemetryTraceLevel.FORCE_TOP_LEVEL); + throw new TimeoutException(Messages.get("Failover.failoverReaderTimeout")); + } + + protected void throwFailoverSuccessException() throws SQLException { + if (isInTransaction || this.pluginService.isInTransaction()) { + if (this.pluginManagerService != null) { + this.pluginManagerService.setInTransaction(false); } + // "Transaction resolution unknown. Please re-configure session state if required and try + // restarting transaction." + final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError"); + LOGGER.info(errorMessage); + throw new TransactionStateUnknownSQLException(); + } else { + // "The active SQL connection has changed due to a connection failure. Please re-configure + // session state if required. " + LOGGER.severe(() -> Messages.get("Failover.connectionChangedError")); + throw new FailoverSuccessSQLException(); } } @@ -456,7 +496,7 @@ protected void failoverWriter() throws SQLException { TELEMETRY_WRITER_FAILOVER, TelemetryTraceLevel.NESTED); this.failoverWriterTriggeredCounter.inc(); - this.failoverStartTimeNano = System.nanoTime(); + long failoverStartTimeNano = System.nanoTime(); try { LOGGER.info(() -> Messages.get("Failover.startWriterFailover")); @@ -464,7 +504,6 @@ protected void failoverWriter() throws SQLException { // It's expected that this method synchronously returns when topology is stabilized, // i.e. when cluster control plane has already chosen a new writer. if (!this.pluginService.forceRefreshHostList(true, this.failoverTimeoutMsSetting)) { - // "Unable to establish SQL connection to writer node" this.failoverWriterFailedCounter.inc(); LOGGER.severe(Messages.get("Failover.unableToRefreshHostList")); throw new FailoverFailedSQLException(Messages.get("Failover.unableToRefreshHostList")); @@ -503,8 +542,8 @@ protected void failoverWriter() throws SQLException { } catch (SQLException ex) { this.failoverWriterFailedCounter.inc(); LOGGER.severe( - Messages.get("Failover.exceptionConnectingToWriter", new Object[]{writerCandidate.getHost(), ex})); - throw new FailoverFailedSQLException(Messages.get("Failover.exceptionConnectingToWriter")); + Messages.get("Failover.exceptionConnectingToWriter", new Object[]{writerCandidate.getHost()})); + throw new FailoverFailedSQLException(Messages.get("Failover.exceptionConnectingToWriter"), ex); } HostRole role = this.pluginService.getHostRole(writerCandidateConn); @@ -515,7 +554,8 @@ protected void failoverWriter() throws SQLException { // do nothing } this.failoverWriterFailedCounter.inc(); - LOGGER.severe(Messages.get("Failover.unexpectedReaderRole", new Object[]{writerCandidate.getHost(), role})); + LOGGER.severe( + Messages.get("Failover.unexpectedReaderRole", new Object[]{writerCandidate.getHost(), role})); throw new FailoverFailedSQLException(Messages.get("Failover.unexpectedReaderRole")); } @@ -525,13 +565,11 @@ protected void failoverWriter() throws SQLException { () -> Messages.get( "Failover.establishedConnection", new Object[]{this.pluginService.getCurrentHostSpec()})); - - this.failoverWriterSuccessCounter.inc(); - + throwFailoverSuccessException(); } catch (FailoverSuccessSQLException ex) { + this.failoverWriterSuccessCounter.inc(); telemetryContext.setSuccess(true); telemetryContext.setException(ex); - this.failoverWriterSuccessCounter.inc(); throw ex; } catch (Exception ex) { telemetryContext.setSuccess(false); @@ -541,7 +579,7 @@ protected void failoverWriter() throws SQLException { } finally { LOGGER.finest(() -> Messages.get( "Failover.writerFailoverElapsed", - new Object[]{TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.failoverStartTimeNano)})); + new Object[]{TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - failoverStartTimeNano)})); telemetryContext.closeContext(); if (this.telemetryFailoverAdditionalTopTraceSetting) { telemetryFactory.postCopy(telemetryContext, TelemetryTraceLevel.FORCE_TOP_LEVEL); diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/ReaderFailoverResult.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/ReaderFailoverResult.java new file mode 100644 index 000000000..53b417208 --- /dev/null +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/ReaderFailoverResult.java @@ -0,0 +1,39 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.jdbc.plugin.failover2; + +import java.sql.Connection; +import org.checkerframework.checker.nullness.qual.NonNull; +import software.amazon.jdbc.HostSpec; + +public class ReaderFailoverResult { + private final Connection connection; + private final HostSpec hostSpec; + + public ReaderFailoverResult(@NonNull Connection connection, @NonNull HostSpec hostSpec) { + this.connection = connection; + this.hostSpec = hostSpec; + } + + public @NonNull Connection getConnection() { + return connection; + } + + public @NonNull HostSpec getHostSpec() { + return hostSpec; + } +} diff --git a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties index 99eb6f93e..8beaef8ad 100644 --- a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties +++ b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties @@ -62,6 +62,7 @@ AwsWrapperDataSource.configurationProfileNotFound=Configuration profile ''{0}'' # Cluster Aware Reader Failover Handler ClusterAwareReaderFailoverHandler.interruptedThread=Thread was interrupted. ClusterAwareReaderFailoverHandler.attemptingReaderConnection=Trying to connect to reader: ''{0}'', with properties ''{1}'' +ClusterAwareReaderFailoverHandler.errorGettingHostRole=An error occurred while trying to determine the role of the reader candidate: {0} ClusterAwareReaderFailoverHandler.successfulReaderConnection=Connected to reader: ''{0}'' ClusterAwareReaderFailoverHandler.failedReaderConnection=Failed to connect to reader: ''{0}'' ClusterAwareReaderFailoverHandler.invalidTopology=''{0}'' was called with an invalid (null or empty) topology. @@ -171,13 +172,19 @@ ExecutionTimeConnectionPlugin.executionTime=Executed {0} in {1} nanos. # Failover Connection Plugin Failover.transactionResolutionUnknownError=Transaction resolution unknown. Please re-configure session state if required and try restarting the transaction. Failover.connectionChangedError=The active SQL connection has changed due to a connection failure. Please re-configure session state if required. -Failover.exceptionConnectingToWriter=An exception occurred while trying to connect to the new writer ''{0}''. Exception: {1} +Failover.exceptionConnectingToWriter=An exception occurred while trying to connect to the new writer ''{0}''. +Failover.failoverReaderTimeout=The reader failover process was not able to establish a connection before timing out. +Failover.failoverReaderUnableToRefreshHostList=The request to discover the new topology was unsuccessful. +Failover.noWriterHostAfterReconnecting=The writer failover process successfully reconnected, but no writer was found in the updated topology: {0} +Failover.nullReaderFailoverHandlerSupplier=The failover plugin was unable to create a reader failover handler because the supplier was unexpectedly null. +Failover.nullWriterFailoverHandlerSupplier=The failover plugin was unable to create a writer failover handler because the supplier was unexpectedly null. Failover.parameterValue={0}={1} Failover.unableToConnect=Unable to establish a SQL connection due to an unexpected error. Failover.unableToConnectToWriter=Unable to establish SQL connection to the writer instance. Failover.unableToConnectToReader=Unable to establish SQL connection to the reader instance. Failover.unableToRefreshHostList=The request to discover the new topology timed out or was unsuccessful. Failover.unexpectedReaderRole=The new writer was identified to be ''{0}'', but querying the instance for its role returned a role of {1}. +Failover.strictReaderUnknownHostRole=Unable to determine host role for ''{0}''. Since failover mode is set to STRICT_READER and the host may be a writer, it will not be selected for reader failover. Failover.detectedException=Detected an exception while executing a command: {0} Failover.failoverDisabled=Cluster-aware failover is disabled. Failover.establishedConnection=Connected to: {0} @@ -189,6 +196,8 @@ Failover.noOperationsAfterConnectionClosed=No operations allowed after connectio Failover.noWriterHost=Unable to find writer in updated host list: Failover.readerFailoverElapsed=Reader failover elapsed in {0} ms. Failover.writerFailoverElapsed=Writer failover elapsed in {0} ms. +Failover.failedReaderConnection=[Reader Failover] Failed to connect to host: ''{0}'' +Failover.errorSelectingReaderHost=An error occurred while attempting to select a reader host candidate: ''{0}''. Candidates: # Federated Auth Plugin FederatedAuthPlugin.unableToDetermineRegion=Unable to determine connection region. If you are using a non-standard RDS URL, please set the ''{0}'' property. diff --git a/wrapper/src/test/java/integration/container/tests/AuroraFailoverTest.java b/wrapper/src/test/java/integration/container/tests/AuroraFailoverTest.java index 49569ebbe..6313f8a2e 100644 --- a/wrapper/src/test/java/integration/container/tests/AuroraFailoverTest.java +++ b/wrapper/src/test/java/integration/container/tests/AuroraFailoverTest.java @@ -640,6 +640,110 @@ public void test_writerFailover_writerReelected() throws SQLException, Interrupt } } + @TestTemplate + @EnableOnNumOfInstances(min = 2) + public void test_readerFailover_readerOrWriter() throws SQLException, InterruptedException { + final String initialWriterId = this.currentWriter; + TestInstanceInfo initialWriterInstanceInfo = + TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getInstance(initialWriterId); + + final Properties props = initDefaultProps(); + props.setProperty("failoverMode", "reader-or-writer"); + + try (final Connection conn = + DriverManager.getConnection( + ConnectionStringHelper.getWrapperUrl( + initialWriterInstanceInfo.getHost(), + initialWriterInstanceInfo.getPort(), + TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getDefaultDbName()), + props)) { + + // Crash Instance1 and nominate a new writer + auroraUtil.failoverClusterAndWaitUntilWriterChanged(); + + // Failure occurs on Connection invocation + auroraUtil.assertFirstQueryThrows(conn, FailoverSuccessSQLException.class); + } + } + + @TestTemplate + @EnableOnNumOfInstances(min = 2) + public void test_readerFailover_strictReader() throws SQLException, InterruptedException { + final String initialWriterId = this.currentWriter; + TestInstanceInfo initialWriterInstanceInfo = + TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getInstance(initialWriterId); + + final Properties props = initDefaultProps(); + props.setProperty("failoverMode", "strict-reader"); + + try (final Connection conn = + DriverManager.getConnection( + ConnectionStringHelper.getWrapperUrl( + initialWriterInstanceInfo.getHost(), + initialWriterInstanceInfo.getPort(), + TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getDefaultDbName()), + props)) { + + // Crash Instance1 and nominate a new writer + auroraUtil.failoverClusterAndWaitUntilWriterChanged(); + + // Failure occurs on Connection invocation + auroraUtil.assertFirstQueryThrows(conn, FailoverSuccessSQLException.class); + + String currentConnectionId = auroraUtil.queryInstanceId(conn); + assertFalse(auroraUtil.isDBInstanceWriter(currentConnectionId)); + } + } + + @TestTemplate + @EnableOnTestFeature(TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED) + public void test_readerFailover_writerReelected() throws SQLException, InterruptedException { + final String initialWriterId = this.currentWriter; + TestInstanceInfo initialWriterInstanceInfo = + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstance(initialWriterId); + + final Properties props = initDefaultProxiedProps(); + PropertyDefinition.SOCKET_TIMEOUT.set(props, "2000"); + props.setProperty("failoverMode", "reader-or-writer"); + + try (final Connection conn = + DriverManager.getConnection( + ConnectionStringHelper.getWrapperUrl( + initialWriterInstanceInfo.getHost(), + initialWriterInstanceInfo.getPort(), + TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getDefaultDbName()), + props)) { + + ExecutorService executor = Executors.newFixedThreadPool(1, r -> { + final Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + }); + + try { + // Failover usually changes the writer instance, but we want to test re-election of the same writer, so we will + // simulate this by temporarily disabling connectivity to the writer. + executor.submit(() -> { + try { + ProxyHelper.disableConnectivity(initialWriterId); + TimeUnit.SECONDS.sleep(5); + ProxyHelper.enableConnectivity(initialWriterId); + } catch (InterruptedException e) { + fail("The disable connectivity thread was unexpectedly interrupted."); + } + }); + + // Leave some time for the other thread to start up + TimeUnit.MILLISECONDS.sleep(500); + + // Failure occurs on Connection invocation + auroraUtil.assertFirstQueryThrows(conn, FailoverSuccessSQLException.class); + } finally { + executor.shutdownNow(); + } + } + } + // Helper methods below protected String getFailoverPlugin() { diff --git a/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandlerTest.java b/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandlerTest.java index 89c9c3919..31d58f7b0 100644 --- a/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandlerTest.java +++ b/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandlerTest.java @@ -24,7 +24,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.mockitoSession; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -384,17 +383,18 @@ public void testHostFailoverStrictReaderEnabled() { DEFAULT_READER_CONNECT_TIMEOUT, true); - // We expect only reader nodes to be chosen. - List expectedReaderHost = Collections.singletonList(reader); + // The writer is included because the original writer has likely become a reader. + List expectedHostsByPriority = Arrays.asList(reader, writer); List hostsByPriority = target.getHostsByPriority(hosts); - assertEquals(expectedReaderHost, hostsByPriority); + assertEquals(expectedHostsByPriority, hostsByPriority); - // Should pick the reader even if unavailable. + // Should pick the reader even if unavailable. The unavailable reader will be lower priority than the writer. reader.setAvailability(HostAvailability.NOT_AVAILABLE); + expectedHostsByPriority = Arrays.asList(writer, reader); hostsByPriority = target.getHostsByPriority(hosts); - assertEquals(expectedReaderHost, hostsByPriority); + assertEquals(expectedHostsByPriority, hostsByPriority); // Writer node will only be picked if it is the only node in topology; List expectedWriterHost = Collections.singletonList(writer); diff --git a/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPluginTest.java b/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPluginTest.java index f1075d9fd..2123848d1 100644 --- a/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPluginTest.java +++ b/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPluginTest.java @@ -34,7 +34,6 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; @@ -50,7 +49,6 @@ import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import software.amazon.jdbc.HostListProvider; import software.amazon.jdbc.HostListProviderService; import software.amazon.jdbc.HostRole; import software.amazon.jdbc.HostSpec; @@ -61,7 +59,6 @@ import software.amazon.jdbc.hostavailability.HostAvailability; import software.amazon.jdbc.hostavailability.SimpleHostAvailabilityStrategy; import software.amazon.jdbc.hostlistprovider.AuroraHostListProvider; -import software.amazon.jdbc.hostlistprovider.DynamicHostListProvider; import software.amazon.jdbc.util.RdsUrlType; import software.amazon.jdbc.util.SqlState; import software.amazon.jdbc.util.telemetry.GaugeCallable; @@ -75,6 +72,11 @@ class FailoverConnectionPluginTest { private static final Class MONITOR_METHOD_INVOKE_ON = Connection.class; private static final String MONITOR_METHOD_NAME = "Connection.executeQuery"; private static final Object[] EMPTY_ARGS = {}; + private final List defaultHosts = Arrays.asList( + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("writer").port(1234).role(HostRole.WRITER).build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("reader1").port(1234).role(HostRole.READER).build()); @Mock PluginService mockPluginService; @Mock Connection mockConnection; @@ -82,8 +84,8 @@ class FailoverConnectionPluginTest { @Mock HostListProviderService mockHostListProviderService; @Mock AuroraHostListProvider mockHostListProvider; @Mock JdbcCallable mockInitHostProviderFunc; - @Mock ClusterAwareReaderFailoverHandler mockReaderFailoverHandler; - @Mock ClusterAwareWriterFailoverHandler mockWriterFailoverHandler; + @Mock ReaderFailoverHandler mockReaderFailoverHandler; + @Mock WriterFailoverHandler mockWriterFailoverHandler; @Mock ReaderFailoverResult mockReaderResult; @Mock WriterFailoverResult mockWriterResult; @Mock JdbcCallable mockSqlFunction; @@ -111,8 +113,13 @@ void init() throws SQLException { when(mockPluginService.getCurrentHostSpec()).thenReturn(mockHostSpec); when(mockPluginService.connect(any(HostSpec.class), eq(properties))).thenReturn(mockConnection); when(mockPluginService.getTelemetryFactory()).thenReturn(mockTelemetryFactory); + when(mockPluginService.getHosts()).thenReturn(defaultHosts); + when(mockPluginService.getAllHosts()).thenReturn(defaultHosts); when(mockReaderFailoverHandler.failover(any(), any())).thenReturn(mockReaderResult); when(mockWriterFailoverHandler.failover(any())).thenReturn(mockWriterResult); + when(mockWriterResult.isConnected()).thenReturn(true); + when(mockWriterResult.getTopology()).thenReturn(defaultHosts); + when(mockReaderResult.isConnected()).thenReturn(true); when(mockPluginService.getTelemetryFactory()).thenReturn(mockTelemetryFactory); when(mockTelemetryFactory.openTelemetryContext(anyString(), any())).thenReturn(mockTelemetryContext); @@ -157,7 +164,7 @@ void test_updateTopology() throws SQLException { initializePlugin(); // Test updateTopology with failover disabled - when(mockHostListProvider.getRdsUrlType()).thenReturn(RdsUrlType.RDS_PROXY); + plugin.setRdsUrlType(RdsUrlType.RDS_PROXY); plugin.updateTopology(false); verify(mockPluginService, never()).forceRefreshHostList(); verify(mockPluginService, never()).refreshHostList(); @@ -196,50 +203,41 @@ void test_updateTopology_withForceUpdate(final boolean forceUpdate) throws SQLEx } @Test - void test_failover_failoverReader() throws SQLException { + void test_failover_failoverWriter() throws SQLException { when(mockPluginService.isInTransaction()).thenReturn(true); initializePlugin(); final FailoverConnectionPlugin spyPlugin = spy(plugin); - doNothing().when(spyPlugin).failoverWriter(); + doThrow(FailoverSuccessSQLException.class).when(spyPlugin).failoverWriter(); spyPlugin.failoverMode = FailoverMode.STRICT_WRITER; - SQLException exception = assertThrows(SQLException.class, () -> spyPlugin.failover(mockHostSpec)); - assertEquals(SqlState.CONNECTION_FAILURE_DURING_TRANSACTION.getState(), exception.getSQLState()); + assertThrows(FailoverSuccessSQLException.class, () -> spyPlugin.failover(mockHostSpec)); verify(spyPlugin).failoverWriter(); } @Test - void test_failover_failoverWriter() throws SQLException { + void test_failover_failoverReader() throws SQLException { when(mockPluginService.isInTransaction()).thenReturn(false); initializePlugin(); final FailoverConnectionPlugin spyPlugin = spy(plugin); - doNothing().when(spyPlugin).failoverReader(eq(mockHostSpec)); + doThrow(FailoverSuccessSQLException.class).when(spyPlugin).failoverReader(eq(mockHostSpec)); spyPlugin.failoverMode = FailoverMode.READER_OR_WRITER; - SQLException exception = assertThrows(SQLException.class, () -> spyPlugin.failover(mockHostSpec)); - assertEquals(SqlState.COMMUNICATION_LINK_CHANGED.getState(), exception.getSQLState()); + assertThrows(FailoverSuccessSQLException.class, () -> spyPlugin.failover(mockHostSpec)); verify(spyPlugin).failoverReader(eq(mockHostSpec)); } @Test void test_failoverReader_withValidFailedHostSpec_successFailover() throws SQLException { - final HostSpec hostSpec = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("hostA") - .build(); - final List hosts = Collections.singletonList(hostSpec); - when(mockHostSpec.getAliases()).thenReturn(new HashSet<>(Arrays.asList("alias1", "alias2"))); when(mockHostSpec.getRawAvailability()).thenReturn(HostAvailability.AVAILABLE); - when(mockPluginService.getAllHosts()).thenReturn(hosts); - when(mockPluginService.getHosts()).thenReturn(hosts); when(mockReaderResult.isConnected()).thenReturn(true); when(mockReaderResult.getConnection()).thenReturn(mockConnection); - when(mockReaderResult.getHost()).thenReturn(hostSpec); + when(mockReaderResult.getHost()).thenReturn(defaultHosts.get(1)); initializePlugin(); plugin.initHostProvider( - "initialUrl", mockHostListProviderService, mockInitHostProviderFunc, () -> mockReaderFailoverHandler, @@ -248,14 +246,14 @@ void test_failoverReader_withValidFailedHostSpec_successFailover() throws SQLExc final FailoverConnectionPlugin spyPlugin = spy(plugin); doNothing().when(spyPlugin).updateTopology(true); - spyPlugin.failoverReader(mockHostSpec); + assertThrows(FailoverSuccessSQLException.class, () -> spyPlugin.failoverReader(mockHostSpec)); - verify(mockReaderFailoverHandler).failover(eq(hosts), eq(mockHostSpec)); - verify(mockPluginService).setCurrentConnection(eq(mockConnection), eq(hostSpec)); + verify(mockReaderFailoverHandler).failover(eq(defaultHosts), eq(mockHostSpec)); + verify(mockPluginService).setCurrentConnection(eq(mockConnection), eq(defaultHosts.get(1))); } @Test - void test_failoverReader_withVNoFailedHostSpec_withException() throws SQLException { + void test_failoverReader_withNoFailedHostSpec_withException() throws SQLException { final HostSpec hostSpec = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("hostA") .build(); final List hosts = Collections.singletonList(hostSpec); @@ -269,7 +267,6 @@ void test_failoverReader_withVNoFailedHostSpec_withException() throws SQLExcepti initializePlugin(); plugin.initHostProvider( - "initialUrl", mockHostListProviderService, mockInitHostProviderFunc, () -> mockReaderFailoverHandler, @@ -292,7 +289,6 @@ void test_failoverWriter_failedFailover_throwsException() throws SQLException { initializePlugin(); plugin.initHostProvider( - "initialUrl", mockHostListProviderService, mockInitHostProviderFunc, () -> mockReaderFailoverHandler, @@ -315,7 +311,6 @@ void test_failoverWriter_failedFailover_withNoResult() throws SQLException { initializePlugin(); plugin.initHostProvider( - "initialUrl", mockHostListProviderService, mockInitHostProviderFunc, () -> mockReaderFailoverHandler, @@ -331,26 +326,19 @@ void test_failoverWriter_failedFailover_withNoResult() throws SQLException { @Test void test_failoverWriter_successFailover() throws SQLException { - final HostSpec hostSpec = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("hostA") - .build(); - final List hosts = Collections.singletonList(hostSpec); - when(mockHostSpec.getAliases()).thenReturn(new HashSet<>(Arrays.asList("alias1", "alias2"))); - when(mockPluginService.getAllHosts()).thenReturn(hosts); - when(mockPluginService.getHosts()).thenReturn(hosts); initializePlugin(); plugin.initHostProvider( - "initialUrl", mockHostListProviderService, mockInitHostProviderFunc, () -> mockReaderFailoverHandler, () -> mockWriterFailoverHandler); - final SQLException exception = assertThrows(SQLException.class, () -> plugin.failoverWriter()); - assertEquals(SqlState.CONNECTION_UNABLE_TO_CONNECT.getState(), exception.getSQLState()); + final SQLException exception = assertThrows(FailoverSuccessSQLException.class, () -> plugin.failoverWriter()); + assertEquals(SqlState.COMMUNICATION_LINK_CHANGED.getState(), exception.getSQLState()); - verify(mockWriterFailoverHandler).failover(eq(hosts)); + verify(mockWriterFailoverHandler).failover(eq(defaultHosts)); } @Test @@ -442,43 +430,7 @@ void test_execute_withDirectExecute() throws SQLException { private void initializePlugin() { plugin = new FailoverConnectionPlugin(mockPluginService, properties); - } - - private static class FooHostListProvider implements HostListProvider, DynamicHostListProvider { - - @Override - public List refresh() { - return new ArrayList<>(); - } - - @Override - public List refresh(Connection connection) { - return new ArrayList<>(); - } - - @Override - public List forceRefresh() { - return new ArrayList<>(); - } - - @Override - public List forceRefresh(Connection connection) { - return new ArrayList<>(); - } - - @Override - public HostRole getHostRole(Connection conn) { - return HostRole.WRITER; - } - - @Override - public HostSpec identifyConnection(Connection connection) throws SQLException { - return new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("foo").build(); - } - - @Override - public String getClusterId() throws UnsupportedOperationException { - throw new UnsupportedOperationException(); - } + plugin.setWriterFailoverHandler(mockWriterFailoverHandler); + plugin.setReaderFailoverHandler(mockReaderFailoverHandler); } }