Skip to content

Commit

Permalink
add support for TopK
Browse files Browse the repository at this point in the history
  • Loading branch information
gkorland committed Jul 4, 2019
1 parent 141cb42 commit b4257c5
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.redislabs</groupId>
<artifactId>jrebloom</artifactId>
<version>1.3.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>

<name>JReBloom</name>
<description>Official client for ReBloom</description>
Expand Down
118 changes: 116 additions & 2 deletions src/main/java/io/rebloom/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.Pool;
import redis.clients.jedis.util.SafeEncoder;

import java.io.Closeable;
import java.util.*;
import java.util.stream.Collectors;

/**
* Client is the main ReBloom client class, wrapping connection management and all ReBloom commands
Expand All @@ -20,13 +24,20 @@ Jedis _conn() {
return pool.getResource();
}

private Connection sendCommand(Jedis conn, Command command, String ...args) {
private Connection sendCommand(Jedis conn, String key, ProtocolCommand command, String ...args) {
String[] fullArgs = new String[args.length + 1];
fullArgs[0] = key;
System.arraycopy(args, 0, fullArgs, 1, args.length);
return sendCommand(conn, command, fullArgs);
}

private Connection sendCommand(Jedis conn, ProtocolCommand command, String ...args) {
Connection client = conn.getClient();
client.sendCommand(command, args);
return client;
}

private Connection sendCommand(Jedis conn, Command command, byte[]... args) {
private Connection sendCommand(Jedis conn, ProtocolCommand command, byte[]... args) {
Connection client = conn.getClient();
client.sendCommand(command, args);
return client;
Expand Down Expand Up @@ -204,6 +215,109 @@ public boolean delete(String name) {
return conn.del(name) != 0;
}
}

/**
* TOPK.RESERVE key topk width depth decay
*
* Reserve a topk filter.
* @param key The key of the filter
* @param topk
* @param width
* @param depth
* @param decay
*
* Note that if a filter is not reserved, a new one is created when {@link #add(String, byte[])}
* is called.
*/
public void topkCreateFilter(String key, long topk, long width, long depth, double decay) {
try (Jedis conn = _conn()) {
String rep = sendCommand(conn, TopKCommand.RESERVE, SafeEncoder.encode(key), Protocol.toByteArray(topk),
Protocol.toByteArray(width), Protocol.toByteArray(depth),Protocol.toByteArray(decay))
.getStatusCodeReply();

if (!rep.equals("OK")) {
throw new JedisException(rep);
}
}
}

/**
* TOPK.ADD key item [item ...]
*
* Adds an item to the filter
* @param key The key of the filter
* @param items The items to add to the filter
* @return list of items dropped from the list.
*/
public List<String> topkAdd(String key, String ...items) {
try (Jedis conn = _conn()) {
return sendCommand(conn, key, TopKCommand.ADD, items).getMultiBulkReply();
}
}

/**
* TOPK.INCRBY key item increment [item increment ...]
*
* Adds an item to the filter
* @param key The key of the filter
* @param item The item to to increment
* @return list of items dropped from the list.
*/
public String topkIncrBy(String key, String item, long increment) {
try (Jedis conn = _conn()) {
return sendCommand(conn, TopKCommand.INCRBY, SafeEncoder.encode(key), SafeEncoder.encode(item), Protocol.toByteArray(increment))
.getMultiBulkReply().get(0);
}
}

/**
* TOPK.QUERY key item [item ...]
*
* Checks whether an item is one of Top-K items.
*
* @param key The key of the filter
* @param items The items to check in the list
* @return list of indicator for each item requested
*/
public List<Boolean> topkQuery(String key, String ...items) {
try (Jedis conn = _conn()) {
return sendCommand(conn, key, TopKCommand.QUERY, items)
.getIntegerMultiBulkReply()
.stream().map(s -> s!=0)
.collect(Collectors.toList());
}
}

/**
* TOPK.COUNT key item [item ...]
*
* Returns count for an item.
*
* @param key The key of the filter
* @param items The items to check in the list
* @return list of counters per item.
*/
public List<Long> topkCount(String key, String ...items) {
try (Jedis conn = _conn()) {
return sendCommand(conn, key, TopKCommand.COUNT, items)
.getIntegerMultiBulkReply();
}
}

/**
* TOPK.LIST key
*
* Return full list of items in Top K list.
*
* @param key The key of the filter
* @return list of items in the list.
*/
public List<String> topkList(String key) {
try (Jedis conn = _conn()) {
return sendCommand(conn, TopKCommand.LIST, key)
.getMultiBulkReply();
}
}

@Override
public void close(){
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/rebloom/client/TopKCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.rebloom.client;

import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.util.SafeEncoder;

public enum TopKCommand implements ProtocolCommand {
RESERVE("TOPK.RESERVE"),
ADD("TOPK.ADD"),
INCRBY("TOPK.INCRBY"),
QUERY("TOPK.QUERY"),
COUNT("TOPK.COUNT"),
LIST("TOPK.LIST"),
INFO("TOPK.INFO");

private final byte[] raw;

TopKCommand(String alt) {
raw = SafeEncoder.encode(alt);
}

public byte[] getRaw() {
return raw;
}
}
17 changes: 17 additions & 0 deletions src/test/java/io/rebloom/client/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,22 @@ public void testExample() {
client.createFilter("specialBloom", 10000, 0.0001);
client.add("specialBloom", "foo");
}

@Test
public void createTopKFilter() {
cl.topkCreateFilter("aaa", 30, 2000, 7, 0.925);

assertEquals(Arrays.asList(null, null), cl.topkAdd("aaa", "bb", "cc"));

assertEquals(Arrays.asList(true, false, true), cl.topkQuery("aaa", "bb", "gg", "cc"));

assertEquals(Arrays.asList(1L, 0L, 1L), cl.topkCount("aaa", "bb", "gg", "cc"));

assertTrue( cl.topkList("aaa").stream().allMatch( s -> Arrays.asList("bb", "cc").contains(s) || s == null));

assertEquals(null, cl.topkIncrBy("aaa", "ff", 10));

assertTrue( cl.topkList("aaa").stream().allMatch( s -> Arrays.asList("bb", "cc", "ff").contains(s) || s == null));
}

}

2 comments on commit b4257c5

@joyang1
Copy link
Contributor

@joyang1 joyang1 commented on b4257c5 Jul 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gkorland, ClusterClient is need to add TopKCommand? If need, I can do it.

@gkorland
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joyang1 Yes that will be great, thanks!

Please sign in to comment.