Skip to content

Commit

Permalink
fix: limitless plugin - add null checks (#1152)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronchung-bitquill authored Oct 18, 2024
1 parent 7c5f391 commit ed3bbe7
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private Connection connectInternalWithDialect(
try {
conn = connectFunc.call();
} catch (final SQLException e) {
return retryConnectWithLeastLoadedRouters(limitlessRouters, props, conn, hostSpec);
return retryConnectWithLeastLoadedRouters(limitlessRouters, props, null, hostSpec);
}
}
return conn;
Expand All @@ -174,7 +174,7 @@ private Connection connectInternalWithDialect(
HostRole.WRITER, RoundRobinHostSelector.STRATEGY_ROUND_ROBIN);
LOGGER.fine(Messages.get(
"LimitlessConnectionPlugin.selectedHost",
new Object[] {selectedHostSpec.getHost()}));
new Object[] {selectedHostSpec != null ? selectedHostSpec.getHost() : "null"}));
} catch (SQLException e) {
LOGGER.warning(Messages.get("LimitlessConnectionPlugin.errorSelectingRouter", new Object[] {e.getMessage()}));
if (conn == null || conn.isClosed()) {
Expand All @@ -186,10 +186,12 @@ private Connection connectInternalWithDialect(
try {
return pluginService.connect(selectedHostSpec, props);
} catch (SQLException e) {
LOGGER.fine(Messages.get(
"LimitlessConnectionPlugin.failedToConnectToHost",
new Object[] {selectedHostSpec.getHost()}));
selectedHostSpec.setAvailability(HostAvailability.NOT_AVAILABLE);
if (selectedHostSpec != null) {
LOGGER.fine(Messages.get(
"LimitlessConnectionPlugin.failedToConnectToHost",
new Object[] {selectedHostSpec.getHost()}));
selectedHostSpec.setAvailability(HostAvailability.NOT_AVAILABLE);
}
if (conn == null || conn.isClosed()) {
conn = connectFunc.call();
}
Expand Down Expand Up @@ -238,21 +240,31 @@ private void initLimitlessRouterMonitorService() {
}
}

private Connection retryConnectWithLeastLoadedRouters(final List<HostSpec> limitlessRouters, final Properties props,
final Connection conn, final HostSpec hostSpec) throws SQLException {
private Connection retryConnectWithLeastLoadedRouters(
final List<HostSpec> limitlessRouters,
final Properties props,
final Connection conn,
final HostSpec hostSpec) throws SQLException {

List<HostSpec> currentRouters = limitlessRouters;
int retryCount = 0;
final int maxRetries = MAX_RETRIES.getInteger(props);

while (retryCount++ < maxRetries) {
if (currentRouters.stream().noneMatch(h -> h.getAvailability().equals(HostAvailability.AVAILABLE))) {
currentRouters = synchronouslyGetLimitlessRoutersWithRetry(conn, hostSpec.getPort(), props);
if (conn != null && !conn.isClosed()) {
currentRouters = synchronouslyGetLimitlessRoutersWithRetry(conn, hostSpec.getPort(), props);
}

if (currentRouters == null

Check warning on line 259 in wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPlugin.java

View workflow job for this annotation

GitHub Actions / Qodana Community for JVM

Constant values

Condition `currentRouters == null` is always `false`
|| currentRouters.isEmpty()
|| currentRouters.stream().noneMatch(h -> h.getAvailability().equals(HostAvailability.AVAILABLE))) {
LOGGER.warning(Messages.get("LimitlessConnectionPlugin.noRoutersAvailableForRetry"));
return conn;
if (conn != null && !conn.isClosed()) {
return conn;
} else {
throw new SQLException(Messages.get("LimitlessConnectionPlugin.noRoutersAvailable"));
}
}
}

Expand Down Expand Up @@ -281,8 +293,12 @@ private Connection retryConnectWithLeastLoadedRouters(final List<HostSpec> limit
new Object[] {selectedHostSpec.getHost()}));
}
}
LOGGER.warning(Messages.get("LimitlessConnectionPlugin.maxRetriesExceeded"));
return conn;
if (conn != null && !conn.isClosed()) {
LOGGER.warning(Messages.get("LimitlessConnectionPlugin.maxRetriesExceeded"));
return conn;
} else {
throw new SQLException(Messages.get("LimitlessConnectionPlugin.maxRetriesExceeded"));
}
}

private List<HostSpec> synchronouslyGetLimitlessRoutersWithRetry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void run() {

while (!this.stopped.get()) {
TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext(
"node response time thread", TelemetryTraceLevel.TOP_LEVEL);
"limitless router monitor thread", TelemetryTraceLevel.TOP_LEVEL);
telemetryContext.setAttribute("url", hostSpec.getUrl());
try {
this.openConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
@FunctionalInterface
public interface LimitlessRouterMonitorInitializer {
LimitlessRouterMonitor createLimitlessRouterMonitor(
final PluginService pluginService,
final HostSpec hostSpec,
final SlidingExpirationCacheWithCleanupThread<String, List<HostSpec>> limitlessRouterCache,
final String limitlessRouterCacheKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,21 @@ public class LimitlessRouterServiceImpl implements LimitlessRouterService {
);

public LimitlessRouterServiceImpl(final @NonNull PluginService pluginService) {
this(pluginService, LimitlessRouterMonitor::new, new LimitlessQueryHelper(pluginService));
this(
pluginService,
(hostSpec,
routerCache,
routerCacheKey,
props,
intervalMs) ->
new LimitlessRouterMonitor(
pluginService,
hostSpec,
routerCache,
routerCacheKey,
props,
intervalMs),
new LimitlessQueryHelper(pluginService));
}

public LimitlessRouterServiceImpl(
Expand Down Expand Up @@ -127,7 +141,6 @@ public void startMonitoring(final @NonNull HostSpec hostSpec,
limitlessRouterMonitorKey,
key -> this.limitlessRouterMonitorInitializer
.createLimitlessRouterMonitor(
pluginService,
hostSpec,
limitlessRouterCache,
this.pluginService.getHostListProvider().getClusterId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public enum RdsUrlType {
RDS_CUSTOM_CLUSTER(true, true),
RDS_PROXY(true, false),
RDS_INSTANCE(true, false),
RDS_AURORA_LIMITLESS_DB_SHARD_GROUP(true, true),
RDS_AURORA_LIMITLESS_DB_SHARD_GROUP(true, false),
OTHER(false, false);

private final boolean isRds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ IamAuthConnectionPlugin.missingRequiredConfigParameter=Configuration parameter '

# Limitless Connection Plugin
LimitlessConnectionPlugin.connectWithHost=Connecting to host {0}.
LimitlessConnectionPlugin.usingProvidedConnectUrl=Connecting using provided connection URL.
LimitlessConnectionPlugin.errorSelectingRouter=An error occurred while selecting Limitless Transaction Router: {0}
LimitlessConnectionPlugin.failedToConnectToHost=Failed to connect to host {0}.
LimitlessConnectionPlugin.incorrectConfiguration=Limitless Connection Plugin is unable to run. Please ensure the connection settings are correct.
LimitlessConnectionPlugin.limitlessRouterCacheEmpty=Limitless Router cache is empty. This normal during application start up when the cache is not yet populated.
Expand All @@ -198,6 +198,7 @@ LimitlessConnectionPlugin.selectedHost=Host {0} has been selected.
LimitlessConnectionPlugin.selectedHostForRetry=Host {0} has been selected for connection retry.
LimitlessConnectionPlugin.synchronouslyGetLimitlessRouters=Fetching Limitless Routers synchronously.
LimitlessConnectionPlugin.unsupportedDialectOrDatabase=Unsupported dialect ''{0}'' encountered. Please ensure JDBC connection parameters are correct, and refer to the documentation to ensure that the connecting database is compatible with the Limitless Connection Plugin.
LimitlessConnectionPlugin.usingProvidedConnectUrl=Connecting using provided connection URL.

# Limitless Query Helper
LimitlessQueryHelper.unsupportedDialectOrDatabase=Unsupported dialect ''{0}'' encountered. Please ensure JDBC connection parameters are correct, and refer to the documentation to ensure that the connecting database is compatible with the Limitless Connection Plugin.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void testGetLimitlessRouters() throws SQLException {

final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl(
mockPluginService,
(a, b, c, d, e, f) -> mockLimitlessRouterMonitor,
(a, b, c, d, e) -> mockLimitlessRouterMonitor,
mockLimitlessQueryHelper);
limitlessRouterService.startMonitoring(hostSpec, props, intervalMs);
final List<HostSpec> actualEndpointHostSpecList = limitlessRouterService.getLimitlessRouters(CLUSTER_ID, props);
Expand Down Expand Up @@ -112,7 +112,7 @@ void testForceGetLimitlessRoutersWithConn() throws SQLException {

final LimitlessRouterServiceImpl limitlessRouterService = new LimitlessRouterServiceImpl(
mockPluginService,
(a, b, c, d, e, f) -> mockLimitlessRouterMonitor,
(a, b, c, d, e) -> mockLimitlessRouterMonitor,
mockLimitlessQueryHelper);

final List<HostSpec> actualHostSpecList = limitlessRouterService
Expand Down

0 comments on commit ed3bbe7

Please sign in to comment.