Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add configuration option for the preferred protocol version to use during handshake #422

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,15 @@ It is important to note that, the reconnect will create a new connection object,

== Protocol Parser

This client supports both `RESP2` and `RESP3` protocols, at the connection handshake time the client will automatically detect which version is supported by the server and use it.
This client supports both `RESP2` and `RESP3` protocols.
By default, the client attempts to negotiate support for `RESP3` at connection handshake time.

It is possible to use the {@link io.vertx.redis.client.RedisOptions#setPreferredProtocolVersion} method to select the preferred version, `RESP2` or `RESP3`:

[source,$lang]
----
{@link examples.RedisExamples#preferredProtocolVersion1}
----

The parser internally creates an "infinite" readable buffer from all the chunks received from the server, in order to avoid creating too much garbage in terms of memory collection, a tunnable watermark value is configurable at JVM startup time.
The system property `io.vertx.redis.parser.watermark` defines how much data is keept in this readable buffer before it gets discarded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, RedisCo
obj.setProtocolNegotiation((Boolean)member.getValue());
}
break;
case "preferredProtocolVersion":
if (member.getValue() instanceof String) {
obj.setPreferredProtocolVersion(io.vertx.redis.client.ProtocolVersion.valueOf((String)member.getValue()));
}
break;
case "password":
if (member.getValue() instanceof String) {
obj.setPassword((String)member.getValue());
Expand Down Expand Up @@ -76,6 +81,9 @@ static void toJson(RedisConnectOptions obj, JsonObject json) {
static void toJson(RedisConnectOptions obj, java.util.Map<String, Object> json) {
json.put("maxNestedArrays", obj.getMaxNestedArrays());
json.put("protocolNegotiation", obj.isProtocolNegotiation());
if (obj.getPreferredProtocolVersion() != null) {
json.put("preferredProtocolVersion", obj.getPreferredProtocolVersion().name());
}
if (obj.getPassword() != null) {
json.put("password", obj.getPassword());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, RedisOp
obj.setProtocolNegotiation((Boolean)member.getValue());
}
break;
case "preferredProtocolVersion":
if (member.getValue() instanceof String) {
obj.setPreferredProtocolVersion(io.vertx.redis.client.ProtocolVersion.valueOf((String)member.getValue()));
}
break;
case "poolName":
if (member.getValue() instanceof String) {
obj.setPoolName((String)member.getValue());
Expand Down Expand Up @@ -173,6 +178,9 @@ static void toJson(RedisOptions obj, java.util.Map<String, Object> json) {
json.put("password", obj.getPassword());
}
json.put("protocolNegotiation", obj.isProtocolNegotiation());
if (obj.getPreferredProtocolVersion() != null) {
json.put("preferredProtocolVersion", obj.getPreferredProtocolVersion().name());
}
if (obj.getPoolName() != null) {
json.put("poolName", obj.getPoolName());
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/examples/RedisExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,8 @@ public void example13(Vertx vertx) {
public void tracing1(RedisOptions options) {
options.setTracingPolicy(TracingPolicy.ALWAYS);
}

public void preferredProtocolVersion1(RedisOptions options) {
options.setPreferredProtocolVersion(ProtocolVersion.RESP2);
}
}
29 changes: 29 additions & 0 deletions src/main/java/io/vertx/redis/client/ProtocolVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.vertx.redis.client;

import io.vertx.codegen.annotations.VertxGen;

/**
* Redis protocol version to be used.
*/
@VertxGen
public enum ProtocolVersion {
/**
* RESP2
*/
RESP2("2"),
/**
* RESP3
*/
RESP3("3"),
;

private final String value;

ProtocolVersion(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}
25 changes: 25 additions & 0 deletions src/main/java/io/vertx/redis/client/RedisConnectOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public abstract class RedisConnectOptions {
private List<String> endpoints;
private int maxNestedArrays;
private boolean protocolNegotiation;
private ProtocolVersion preferredProtocolVersion;
private int maxWaitingHandlers;

private void init() {
Expand All @@ -41,6 +42,7 @@ private void init() {
public RedisConnectOptions(RedisOptions options) {
setEndpoints(new ArrayList<>(options.getEndpoints()));
setProtocolNegotiation(options.isProtocolNegotiation());
setPreferredProtocolVersion(options.getPreferredProtocolVersion());
setMaxNestedArrays(options.getMaxNestedArrays());
setPassword(options.getPassword());
setMaxWaitingHandlers(options.getMaxWaitingHandlers());
Expand All @@ -55,6 +57,7 @@ public RedisConnectOptions(RedisConnectOptions other) {
init();
this.maxNestedArrays = other.maxNestedArrays;
this.protocolNegotiation = other.protocolNegotiation;
this.preferredProtocolVersion = other.preferredProtocolVersion;
this.password = other.password;
this.endpoints = new ArrayList<>(other.endpoints);
this.maxWaitingHandlers = other.maxWaitingHandlers;
Expand Down Expand Up @@ -111,6 +114,28 @@ public RedisConnectOptions setProtocolNegotiation(boolean protocolNegotiation) {
return this;
}

/**
* Returns the preferred protocol version to be used during protocol negotiation. When not set,
* defaults to RESP 3. When protocol negotiation is disabled, this setting has no effect.
*
* @return preferred protocol version
*/
public ProtocolVersion getPreferredProtocolVersion() {
return preferredProtocolVersion;
}

/**
* Sets the preferred protocol version to be used during protocol negotiation. When not set,
* defaults to RESP 3. When protocol negotiation is disabled, this setting has no effect.
*
* @param preferredProtocolVersion preferred protocol version
* @return fluent self
*/
public RedisConnectOptions setPreferredProtocolVersion(ProtocolVersion preferredProtocolVersion) {
this.preferredProtocolVersion = preferredProtocolVersion;
return this;
}

/**
* Get the default password for cluster/sentinel connections, if not set it will try to
* extract it from the current default endpoint.
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/io/vertx/redis/client/RedisOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class RedisOptions {
private RedisReplicas useReplicas;
private volatile String password;
private boolean protocolNegotiation;
private ProtocolVersion preferredProtocolVersion;
private long hashSlotCacheTTL;
private TracingPolicy tracingPolicy;

Expand Down Expand Up @@ -88,6 +89,7 @@ public RedisOptions(RedisOptions other) {
this.useReplicas = other.useReplicas;
this.password = other.password;
this.protocolNegotiation = other.protocolNegotiation;
this.preferredProtocolVersion = other.preferredProtocolVersion;
this.hashSlotCacheTTL = other.hashSlotCacheTTL;
this.maxWaitingHandlers = other.maxWaitingHandlers;
this.tracingPolicy = other.tracingPolicy;
Expand Down Expand Up @@ -524,6 +526,29 @@ public RedisOptions setProtocolNegotiation(boolean protocolNegotiation) {
return this;
}

/**
* Returns the preferred protocol version to be used during protocol negotiation. When not set,
* defaults to RESP 3. When protocol negotiation is disabled, this setting has no effect.
*
* @return preferred protocol version
*/
public ProtocolVersion getPreferredProtocolVersion() {
return preferredProtocolVersion;
}

/**
* Sets the preferred protocol version to be used during protocol negotiation. When not set,
* defaults to RESP 3. When protocol negotiation is disabled, this setting has no effect.
*
* @param preferredProtocolVersion preferred protocol version
* @return fluent self
*/
public RedisOptions setPreferredProtocolVersion(ProtocolVersion preferredProtocolVersion) {
this.preferredProtocolVersion = preferredProtocolVersion;
return this;
}


/**
* Set a user defined pool name (for metrics reporting).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,11 @@ private Future<Void> hello(ContextInternal ctx, RedisConnection connection, Redi
if (!options.isProtocolNegotiation()) {
return ping(ctx, connection);
} else {
Request hello = Request.cmd(Command.HELLO).arg(RESPParser.VERSION);
String version = RESPParser.VERSION;
if (options.getPreferredProtocolVersion() != null) {
version = options.getPreferredProtocolVersion().getValue();
}
Request hello = Request.cmd(Command.HELLO).arg(version);

String password = redisURI.password() != null ? redisURI.password() : options.getPassword();
String user = redisURI.user();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.vertx.redis.client.test;

import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.RunTestOnContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.ProtocolVersion;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.testcontainers.containers.GenericContainer;

@RunWith(VertxUnitRunner.class)
public class RedisProtocolVersionTest {
@ClassRule
public static final GenericContainer<?> redis = new GenericContainer<>("redis:7")
.withExposedPorts(6379);

@Rule
public final RunTestOnContext rule = new RunTestOnContext();

@Test
public void resp2(TestContext test) {
RedisOptions options = new RedisOptions()
.setConnectionString("redis://" + redis.getHost() + ":" + redis.getFirstMappedPort())
.setPreferredProtocolVersion(ProtocolVersion.RESP2);

Redis client = Redis.createClient(rule.vertx(), options);

Async async = test.async();
client
.send(Request.cmd(Command.DEL).arg("myhash"))
.flatMap(ignored -> {
return client.send(Request.cmd(Command.HSET).arg("myhash").arg("field1").arg(1).arg("field2").arg(2));
})
.flatMap(response -> {
test.assertEquals(2, response.toInteger());
return client.send(Request.cmd(Command.HGETALL).arg("myhash"));
})
.onSuccess(response -> {
test.assertTrue(response.toString().startsWith("[")); // list

test.assertTrue(response.containsKey("field1"));
test.assertEquals(1, response.get("field1").toInteger());

test.assertTrue(response.containsKey("field2"));
test.assertEquals(2, response.get("field2").toInteger());

test.assertEquals("field1", response.get(0).toString());

client.close();
async.complete();
}).onFailure(test::fail);
}

@Test
public void resp3(TestContext test) {
RedisOptions options = new RedisOptions()
.setConnectionString("redis://" + redis.getHost() + ":" + redis.getFirstMappedPort())
.setPreferredProtocolVersion(ProtocolVersion.RESP3);

Redis client = Redis.createClient(rule.vertx(), options);

Async async = test.async();
client
.send(Request.cmd(Command.DEL).arg("myhash"))
.flatMap(ignored -> {
return client.send(Request.cmd(Command.HSET).arg("myhash").arg("field1").arg(3).arg("field2").arg(4));
})
.flatMap(response -> {
test.assertEquals(2, response.toInteger());
return client.send(Request.cmd(Command.HGETALL).arg("myhash"));
})
.onSuccess(response -> {
test.assertTrue(response.toString().startsWith("{")); // map

test.assertTrue(response.containsKey("field1"));
test.assertEquals(3, response.get("field1").toInteger());

test.assertTrue(response.containsKey("field2"));
test.assertEquals(4, response.get("field2").toInteger());

try {
response.get(0);
test.fail("Map-typed Multi should fail on get(int)");
} catch (Exception expected) {
}

client.close();
async.complete();
}).onFailure(test::fail);
}
}