Skip to content

Commit

Permalink
support TOPK command for Redis Cluster with Rebloom (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
joyang1 authored and gkorland committed Jul 8, 2019
1 parent 25b50bf commit ea16ca2
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 120 deletions.
20 changes: 0 additions & 20 deletions jrebloom.iml

This file was deleted.

38 changes: 19 additions & 19 deletions src/main/java/io/rebloom/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private Connection sendCommand(Jedis conn, String key, ProtocolCommand command,
System.arraycopy(args, 0, fullArgs, 1, args.length);
return sendCommand(conn, command, fullArgs);
}

private Connection sendCommand(Jedis conn, ProtocolCommand command, String ...args) {
Connection client = conn.getClient();
client.sendCommand(command, args);
Expand Down Expand Up @@ -215,10 +215,10 @@ public boolean delete(String name) {
return conn.del(name) != 0;
}
}

/**
* TOPK.RESERVE key topk width depth decay
*
*
* Reserve a topk filter.
* @param key The key of the filter
* @param topk
Expand All @@ -231,7 +231,7 @@ public boolean delete(String name) {
*/
public void topkCreateFilter(String key, long topk, long width, long depth, double decay) {
try (Jedis conn = _conn()) {
String rep = sendCommand(conn, TopKCommand.RESERVE, SafeEncoder.encode(key), Protocol.toByteArray(topk),
String rep = sendCommand(conn, TopKCommand.RESERVE, SafeEncoder.encode(key), Protocol.toByteArray(topk),
Protocol.toByteArray(width), Protocol.toByteArray(depth),Protocol.toByteArray(decay))
.getStatusCodeReply();

Expand All @@ -240,10 +240,10 @@ public void topkCreateFilter(String key, long topk, long width, long depth, doub
}
}
}

/**
* TOPK.ADD key item [item ...]
*
*
* Adds an item to the filter
* @param key The key of the filter
* @param items The items to add to the filter
Expand All @@ -254,27 +254,27 @@ public List<String> topkAdd(String key, String ...items) {
return sendCommand(conn, key, TopKCommand.ADD, items).getMultiBulkReply();
}
}

/**
* TOPK.INCRBY key item increment [item increment ...]
*
*
* Adds an item to the filter
* @param key The key of the filter
* @param item The item to to increment
* @return list of items dropped from the list.
* @param item The item to increment
* @return item dropped from the list.
*/
public String topkIncrBy(String key, String item, long increment) {
try (Jedis conn = _conn()) {
return sendCommand(conn, TopKCommand.INCRBY, SafeEncoder.encode(key), SafeEncoder.encode(item), Protocol.toByteArray(increment))
.getMultiBulkReply().get(0);
}
}

/**
* TOPK.QUERY key item [item ...]
*
*
* Checks whether an item is one of Top-K items.
*
*
* @param key The key of the filter
* @param items The items to check in the list
* @return list of indicator for each item requested
Expand All @@ -287,12 +287,12 @@ public List<Boolean> topkQuery(String key, String ...items) {
.collect(Collectors.toList());
}
}

/**
* TOPK.COUNT key item [item ...]
*
*
* Returns count for an item.
*
*
* @param key The key of the filter
* @param items The items to check in the list
* @return list of counters per item.
Expand All @@ -303,12 +303,12 @@ public List<Long> topkCount(String key, String ...items) {
.getIntegerMultiBulkReply();
}
}

/**
* TOPK.LIST key
*
*
* Return full list of items in Top K list.
*
*
* @param key The key of the filter
* @return list of items in the list.
*/
Expand Down
145 changes: 145 additions & 0 deletions src/main/java/io/rebloom/client/ClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.*;
import redis.clients.jedis.Client;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* @author TommyYang on 2018/12/17
Expand Down Expand Up @@ -87,12 +91,21 @@ public ClusterClient(Set<HostAndPort> jedisClusterNode, int connectionTimeout, i
}


private void sendCommand(Connection conn, String key, ProtocolCommand command, String ...args) {
String[] fullArgs = new String[args.length + 1];
fullArgs[0] = key;
System.arraycopy(args, 0, fullArgs, 1, args.length);
conn.sendCommand(command, fullArgs);
}

/**
* Reserve a bloom filter.
* @param name The key of the filter
* @param initCapacity Optimize for this many items
* @param errorRate The desired rate of false positives
*
* @return true if the filter create success, false if the filter create error.
*
* Note that if a filter is not reserved, a new one is created when {@link #add(String, byte[])}
* is called.
*/
Expand Down Expand Up @@ -224,6 +237,11 @@ public boolean[] execute(Jedis connection) {
}).run(name);
}

/**
* Remove the filter
* @param name
* @return true if delete the filter, false is not delete the filter
*/
public boolean delete(String name) {
return (new JedisClusterCommand<Boolean>(this.connectionHandler, this.maxAttempts) {
public Boolean execute(Jedis connection) {
Expand All @@ -234,7 +252,134 @@ public Boolean execute(Jedis connection) {
}).run(name);
}

/**
* TOPK.RESERVE key topk width depth decay
*
* Reserve a topk filter.
* @param key The key of the filter
* @param topk
* @param width
* @param depth
* @param decay
*
* Note that if a filter is not reserved, a new one is created when {@link #add(String, byte[])}
* is called.
*/
public void topkCreateFilter(String key, long topk, long width, long depth, double decay) {
(new JedisClusterCommand<Void>(this.connectionHandler, this.maxAttempts){
@Override
public Void execute(Jedis jedis) {
Connection conn = jedis.getClient();
conn.sendCommand(TopKCommand.RESERVE, SafeEncoder.encode(key), Protocol.toByteArray(topk),
Protocol.toByteArray(width), Protocol.toByteArray(depth),Protocol.toByteArray(decay));
String resp = conn.getStatusCodeReply();
if (!resp.equals("OK")){
throw new JedisException(resp);
}
return null;
}
}).run(key);
}

/**
* TOPK.ADD key item [item ...]
*
* Adds an item to the filter
* @param key The key of the filter
* @param items The items to add to the filter
* @return list of items dropped from the list.
*/
public List<String> topkAdd(String key, String ...items) {
return (new JedisClusterCommand<List<String>>(this.connectionHandler, this.maxAttempts){
@Override
public List<String> execute(Jedis jedis) {
Connection conn = jedis.getClient();
sendCommand(conn, key, TopKCommand.ADD, items);
return conn.getMultiBulkReply();
}
}).run(key);
}

/**
* TOPK.INCRBY key item increment [item increment ...]
*
* Adds an item to the filter
* @param key The key of the filter
* @param item The item to increment
* @return item dropped from the list.
*/
public String topkIncrBy(String key, String item, long increment) {
return (new JedisClusterCommand<String>(this.connectionHandler, this.maxAttempts){
@Override
public String execute(Jedis jedis) {
Connection conn = jedis.getClient();
conn.sendCommand(TopKCommand.INCRBY, SafeEncoder.encode(key), SafeEncoder.encode(item), Protocol.toByteArray(increment));
return conn.getMultiBulkReply().get(0);
}
}).run(key);
}

/**
* TOPK.QUERY key item [item ...]
*
* Checks whether an item is one of Top-K items.
*
* @param key The key of the filter
* @param items The items to check in the list
* @return list of indicator for each item requested
*/
public List<Boolean> topkQuery(String key, String ...items) {
return (new JedisClusterCommand<List<Boolean>>(this.connectionHandler, this.maxAttempts){
@Override
public List<Boolean> execute(Jedis jedis) {
Connection conn = jedis.getClient();
sendCommand(conn, key, TopKCommand.QUERY, items);
return conn.getIntegerMultiBulkReply()
.stream()
.map(s -> s != 0)
.collect(Collectors.toList());
}
}).run(key);
}

/**
* TOPK.COUNT key item [item ...]
*
* Returns count for an item.
*
* @param key The key of the filter
* @param items The items to check in the list
* @return list of counters per item.
*/
public List<Long> topkCount(String key, String ...items) {
return (new JedisClusterCommand<List<Long>>(this.connectionHandler, this.maxAttempts){
@Override
public List<Long> execute(Jedis jedis) {
Connection conn = jedis.getClient();
sendCommand(conn, key, TopKCommand.COUNT, items);
return conn.getIntegerMultiBulkReply();
}
}).run(key);
}

/**
* TOPK.LIST key
*
* Return full list of items in Top K list.
*
* @param key The key of the filter
* @return list of items in the list.
*/
public List<String> topkList(String key) {
return (new JedisClusterCommand<List<String>>(this.connectionHandler, this.maxAttempts){
@Override
public List<String> execute(Jedis jedis) {
Connection conn = jedis.getClient();
conn.sendCommand(TopKCommand.LIST, key);
return conn.getMultiBulkReply();
}
}).run(key);
}

@SafeVarargs
private final <T> boolean[] sendMultiCommand(Connection conn, Command cmd, T name, T... value) {
Expand Down
Loading

0 comments on commit ea16ca2

Please sign in to comment.