Skip to content

Commit

Permalink
Fix/limitless dialect check + refactor synchronous router fetch (#1148)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronchung-bitquill authored Oct 16, 2024
1 parent cd6f739 commit 7c5f391
Show file tree
Hide file tree
Showing 17 changed files with 480 additions and 210 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### :bug: Fixed
- Use the cluster URL as the default cluster ID ([PR #1131](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1131)).
- Fix logic in SlidingExpirationCache and SlidingExpirationCacheWithCleanupThread ([PR #1142](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1142)).
- Limitless Connection Plugin to check dialect and attempt recovery in case an unsupported dialect is encountered ([PR #1148](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1148)).

### :crab: Changed
- Updated expected URL patterns for Limitless Databases ([PR #1147](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1147)).

## [2.4.0] - 2024-09-25

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Amazon Aurora Limitless Database is a new type of database that can horizontally scale to handle millions of write transactions per second and manage petabytes of data.
Users will be able to use the AWS JDBC Driver with Aurora Limitless Databases and optimize their experience using the Limitless Connection Plugin.
To learn more about Aurora Limitless Database, see the [Amazon Aurora Limitless documentation](// TODO).
To learn more about Aurora Limitless Database, see the [Amazon Aurora Limitless documentation](https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-aurora-limitless-database/).

## Why use the Limitless Connection Plugin?

Expand Down
4 changes: 4 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/HostSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ public long getWeight() {
return this.weight;
}

public void setWeight(long weight) {
this.weight = weight;
}

public void addAlias(final String... alias) {
if (alias == null || alias.length < 1) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.jdbc.dialect;

public interface AuroraLimitlessDialect extends Dialect {
String getLimitlessRouterEndpointQuery();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Suitable for the following AWS PG configurations.
* - Regional Cluster
*/
public class AuroraPgDialect extends PgDialect {
public class AuroraPgDialect extends PgDialect implements AuroraLimitlessDialect {
private static final Logger LOGGER = Logger.getLogger(AuroraPgDialect.class.getName());

private static final String extensionsSql =
Expand All @@ -53,6 +53,8 @@ public class AuroraPgDialect extends PgDialect {

private static final String NODE_ID_QUERY = "SELECT aurora_db_instance_identifier()";
private static final String IS_READER_QUERY = "SELECT pg_is_in_recovery()";
protected static final String LIMITLESS_ROUTER_ENDPOINT_QUERY =
"select router_endpoint, load from aurora_limitless_router_endpoints()";

@Override
public boolean isDialect(final Connection connection) {
Expand Down Expand Up @@ -149,4 +151,9 @@ public HostListProviderSupplier getHostListProvider() {
IS_READER_QUERY);
};
}

@Override
public String getLimitlessRouterEndpointQuery() {
return LIMITLESS_ROUTER_ENDPOINT_QUERY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ public Dialect getDialect(

if (driverProtocol.contains("postgresql")) {
RdsUrlType type = this.rdsHelper.identifyRdsType(host);
if (RdsUrlType.RDS_AURORA_LIMITLESS_DB_SHARD_GROUP.equals(type)) {
this.canUpdate = false;
this.dialectCode = DialectCodes.AURORA_PG;
this.dialect = knownDialectsByCode.get(DialectCodes.AURORA_PG);
return this.dialect;
}
if (type.isRdsCluster()) {
this.canUpdate = true;
this.dialectCode = DialectCodes.AURORA_PG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import software.amazon.jdbc.PluginService;
import software.amazon.jdbc.PropertyDefinition;
import software.amazon.jdbc.RoundRobinHostSelector;
import software.amazon.jdbc.dialect.AuroraLimitlessDialect;
import software.amazon.jdbc.dialect.Dialect;
import software.amazon.jdbc.hostavailability.HostAvailability;
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.Utils;
import software.amazon.jdbc.wrapper.HighestWeightHostSelector;

public class LimitlessConnectionPlugin extends AbstractConnectionPlugin {
Expand All @@ -62,15 +65,15 @@ public class LimitlessConnectionPlugin extends AbstractConnectionPlugin {
"limitlessConnectMaxRetries",
"5",
"Max number of connection retries the Limitless Connection Plugin will attempt.");

protected final PluginService pluginService;
protected final @NonNull Properties properties;
private final @NonNull Supplier<LimitlessRouterService> limitlessRouterServiceSupplier;
protected final Properties properties;
private final Supplier<LimitlessRouterService> limitlessRouterServiceSupplier;
private LimitlessRouterService limitlessRouterService;
private static final Set<String> subscribedMethods =
Collections.unmodifiableSet(new HashSet<String>() {
{
add("connect");
add("forceConnect");
}
});

Expand All @@ -86,7 +89,7 @@ public Set<String> getSubscribedMethods() {
public LimitlessConnectionPlugin(final PluginService pluginService, final @NonNull Properties properties) {
this(pluginService,
properties,
LimitlessRouterServiceImpl::new);
() -> new LimitlessRouterServiceImpl(pluginService));
}

public LimitlessConnectionPlugin(
Expand All @@ -109,58 +112,75 @@ public Connection connect(
return connectInternal(driverProtocol, hostSpec, props, isInitialConnection, connectFunc);
}

@Override
public Connection forceConnect(
private Connection connectInternal(
final @NonNull String driverProtocol,
final @NonNull HostSpec hostSpec,
final @NonNull Properties props,
final boolean isInitialConnection,
final @NonNull JdbcCallable<Connection, SQLException> forceConnectFunc)
throws SQLException {
return connectInternal(driverProtocol, hostSpec, props, isInitialConnection, forceConnectFunc);
final JdbcCallable<Connection, SQLException> connectFunc) throws SQLException {
final Dialect dialect = this.pluginService.getDialect();
if (dialect instanceof AuroraLimitlessDialect) {
return connectInternalWithDialect(driverProtocol, hostSpec, props, isInitialConnection, connectFunc);
} else {
return connectInternalWithoutDialect(driverProtocol, hostSpec, props, isInitialConnection, connectFunc);
}
}

private Connection connectInternal(
private Connection connectInternalWithDialect(
final @NonNull String driverProtocol,
final @NonNull HostSpec hostSpec,
final @NonNull Properties props,
final boolean isInitialConnection,
final JdbcCallable<Connection,
SQLException> connectFunc) throws SQLException {
final JdbcCallable<Connection, SQLException> connectFunc) throws SQLException {

initLimitlessRouterMonitorService();
if (isInitialConnection) {
this.limitlessRouterService
.startMonitoring(pluginService, hostSpec, properties, INTERVAL_MILLIS.getInteger(properties));
.startMonitoring(hostSpec, properties, INTERVAL_MILLIS.getInteger(properties));
}

List<HostSpec> limitlessRouters = this.limitlessRouterService.getLimitlessRouters(
this.pluginService.getHostListProvider().getClusterId(), props);

if (limitlessRouters.isEmpty()) {
Connection conn = null;
if (Utils.isNullOrEmpty(limitlessRouters)) {
conn = connectFunc.call();
LOGGER.finest(Messages.get("LimitlessConnectionPlugin.limitlessRouterCacheEmpty"));
final boolean waitForRouterInfo = WAIT_F0R_ROUTER_INFO.getBoolean(props);
if (waitForRouterInfo) {
limitlessRouters = synchronouslyGetLimitlessRoutersWithRetry(props);
limitlessRouters = synchronouslyGetLimitlessRoutersWithRetry(conn, hostSpec.getPort(), props);
} else {
LOGGER.finest(Messages.get("LimitlessConnectionPlugin.usingProvidedConnectUrl"));
return connectFunc.call();
return conn;
}
} else if (limitlessRouters.contains(hostSpec)) {
}

if (limitlessRouters.contains(hostSpec)) {
LOGGER.finest(Messages.get("LimitlessConnectionPlugin.connectWithHost", new Object[] {hostSpec.getHost()}));
return connectFunc.call();
if (conn == null || conn.isClosed()) {
try {
conn = connectFunc.call();
} catch (final SQLException e) {
return retryConnectWithLeastLoadedRouters(limitlessRouters, props, conn, hostSpec);
}
}
return conn;
}

RoundRobinHostSelector.setRoundRobinHostWeightPairsProperty(props, limitlessRouters);
final HostSpec selectedHostSpec;
HostSpec selectedHostSpec;
try {
selectedHostSpec = this.pluginService.getHostSpecByStrategy(limitlessRouters,
HostRole.WRITER, RoundRobinHostSelector.STRATEGY_ROUND_ROBIN);
LOGGER.fine(Messages.get(
"LimitlessConnectionPlugin.selectedHost",
new Object[] {selectedHostSpec.getHost()}));
} catch (UnsupportedOperationException e) {
LOGGER.severe(Messages.get("LimitlessConnectionPlugin.incorrectConfiguration"));
throw e;
} catch (SQLException e) {
LOGGER.warning(Messages.get("LimitlessConnectionPlugin.errorSelectingRouter", new Object[] {e.getMessage()}));
if (conn == null || conn.isClosed()) {
conn = connectFunc.call();
}
return retryConnectWithLeastLoadedRouters(limitlessRouters, props, conn, hostSpec);
}

try {
Expand All @@ -170,78 +190,117 @@ private Connection connectInternal(
"LimitlessConnectionPlugin.failedToConnectToHost",
new Object[] {selectedHostSpec.getHost()}));
selectedHostSpec.setAvailability(HostAvailability.NOT_AVAILABLE);
if (conn == null || conn.isClosed()) {
conn = connectFunc.call();
}
// Retry connect prioritising healthiest router for best chance of connection over load-balancing with round-robin
return retryConnectWithLeastLoadedRouters(limitlessRouters, props, e);
return retryConnectWithLeastLoadedRouters(limitlessRouters, props, conn, hostSpec);
}
}

private Connection connectInternalWithoutDialect(
final @NonNull String driverProtocol,
final @NonNull HostSpec hostSpec,
final @NonNull Properties props,
final boolean isInitialConnection,
final JdbcCallable<Connection, SQLException> connectFunc) throws SQLException {

final Connection conn = connectFunc.call();

final Dialect dialect = this.pluginService.getDialect();
if (!(dialect instanceof AuroraLimitlessDialect)) {
throw new UnsupportedOperationException(Messages.get("LimitlessConnectionPlugin.unsupportedDialectOrDatabase",
new Object[] {dialect}));
}

initLimitlessRouterMonitorService();
if (isInitialConnection) {
this.limitlessRouterService
.startMonitoring(hostSpec, properties, INTERVAL_MILLIS.getInteger(properties));
}

List<HostSpec> limitlessRouters = this.limitlessRouterService.getLimitlessRouters(
this.pluginService.getHostListProvider().getClusterId(), props);
if (Utils.isNullOrEmpty(limitlessRouters)) {
LOGGER.finest(Messages.get("LimitlessConnectionPlugin.limitlessRouterCacheEmpty"));
final boolean waitForRouterInfo = WAIT_F0R_ROUTER_INFO.getBoolean(props);
if (waitForRouterInfo) {
synchronouslyGetLimitlessRoutersWithRetry(conn, hostSpec.getPort(), props);
}
}

return conn;
}

private void initLimitlessRouterMonitorService() {
if (limitlessRouterService == null) {
this.limitlessRouterService = this.limitlessRouterServiceSupplier.get();
}
}

private Connection retryConnectWithLeastLoadedRouters(final List<HostSpec> limitlessRouters,
final Properties props, final SQLException originalException) throws SQLException {
private Connection retryConnectWithLeastLoadedRouters(final List<HostSpec> limitlessRouters, final Properties props,
final Connection conn, final HostSpec hostSpec) throws SQLException {

List<HostSpec> currentRouters = limitlessRouters;
int retryCount = 0;
final int maxRetries = MAX_RETRIES.getInteger(props);

while (retryCount++ < maxRetries) {
if (!currentRouters.stream().anyMatch(h -> h.getAvailability().equals(HostAvailability.AVAILABLE))) {
currentRouters = synchronouslyGetLimitlessRoutersWithRetry(props);
if (currentRouters.stream().noneMatch(h -> h.getAvailability().equals(HostAvailability.AVAILABLE))) {
currentRouters = synchronouslyGetLimitlessRoutersWithRetry(conn, hostSpec.getPort(), props);
if (currentRouters == null

Check warning on line 251 in wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPlugin.java

View workflow job for this annotation

GitHub Actions / Qodana Community for JVM

Constant values

Condition `currentRouters == null` is always `false`
|| currentRouters.isEmpty()
|| !currentRouters.stream().anyMatch(h -> h.getAvailability().equals(HostAvailability.AVAILABLE))) {
throw new SQLException(
Messages.get("LimitlessConnectionPlugin.noRoutersAvailableForRetry"),
originalException);
|| currentRouters.stream().noneMatch(h -> h.getAvailability().equals(HostAvailability.AVAILABLE))) {
LOGGER.warning(Messages.get("LimitlessConnectionPlugin.noRoutersAvailableForRetry"));
return conn;
}
}

final HostSpec selectedHostSpec;
HostSpec selectedHostSpec = hostSpec;

Check warning on line 259 in wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPlugin.java

View workflow job for this annotation

GitHub Actions / Qodana Community for JVM

Unused assignment

Variable `selectedHostSpec` initializer `hostSpec` is redundant
try {
// Select healthiest router for best chance of connection over load-balancing with round-robin
selectedHostSpec = this.pluginService.getHostSpecByStrategy(limitlessRouters,
HostRole.WRITER, HighestWeightHostSelector.STRATEGY_HIGHEST_WEIGHT);
LOGGER.finest(Messages.get(
"LimitlessConnectionPlugin.selectedHostForRetry",
new Object[] {selectedHostSpec.getHost()}));
} catch (UnsupportedOperationException e) {
} catch (final UnsupportedOperationException e) {
LOGGER.severe(Messages.get("LimitlessConnectionPlugin.incorrectConfiguration"));
throw e;
} catch (final SQLException e) {
// error from host selector
continue;
}

try {
return pluginService.connect(selectedHostSpec, props);
} catch (SQLException e) {
} catch (final SQLException e) {
selectedHostSpec.setAvailability(HostAvailability.NOT_AVAILABLE);
LOGGER.finest(Messages.get(
"LimitlessConnectionPlugin.failedToConnectToHost",
new Object[] {selectedHostSpec.getHost()}));
}
}
throw new SQLException(Messages.get("LimitlessConnectionPlugin.noRoutersAvailableForRetry"), originalException);
LOGGER.warning(Messages.get("LimitlessConnectionPlugin.maxRetriesExceeded"));
return conn;
}

private List<HostSpec> synchronouslyGetLimitlessRoutersWithRetry(final Properties props) throws SQLException {
private List<HostSpec> synchronouslyGetLimitlessRoutersWithRetry(
final Connection conn, final int hostPort, final Properties props) throws SQLException {
LOGGER.finest(Messages.get("LimitlessConnectionPlugin.synchronouslyGetLimitlessRouters"));
int retryCount = -1; // start at -1 since the first try is not a retry.
int maxRetries = GET_ROUTER_MAX_RETRIES.getInteger(props);
int retryIntervalMs = GET_ROUTER_RETRY_INTERVAL_MILLIS.getInteger(props);
List<HostSpec> newLimitlessRouters = null;
do {
try {
newLimitlessRouters = this.limitlessRouterService.forceGetLimitlessRouters(
this.pluginService.getHostListProvider().getClusterId(), props);
List<HostSpec> newLimitlessRouters = this.limitlessRouterService.forceGetLimitlessRoutersWithConn(
conn, hostPort, props);
if (newLimitlessRouters != null && !newLimitlessRouters.isEmpty()) {
return newLimitlessRouters;
}
Thread.sleep(retryIntervalMs);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException(Messages.get("LimitlessConnectionPlugin.interruptedThread"));
} finally {
retryCount++;
}
Expand Down
Loading

0 comments on commit 7c5f391

Please sign in to comment.