Skip to content

Commit

Permalink
Merge pull request #283 from smukil/per_conn_consistency
Browse files Browse the repository at this point in the history
Allow client to set per-connection level consistency
  • Loading branch information
smukil authored Oct 21, 2019
2 parents 6dd3d57 + d12f861 commit bf9b5ca
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ public interface ConnectionFactory<CL> {

Connection<CL> createConnectionWithDataStore(HostConnectionPool<CL> pool)
throws DynoConnectException;

Connection<CL> createConnectionWithConsistencyLevel(HostConnectionPool<CL> pool, String consistency)
throws DynoConnectException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ enum CompressionStrategy {
*/
boolean isConnectToDatastore();

/**
* Returns 'true' if a connection-pool level consistency setting was provided.
* @return
*/
boolean isConnectionPoolConsistencyProvided();

boolean isFallbackEnabled();

/**
Expand Down Expand Up @@ -236,6 +242,13 @@ enum CompressionStrategy {
*/
int getPoolReconnectWaitMillis();

/**
* Returns the user-provided connection pool level consistency setting if provided.
*
* @return
*/
String getConnectionPoolConsistency();

String getHashtag();

ConnectionPoolConfiguration setLocalZoneAffinity(boolean condition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class ConnectionPoolConfigurationImpl implements ConnectionPoolConfigurat
private static final boolean DEFAULT_FALLBACK_POLICY = true;
private static final boolean DEFAULT_CONNECT_TO_DATASTORE = false;
private static final int DEFAULT_LOCK_VOTING_SIZE = -1;
private static final String UNSET_CONNECTION_POOL_CONSISTENCY = "UNSET_CONFIG";

private HostSupplier hostSupplier;
private TokenMapSupplier tokenSupplier;
Expand Down Expand Up @@ -87,6 +88,7 @@ public class ConnectionPoolConfigurationImpl implements ConnectionPoolConfigurat
private int lockVotingSize = DEFAULT_LOCK_VOTING_SIZE;
private boolean fallbackEnabled = DEFAULT_FALLBACK_POLICY;
private boolean connectToDatastore = DEFAULT_CONNECT_TO_DATASTORE;
private String connectionPoolConsistency = UNSET_CONNECTION_POOL_CONSISTENCY;


private RetryPolicyFactory retryFactory = new RetryPolicyFactory() {
Expand Down Expand Up @@ -146,6 +148,11 @@ public boolean isConnectToDatastore() {
return connectToDatastore;
}

@Override
public boolean isConnectionPoolConsistencyProvided() {
return !(this.connectionPoolConsistency.compareTo(UNSET_CONNECTION_POOL_CONSISTENCY) == 0);
}

@Override
public boolean isFallbackEnabled() {
return fallbackEnabled;
Expand Down Expand Up @@ -318,6 +325,10 @@ public void setConnectToDatastore(boolean connectToDatastore) {
this.connectToDatastore = connectToDatastore;
}

public void setConnectionPoolConsistency(String consistency) {
this.connectionPoolConsistency = consistency;
}

public ConnectionPoolConfigurationImpl setFallbackEnabled(boolean fallbackEnabled) {
this.fallbackEnabled = fallbackEnabled;
return this;
Expand Down Expand Up @@ -393,6 +404,10 @@ public ConnectionPoolConfigurationImpl setCompressionThreshold(int thresholdInBy
return this;
}

@Override
public String getConnectionPoolConsistency() {
return this.connectionPoolConsistency;
}

public HostSupplier getHostSupplier() {
return hostSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,15 @@ private ConnectionPoolActive(HostConnectionPoolImpl<CL> cp) {
public Connection<CL> createConnection() {

try {
Connection<CL> connection = cpConfig.isConnectToDatastore() ? connFactory.createConnectionWithDataStore(pool) :
connFactory.createConnection(pool);
Connection<CL> connection;
if (cpConfig.isConnectToDatastore()) {
connection = connFactory.createConnectionWithDataStore(pool);
} else if (cpConfig.isConnectionPoolConsistencyProvided()) {
connection = connFactory.createConnectionWithConsistencyLevel(pool, cpConfig.getConnectionPoolConsistency());
} else {
connection = connFactory.createConnection(pool);
}

connection.open();
availableConnections.add(connection);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ public Connection<TestClient> createConnection(HostConnectionPool<TestClient> po
public Connection<TestClient> createConnectionWithDataStore(HostConnectionPool<TestClient> pool) throws DynoConnectException {
return null;
}

@Override
public Connection<TestClient> createConnectionWithConsistencyLevel(HostConnectionPool<TestClient> pool, String consistency) throws DynoConnectException {
return null;
}

};

private Host host1 = new HostBuilder().setHostname("host1").setPort(8080).setRack("localRack").setStatus(Status.Up).createHost();
Expand Down Expand Up @@ -513,6 +519,12 @@ public <R> OperationResult<R> execute(Operation<TestClient, R> op) throws DynoEx
public Connection<TestClient> createConnectionWithDataStore(HostConnectionPool<TestClient> pool) throws DynoConnectException {
return null;
}

@Override
public Connection<TestClient> createConnectionWithConsistencyLevel(HostConnectionPool<TestClient> pool, String consistency) throws DynoConnectException {
return null;
}

};

final ConnectionPoolImpl<TestClient> pool = new ConnectionPoolImpl<TestClient>(badConnectionFactory, cpConfig, cpMonitor);
Expand Down Expand Up @@ -629,6 +641,12 @@ public <R> OperationResult<R> execute(Operation<TestClient, R> op) throws DynoEx
public Connection<TestClient> createConnectionWithDataStore(HostConnectionPool<TestClient> pool) throws DynoConnectException {
return null;
}

@Override
public Connection<TestClient> createConnectionWithConsistencyLevel(HostConnectionPool<TestClient> pool, String consistency) throws DynoConnectException {
return null;
}

};

final RetryNTimes retry = new RetryNTimes(3, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ public Connection<TestClient> createConnection(HostConnectionPool<TestClient> po
public Connection<TestClient> createConnectionWithDataStore(HostConnectionPool<TestClient> pool) throws DynoConnectException {
return null;
}

@Override
public Connection<TestClient> createConnectionWithConsistencyLevel(HostConnectionPool<TestClient> pool, String consistency) throws DynoConnectException {
return null;
}

};

private static ConnectionPoolConfigurationImpl config = new ConnectionPoolConfigurationImpl("TestClient");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public Connection<TestClient> createConnection(HostConnectionPool<TestClient> po
public Connection<TestClient> createConnectionWithDataStore(HostConnectionPool<TestClient> pool) throws DynoConnectException {
return null;
}

@Override
public Connection<TestClient> createConnectionWithConsistencyLevel(HostConnectionPool<TestClient> pool, String consistency) throws DynoConnectException {
return null;
}
};

private static ConnectionPoolConfigurationImpl config = new ConnectionPoolConfigurationImpl("TestClient");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

/*******************************************************************************
* Copyright 2018 Netflix
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/


package com.netflix.dyno.jedis;

import lombok.Getter;

import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.util.SafeEncoder;

@Getter
public enum DynoConfigCommand implements ProtocolCommand {
CONN_CONSISTENCY("DYNO_CONFIG:CONN_CONSISTENCY");

private final byte[] raw;

DynoConfigCommand(String opName) {
this.raw = SafeEncoder.encode(opName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4690,6 +4690,7 @@ public static class Builder {
private TokenMapSupplier tokenMapSupplier;
private TokenMapSupplier dualWriteTokenMapSupplier;
private boolean isDatastoreClient;
private String connectionPoolConsistency;

public Builder() {
}
Expand Down Expand Up @@ -4765,16 +4766,31 @@ public Builder isDatastoreClient(boolean isDatastoreClient) {
return this;
}

public Builder withConnectionPoolConsistency(String consistency) {
this.connectionPoolConsistency = consistency;
return this;
}

public DynoJedisClient build() {
assert (appName != null);
assert (clusterName != null);

// Make sure that the user doesn't set isDatastoreClient and connectionPoolConsistency together.
if (this.isDatastoreClient == true && this.connectionPoolConsistency != null) {
throw new DynoException("Cannot set isDatastoreClient(true) and also set withConnectionPoolConsistency() together");
}

if (cpConfig == null) {
cpConfig = new ArchaiusConnectionPoolConfiguration(appName);
Logger.info("Dyno Client runtime properties: " + cpConfig.toString());
}
cpConfig.setConnectToDatastore(isDatastoreClient);

// If a connection-pool level consistency setting was provided, add it here.
if (this.connectionPoolConsistency != null) {
cpConfig.setConnectionPoolConsistency(connectionPoolConsistency);
}

if (cpConfig.isDualWriteEnabled()) {
return buildDynoDualWriterClient();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,20 @@ public Connection<Jedis> createConnectionWithDataStore(HostConnectionPool<Jedis>
return new JedisConnection(pool, true);
}

@Override
public Connection<Jedis> createConnectionWithConsistencyLevel(HostConnectionPool<Jedis> pool, String consistency) {
JedisConnection connection = new JedisConnection(pool);
connection.setConsistencyLevel(consistency);
return connection;
}

// TODO: raghu compose redisconnection with jedisconnection in it
public class JedisConnection implements Connection<Jedis> {

private final HostConnectionPool<Jedis> hostPool;
private final Jedis jedisClient;
private final ConnectionContextImpl context = new ConnectionContextImpl();
private String consistencyLevel;

private DynoConnectException lastDynoException;

Expand Down Expand Up @@ -100,6 +108,14 @@ public JedisConnection(HostConnectionPool<Jedis> hostPool, boolean connectDataSt
}
}

public void setConsistencyLevel(String consistency) {
this.consistencyLevel = consistency;
}

public boolean isConsistencyLevelProvided() {
return this.consistencyLevel != null;
}

@Override
public <R> OperationResult<R> execute(Operation<Jedis, R> op) throws DynoException {

Expand Down Expand Up @@ -158,6 +174,9 @@ public Host getHost() {
@Override
public void open() throws DynoException {
jedisClient.connect();
if (isConsistencyLevelProvided()) {
jedisClient.getClient().sendCommand(DynoConfigCommand.CONN_CONSISTENCY, this.consistencyLevel);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public Connection<RedisAsyncConnection<String, String>> createConnectionWithData
throw new UnsupportedOperationException("");
}

@Override
public Connection<RedisAsyncConnection<String, String>> createConnectionWithConsistencyLevel(HostConnectionPool<RedisAsyncConnection<String, String>> pool, String consistency) throws DynoConnectException {
throw new UnsupportedOperationException("");
}

public static class RedissonConnection implements Connection<RedisAsyncConnection<String, String>> {

private final HostConnectionPool<RedisAsyncConnection<String, String>> hostPool;
Expand Down

0 comments on commit bf9b5ca

Please sign in to comment.