Skip to content

Commit

Permalink
Merge pull request #422 from Ladicek/resp-version
Browse files Browse the repository at this point in the history
add configuration option for the preferred protocol version to use during handshake
  • Loading branch information
Ladicek authored Dec 6, 2023
2 parents 305269e + 8c31dba commit d19e984
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 2 deletions.
10 changes: 9 additions & 1 deletion src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,15 @@ It is important to note that, the reconnect will create a new connection object,

== Protocol Parser

This client supports both `RESP2` and `RESP3` protocols, at the connection handshake time the client will automatically detect which version is supported by the server and use it.
This client supports both `RESP2` and `RESP3` protocols.
By default, the client attempts to negotiate support for `RESP3` at connection handshake time.

It is possible to use the {@link io.vertx.redis.client.RedisOptions#setPreferredProtocolVersion} method to select the preferred version, `RESP2` or `RESP3`:

[source,$lang]
----
{@link examples.RedisExamples#preferredProtocolVersion1}
----

The parser internally creates an "infinite" readable buffer from all the chunks received from the server, in order to avoid creating too much garbage in terms of memory collection, a tunnable watermark value is configurable at JVM startup time.
The system property `io.vertx.redis.parser.watermark` defines how much data is keept in this readable buffer before it gets discarded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, RedisCo
obj.setProtocolNegotiation((Boolean)member.getValue());
}
break;
case "preferredProtocolVersion":
if (member.getValue() instanceof String) {
obj.setPreferredProtocolVersion(io.vertx.redis.client.ProtocolVersion.valueOf((String)member.getValue()));
}
break;
case "password":
if (member.getValue() instanceof String) {
obj.setPassword((String)member.getValue());
Expand Down Expand Up @@ -76,6 +81,9 @@ static void toJson(RedisConnectOptions obj, JsonObject json) {
static void toJson(RedisConnectOptions obj, java.util.Map<String, Object> json) {
json.put("maxNestedArrays", obj.getMaxNestedArrays());
json.put("protocolNegotiation", obj.isProtocolNegotiation());
if (obj.getPreferredProtocolVersion() != null) {
json.put("preferredProtocolVersion", obj.getPreferredProtocolVersion().name());
}
if (obj.getPassword() != null) {
json.put("password", obj.getPassword());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, RedisOp
obj.setProtocolNegotiation((Boolean)member.getValue());
}
break;
case "preferredProtocolVersion":
if (member.getValue() instanceof String) {
obj.setPreferredProtocolVersion(io.vertx.redis.client.ProtocolVersion.valueOf((String)member.getValue()));
}
break;
case "poolName":
if (member.getValue() instanceof String) {
obj.setPoolName((String)member.getValue());
Expand Down Expand Up @@ -173,6 +178,9 @@ static void toJson(RedisOptions obj, java.util.Map<String, Object> json) {
json.put("password", obj.getPassword());
}
json.put("protocolNegotiation", obj.isProtocolNegotiation());
if (obj.getPreferredProtocolVersion() != null) {
json.put("preferredProtocolVersion", obj.getPreferredProtocolVersion().name());
}
if (obj.getPoolName() != null) {
json.put("poolName", obj.getPoolName());
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/examples/RedisExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,8 @@ public void example13(Vertx vertx) {
public void tracing1(RedisOptions options) {
options.setTracingPolicy(TracingPolicy.ALWAYS);
}

public void preferredProtocolVersion1(RedisOptions options) {
options.setPreferredProtocolVersion(ProtocolVersion.RESP2);
}
}
29 changes: 29 additions & 0 deletions src/main/java/io/vertx/redis/client/ProtocolVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.vertx.redis.client;

import io.vertx.codegen.annotations.VertxGen;

/**
* Redis protocol version to be used.
*/
@VertxGen
public enum ProtocolVersion {
/**
* RESP2
*/
RESP2("2"),
/**
* RESP3
*/
RESP3("3"),
;

private final String value;

ProtocolVersion(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}
25 changes: 25 additions & 0 deletions src/main/java/io/vertx/redis/client/RedisConnectOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public abstract class RedisConnectOptions {
private List<String> endpoints;
private int maxNestedArrays;
private boolean protocolNegotiation;
private ProtocolVersion preferredProtocolVersion;
private int maxWaitingHandlers;

private void init() {
Expand All @@ -41,6 +42,7 @@ private void init() {
public RedisConnectOptions(RedisOptions options) {
setEndpoints(new ArrayList<>(options.getEndpoints()));
setProtocolNegotiation(options.isProtocolNegotiation());
setPreferredProtocolVersion(options.getPreferredProtocolVersion());
setMaxNestedArrays(options.getMaxNestedArrays());
setPassword(options.getPassword());
setMaxWaitingHandlers(options.getMaxWaitingHandlers());
Expand All @@ -55,6 +57,7 @@ public RedisConnectOptions(RedisConnectOptions other) {
init();
this.maxNestedArrays = other.maxNestedArrays;
this.protocolNegotiation = other.protocolNegotiation;
this.preferredProtocolVersion = other.preferredProtocolVersion;
this.password = other.password;
this.endpoints = new ArrayList<>(other.endpoints);
this.maxWaitingHandlers = other.maxWaitingHandlers;
Expand Down Expand Up @@ -111,6 +114,28 @@ public RedisConnectOptions setProtocolNegotiation(boolean protocolNegotiation) {
return this;
}

/**
* Returns the preferred protocol version to be used during protocol negotiation. When not set,
* defaults to RESP 3. When protocol negotiation is disabled, this setting has no effect.
*
* @return preferred protocol version
*/
public ProtocolVersion getPreferredProtocolVersion() {
return preferredProtocolVersion;
}

/**
* Sets the preferred protocol version to be used during protocol negotiation. When not set,
* defaults to RESP 3. When protocol negotiation is disabled, this setting has no effect.
*
* @param preferredProtocolVersion preferred protocol version
* @return fluent self
*/
public RedisConnectOptions setPreferredProtocolVersion(ProtocolVersion preferredProtocolVersion) {
this.preferredProtocolVersion = preferredProtocolVersion;
return this;
}

/**
* Get the default password for cluster/sentinel connections, if not set it will try to
* extract it from the current default endpoint.
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/io/vertx/redis/client/RedisOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class RedisOptions {
private RedisReplicas useReplicas;
private volatile String password;
private boolean protocolNegotiation;
private ProtocolVersion preferredProtocolVersion;
private long hashSlotCacheTTL;
private TracingPolicy tracingPolicy;

Expand Down Expand Up @@ -88,6 +89,7 @@ public RedisOptions(RedisOptions other) {
this.useReplicas = other.useReplicas;
this.password = other.password;
this.protocolNegotiation = other.protocolNegotiation;
this.preferredProtocolVersion = other.preferredProtocolVersion;
this.hashSlotCacheTTL = other.hashSlotCacheTTL;
this.maxWaitingHandlers = other.maxWaitingHandlers;
this.tracingPolicy = other.tracingPolicy;
Expand Down Expand Up @@ -524,6 +526,29 @@ public RedisOptions setProtocolNegotiation(boolean protocolNegotiation) {
return this;
}

/**
* Returns the preferred protocol version to be used during protocol negotiation. When not set,
* defaults to RESP 3. When protocol negotiation is disabled, this setting has no effect.
*
* @return preferred protocol version
*/
public ProtocolVersion getPreferredProtocolVersion() {
return preferredProtocolVersion;
}

/**
* Sets the preferred protocol version to be used during protocol negotiation. When not set,
* defaults to RESP 3. When protocol negotiation is disabled, this setting has no effect.
*
* @param preferredProtocolVersion preferred protocol version
* @return fluent self
*/
public RedisOptions setPreferredProtocolVersion(ProtocolVersion preferredProtocolVersion) {
this.preferredProtocolVersion = preferredProtocolVersion;
return this;
}


/**
* Set a user defined pool name (for metrics reporting).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,11 @@ private Future<Void> hello(ContextInternal ctx, RedisConnection connection, Redi
if (!options.isProtocolNegotiation()) {
return ping(ctx, connection);
} else {
Request hello = Request.cmd(Command.HELLO).arg(RESPParser.VERSION);
String version = RESPParser.VERSION;
if (options.getPreferredProtocolVersion() != null) {
version = options.getPreferredProtocolVersion().getValue();
}
Request hello = Request.cmd(Command.HELLO).arg(version);

String password = redisURI.password() != null ? redisURI.password() : options.getPassword();
String user = redisURI.user();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.vertx.redis.client.test;

import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.RunTestOnContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.ProtocolVersion;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.testcontainers.containers.GenericContainer;

@RunWith(VertxUnitRunner.class)
public class RedisProtocolVersionTest {
@ClassRule
public static final GenericContainer<?> redis = new GenericContainer<>("redis:7")
.withExposedPorts(6379);

@Rule
public final RunTestOnContext rule = new RunTestOnContext();

@Test
public void resp2(TestContext test) {
RedisOptions options = new RedisOptions()
.setConnectionString("redis://" + redis.getHost() + ":" + redis.getFirstMappedPort())
.setPreferredProtocolVersion(ProtocolVersion.RESP2);

Redis client = Redis.createClient(rule.vertx(), options);

Async async = test.async();
client
.send(Request.cmd(Command.DEL).arg("myhash"))
.flatMap(ignored -> {
return client.send(Request.cmd(Command.HSET).arg("myhash").arg("field1").arg(1).arg("field2").arg(2));
})
.flatMap(response -> {
test.assertEquals(2, response.toInteger());
return client.send(Request.cmd(Command.HGETALL).arg("myhash"));
})
.onSuccess(response -> {
test.assertTrue(response.toString().startsWith("[")); // list

test.assertTrue(response.containsKey("field1"));
test.assertEquals(1, response.get("field1").toInteger());

test.assertTrue(response.containsKey("field2"));
test.assertEquals(2, response.get("field2").toInteger());

test.assertEquals("field1", response.get(0).toString());

client.close();
async.complete();
}).onFailure(test::fail);
}

@Test
public void resp3(TestContext test) {
RedisOptions options = new RedisOptions()
.setConnectionString("redis://" + redis.getHost() + ":" + redis.getFirstMappedPort())
.setPreferredProtocolVersion(ProtocolVersion.RESP3);

Redis client = Redis.createClient(rule.vertx(), options);

Async async = test.async();
client
.send(Request.cmd(Command.DEL).arg("myhash"))
.flatMap(ignored -> {
return client.send(Request.cmd(Command.HSET).arg("myhash").arg("field1").arg(3).arg("field2").arg(4));
})
.flatMap(response -> {
test.assertEquals(2, response.toInteger());
return client.send(Request.cmd(Command.HGETALL).arg("myhash"));
})
.onSuccess(response -> {
test.assertTrue(response.toString().startsWith("{")); // map

test.assertTrue(response.containsKey("field1"));
test.assertEquals(3, response.get("field1").toInteger());

test.assertTrue(response.containsKey("field2"));
test.assertEquals(4, response.get("field2").toInteger());

try {
response.get(0);
test.fail("Map-typed Multi should fail on get(int)");
} catch (Exception expected) {
}

client.close();
async.complete();
}).onFailure(test::fail);
}
}

0 comments on commit d19e984

Please sign in to comment.