From ab110c3f952b5219e68cd6a65d90ac2bfb08ef2e Mon Sep 17 00:00:00 2001 From: aaron-congo Date: Wed, 18 Dec 2024 10:20:25 -0800 Subject: [PATCH 01/10] fix: reader failover logic --- .../ClusterTopologyMonitorImpl.java | 4 + .../failover2/FailoverConnectionPlugin.java | 131 ++++++++++-------- ..._advanced_jdbc_wrapper_messages.properties | 4 +- .../container/tests/AuroraFailoverTest.java | 104 ++++++++++++++ 4 files changed, 187 insertions(+), 56 deletions(-) diff --git a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java index 34934e27c..ee2d46511 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java @@ -236,6 +236,10 @@ protected List waitTillTopologyGetsUpdated(final long timeoutMs) throw } final long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMs); + + // Note that we are checking reference equality instead of value equality here. We will break out of the loop if + // there is a new entry in the topology map, even if the value of the hosts in latestHosts is the same as + // currentHosts. while (currentHosts == (latestHosts = this.topologyMap.get(this.clusterId)) && System.nanoTime() < end) { try { diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java index fd11ea225..a28465ec8 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java @@ -16,6 +16,8 @@ package software.amazon.jdbc.plugin.failover2; +import static software.amazon.jdbc.plugin.failover.FailoverMode.STRICT_READER; + import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; @@ -26,6 +28,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; +import java.util.stream.Collectors; import software.amazon.jdbc.AwsWrapperProperty; import software.amazon.jdbc.HostListProviderService; import software.amazon.jdbc.HostRole; @@ -342,85 +345,104 @@ protected void failoverReader() throws SQLException { try { LOGGER.fine(() -> Messages.get("Failover.startReaderFailover")); - - // It's expected that this method synchronously returns when topology is stabilized, - // i.e. when cluster control plane has already chosen a new writer. if (!this.pluginService.forceRefreshHostList(false, 0)) { // "Unable to establish SQL connection to reader instance" this.failoverReaderFailedCounter.inc(); - LOGGER.severe(Messages.get("Failover.unableToConnectToReader")); - throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToReader")); + LOGGER.severe(Messages.get("Failover.unableToRefreshHostList", new Object[]{"[failoverReader]"})); + throw new FailoverFailedSQLException( + Messages.get("Failover.unableToRefreshHostList", new Object[]{"[failoverReader]"})); } final Properties copyProp = PropertyUtils.copyProperties(this.properties); copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true"); - + // The host roles in this host list may or may not be accurate, depending on whether the new topology has become + // available or not yet. final List hosts = this.pluginService.getHosts(); - Connection readerCandidateConn = null; + Connection candidateConn = null; HostSpec readerCandidate = null; - final Set remainingHosts = new HashSet<>(hosts); - - while (!remainingHosts.isEmpty() - && readerCandidateConn == null - && System.nanoTime() < failoverEndTimeNano) { - try { - readerCandidate = - this.pluginService.getHostSpecByStrategy( - new ArrayList<>(remainingHosts), - HostRole.READER, - this.failoverReaderHostSelectorStrategySetting); - } catch (UnsupportedOperationException | SQLException ex) { - // can't use selected strategy to get a reader host - LOGGER.finest("Error: " + ex.getMessage()); - break; + HostSpec verifiedWriter = null; + final HostSpec originalWriter = hosts.stream() + .filter(hostSpec -> HostRole.WRITER.equals(hostSpec.getRole())) + .findFirst() + .orElse(null); + final Set originalReaders = hosts.stream() + .filter(hostSpec -> HostRole.READER.equals(hostSpec.getRole())) + .collect(Collectors.toSet()); + while (candidateConn == null && System.nanoTime() < failoverEndTimeNano) { + final Set remainingHosts = new HashSet<>(originalReaders); + if (FailoverMode.STRICT_READER.equals(this.failoverMode) && verifiedWriter != null) { + remainingHosts.remove(verifiedWriter); } - if (readerCandidate == null) { - break; - } + while (!remainingHosts.isEmpty() + && candidateConn == null + && System.nanoTime() < failoverEndTimeNano) { + try { + readerCandidate = + this.pluginService.getHostSpecByStrategy( + new ArrayList<>(remainingHosts), + HostRole.READER, + this.failoverReaderHostSelectorStrategySetting); + } catch (UnsupportedOperationException | SQLException ex) { + // can't use selected strategy to get a reader host + LOGGER.finest("Error: " + ex.getMessage()); + break; + } - try { - readerCandidateConn = this.pluginService.connect(readerCandidate, copyProp); - if (this.pluginService.getHostRole(readerCandidateConn) != HostRole.READER) { - readerCandidateConn.close(); - readerCandidateConn = null; + if (readerCandidate == null) { + break; + } + + try { + candidateConn = this.pluginService.connect(readerCandidate, copyProp); + // Since the roles in the host list might not be accurate, we execute a query to check the instance's role. + if (this.failoverMode == STRICT_READER + && this.pluginService.getHostRole(candidateConn) == HostRole.WRITER) { + verifiedWriter = readerCandidate; + candidateConn.close(); + candidateConn = null; + remainingHosts.remove(readerCandidate); + readerCandidate = null; + } + } catch (SQLException ex) { remainingHosts.remove(readerCandidate); + candidateConn = null; readerCandidate = null; } - } catch (SQLException ex) { - remainingHosts.remove(readerCandidate); - readerCandidateConn = null; - readerCandidate = null; } - } - if (readerCandidate == null - && readerCandidateConn == null - && this.failoverMode == FailoverMode.READER_OR_WRITER) { - // As a last resort, let's try to connect to a writer - try { - readerCandidate = this.pluginService.getHostSpecByStrategy(HostRole.WRITER, - this.failoverReaderHostSelectorStrategySetting); - if (readerCandidate != null) { - try { - readerCandidateConn = this.pluginService.connect(readerCandidate, copyProp); - } catch (SQLException ex) { + if (originalWriter != null + && readerCandidate == null + && candidateConn == null + && System.nanoTime() < failoverEndTimeNano) { + // Try the original writer. The role may be inaccurate, so we will try connecting to it even if failover mode + // is set to STRICT_READER. + readerCandidate = originalWriter; + try { + candidateConn = this.pluginService.connect(readerCandidate, copyProp); + if (this.failoverMode == STRICT_READER + && this.pluginService.getHostRole(candidateConn) == HostRole.WRITER) { + verifiedWriter = readerCandidate; + candidateConn.close(); + candidateConn = null; + remainingHosts.remove(readerCandidate); readerCandidate = null; } + } catch (SQLException ex) { + readerCandidate = null; + candidateConn = null; } - } catch (UnsupportedOperationException ex) { - // can't use selected strategy to get a reader host } } - if (readerCandidateConn == null) { + if (candidateConn == null) { // "Unable to establish SQL connection to reader instance" this.failoverReaderFailedCounter.inc(); LOGGER.severe(Messages.get("Failover.unableToConnectToReader")); throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToReader")); } - this.pluginService.setCurrentConnection(readerCandidateConn, readerCandidate); + this.pluginService.setCurrentConnection(candidateConn, readerCandidate); LOGGER.info( () -> Messages.get( @@ -466,8 +488,9 @@ protected void failoverWriter() throws SQLException { if (!this.pluginService.forceRefreshHostList(true, this.failoverTimeoutMsSetting)) { // "Unable to establish SQL connection to writer node" this.failoverWriterFailedCounter.inc(); - LOGGER.severe(Messages.get("Failover.unableToRefreshHostList")); - throw new FailoverFailedSQLException(Messages.get("Failover.unableToRefreshHostList")); + LOGGER.severe(Messages.get("Failover.unableToRefreshHostList", new Object[]{"[failoverWriter]"})); + throw new FailoverFailedSQLException( + Messages.get("Failover.unableToRefreshHostList", new Object[]{"[failoverWriter]"})); } final List updatedHosts = this.pluginService.getAllHosts(); @@ -503,8 +526,8 @@ protected void failoverWriter() throws SQLException { } catch (SQLException ex) { this.failoverWriterFailedCounter.inc(); LOGGER.severe( - Messages.get("Failover.exceptionConnectingToWriter", new Object[]{writerCandidate.getHost(), ex})); - throw new FailoverFailedSQLException(Messages.get("Failover.exceptionConnectingToWriter")); + Messages.get("Failover.exceptionConnectingToWriter", new Object[]{writerCandidate.getHost()})); + throw new FailoverFailedSQLException(Messages.get("Failover.exceptionConnectingToWriter"), ex); } HostRole role = this.pluginService.getHostRole(writerCandidateConn); diff --git a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties index 99eb6f93e..a953c80f4 100644 --- a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties +++ b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties @@ -171,12 +171,12 @@ ExecutionTimeConnectionPlugin.executionTime=Executed {0} in {1} nanos. # Failover Connection Plugin Failover.transactionResolutionUnknownError=Transaction resolution unknown. Please re-configure session state if required and try restarting the transaction. Failover.connectionChangedError=The active SQL connection has changed due to a connection failure. Please re-configure session state if required. -Failover.exceptionConnectingToWriter=An exception occurred while trying to connect to the new writer ''{0}''. Exception: {1} +Failover.exceptionConnectingToWriter=An exception occurred while trying to connect to the new writer ''{0}''. Failover.parameterValue={0}={1} Failover.unableToConnect=Unable to establish a SQL connection due to an unexpected error. Failover.unableToConnectToWriter=Unable to establish SQL connection to the writer instance. Failover.unableToConnectToReader=Unable to establish SQL connection to the reader instance. -Failover.unableToRefreshHostList=The request to discover the new topology timed out or was unsuccessful. +Failover.unableToRefreshHostList={0} The request to discover the new topology timed out or was unsuccessful. Failover.unexpectedReaderRole=The new writer was identified to be ''{0}'', but querying the instance for its role returned a role of {1}. Failover.detectedException=Detected an exception while executing a command: {0} Failover.failoverDisabled=Cluster-aware failover is disabled. diff --git a/wrapper/src/test/java/integration/container/tests/AuroraFailoverTest.java b/wrapper/src/test/java/integration/container/tests/AuroraFailoverTest.java index 49569ebbe..6313f8a2e 100644 --- a/wrapper/src/test/java/integration/container/tests/AuroraFailoverTest.java +++ b/wrapper/src/test/java/integration/container/tests/AuroraFailoverTest.java @@ -640,6 +640,110 @@ public void test_writerFailover_writerReelected() throws SQLException, Interrupt } } + @TestTemplate + @EnableOnNumOfInstances(min = 2) + public void test_readerFailover_readerOrWriter() throws SQLException, InterruptedException { + final String initialWriterId = this.currentWriter; + TestInstanceInfo initialWriterInstanceInfo = + TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getInstance(initialWriterId); + + final Properties props = initDefaultProps(); + props.setProperty("failoverMode", "reader-or-writer"); + + try (final Connection conn = + DriverManager.getConnection( + ConnectionStringHelper.getWrapperUrl( + initialWriterInstanceInfo.getHost(), + initialWriterInstanceInfo.getPort(), + TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getDefaultDbName()), + props)) { + + // Crash Instance1 and nominate a new writer + auroraUtil.failoverClusterAndWaitUntilWriterChanged(); + + // Failure occurs on Connection invocation + auroraUtil.assertFirstQueryThrows(conn, FailoverSuccessSQLException.class); + } + } + + @TestTemplate + @EnableOnNumOfInstances(min = 2) + public void test_readerFailover_strictReader() throws SQLException, InterruptedException { + final String initialWriterId = this.currentWriter; + TestInstanceInfo initialWriterInstanceInfo = + TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getInstance(initialWriterId); + + final Properties props = initDefaultProps(); + props.setProperty("failoverMode", "strict-reader"); + + try (final Connection conn = + DriverManager.getConnection( + ConnectionStringHelper.getWrapperUrl( + initialWriterInstanceInfo.getHost(), + initialWriterInstanceInfo.getPort(), + TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getDefaultDbName()), + props)) { + + // Crash Instance1 and nominate a new writer + auroraUtil.failoverClusterAndWaitUntilWriterChanged(); + + // Failure occurs on Connection invocation + auroraUtil.assertFirstQueryThrows(conn, FailoverSuccessSQLException.class); + + String currentConnectionId = auroraUtil.queryInstanceId(conn); + assertFalse(auroraUtil.isDBInstanceWriter(currentConnectionId)); + } + } + + @TestTemplate + @EnableOnTestFeature(TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED) + public void test_readerFailover_writerReelected() throws SQLException, InterruptedException { + final String initialWriterId = this.currentWriter; + TestInstanceInfo initialWriterInstanceInfo = + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstance(initialWriterId); + + final Properties props = initDefaultProxiedProps(); + PropertyDefinition.SOCKET_TIMEOUT.set(props, "2000"); + props.setProperty("failoverMode", "reader-or-writer"); + + try (final Connection conn = + DriverManager.getConnection( + ConnectionStringHelper.getWrapperUrl( + initialWriterInstanceInfo.getHost(), + initialWriterInstanceInfo.getPort(), + TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getDefaultDbName()), + props)) { + + ExecutorService executor = Executors.newFixedThreadPool(1, r -> { + final Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + }); + + try { + // Failover usually changes the writer instance, but we want to test re-election of the same writer, so we will + // simulate this by temporarily disabling connectivity to the writer. + executor.submit(() -> { + try { + ProxyHelper.disableConnectivity(initialWriterId); + TimeUnit.SECONDS.sleep(5); + ProxyHelper.enableConnectivity(initialWriterId); + } catch (InterruptedException e) { + fail("The disable connectivity thread was unexpectedly interrupted."); + } + }); + + // Leave some time for the other thread to start up + TimeUnit.MILLISECONDS.sleep(500); + + // Failure occurs on Connection invocation + auroraUtil.assertFirstQueryThrows(conn, FailoverSuccessSQLException.class); + } finally { + executor.shutdownNow(); + } + } + } + // Helper methods below protected String getFailoverPlugin() { From 090650218551d4a9062c889a91cabd9f6520b6a1 Mon Sep 17 00:00:00 2001 From: aaron-congo Date: Wed, 18 Dec 2024 16:55:46 -0800 Subject: [PATCH 02/10] Fix bug in reader failover for failover1 plugin --- .../ClusterAwareReaderFailoverHandler.java | 25 +++++-------- .../failover/FailoverConnectionPlugin.java | 13 ++++--- .../amazon/jdbc/util/ConnectionUrlParser.java | 21 +++++++++++ ..._advanced_jdbc_wrapper_messages.properties | 1 + .../jdbc/util/ConnectionUrlParserTest.java | 35 +++++++++++++++++++ 5 files changed, 71 insertions(+), 24 deletions(-) diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandler.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandler.java index cc012d676..c6c9bedbe 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandler.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandler.java @@ -152,24 +152,14 @@ private Future submitInternalFailoverTask( } // need to ensure that new connection is a connection to a reader node - - pluginService.forceRefreshHostList(result.getConnection()); - topology = pluginService.getAllHosts(); - for (final HostSpec node : topology) { - if (node.getUrl().equals(result.getHost().getUrl())) { - // found new connection host in the latest topology - if (node.getRole() == HostRole.READER) { - return result; - } + try { + if (HostRole.READER.equals(this.pluginService.getHostRole(result.getConnection()))) { + return result; } + } catch (SQLException e) { + LOGGER.fine(Messages.get("ClusterAwareReaderFailoverHandler.errorGettingHostRole", new Object[]{e})); } - // New node is not found in the latest topology. There are few possible reasons for that. - // - Node is not yet presented in the topology due to failover process in progress - // - Node is in the topology but its role isn't a - // READER (that is not acceptable option due to this.strictReader setting) - // Need to continue this loop and to make another try to connect to a reader. - try { result.getConnection().close(); } catch (final SQLException ex) { @@ -244,8 +234,9 @@ public List getHostsByPriority(final List hosts) { final List hostsByPriority = new ArrayList<>(activeReaders); final int numOfReaders = activeReaders.size() + downHostList.size(); - if (writerHost != null - && (!this.enableFailoverStrictReader || numOfReaders == 0)) { + // Since the writer instance may change during failover, the original writer is likely now a reader. We will include + // it and then verify the role once connected if using "strict-reader". + if (writerHost != null || numOfReaders == 0) { hostsByPriority.add(writerHost); } hostsByPriority.addAll(downHostList); diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java index 38a7ea39b..039ab60fd 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java @@ -44,6 +44,7 @@ import software.amazon.jdbc.plugin.AbstractConnectionPlugin; import software.amazon.jdbc.plugin.staledns.AuroraStaleDnsHelper; import software.amazon.jdbc.targetdriverdialect.TargetDriverDialect; +import software.amazon.jdbc.util.ConnectionUrlParser; import software.amazon.jdbc.util.Messages; import software.amazon.jdbc.util.RdsUrlType; import software.amazon.jdbc.util.RdsUtils; @@ -289,10 +290,11 @@ void initHostProvider( return; } + initHostProviderFunc.call(); + initFailoverMode(initialUrl); + this.readerFailoverHandler = readerFailoverHandlerSupplier.get(); this.writerFailoverHandler = writerFailoverHandlerSupplier.get(); - - initHostProviderFunc.call(); } @Override @@ -364,12 +366,10 @@ private void initSettings() { TELEMETRY_FAILOVER_ADDITIONAL_TOP_TRACE.getBoolean(this.properties); } - protected void initFailoverMode() { + protected void initFailoverMode(final String initialUrl) { if (this.rdsUrlType == null) { this.failoverMode = FailoverMode.fromValue(FAILOVER_MODE.getString(this.properties)); - final HostSpec initialHostSpec = this.hostListProviderService.getInitialConnectionHostSpec(); - this.rdsUrlType = this.rdsHelper.identifyRdsType(initialHostSpec.getHost()); - + this.rdsUrlType = this.rdsHelper.identifyRdsType(ConnectionUrlParser.parseHost(initialUrl)); if (this.failoverMode == null) { this.failoverMode = this.rdsUrlType == RdsUrlType.RDS_READER_CLUSTER ? FailoverMode.READER_OR_WRITER @@ -812,7 +812,6 @@ public Connection connect( private Connection connectInternal(String driverProtocol, HostSpec hostSpec, Properties props, boolean isInitialConnection, JdbcCallable connectFunc, boolean isForceConnect) throws SQLException { - this.initFailoverMode(); Connection conn = null; try { conn = diff --git a/wrapper/src/main/java/software/amazon/jdbc/util/ConnectionUrlParser.java b/wrapper/src/main/java/software/amazon/jdbc/util/ConnectionUrlParser.java index 9fa9e0afa..be812991b 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/util/ConnectionUrlParser.java +++ b/wrapper/src/main/java/software/amazon/jdbc/util/ConnectionUrlParser.java @@ -87,6 +87,27 @@ public static HostSpec parseHostPortPair(final String url, final HostRole role, return getHostSpec(hostPortPair, role, hostSpecBuilderSupplier.get()); } + public static @Nullable String parseHost(final String connString) { + final Matcher matcher = CONNECTION_STRING_PATTERN.matcher(connString); + if (!matcher.matches()) { + return null; + } + + final String hosts = matcher.group("hosts") == null ? null : matcher.group("hosts").trim(); + if (hosts == null) { + return null; + } + + final String[] hostArray = hosts.split(HOSTS_SEPARATOR); + if (hostArray.length == 0) { + return null; + } + + final String url = hostArray[0]; + final String[] hostPortPair = url.split(HOST_PORT_SEPARATOR, 2); + return hostPortPair.length == 0 ? null : hostPortPair[0]; + } + private static HostSpec getHostSpec(final String[] hostPortPair, final HostRole hostRole, final HostSpecBuilder hostSpecBuilder) { String hostId = rdsUtils.getRdsInstanceId(hostPortPair[0]); diff --git a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties index a953c80f4..42e04e772 100644 --- a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties +++ b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties @@ -62,6 +62,7 @@ AwsWrapperDataSource.configurationProfileNotFound=Configuration profile ''{0}'' # Cluster Aware Reader Failover Handler ClusterAwareReaderFailoverHandler.interruptedThread=Thread was interrupted. ClusterAwareReaderFailoverHandler.attemptingReaderConnection=Trying to connect to reader: ''{0}'', with properties ''{1}'' +ClusterAwareReaderFailoverHandler.errorGettingHostRole=An error occurred while trying to determine the role of the reader candidate: {0} ClusterAwareReaderFailoverHandler.successfulReaderConnection=Connected to reader: ''{0}'' ClusterAwareReaderFailoverHandler.failedReaderConnection=Failed to connect to reader: ''{0}'' ClusterAwareReaderFailoverHandler.invalidTopology=''{0}'' was called with an invalid (null or empty) topology. diff --git a/wrapper/src/test/java/software/amazon/jdbc/util/ConnectionUrlParserTest.java b/wrapper/src/test/java/software/amazon/jdbc/util/ConnectionUrlParserTest.java index 50875bda0..6f08c2fa9 100644 --- a/wrapper/src/test/java/software/amazon/jdbc/util/ConnectionUrlParserTest.java +++ b/wrapper/src/test/java/software/amazon/jdbc/util/ConnectionUrlParserTest.java @@ -110,6 +110,41 @@ void testParsingUrlWithNestedParams(final String url, final String expected) { assertEquals(expected, props.getProperty("param")); } + @ParameterizedTest + @MethodSource("getTestParseHostArgs") + void testParseHost(final String connString, final String expected) { + assertEquals(expected, ConnectionUrlParser.parseHost(connString)); + } + + private static Stream getTestParseHostArgs() { + return Stream.of( + Arguments.of( + "jdbc:postgresql://mydb.cluster-XYZ.us-east-2.rds.amazonaws.com", + "mydb.cluster-XYZ.us-east-2.rds.amazonaws.com"), + Arguments.of( + "jdbc:postgresql://mydb.cluster-XYZ.us-east-2.rds.amazonaws.com:5432", + "mydb.cluster-XYZ.us-east-2.rds.amazonaws.com"), + Arguments.of( + "jdbc:postgresql://mydb.cluster-XYZ.us-east-2.rds.amazonaws.com:5432?connectTimeout=30000", + "mydb.cluster-XYZ.us-east-2.rds.amazonaws.com"), + Arguments.of( + "jdbc:postgresql://mydb.cluster-XYZ.us-east-2.rds.amazonaws.com/test", + "mydb.cluster-XYZ.us-east-2.rds.amazonaws.com"), + Arguments.of( + "jdbc:postgresql://mydb.cluster-XYZ.us-east-2.rds.amazonaws.com:5432/test", + "mydb.cluster-XYZ.us-east-2.rds.amazonaws.com"), + Arguments.of( + "jdbc:postgresql://mydb.cluster-XYZ.us-east-2.rds.amazonaws.com:5432?connectTimeout=30000/test", + "mydb.cluster-XYZ.us-east-2.rds.amazonaws.com"), + Arguments.of( + "jdbc:postgresql://mydb.cluster-ro-XYZ.us-east-2.rds.amazonaws.com:5432?connectTimeout=30000/test", + "mydb.cluster-ro-XYZ.us-east-2.rds.amazonaws.com"), + Arguments.of( + "jdbc:postgresql://mydb.cluster-XYZ.rds.cn-northwest-1.amazonaws.com.cn:5432/test", + "mydb.cluster-XYZ.rds.cn-northwest-1.amazonaws.com.cn") + ); + } + private static Stream testGetHostsFromConnectionUrlArguments() { return Stream.of( Arguments.of("protocol//", new ArrayList()), From 90fbc0930a16f1df28e2e3816129d6d7631ea426 Mon Sep 17 00:00:00 2001 From: aaron-congo Date: Wed, 18 Dec 2024 18:14:07 -0800 Subject: [PATCH 03/10] Fix unit test --- .../ClusterAwareReaderFailoverHandlerTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandlerTest.java b/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandlerTest.java index 89c9c3919..31d58f7b0 100644 --- a/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandlerTest.java +++ b/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/ClusterAwareReaderFailoverHandlerTest.java @@ -24,7 +24,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.mockitoSession; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -384,17 +383,18 @@ public void testHostFailoverStrictReaderEnabled() { DEFAULT_READER_CONNECT_TIMEOUT, true); - // We expect only reader nodes to be chosen. - List expectedReaderHost = Collections.singletonList(reader); + // The writer is included because the original writer has likely become a reader. + List expectedHostsByPriority = Arrays.asList(reader, writer); List hostsByPriority = target.getHostsByPriority(hosts); - assertEquals(expectedReaderHost, hostsByPriority); + assertEquals(expectedHostsByPriority, hostsByPriority); - // Should pick the reader even if unavailable. + // Should pick the reader even if unavailable. The unavailable reader will be lower priority than the writer. reader.setAvailability(HostAvailability.NOT_AVAILABLE); + expectedHostsByPriority = Arrays.asList(writer, reader); hostsByPriority = target.getHostsByPriority(hosts); - assertEquals(expectedReaderHost, hostsByPriority); + assertEquals(expectedHostsByPriority, hostsByPriority); // Writer node will only be picked if it is the only node in topology; List expectedWriterHost = Collections.singletonList(writer); From 2696be910233cf8ff6171e5c26c6a67d1b987c5c Mon Sep 17 00:00:00 2001 From: aaron-congo Date: Thu, 19 Dec 2024 10:23:00 -0800 Subject: [PATCH 04/10] Don't attempt original writer if it is the verified writer and failoverMode=strict-reader --- .../failover2/FailoverConnectionPlugin.java | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java index a28465ec8..cb4382982 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java @@ -411,27 +411,33 @@ protected void failoverReader() throws SQLException { } } - if (originalWriter != null - && readerCandidate == null - && candidateConn == null - && System.nanoTime() < failoverEndTimeNano) { - // Try the original writer. The role may be inaccurate, so we will try connecting to it even if failover mode - // is set to STRICT_READER. - readerCandidate = originalWriter; - try { - candidateConn = this.pluginService.connect(readerCandidate, copyProp); - if (this.failoverMode == STRICT_READER - && this.pluginService.getHostRole(candidateConn) == HostRole.WRITER) { - verifiedWriter = readerCandidate; - candidateConn.close(); - candidateConn = null; - remainingHosts.remove(readerCandidate); - readerCandidate = null; - } - } catch (SQLException ex) { - readerCandidate = null; + if (readerCandidate != null + || candidateConn != null + || originalWriter == null + || System.nanoTime() > failoverEndTimeNano) { + continue; + } + + if (STRICT_READER.equals(this.failoverMode) && originalWriter.equals(verifiedWriter)) { + continue; + } + + // Try the original writer. The role may be inaccurate, so we will try connecting to it even if failover mode + // is set to STRICT_READER. + readerCandidate = originalWriter; + try { + candidateConn = this.pluginService.connect(readerCandidate, copyProp); + if (this.failoverMode == STRICT_READER + && this.pluginService.getHostRole(candidateConn) == HostRole.WRITER) { + verifiedWriter = readerCandidate; + candidateConn.close(); candidateConn = null; + remainingHosts.remove(readerCandidate); + readerCandidate = null; } + } catch (SQLException ex) { + readerCandidate = null; + candidateConn = null; } } From bfd9631fe0d9b92f491fa6393eb131409cdc5bd4 Mon Sep 17 00:00:00 2001 From: aaron-congo Date: Thu, 19 Dec 2024 17:22:06 -0800 Subject: [PATCH 05/10] failover2: cleanup reader failover, fix telemetry --- .../failover2/FailoverConnectionPlugin.java | 280 +++++++++--------- .../failover2/ReaderFailoverResult.java | 39 +++ ..._advanced_jdbc_wrapper_messages.properties | 6 +- 3 files changed, 188 insertions(+), 137 deletions(-) create mode 100644 wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/ReaderFailoverResult.java diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java index cb4382982..070c87052 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java @@ -27,6 +27,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Logger; import java.util.stream.Collectors; import software.amazon.jdbc.AwsWrapperProperty; @@ -127,7 +128,6 @@ public class FailoverConnectionPlugin extends AbstractConnectionPlugin { protected boolean isInTransaction = false; protected RdsUrlType rdsUrlType; protected HostListProviderService hostListProviderService; - protected long failoverStartTimeNano = 0; protected final AuroraStaleDnsHelper staleDnsHelper; protected final TelemetryCounter failoverWriterTriggeredCounter; protected final TelemetryCounter failoverWriterSuccessCounter; @@ -309,28 +309,11 @@ protected void dealWithIllegalStateException( * @throws SQLException if an error occurs */ protected void failover() throws SQLException { - if (this.failoverMode == FailoverMode.STRICT_WRITER) { failoverWriter(); } else { failoverReader(); } - - if (isInTransaction || this.pluginService.isInTransaction()) { - if (this.pluginManagerService != null) { - this.pluginManagerService.setInTransaction(false); - } - // "Transaction resolution unknown. Please re-configure session state if required and try - // restarting transaction." - final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError"); - LOGGER.info(errorMessage); - throw new TransactionStateUnknownSQLException(); - } else { - // "The active SQL connection has changed due to a connection failure. Please re-configure - // session state if required. " - LOGGER.severe(() -> Messages.get("Failover.connectionChangedError")); - throw new FailoverSuccessSQLException(); - } } protected void failoverReader() throws SQLException { @@ -339,129 +322,35 @@ protected void failoverReader() throws SQLException { TELEMETRY_READER_FAILOVER, TelemetryTraceLevel.NESTED); this.failoverReaderTriggeredCounter.inc(); - this.failoverStartTimeNano = System.nanoTime(); - final long failoverEndTimeNano = this.failoverStartTimeNano - + TimeUnit.MILLISECONDS.toNanos(this.failoverTimeoutMsSetting); + final long failoverStartNano = System.nanoTime(); + final long failoverEndNano = failoverStartNano + TimeUnit.MILLISECONDS.toNanos(this.failoverTimeoutMsSetting); try { LOGGER.fine(() -> Messages.get("Failover.startReaderFailover")); + // When we pass a timeout of 0, we inform the plugin service that it should update its topology without waiting + // for it to get updated, since we do not need updated topology to establish a reader connection. if (!this.pluginService.forceRefreshHostList(false, 0)) { - // "Unable to establish SQL connection to reader instance" - this.failoverReaderFailedCounter.inc(); - LOGGER.severe(Messages.get("Failover.unableToRefreshHostList", new Object[]{"[failoverReader]"})); - throw new FailoverFailedSQLException( - Messages.get("Failover.unableToRefreshHostList", new Object[]{"[failoverReader]"})); - } - - final Properties copyProp = PropertyUtils.copyProperties(this.properties); - copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true"); - // The host roles in this host list may or may not be accurate, depending on whether the new topology has become - // available or not yet. - final List hosts = this.pluginService.getHosts(); - Connection candidateConn = null; - HostSpec readerCandidate = null; - HostSpec verifiedWriter = null; - final HostSpec originalWriter = hosts.stream() - .filter(hostSpec -> HostRole.WRITER.equals(hostSpec.getRole())) - .findFirst() - .orElse(null); - final Set originalReaders = hosts.stream() - .filter(hostSpec -> HostRole.READER.equals(hostSpec.getRole())) - .collect(Collectors.toSet()); - while (candidateConn == null && System.nanoTime() < failoverEndTimeNano) { - final Set remainingHosts = new HashSet<>(originalReaders); - if (FailoverMode.STRICT_READER.equals(this.failoverMode) && verifiedWriter != null) { - remainingHosts.remove(verifiedWriter); - } - - while (!remainingHosts.isEmpty() - && candidateConn == null - && System.nanoTime() < failoverEndTimeNano) { - try { - readerCandidate = - this.pluginService.getHostSpecByStrategy( - new ArrayList<>(remainingHosts), - HostRole.READER, - this.failoverReaderHostSelectorStrategySetting); - } catch (UnsupportedOperationException | SQLException ex) { - // can't use selected strategy to get a reader host - LOGGER.finest("Error: " + ex.getMessage()); - break; - } - - if (readerCandidate == null) { - break; - } - - try { - candidateConn = this.pluginService.connect(readerCandidate, copyProp); - // Since the roles in the host list might not be accurate, we execute a query to check the instance's role. - if (this.failoverMode == STRICT_READER - && this.pluginService.getHostRole(candidateConn) == HostRole.WRITER) { - verifiedWriter = readerCandidate; - candidateConn.close(); - candidateConn = null; - remainingHosts.remove(readerCandidate); - readerCandidate = null; - } - } catch (SQLException ex) { - remainingHosts.remove(readerCandidate); - candidateConn = null; - readerCandidate = null; - } - } - - if (readerCandidate != null - || candidateConn != null - || originalWriter == null - || System.nanoTime() > failoverEndTimeNano) { - continue; - } - - if (STRICT_READER.equals(this.failoverMode) && originalWriter.equals(verifiedWriter)) { - continue; - } - - // Try the original writer. The role may be inaccurate, so we will try connecting to it even if failover mode - // is set to STRICT_READER. - readerCandidate = originalWriter; - try { - candidateConn = this.pluginService.connect(readerCandidate, copyProp); - if (this.failoverMode == STRICT_READER - && this.pluginService.getHostRole(candidateConn) == HostRole.WRITER) { - verifiedWriter = readerCandidate; - candidateConn.close(); - candidateConn = null; - remainingHosts.remove(readerCandidate); - readerCandidate = null; - } - } catch (SQLException ex) { - readerCandidate = null; - candidateConn = null; - } + LOGGER.severe(Messages.get("Failover.failoverReaderUnableToRefreshHostList")); + throw new FailoverFailedSQLException(Messages.get("Failover.failoverReaderUnableToRefreshHostList")); } - if (candidateConn == null) { - // "Unable to establish SQL connection to reader instance" - this.failoverReaderFailedCounter.inc(); + try { + ReaderFailoverResult result = getReaderFailoverConnection(failoverEndNano); + this.pluginService.setCurrentConnection(result.getConnection(), result.getHostSpec()); + } catch (TimeoutException e) { LOGGER.severe(Messages.get("Failover.unableToConnectToReader")); throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToReader")); } - this.pluginService.setCurrentConnection(candidateConn, readerCandidate); - LOGGER.info( () -> Messages.get( "Failover.establishedConnection", new Object[]{this.pluginService.getCurrentHostSpec()})); - this.failoverReaderSuccessCounter.inc(); - - } catch (FailoverSuccessSQLException ex) { telemetryContext.setSuccess(true); - telemetryContext.setException(ex); - this.failoverReaderSuccessCounter.inc(); - throw ex; + SQLException failoverSuccessException = getFailoverSuccessException(); + telemetryContext.setException(failoverSuccessException); + throw failoverSuccessException; } catch (Exception ex) { telemetryContext.setSuccess(false); telemetryContext.setException(ex); @@ -470,7 +359,7 @@ protected void failoverReader() throws SQLException { } finally { LOGGER.finest(() -> Messages.get( "Failover.readerFailoverElapsed", - new Object[]{TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.failoverStartTimeNano)})); + new Object[]{TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - failoverStartNano)})); telemetryContext.closeContext(); if (this.telemetryFailoverAdditionalTopTraceSetting) { telemetryFactory.postCopy(telemetryContext, TelemetryTraceLevel.FORCE_TOP_LEVEL); @@ -478,13 +367,134 @@ protected void failoverReader() throws SQLException { } } + protected ReaderFailoverResult getReaderFailoverConnection(long failoverEndTimeNano) throws TimeoutException { + final Properties copyProp = PropertyUtils.copyProperties(this.properties); + copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true"); + + // The roles in this list might not be accurate, depending on whether the new topology has become available yet. + final List hosts = this.pluginService.getHosts(); + final Set readerCandidates = hosts.stream() + .filter(hostSpec -> HostRole.READER.equals(hostSpec.getRole())) + .collect(Collectors.toSet()); + final HostSpec originalWriter = hosts.stream() + .filter(hostSpec -> HostRole.WRITER.equals(hostSpec.getRole())) + .findFirst() + .orElse(null); + boolean isOriginalWriterStillWriter = false; + + do { + final Set remainingReaders = new HashSet<>(readerCandidates); + while (!remainingReaders.isEmpty() && System.nanoTime() < failoverEndTimeNano) { + HostSpec readerCandidate; + try { + readerCandidate = + this.pluginService.getHostSpecByStrategy( + new ArrayList<>(remainingReaders), + HostRole.READER, + this.failoverReaderHostSelectorStrategySetting); + } catch (UnsupportedOperationException | SQLException ex) { + LOGGER.finest( + Messages.get( + "Failover.errorSelectingReaderHost", + new Object[]{ + ex.getMessage(), + Utils.logTopology(new ArrayList<>(remainingReaders), "") + })); + break; + } + + if (readerCandidate == null) { + LOGGER.finest( + Messages.get("Failover.readerCandidateNull", + new Object[]{Utils.logTopology(new ArrayList<>(remainingReaders), "")})); + break; + } + + try { + Connection candidateConn = this.pluginService.connect(readerCandidate, copyProp); + if (this.failoverMode != STRICT_READER) { + return new ReaderFailoverResult(candidateConn, readerCandidate); + } + + // Since the roles in the host list might not be accurate, we execute a query to check the instance's role. + if (this.pluginService.getHostRole(candidateConn) == HostRole.WRITER) { + // The reader candidate is actually a writer, which is not valid when failoverMode is STRICT_READER. + // We will remove it from the list of reader candidates and remaining readers to avoid retrying it. + readerCandidates.remove(readerCandidate); + remainingReaders.remove(readerCandidate); + candidateConn.close(); + } + } catch (SQLException ex) { + remainingReaders.remove(readerCandidate); + } + } + + // We were not able to connect to any of the original readers. We will try connecting to the original writer, + // which may have been demoted to a reader. + + if (originalWriter == null || System.nanoTime() > failoverEndTimeNano) { + // No writer was found in the original topology, or we have timed out. + continue; + } + + if (this.failoverMode == STRICT_READER && isOriginalWriterStillWriter) { + // The original writer has been verified, so it is not valid when in STRICT_READER mode. + continue; + } + + // Try the original writer, which may have been demoted to a reader. + try { + Connection candidateConn = this.pluginService.connect(originalWriter, copyProp); + if (this.failoverMode != STRICT_READER) { + return new ReaderFailoverResult(candidateConn, originalWriter); + } + + // We are in STRICT_READER mode, so we need to verify the host's role. + HostRole role = this.pluginService.getHostRole(candidateConn); + if (role == HostRole.READER) { + return new ReaderFailoverResult(candidateConn, originalWriter); + } + + // We are in STRICT_READER mode and the connection is not a reader, so it is not valid. + + if (role == HostRole.WRITER) { + isOriginalWriterStillWriter = true; + } + + candidateConn.close(); + } catch (SQLException ex) { + LOGGER.fine(Messages.get("Failover.failedReaderConnection", new Object[]{originalWriter.getUrl()})); + } + } while (System.nanoTime() < failoverEndTimeNano); // All hosts failed. Keep trying until we hit the timeout. + + throw new TimeoutException(Messages.get("Failover.failoverReaderTimeout")); + } + + protected SQLException getFailoverSuccessException() { + if (isInTransaction || this.pluginService.isInTransaction()) { + if (this.pluginManagerService != null) { + this.pluginManagerService.setInTransaction(false); + } + // "Transaction resolution unknown. Please re-configure session state if required and try + // restarting transaction." + final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError"); + LOGGER.info(errorMessage); + return new TransactionStateUnknownSQLException(); + } else { + // "The active SQL connection has changed due to a connection failure. Please re-configure + // session state if required. " + LOGGER.severe(() -> Messages.get("Failover.connectionChangedError")); + return new FailoverSuccessSQLException(); + } + } + protected void failoverWriter() throws SQLException { TelemetryFactory telemetryFactory = this.pluginService.getTelemetryFactory(); TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext( TELEMETRY_WRITER_FAILOVER, TelemetryTraceLevel.NESTED); this.failoverWriterTriggeredCounter.inc(); - this.failoverStartTimeNano = System.nanoTime(); + long failoverStartTimeNano = System.nanoTime(); try { LOGGER.info(() -> Messages.get("Failover.startWriterFailover")); @@ -492,11 +502,9 @@ protected void failoverWriter() throws SQLException { // It's expected that this method synchronously returns when topology is stabilized, // i.e. when cluster control plane has already chosen a new writer. if (!this.pluginService.forceRefreshHostList(true, this.failoverTimeoutMsSetting)) { - // "Unable to establish SQL connection to writer node" this.failoverWriterFailedCounter.inc(); - LOGGER.severe(Messages.get("Failover.unableToRefreshHostList", new Object[]{"[failoverWriter]"})); - throw new FailoverFailedSQLException( - Messages.get("Failover.unableToRefreshHostList", new Object[]{"[failoverWriter]"})); + LOGGER.severe(Messages.get("Failover.unableToRefreshHostList")); + throw new FailoverFailedSQLException(Messages.get("Failover.unableToRefreshHostList")); } final List updatedHosts = this.pluginService.getAllHosts(); @@ -556,11 +564,11 @@ protected void failoverWriter() throws SQLException { new Object[]{this.pluginService.getCurrentHostSpec()})); this.failoverWriterSuccessCounter.inc(); - - } catch (FailoverSuccessSQLException ex) { telemetryContext.setSuccess(true); - telemetryContext.setException(ex); - this.failoverWriterSuccessCounter.inc(); + SQLException failoverSuccessException = getFailoverSuccessException(); + telemetryContext.setException(failoverSuccessException); + throw failoverSuccessException; + } catch (FailoverSuccessSQLException ex) { throw ex; } catch (Exception ex) { telemetryContext.setSuccess(false); @@ -570,7 +578,7 @@ protected void failoverWriter() throws SQLException { } finally { LOGGER.finest(() -> Messages.get( "Failover.writerFailoverElapsed", - new Object[]{TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.failoverStartTimeNano)})); + new Object[]{TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - failoverStartTimeNano)})); telemetryContext.closeContext(); if (this.telemetryFailoverAdditionalTopTraceSetting) { telemetryFactory.postCopy(telemetryContext, TelemetryTraceLevel.FORCE_TOP_LEVEL); diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/ReaderFailoverResult.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/ReaderFailoverResult.java new file mode 100644 index 000000000..53b417208 --- /dev/null +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/ReaderFailoverResult.java @@ -0,0 +1,39 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.jdbc.plugin.failover2; + +import java.sql.Connection; +import org.checkerframework.checker.nullness.qual.NonNull; +import software.amazon.jdbc.HostSpec; + +public class ReaderFailoverResult { + private final Connection connection; + private final HostSpec hostSpec; + + public ReaderFailoverResult(@NonNull Connection connection, @NonNull HostSpec hostSpec) { + this.connection = connection; + this.hostSpec = hostSpec; + } + + public @NonNull Connection getConnection() { + return connection; + } + + public @NonNull HostSpec getHostSpec() { + return hostSpec; + } +} diff --git a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties index 42e04e772..1b01ccca3 100644 --- a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties +++ b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties @@ -173,11 +173,13 @@ ExecutionTimeConnectionPlugin.executionTime=Executed {0} in {1} nanos. Failover.transactionResolutionUnknownError=Transaction resolution unknown. Please re-configure session state if required and try restarting the transaction. Failover.connectionChangedError=The active SQL connection has changed due to a connection failure. Please re-configure session state if required. Failover.exceptionConnectingToWriter=An exception occurred while trying to connect to the new writer ''{0}''. +Failover.failoverReaderTimeout=The reader failover process was not able to establish a connection before timing out. +Failover.failoverReaderUnableToRefreshHostList=The request to discover the new topology was unsuccessful. Failover.parameterValue={0}={1} Failover.unableToConnect=Unable to establish a SQL connection due to an unexpected error. Failover.unableToConnectToWriter=Unable to establish SQL connection to the writer instance. Failover.unableToConnectToReader=Unable to establish SQL connection to the reader instance. -Failover.unableToRefreshHostList={0} The request to discover the new topology timed out or was unsuccessful. +Failover.unableToRefreshHostList=The request to discover the new topology timed out or was unsuccessful. Failover.unexpectedReaderRole=The new writer was identified to be ''{0}'', but querying the instance for its role returned a role of {1}. Failover.detectedException=Detected an exception while executing a command: {0} Failover.failoverDisabled=Cluster-aware failover is disabled. @@ -190,6 +192,8 @@ Failover.noOperationsAfterConnectionClosed=No operations allowed after connectio Failover.noWriterHost=Unable to find writer in updated host list: Failover.readerFailoverElapsed=Reader failover elapsed in {0} ms. Failover.writerFailoverElapsed=Writer failover elapsed in {0} ms. +Failover.failedReaderConnection=[Reader Failover] Failed to connect to host: ''{0}'' +Failover.errorSelectingReaderHost=An error occurred while attempting to select a reader host candidate: ''{0}''. Candidates: {1} # Federated Auth Plugin FederatedAuthPlugin.unableToDetermineRegion=Unable to determine connection region. If you are using a non-standard RDS URL, please set the ''{0}'' property. From 9316886f299269363f57b8a890fdb90d60ad4e2c Mon Sep 17 00:00:00 2001 From: aaron-congo Date: Thu, 19 Dec 2024 17:23:23 -0800 Subject: [PATCH 06/10] failover1: fix telemetry --- .../failover/FailoverConnectionPlugin.java | 46 ++++++++++--------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java index 039ab60fd..46ed1314c 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java @@ -578,22 +578,6 @@ protected void failover(final HostSpec failedHost) throws SQLException { } else { failoverReader(failedHost); } - - if (isInTransaction || this.pluginService.isInTransaction()) { - if (this.pluginManagerService != null) { - this.pluginManagerService.setInTransaction(false); - } - // "Transaction resolution unknown. Please re-configure session state if required and try - // restarting transaction." - final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError"); - LOGGER.info(errorMessage); - throw new TransactionStateUnknownSQLException(); - } else { - // "The active SQL connection has changed due to a connection failure. Please re-configure - // session state if required. " - LOGGER.severe(() -> Messages.get("Failover.connectionChangedError")); - throw new FailoverSuccessSQLException(); - } } protected void failoverReader(final HostSpec failedHostSpec) throws SQLException { @@ -629,20 +613,20 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException this.pluginService.setCurrentConnection(result.getConnection(), result.getHost()); - this.pluginService.getCurrentHostSpec().removeAlias(oldAliases.toArray(new String[]{})); + this.pluginService.getCurrentHostSpec().removeAlias(oldAliases.toArray(new String[] {})); updateTopology(true); LOGGER.info( () -> Messages.get( "Failover.establishedConnection", - new Object[]{this.pluginService.getCurrentHostSpec()})); + new Object[] {this.pluginService.getCurrentHostSpec()})); this.failoverReaderSuccessCounter.inc(); - - } catch (FailoverSuccessSQLException ex) { telemetryContext.setSuccess(true); - telemetryContext.setException(ex); - this.failoverReaderSuccessCounter.inc(); + SQLException failoverSuccessException = getFailoverSuccessException(); + telemetryContext.setException(failoverSuccessException); + throw failoverSuccessException; + } catch (FailoverSuccessSQLException ex) { throw ex; } catch (Exception ex) { telemetryContext.setSuccess(false); @@ -657,6 +641,24 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException } } + protected SQLException getFailoverSuccessException() { + if (isInTransaction || this.pluginService.isInTransaction()) { + if (this.pluginManagerService != null) { + this.pluginManagerService.setInTransaction(false); + } + // "Transaction resolution unknown. Please re-configure session state if required and try + // restarting transaction." + final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError"); + LOGGER.info(errorMessage); + return new TransactionStateUnknownSQLException(); + } else { + // "The active SQL connection has changed due to a connection failure. Please re-configure + // session state if required. " + LOGGER.severe(() -> Messages.get("Failover.connectionChangedError")); + return new FailoverSuccessSQLException(); + } + } + protected void failoverWriter() throws SQLException { TelemetryFactory telemetryFactory = this.pluginService.getTelemetryFactory(); TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext( From 9d69dc1c5ecc49929707d0fa70c84177361d4e5d Mon Sep 17 00:00:00 2001 From: aaron-congo Date: Thu, 19 Dec 2024 17:58:57 -0800 Subject: [PATCH 07/10] Cleanup --- .../failover/FailoverConnectionPlugin.java | 59 ++++++++----------- .../failover2/FailoverConnectionPlugin.java | 40 ++++++------- ..._advanced_jdbc_wrapper_messages.properties | 3 +- 3 files changed, 44 insertions(+), 58 deletions(-) diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java index 46ed1314c..44f7a6694 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java @@ -36,7 +36,6 @@ import software.amazon.jdbc.HostSpec; import software.amazon.jdbc.JdbcCallable; import software.amazon.jdbc.NodeChangeOptions; -import software.amazon.jdbc.OldConnectionSuggestedAction; import software.amazon.jdbc.PluginManagerService; import software.amazon.jdbc.PluginService; import software.amazon.jdbc.PropertyDefinition; @@ -297,11 +296,6 @@ void initHostProvider( this.writerFailoverHandler = writerFailoverHandlerSupplier.get(); } - @Override - public OldConnectionSuggestedAction notifyConnectionChanged(final EnumSet changes) { - return OldConnectionSuggestedAction.NO_OPINION; - } - @Override public void notifyNodeListChanged(final Map> changes) { @@ -494,7 +488,7 @@ private boolean isWriter(final HostSpec hostSpec) { return hostSpec.getRole() == HostRole.WRITER; } - private void processFailoverFailure(final String message) throws SQLException { + private void throwFailoverFailedException(final String message) throws SQLException { LOGGER.severe(message); throw new FailoverFailedSQLException(message); } @@ -594,9 +588,8 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException if (failedHostSpec != null && failedHostSpec.getRawAvailability() == HostAvailability.AVAILABLE) { failedHost = failedHostSpec; } - final ReaderFailoverResult result = - readerFailoverHandler.failover(this.pluginService.getHosts(), failedHost); + final ReaderFailoverResult result = readerFailoverHandler.failover(this.pluginService.getHosts(), failedHost); if (result != null) { final SQLException exception = result.getException(); if (exception != null) { @@ -605,9 +598,7 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException } if (result == null || !result.isConnected()) { - // "Unable to establish SQL connection to reader instance" - processFailoverFailure(Messages.get("Failover.unableToConnectToReader")); - this.failoverReaderFailedCounter.inc(); + throwFailoverFailedException(Messages.get("Failover.unableToConnectToReader")); return; } @@ -620,13 +611,11 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException () -> Messages.get( "Failover.establishedConnection", new Object[] {this.pluginService.getCurrentHostSpec()})); - + throwFailoverSuccessException(); + } catch (FailoverSuccessSQLException ex) { this.failoverReaderSuccessCounter.inc(); telemetryContext.setSuccess(true); - SQLException failoverSuccessException = getFailoverSuccessException(); - telemetryContext.setException(failoverSuccessException); - throw failoverSuccessException; - } catch (FailoverSuccessSQLException ex) { + telemetryContext.setException(ex); throw ex; } catch (Exception ex) { telemetryContext.setSuccess(false); @@ -641,7 +630,7 @@ protected void failoverReader(final HostSpec failedHostSpec) throws SQLException } } - protected SQLException getFailoverSuccessException() { + protected void throwFailoverSuccessException() throws SQLException { if (isInTransaction || this.pluginService.isInTransaction()) { if (this.pluginManagerService != null) { this.pluginManagerService.setInTransaction(false); @@ -650,12 +639,12 @@ protected SQLException getFailoverSuccessException() { // restarting transaction." final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError"); LOGGER.info(errorMessage); - return new TransactionStateUnknownSQLException(); + throw new TransactionStateUnknownSQLException(); } else { // "The active SQL connection has changed due to a connection failure. Please re-configure // session state if required. " LOGGER.severe(() -> Messages.get("Failover.connectionChangedError")); - return new FailoverSuccessSQLException(); + throw new FailoverSuccessSQLException(); } } @@ -667,8 +656,7 @@ protected void failoverWriter() throws SQLException { try { LOGGER.info(() -> Messages.get("Failover.startWriterFailover")); - final WriterFailoverResult failoverResult = - this.writerFailoverHandler.failover(this.pluginService.getAllHosts()); + final WriterFailoverResult failoverResult = this.writerFailoverHandler.failover(this.pluginService.getAllHosts()); if (failoverResult != null) { final SQLException exception = failoverResult.getException(); if (exception != null) { @@ -677,23 +665,25 @@ protected void failoverWriter() throws SQLException { } if (failoverResult == null || !failoverResult.isConnected()) { - // "Unable to establish SQL connection to writer node" - processFailoverFailure(Messages.get("Failover.unableToConnectToWriter")); - this.failoverWriterFailedCounter.inc(); + throwFailoverFailedException(Messages.get("Failover.unableToConnectToWriter")); + return; + } + + List hosts = failoverResult.getTopology(); + final HostSpec writerHostSpec = getWriter(hosts); + if (writerHostSpec == null) { + throwFailoverFailedException( + Messages.get( + "Failover.noWriterHostAfterReconnecting", + new Object[]{Utils.logTopology(hosts, "")})); return; } - // successfully re-connected to a writer node - final HostSpec writerHostSpec = getWriter(failoverResult.getTopology()); final List allowedHosts = this.pluginService.getHosts(); if (!allowedHosts.contains(writerHostSpec)) { - this.failoverWriterFailedCounter.inc(); - processFailoverFailure( + throwFailoverFailedException( Messages.get("Failover.newWriterNotAllowed", - new Object[] { - writerHostSpec == null ? "" : writerHostSpec.getHost(), - Utils.logTopology(allowedHosts, "") - })); + new Object[] {writerHostSpec.getHost(), Utils.logTopology(allowedHosts, "")})); return; } @@ -705,8 +695,7 @@ protected void failoverWriter() throws SQLException { new Object[]{this.pluginService.getCurrentHostSpec()})); this.pluginService.refreshHostList(); - - this.failoverWriterSuccessCounter.inc(); + throwFailoverSuccessException(); } catch (FailoverSuccessSQLException ex) { telemetryContext.setSuccess(true); telemetryContext.setException(ex); diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java index 070c87052..7c5ad8148 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java @@ -334,7 +334,7 @@ protected void failoverReader() throws SQLException { throw new FailoverFailedSQLException(Messages.get("Failover.failoverReaderUnableToRefreshHostList")); } - try { + try { ReaderFailoverResult result = getReaderFailoverConnection(failoverEndNano); this.pluginService.setCurrentConnection(result.getConnection(), result.getHostSpec()); } catch (TimeoutException e) { @@ -345,12 +345,13 @@ protected void failoverReader() throws SQLException { LOGGER.info( () -> Messages.get( "Failover.establishedConnection", - new Object[]{this.pluginService.getCurrentHostSpec()})); + new Object[] {this.pluginService.getCurrentHostSpec()})); + throwFailoverSuccessException(); + } catch (FailoverSuccessSQLException ex) { this.failoverReaderSuccessCounter.inc(); telemetryContext.setSuccess(true); - SQLException failoverSuccessException = getFailoverSuccessException(); - telemetryContext.setException(failoverSuccessException); - throw failoverSuccessException; + telemetryContext.setException(ex); + throw ex; } catch (Exception ex) { telemetryContext.setSuccess(false); telemetryContext.setException(ex); @@ -394,19 +395,15 @@ protected ReaderFailoverResult getReaderFailoverConnection(long failoverEndTimeN this.failoverReaderHostSelectorStrategySetting); } catch (UnsupportedOperationException | SQLException ex) { LOGGER.finest( - Messages.get( - "Failover.errorSelectingReaderHost", - new Object[]{ - ex.getMessage(), - Utils.logTopology(new ArrayList<>(remainingReaders), "") - })); + Utils.logTopology( + new ArrayList<>(remainingReaders), + Messages.get("Failover.errorSelectingReaderHost", new Object[]{ex.getMessage()}))); break; } if (readerCandidate == null) { LOGGER.finest( - Messages.get("Failover.readerCandidateNull", - new Object[]{Utils.logTopology(new ArrayList<>(remainingReaders), "")})); + Utils.logTopology(new ArrayList<>(remainingReaders), Messages.get("Failover.readerCandidateNull"))); break; } @@ -470,7 +467,7 @@ protected ReaderFailoverResult getReaderFailoverConnection(long failoverEndTimeN throw new TimeoutException(Messages.get("Failover.failoverReaderTimeout")); } - protected SQLException getFailoverSuccessException() { + protected void throwFailoverSuccessException() throws SQLException { if (isInTransaction || this.pluginService.isInTransaction()) { if (this.pluginManagerService != null) { this.pluginManagerService.setInTransaction(false); @@ -479,12 +476,12 @@ protected SQLException getFailoverSuccessException() { // restarting transaction." final String errorMessage = Messages.get("Failover.transactionResolutionUnknownError"); LOGGER.info(errorMessage); - return new TransactionStateUnknownSQLException(); + throw new TransactionStateUnknownSQLException(); } else { // "The active SQL connection has changed due to a connection failure. Please re-configure // session state if required. " LOGGER.severe(() -> Messages.get("Failover.connectionChangedError")); - return new FailoverSuccessSQLException(); + throw new FailoverSuccessSQLException(); } } @@ -552,7 +549,8 @@ protected void failoverWriter() throws SQLException { // do nothing } this.failoverWriterFailedCounter.inc(); - LOGGER.severe(Messages.get("Failover.unexpectedReaderRole", new Object[]{writerCandidate.getHost(), role})); + LOGGER.severe( + Messages.get("Failover.unexpectedReaderRole", new Object[]{writerCandidate.getHost(), role})); throw new FailoverFailedSQLException(Messages.get("Failover.unexpectedReaderRole")); } @@ -562,13 +560,11 @@ protected void failoverWriter() throws SQLException { () -> Messages.get( "Failover.establishedConnection", new Object[]{this.pluginService.getCurrentHostSpec()})); - + throwFailoverSuccessException(); + } catch (FailoverSuccessSQLException ex) { this.failoverWriterSuccessCounter.inc(); telemetryContext.setSuccess(true); - SQLException failoverSuccessException = getFailoverSuccessException(); - telemetryContext.setException(failoverSuccessException); - throw failoverSuccessException; - } catch (FailoverSuccessSQLException ex) { + telemetryContext.setException(ex); throw ex; } catch (Exception ex) { telemetryContext.setSuccess(false); diff --git a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties index 1b01ccca3..687677393 100644 --- a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties +++ b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties @@ -175,6 +175,7 @@ Failover.connectionChangedError=The active SQL connection has changed due to a c Failover.exceptionConnectingToWriter=An exception occurred while trying to connect to the new writer ''{0}''. Failover.failoverReaderTimeout=The reader failover process was not able to establish a connection before timing out. Failover.failoverReaderUnableToRefreshHostList=The request to discover the new topology was unsuccessful. +Failover.noWriterHostAfterReconnecting=The writer failover process successfully reconnected, but no writer was found in the updated topology: {0} Failover.parameterValue={0}={1} Failover.unableToConnect=Unable to establish a SQL connection due to an unexpected error. Failover.unableToConnectToWriter=Unable to establish SQL connection to the writer instance. @@ -193,7 +194,7 @@ Failover.noWriterHost=Unable to find writer in updated host list: Failover.readerFailoverElapsed=Reader failover elapsed in {0} ms. Failover.writerFailoverElapsed=Writer failover elapsed in {0} ms. Failover.failedReaderConnection=[Reader Failover] Failed to connect to host: ''{0}'' -Failover.errorSelectingReaderHost=An error occurred while attempting to select a reader host candidate: ''{0}''. Candidates: {1} +Failover.errorSelectingReaderHost=An error occurred while attempting to select a reader host candidate: ''{0}''. Candidates: # Federated Auth Plugin FederatedAuthPlugin.unableToDetermineRegion=Unable to determine connection region. If you are using a non-standard RDS URL, please set the ''{0}'' property. From 710e13ec5b558428e98b840f264c70cdbe7f5bfb Mon Sep 17 00:00:00 2001 From: aaron-congo Date: Fri, 20 Dec 2024 10:53:55 -0800 Subject: [PATCH 08/10] failover1: create failover handlers in connectInternal --- .../failover/FailoverConnectionPlugin.java | 55 ++++++--- .../amazon/jdbc/util/ConnectionUrlParser.java | 21 ---- ..._advanced_jdbc_wrapper_messages.properties | 2 + .../FailoverConnectionPluginTest.java | 106 +++++------------- .../jdbc/util/ConnectionUrlParserTest.java | 35 ------ 5 files changed, 70 insertions(+), 149 deletions(-) diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java index 44f7a6694..423ce7006 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java @@ -43,7 +43,6 @@ import software.amazon.jdbc.plugin.AbstractConnectionPlugin; import software.amazon.jdbc.plugin.staledns.AuroraStaleDnsHelper; import software.amazon.jdbc.targetdriverdialect.TargetDriverDialect; -import software.amazon.jdbc.util.ConnectionUrlParser; import software.amazon.jdbc.util.Messages; import software.amazon.jdbc.util.RdsUrlType; import software.amazon.jdbc.util.RdsUtils; @@ -107,6 +106,8 @@ public class FailoverConnectionPlugin extends AbstractConnectionPlugin { private RdsUrlType rdsUrlType = null; private HostListProviderService hostListProviderService; private final AuroraStaleDnsHelper staleDnsHelper; + private Supplier writerFailoverHandlerSupplier; + private Supplier readerFailoverHandlerSupplier; public static final AwsWrapperProperty FAILOVER_CLUSTER_TOPOLOGY_REFRESH_RATE_MS = new AwsWrapperProperty( @@ -257,7 +258,6 @@ public void initHostProvider( final JdbcCallable initHostProviderFunc) throws SQLException { initHostProvider( - initialUrl, hostListProviderService, initHostProviderFunc, () -> @@ -278,22 +278,20 @@ public void initHostProvider( } void initHostProvider( - final String initialUrl, final HostListProviderService hostListProviderService, final JdbcCallable initHostProviderFunc, - final Supplier readerFailoverHandlerSupplier, - final Supplier writerFailoverHandlerSupplier) + final Supplier readerFailoverHandlerSupplier, + final Supplier writerFailoverHandlerSupplier) throws SQLException { + this.readerFailoverHandlerSupplier = readerFailoverHandlerSupplier; + this.writerFailoverHandlerSupplier = writerFailoverHandlerSupplier; + this.hostListProviderService = hostListProviderService; if (!this.enableFailoverSetting) { return; } initHostProviderFunc.call(); - initFailoverMode(initialUrl); - - this.readerFailoverHandler = readerFailoverHandlerSupplier.get(); - this.writerFailoverHandler = writerFailoverHandlerSupplier.get(); } @Override @@ -337,11 +335,6 @@ private boolean isNodeStillValid(final String node, final Map connectFunc, boolean isForceConnect) throws SQLException { + this.initFailoverMode(); + if (this.readerFailoverHandler == null) { + if (this.readerFailoverHandlerSupplier == null) { + throw new SQLException(Messages.get("Failover.nullReaderFailoverHandlerSupplier")); + } + this.readerFailoverHandler = this.readerFailoverHandlerSupplier.get(); + } + + if (this.writerFailoverHandler == null) { + if (this.writerFailoverHandlerSupplier == null) { + throw new SQLException(Messages.get("Failover.nullWriterFailoverHandlerSupplier")); + } + this.writerFailoverHandler = this.writerFailoverHandlerSupplier.get(); + } + Connection conn = null; try { conn = @@ -842,4 +852,17 @@ public Connection forceConnect( throws SQLException { return connectInternal(driverProtocol, hostSpec, props, isInitialConnection, forceConnectFunc, true); } + + // The below methods are for testing purposes + void setRdsUrlType(final RdsUrlType rdsUrlType) { + this.rdsUrlType = rdsUrlType; + } + + void setWriterFailoverHandler(final WriterFailoverHandler writerFailoverHandler) { + this.writerFailoverHandler = writerFailoverHandler; + } + + void setReaderFailoverHandler(final ReaderFailoverHandler readerFailoverHandler) { + this.readerFailoverHandler = readerFailoverHandler; + } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/util/ConnectionUrlParser.java b/wrapper/src/main/java/software/amazon/jdbc/util/ConnectionUrlParser.java index be812991b..9fa9e0afa 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/util/ConnectionUrlParser.java +++ b/wrapper/src/main/java/software/amazon/jdbc/util/ConnectionUrlParser.java @@ -87,27 +87,6 @@ public static HostSpec parseHostPortPair(final String url, final HostRole role, return getHostSpec(hostPortPair, role, hostSpecBuilderSupplier.get()); } - public static @Nullable String parseHost(final String connString) { - final Matcher matcher = CONNECTION_STRING_PATTERN.matcher(connString); - if (!matcher.matches()) { - return null; - } - - final String hosts = matcher.group("hosts") == null ? null : matcher.group("hosts").trim(); - if (hosts == null) { - return null; - } - - final String[] hostArray = hosts.split(HOSTS_SEPARATOR); - if (hostArray.length == 0) { - return null; - } - - final String url = hostArray[0]; - final String[] hostPortPair = url.split(HOST_PORT_SEPARATOR, 2); - return hostPortPair.length == 0 ? null : hostPortPair[0]; - } - private static HostSpec getHostSpec(final String[] hostPortPair, final HostRole hostRole, final HostSpecBuilder hostSpecBuilder) { String hostId = rdsUtils.getRdsInstanceId(hostPortPair[0]); diff --git a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties index 687677393..ec42fb3c0 100644 --- a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties +++ b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties @@ -176,6 +176,8 @@ Failover.exceptionConnectingToWriter=An exception occurred while trying to conne Failover.failoverReaderTimeout=The reader failover process was not able to establish a connection before timing out. Failover.failoverReaderUnableToRefreshHostList=The request to discover the new topology was unsuccessful. Failover.noWriterHostAfterReconnecting=The writer failover process successfully reconnected, but no writer was found in the updated topology: {0} +Failover.nullReaderFailoverHandlerSupplier=The failover plugin was unable to create a reader failover handler because the supplier was unexpectedly null. +Failover.nullWriterFailoverHandlerSupplier=The failover plugin was unable to create a writer failover handler because the supplier was unexpectedly null. Failover.parameterValue={0}={1} Failover.unableToConnect=Unable to establish a SQL connection due to an unexpected error. Failover.unableToConnectToWriter=Unable to establish SQL connection to the writer instance. diff --git a/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPluginTest.java b/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPluginTest.java index f1075d9fd..2123848d1 100644 --- a/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPluginTest.java +++ b/wrapper/src/test/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPluginTest.java @@ -34,7 +34,6 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; @@ -50,7 +49,6 @@ import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import software.amazon.jdbc.HostListProvider; import software.amazon.jdbc.HostListProviderService; import software.amazon.jdbc.HostRole; import software.amazon.jdbc.HostSpec; @@ -61,7 +59,6 @@ import software.amazon.jdbc.hostavailability.HostAvailability; import software.amazon.jdbc.hostavailability.SimpleHostAvailabilityStrategy; import software.amazon.jdbc.hostlistprovider.AuroraHostListProvider; -import software.amazon.jdbc.hostlistprovider.DynamicHostListProvider; import software.amazon.jdbc.util.RdsUrlType; import software.amazon.jdbc.util.SqlState; import software.amazon.jdbc.util.telemetry.GaugeCallable; @@ -75,6 +72,11 @@ class FailoverConnectionPluginTest { private static final Class MONITOR_METHOD_INVOKE_ON = Connection.class; private static final String MONITOR_METHOD_NAME = "Connection.executeQuery"; private static final Object[] EMPTY_ARGS = {}; + private final List defaultHosts = Arrays.asList( + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("writer").port(1234).role(HostRole.WRITER).build(), + new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("reader1").port(1234).role(HostRole.READER).build()); @Mock PluginService mockPluginService; @Mock Connection mockConnection; @@ -82,8 +84,8 @@ class FailoverConnectionPluginTest { @Mock HostListProviderService mockHostListProviderService; @Mock AuroraHostListProvider mockHostListProvider; @Mock JdbcCallable mockInitHostProviderFunc; - @Mock ClusterAwareReaderFailoverHandler mockReaderFailoverHandler; - @Mock ClusterAwareWriterFailoverHandler mockWriterFailoverHandler; + @Mock ReaderFailoverHandler mockReaderFailoverHandler; + @Mock WriterFailoverHandler mockWriterFailoverHandler; @Mock ReaderFailoverResult mockReaderResult; @Mock WriterFailoverResult mockWriterResult; @Mock JdbcCallable mockSqlFunction; @@ -111,8 +113,13 @@ void init() throws SQLException { when(mockPluginService.getCurrentHostSpec()).thenReturn(mockHostSpec); when(mockPluginService.connect(any(HostSpec.class), eq(properties))).thenReturn(mockConnection); when(mockPluginService.getTelemetryFactory()).thenReturn(mockTelemetryFactory); + when(mockPluginService.getHosts()).thenReturn(defaultHosts); + when(mockPluginService.getAllHosts()).thenReturn(defaultHosts); when(mockReaderFailoverHandler.failover(any(), any())).thenReturn(mockReaderResult); when(mockWriterFailoverHandler.failover(any())).thenReturn(mockWriterResult); + when(mockWriterResult.isConnected()).thenReturn(true); + when(mockWriterResult.getTopology()).thenReturn(defaultHosts); + when(mockReaderResult.isConnected()).thenReturn(true); when(mockPluginService.getTelemetryFactory()).thenReturn(mockTelemetryFactory); when(mockTelemetryFactory.openTelemetryContext(anyString(), any())).thenReturn(mockTelemetryContext); @@ -157,7 +164,7 @@ void test_updateTopology() throws SQLException { initializePlugin(); // Test updateTopology with failover disabled - when(mockHostListProvider.getRdsUrlType()).thenReturn(RdsUrlType.RDS_PROXY); + plugin.setRdsUrlType(RdsUrlType.RDS_PROXY); plugin.updateTopology(false); verify(mockPluginService, never()).forceRefreshHostList(); verify(mockPluginService, never()).refreshHostList(); @@ -196,50 +203,41 @@ void test_updateTopology_withForceUpdate(final boolean forceUpdate) throws SQLEx } @Test - void test_failover_failoverReader() throws SQLException { + void test_failover_failoverWriter() throws SQLException { when(mockPluginService.isInTransaction()).thenReturn(true); initializePlugin(); final FailoverConnectionPlugin spyPlugin = spy(plugin); - doNothing().when(spyPlugin).failoverWriter(); + doThrow(FailoverSuccessSQLException.class).when(spyPlugin).failoverWriter(); spyPlugin.failoverMode = FailoverMode.STRICT_WRITER; - SQLException exception = assertThrows(SQLException.class, () -> spyPlugin.failover(mockHostSpec)); - assertEquals(SqlState.CONNECTION_FAILURE_DURING_TRANSACTION.getState(), exception.getSQLState()); + assertThrows(FailoverSuccessSQLException.class, () -> spyPlugin.failover(mockHostSpec)); verify(spyPlugin).failoverWriter(); } @Test - void test_failover_failoverWriter() throws SQLException { + void test_failover_failoverReader() throws SQLException { when(mockPluginService.isInTransaction()).thenReturn(false); initializePlugin(); final FailoverConnectionPlugin spyPlugin = spy(plugin); - doNothing().when(spyPlugin).failoverReader(eq(mockHostSpec)); + doThrow(FailoverSuccessSQLException.class).when(spyPlugin).failoverReader(eq(mockHostSpec)); spyPlugin.failoverMode = FailoverMode.READER_OR_WRITER; - SQLException exception = assertThrows(SQLException.class, () -> spyPlugin.failover(mockHostSpec)); - assertEquals(SqlState.COMMUNICATION_LINK_CHANGED.getState(), exception.getSQLState()); + assertThrows(FailoverSuccessSQLException.class, () -> spyPlugin.failover(mockHostSpec)); verify(spyPlugin).failoverReader(eq(mockHostSpec)); } @Test void test_failoverReader_withValidFailedHostSpec_successFailover() throws SQLException { - final HostSpec hostSpec = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("hostA") - .build(); - final List hosts = Collections.singletonList(hostSpec); - when(mockHostSpec.getAliases()).thenReturn(new HashSet<>(Arrays.asList("alias1", "alias2"))); when(mockHostSpec.getRawAvailability()).thenReturn(HostAvailability.AVAILABLE); - when(mockPluginService.getAllHosts()).thenReturn(hosts); - when(mockPluginService.getHosts()).thenReturn(hosts); when(mockReaderResult.isConnected()).thenReturn(true); when(mockReaderResult.getConnection()).thenReturn(mockConnection); - when(mockReaderResult.getHost()).thenReturn(hostSpec); + when(mockReaderResult.getHost()).thenReturn(defaultHosts.get(1)); initializePlugin(); plugin.initHostProvider( - "initialUrl", mockHostListProviderService, mockInitHostProviderFunc, () -> mockReaderFailoverHandler, @@ -248,14 +246,14 @@ void test_failoverReader_withValidFailedHostSpec_successFailover() throws SQLExc final FailoverConnectionPlugin spyPlugin = spy(plugin); doNothing().when(spyPlugin).updateTopology(true); - spyPlugin.failoverReader(mockHostSpec); + assertThrows(FailoverSuccessSQLException.class, () -> spyPlugin.failoverReader(mockHostSpec)); - verify(mockReaderFailoverHandler).failover(eq(hosts), eq(mockHostSpec)); - verify(mockPluginService).setCurrentConnection(eq(mockConnection), eq(hostSpec)); + verify(mockReaderFailoverHandler).failover(eq(defaultHosts), eq(mockHostSpec)); + verify(mockPluginService).setCurrentConnection(eq(mockConnection), eq(defaultHosts.get(1))); } @Test - void test_failoverReader_withVNoFailedHostSpec_withException() throws SQLException { + void test_failoverReader_withNoFailedHostSpec_withException() throws SQLException { final HostSpec hostSpec = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("hostA") .build(); final List hosts = Collections.singletonList(hostSpec); @@ -269,7 +267,6 @@ void test_failoverReader_withVNoFailedHostSpec_withException() throws SQLExcepti initializePlugin(); plugin.initHostProvider( - "initialUrl", mockHostListProviderService, mockInitHostProviderFunc, () -> mockReaderFailoverHandler, @@ -292,7 +289,6 @@ void test_failoverWriter_failedFailover_throwsException() throws SQLException { initializePlugin(); plugin.initHostProvider( - "initialUrl", mockHostListProviderService, mockInitHostProviderFunc, () -> mockReaderFailoverHandler, @@ -315,7 +311,6 @@ void test_failoverWriter_failedFailover_withNoResult() throws SQLException { initializePlugin(); plugin.initHostProvider( - "initialUrl", mockHostListProviderService, mockInitHostProviderFunc, () -> mockReaderFailoverHandler, @@ -331,26 +326,19 @@ void test_failoverWriter_failedFailover_withNoResult() throws SQLException { @Test void test_failoverWriter_successFailover() throws SQLException { - final HostSpec hostSpec = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("hostA") - .build(); - final List hosts = Collections.singletonList(hostSpec); - when(mockHostSpec.getAliases()).thenReturn(new HashSet<>(Arrays.asList("alias1", "alias2"))); - when(mockPluginService.getAllHosts()).thenReturn(hosts); - when(mockPluginService.getHosts()).thenReturn(hosts); initializePlugin(); plugin.initHostProvider( - "initialUrl", mockHostListProviderService, mockInitHostProviderFunc, () -> mockReaderFailoverHandler, () -> mockWriterFailoverHandler); - final SQLException exception = assertThrows(SQLException.class, () -> plugin.failoverWriter()); - assertEquals(SqlState.CONNECTION_UNABLE_TO_CONNECT.getState(), exception.getSQLState()); + final SQLException exception = assertThrows(FailoverSuccessSQLException.class, () -> plugin.failoverWriter()); + assertEquals(SqlState.COMMUNICATION_LINK_CHANGED.getState(), exception.getSQLState()); - verify(mockWriterFailoverHandler).failover(eq(hosts)); + verify(mockWriterFailoverHandler).failover(eq(defaultHosts)); } @Test @@ -442,43 +430,7 @@ void test_execute_withDirectExecute() throws SQLException { private void initializePlugin() { plugin = new FailoverConnectionPlugin(mockPluginService, properties); - } - - private static class FooHostListProvider implements HostListProvider, DynamicHostListProvider { - - @Override - public List refresh() { - return new ArrayList<>(); - } - - @Override - public List refresh(Connection connection) { - return new ArrayList<>(); - } - - @Override - public List forceRefresh() { - return new ArrayList<>(); - } - - @Override - public List forceRefresh(Connection connection) { - return new ArrayList<>(); - } - - @Override - public HostRole getHostRole(Connection conn) { - return HostRole.WRITER; - } - - @Override - public HostSpec identifyConnection(Connection connection) throws SQLException { - return new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("foo").build(); - } - - @Override - public String getClusterId() throws UnsupportedOperationException { - throw new UnsupportedOperationException(); - } + plugin.setWriterFailoverHandler(mockWriterFailoverHandler); + plugin.setReaderFailoverHandler(mockReaderFailoverHandler); } } diff --git a/wrapper/src/test/java/software/amazon/jdbc/util/ConnectionUrlParserTest.java b/wrapper/src/test/java/software/amazon/jdbc/util/ConnectionUrlParserTest.java index 6f08c2fa9..50875bda0 100644 --- a/wrapper/src/test/java/software/amazon/jdbc/util/ConnectionUrlParserTest.java +++ b/wrapper/src/test/java/software/amazon/jdbc/util/ConnectionUrlParserTest.java @@ -110,41 +110,6 @@ void testParsingUrlWithNestedParams(final String url, final String expected) { assertEquals(expected, props.getProperty("param")); } - @ParameterizedTest - @MethodSource("getTestParseHostArgs") - void testParseHost(final String connString, final String expected) { - assertEquals(expected, ConnectionUrlParser.parseHost(connString)); - } - - private static Stream getTestParseHostArgs() { - return Stream.of( - Arguments.of( - "jdbc:postgresql://mydb.cluster-XYZ.us-east-2.rds.amazonaws.com", - "mydb.cluster-XYZ.us-east-2.rds.amazonaws.com"), - Arguments.of( - "jdbc:postgresql://mydb.cluster-XYZ.us-east-2.rds.amazonaws.com:5432", - "mydb.cluster-XYZ.us-east-2.rds.amazonaws.com"), - Arguments.of( - "jdbc:postgresql://mydb.cluster-XYZ.us-east-2.rds.amazonaws.com:5432?connectTimeout=30000", - "mydb.cluster-XYZ.us-east-2.rds.amazonaws.com"), - Arguments.of( - "jdbc:postgresql://mydb.cluster-XYZ.us-east-2.rds.amazonaws.com/test", - "mydb.cluster-XYZ.us-east-2.rds.amazonaws.com"), - Arguments.of( - "jdbc:postgresql://mydb.cluster-XYZ.us-east-2.rds.amazonaws.com:5432/test", - "mydb.cluster-XYZ.us-east-2.rds.amazonaws.com"), - Arguments.of( - "jdbc:postgresql://mydb.cluster-XYZ.us-east-2.rds.amazonaws.com:5432?connectTimeout=30000/test", - "mydb.cluster-XYZ.us-east-2.rds.amazonaws.com"), - Arguments.of( - "jdbc:postgresql://mydb.cluster-ro-XYZ.us-east-2.rds.amazonaws.com:5432?connectTimeout=30000/test", - "mydb.cluster-ro-XYZ.us-east-2.rds.amazonaws.com"), - Arguments.of( - "jdbc:postgresql://mydb.cluster-XYZ.rds.cn-northwest-1.amazonaws.com.cn:5432/test", - "mydb.cluster-XYZ.rds.cn-northwest-1.amazonaws.com.cn") - ); - } - private static Stream testGetHostsFromConnectionUrlArguments() { return Stream.of( Arguments.of("protocol//", new ArrayList()), From f1b48359a71b1f5a087a99ccae7ab85bb7427b70 Mon Sep 17 00:00:00 2001 From: aaron-congo Date: Fri, 20 Dec 2024 14:46:17 -0800 Subject: [PATCH 09/10] Fix bug where original readers were not being selected after verifying that they have READER role --- .../failover2/FailoverConnectionPlugin.java | 19 +++++++++++++------ ..._advanced_jdbc_wrapper_messages.properties | 1 + 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java index 7c5ad8148..7b537f0e8 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java @@ -384,6 +384,7 @@ protected ReaderFailoverResult getReaderFailoverConnection(long failoverEndTimeN boolean isOriginalWriterStillWriter = false; do { + // First, try all original readers final Set remainingReaders = new HashSet<>(readerCandidates); while (!remainingReaders.isEmpty() && System.nanoTime() < failoverEndTimeNano) { HostSpec readerCandidate; @@ -414,12 +415,19 @@ protected ReaderFailoverResult getReaderFailoverConnection(long failoverEndTimeN } // Since the roles in the host list might not be accurate, we execute a query to check the instance's role. - if (this.pluginService.getHostRole(candidateConn) == HostRole.WRITER) { + HostRole role = this.pluginService.getHostRole(candidateConn); + if (role == HostRole.READER) { + return new ReaderFailoverResult(candidateConn, readerCandidate); + } else if (role == HostRole.WRITER) { // The reader candidate is actually a writer, which is not valid when failoverMode is STRICT_READER. // We will remove it from the list of reader candidates and remaining readers to avoid retrying it. readerCandidates.remove(readerCandidate); remainingReaders.remove(readerCandidate); candidateConn.close(); + } else { + LOGGER.fine( + Messages.get("Failover.strictReaderUnknownHostRole", new Object[]{readerCandidate.getUrl()})); + readerCandidates.remove(readerCandidate); } } catch (SQLException ex) { remainingReaders.remove(readerCandidate); @@ -450,14 +458,13 @@ protected ReaderFailoverResult getReaderFailoverConnection(long failoverEndTimeN HostRole role = this.pluginService.getHostRole(candidateConn); if (role == HostRole.READER) { return new ReaderFailoverResult(candidateConn, originalWriter); - } - - // We are in STRICT_READER mode and the connection is not a reader, so it is not valid. - - if (role == HostRole.WRITER) { + } else if (role == HostRole.WRITER) { isOriginalWriterStillWriter = true; + } else { + LOGGER.fine(Messages.get("Failover.strictReaderUnknownHostRole", new Object[]{originalWriter.getUrl()})); } + // We are in STRICT_READER mode and the connection is not a reader, so it is not valid. candidateConn.close(); } catch (SQLException ex) { LOGGER.fine(Messages.get("Failover.failedReaderConnection", new Object[]{originalWriter.getUrl()})); diff --git a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties index ec42fb3c0..8beaef8ad 100644 --- a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties +++ b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties @@ -184,6 +184,7 @@ Failover.unableToConnectToWriter=Unable to establish SQL connection to the write Failover.unableToConnectToReader=Unable to establish SQL connection to the reader instance. Failover.unableToRefreshHostList=The request to discover the new topology timed out or was unsuccessful. Failover.unexpectedReaderRole=The new writer was identified to be ''{0}'', but querying the instance for its role returned a role of {1}. +Failover.strictReaderUnknownHostRole=Unable to determine host role for ''{0}''. Since failover mode is set to STRICT_READER and the host may be a writer, it will not be selected for reader failover. Failover.detectedException=Detected an exception while executing a command: {0} Failover.failoverDisabled=Cluster-aware failover is disabled. Failover.establishedConnection=Connected to: {0} From 4ec5a6f79b2ecda80a5d78e4e8ce150688002ff7 Mon Sep 17 00:00:00 2001 From: aaron-congo Date: Fri, 20 Dec 2024 15:49:26 -0800 Subject: [PATCH 10/10] Update host role if it has changed after reader failover --- .../failover2/FailoverConnectionPlugin.java | 40 +++++++++---------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java index 7b537f0e8..beafb4179 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java @@ -410,24 +410,24 @@ protected ReaderFailoverResult getReaderFailoverConnection(long failoverEndTimeN try { Connection candidateConn = this.pluginService.connect(readerCandidate, copyProp); - if (this.failoverMode != STRICT_READER) { - return new ReaderFailoverResult(candidateConn, readerCandidate); - } - // Since the roles in the host list might not be accurate, we execute a query to check the instance's role. HostRole role = this.pluginService.getHostRole(candidateConn); - if (role == HostRole.READER) { - return new ReaderFailoverResult(candidateConn, readerCandidate); - } else if (role == HostRole.WRITER) { + if (role == HostRole.READER || this.failoverMode != STRICT_READER) { + HostSpec updatedHostSpec = new HostSpec(readerCandidate, role); + return new ReaderFailoverResult(candidateConn, updatedHostSpec); + } + + // The role is WRITER or UNKNOWN, and we are in STRICT_READER mode, so the connection is not valid. + remainingReaders.remove(readerCandidate); + candidateConn.close(); + + if (role == HostRole.WRITER) { // The reader candidate is actually a writer, which is not valid when failoverMode is STRICT_READER. - // We will remove it from the list of reader candidates and remaining readers to avoid retrying it. + // We will remove it from the list of reader candidates to avoid retrying it in future iterations. readerCandidates.remove(readerCandidate); - remainingReaders.remove(readerCandidate); - candidateConn.close(); } else { LOGGER.fine( Messages.get("Failover.strictReaderUnknownHostRole", new Object[]{readerCandidate.getUrl()})); - readerCandidates.remove(readerCandidate); } } catch (SQLException ex) { remainingReaders.remove(readerCandidate); @@ -450,22 +450,20 @@ protected ReaderFailoverResult getReaderFailoverConnection(long failoverEndTimeN // Try the original writer, which may have been demoted to a reader. try { Connection candidateConn = this.pluginService.connect(originalWriter, copyProp); - if (this.failoverMode != STRICT_READER) { - return new ReaderFailoverResult(candidateConn, originalWriter); + HostRole role = this.pluginService.getHostRole(candidateConn); + if (role == HostRole.READER || this.failoverMode != STRICT_READER) { + HostSpec updatedHostSpec = new HostSpec(originalWriter, role); + return new ReaderFailoverResult(candidateConn, updatedHostSpec); } - // We are in STRICT_READER mode, so we need to verify the host's role. - HostRole role = this.pluginService.getHostRole(candidateConn); - if (role == HostRole.READER) { - return new ReaderFailoverResult(candidateConn, originalWriter); - } else if (role == HostRole.WRITER) { + // The role is WRITER or UNKNOWN, and we are in STRICT_READER mode, so the connection is not valid. + candidateConn.close(); + + if (role == HostRole.WRITER) { isOriginalWriterStillWriter = true; } else { LOGGER.fine(Messages.get("Failover.strictReaderUnknownHostRole", new Object[]{originalWriter.getUrl()})); } - - // We are in STRICT_READER mode and the connection is not a reader, so it is not valid. - candidateConn.close(); } catch (SQLException ex) { LOGGER.fine(Messages.get("Failover.failedReaderConnection", new Object[]{originalWriter.getUrl()})); }