From 267567b3692fc2f319663af2335fc986b73e5f1e Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sun, 11 Feb 2018 15:01:19 -0800 Subject: [PATCH] refactored for the new pipeline based queues --- .gitignore | 2 + dyno-queues-redis/build.gradle | 6 +- .../dyno/queues/redis/MultiRedisQueue.java | 222 +-------------- .../dyno/queues/redis/QueueBuilder.java | 253 ++++++++++++++++++ .../netflix/dyno/queues/redis/RedisQueue.java | 93 +++---- .../queues/redis/conn/DynoClientProxy.java | 91 +++++++ .../dyno/queues/redis/conn/DynoJedisPipe.java | 65 +++++ .../dyno/queues/redis/conn/JedisProxy.java | 97 +++++++ .../netflix/dyno/queues/redis/conn/Pipe.java | 83 ++++++ .../queues/redis/conn/RedisConnection.java | 52 ++++ .../dyno/queues/redis/conn/RedisPipe.java | 64 +++++ .../{redis => shard}/DynoShardSupplier.java | 2 +- .../{redis => shard}/SingleShardSupplier.java | 2 +- .../dyno/queues/redis/BenchmarkTests.java | 40 ++- .../queues/redis/DynoShardSupplierTest.java | 1 + .../queues/redis/RedisDynoQueueTest2.java | 4 +- gradle/wrapper/gradle-wrapper.properties | 4 +- 17 files changed, 793 insertions(+), 288 deletions(-) create mode 100644 dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/QueueBuilder.java create mode 100644 dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/DynoClientProxy.java create mode 100644 dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/DynoJedisPipe.java create mode 100644 dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/JedisProxy.java create mode 100644 dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/Pipe.java create mode 100644 dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/RedisConnection.java create mode 100644 dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/RedisPipe.java rename dyno-queues-redis/src/main/java/com/netflix/dyno/queues/{redis => shard}/DynoShardSupplier.java (97%) rename dyno-queues-redis/src/main/java/com/netflix/dyno/queues/{redis => shard}/SingleShardSupplier.java (96%) diff --git a/.gitignore b/.gitignore index abf0c50..342edea 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ redis-3.0.7.tar.gz *.iml .idea .gradle +.classpath +.project diff --git a/dyno-queues-redis/build.gradle b/dyno-queues-redis/build.gradle index 27074b8..21f8b4c 100644 --- a/dyno-queues-redis/build.gradle +++ b/dyno-queues-redis/build.gradle @@ -3,8 +3,10 @@ dependencies { compile project(':dyno-queues-core') compile "com.google.inject:guice:3.0" - compile 'com.netflix.dyno:dyno-core:1.5.9' - compile "com.netflix.dyno:dyno-jedis:1.5.9" + + compile 'com.netflix.dyno:dyno-core:1.6.2' + compile "com.netflix.dyno:dyno-jedis:1.6.2" + compile "com.netflix.archaius:archaius-core:0.7.5" compile "com.netflix.servo:servo-core:0.12.17" compile 'com.netflix.eureka:eureka-client:1.8.1' diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/MultiRedisQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/MultiRedisQueue.java index f4d3d20..92bce97 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/MultiRedisQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/MultiRedisQueue.java @@ -15,24 +15,7 @@ */ package com.netflix.dyno.queues.redis; -import com.google.common.base.Function; -import com.google.common.collect.Collections2; -import com.google.common.collect.Lists; -import com.netflix.appinfo.AmazonInfo; -import com.netflix.appinfo.AmazonInfo.MetaDataKey; -import com.netflix.appinfo.InstanceInfo; -import com.netflix.appinfo.InstanceInfo.InstanceStatus; -import com.netflix.discovery.EurekaClient; -import com.netflix.discovery.shared.Application; -import com.netflix.dyno.connectionpool.Host; -import com.netflix.dyno.queues.DynoQueue; -import com.netflix.dyno.queues.Message; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPoolConfig; - import java.io.IOException; -import java.time.Clock; -import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -42,6 +25,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import com.netflix.dyno.queues.DynoQueue; +import com.netflix.dyno.queues.Message; + /** * @author Viren * @@ -214,207 +200,5 @@ private String getNextShard() { String s = shards.get(indx); return s; } - - - public static class Builder { - - private Clock clock; - - private String queueName; - - private EurekaClient ec; - - private String dynomiteClusterName; - - private String redisKeyPrefix; - - private int unackTime; - - private String currentShard; - - private Function hostToShardMap; - - private int redisPoolSize; - - private int quorumPort; - - private int nonQuorumPort; - - private List hosts; - - /** - * @param clock the Clock instance to set - * @return instance of builder - */ - public Builder setClock(Clock clock) { - this.clock = clock; - return this; - } - - /** - * @param queueName the queueName to set - * @return instance of builder - */ - public Builder setQueueName(String queueName) { - this.queueName = queueName; - return this; - } - - /** - * @param ec the ec to set - * @return instance of builder - */ - public Builder setEc(EurekaClient ec) { - this.ec = ec; - return this; - } - - /** - * @param dynomiteClusterName the dynomiteClusterName to set - * @return instance of builder - */ - public Builder setDynomiteClusterName(String dynomiteClusterName) { - this.dynomiteClusterName = dynomiteClusterName; - return this; - } - - /** - * @param redisKeyPrefix the redisKeyPrefix to set - * @return instance of builder - */ - public Builder setRedisKeyPrefix(String redisKeyPrefix) { - this.redisKeyPrefix = redisKeyPrefix; - return this; - } - - /** - * @param unackTime the unackTime to set - * @return instance of builder - */ - public Builder setUnackTime(int unackTime) { - this.unackTime = unackTime; - return this; - } - - /** - * @param currentShard the currentShard to set - * @return instance of builder - */ - public Builder setCurrentShard(String currentShard) { - this.currentShard = currentShard; - return this; - } - - /** - * @param hostToShardMap the hostToShardMap to set - * @return instance of builder - */ - public Builder setHostToShardMap(Function hostToShardMap) { - this.hostToShardMap = hostToShardMap; - return this; - } - - /** - * @param redisPoolSize the redisPoolSize to set - * @return instance of builder - */ - public Builder setRedisPoolSize(int redisPoolSize) { - this.redisPoolSize = redisPoolSize; - return this; - } - - /** - * @param quorumPort the quorumPort to set - * @return instance of builder - */ - public Builder setQuorumPort(int quorumPort) { - this.quorumPort = quorumPort; - return this; - } - - /** - * @param nonQuorumPort the nonQuorumPort to set - * @return instance of builder - */ - public Builder setNonQuorumPort(int nonQuorumPort) { - this.nonQuorumPort = nonQuorumPort; - return this; - } - - public Builder setHosts(List hosts) { - this.hosts = hosts; - return this; - } - - public MultiRedisQueue build() { - if (clock == null) { - clock = Clock.systemDefaultZone(); - } - if(hosts == null) { - hosts = getHostsFromEureka(ec, dynomiteClusterName); - } - Map shardMap = new HashMap<>(); - for(Host host : hosts) { - String shard = hostToShardMap.apply(host); - shardMap.put(shard, host); - } - - JedisPoolConfig config = new JedisPoolConfig(); - config.setTestOnBorrow(true); - config.setTestOnCreate(true); - config.setMaxTotal(redisPoolSize); - config.setMaxIdle(5); - config.setMaxWaitMillis(60_000); - - Map queues = new HashMap<>(); - for(String queueShard : shardMap.keySet()) { - String host = shardMap.get(queueShard).getIpAddress(); - - JedisPool pool = new JedisPool(config, host, quorumPort, 0); - JedisPool readPool = new JedisPool(config, host, nonQuorumPort, 0); - - RedisQueue q = new RedisQueue(clock, redisKeyPrefix, queueName, queueShard, unackTime, unackTime, pool); - q.setNonQuorumPool(readPool); - queues.put(queueShard, q); - } - MultiRedisQueue queue = new MultiRedisQueue(queueName, currentShard, queues); - return queue; - } - - private static List getHostsFromEureka(EurekaClient ec, String applicationName) { - - Application app = ec.getApplication(applicationName); - List hosts = new ArrayList(); - - if (app == null) { - return hosts; - } - - List ins = app.getInstances(); - - if (ins == null || ins.isEmpty()) { - return hosts; - } - - hosts = Lists.newArrayList(Collections2.transform(ins, - - new Function() { - @Override - public Host apply(InstanceInfo info) { - - Host.Status status = info.getStatus() == InstanceStatus.UP ? Host.Status.Up : Host.Status.Down; - String rack = null; - if (info.getDataCenterInfo() instanceof AmazonInfo) { - AmazonInfo amazonInfo = (AmazonInfo)info.getDataCenterInfo(); - rack = amazonInfo.get(MetaDataKey.availabilityZone); - } - Host host = new Host(info.getHostName(), info.getIPAddr(), rack, status); - return host; - } - })); - return hosts; - } - } - } diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/QueueBuilder.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/QueueBuilder.java new file mode 100644 index 0000000..722f188 --- /dev/null +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/QueueBuilder.java @@ -0,0 +1,253 @@ +/** + * + */ +package com.netflix.dyno.queues.redis; + +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import com.netflix.appinfo.AmazonInfo; +import com.netflix.appinfo.AmazonInfo.MetaDataKey; +import com.netflix.appinfo.InstanceInfo; +import com.netflix.appinfo.InstanceInfo.InstanceStatus; +import com.netflix.discovery.EurekaClient; +import com.netflix.discovery.shared.Application; +import com.netflix.dyno.connectionpool.Host; +import com.netflix.dyno.connectionpool.Host.Status; +import com.netflix.dyno.contrib.EurekaHostsSupplier; +import com.netflix.dyno.jedis.DynoJedisClient; +import com.netflix.dyno.queues.DynoQueue; +import com.netflix.dyno.queues.redis.conn.DynoClientProxy; +import com.netflix.dyno.queues.redis.conn.JedisProxy; +import com.netflix.dyno.queues.redis.conn.RedisConnection; + +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; + +/** + * @author Viren + * + */ +public class QueueBuilder { + + private Clock clock; + + private String queueName; + + private EurekaClient ec; + + private String dynomiteClusterName; + + private String redisKeyPrefix; + + private int unackTime; + + private String currentShard; + + private Function hostToShardMap; + + private int nonQuorumPort; + + private List hosts; + + private JedisPoolConfig redisPoolConfig; + + /** + * @param clock the Clock instance to set + * @return instance of QueueBuilder + */ + public QueueBuilder setClock(Clock clock) { + this.clock = clock; + return this; + } + + /** + * @param queueName the queueName to set + * @return instance of QueueBuilder + */ + public QueueBuilder setQueueName(String queueName) { + this.queueName = queueName; + return this; + } + + /** + * @param redisKeyPrefix Prefix used for all the keys in Redis + * @return instance of QueueBuilder + */ + public QueueBuilder setRedisKeyPrefix(String redisKeyPrefix) { + this.redisKeyPrefix = redisKeyPrefix; + return this; + } + + /** + * @param ec the ec to set + * @return instance of QueueBuilder + */ + + /** + * + * @param ec EurekaClient instance used to discover the hosts in the dynomite cluster + * @param dynomiteClusterName Name of the Dynomite Cluster to be used + * @param nonQuorumPort Direct Redis port used to make non-quorumed queries - used when querying the message counts + * @return instance of QueueBuilder + */ + public QueueBuilder useDynomiteCluster(EurekaClient ec, String dynomiteClusterName, int nonQuorumPort) { + this.ec = ec; + this.dynomiteClusterName = dynomiteClusterName; + this.nonQuorumPort = nonQuorumPort; + return this; + } + + /** + * + * @param redisPoolConfig + * @return instance of QueueBuilder + */ + public QueueBuilder useNonDynomiteRedis(JedisPoolConfig redisPoolConfig, List redisHosts) { + this.redisPoolConfig = redisPoolConfig; + this.hosts = redisHosts; + return this; + } + + /** + * + * @param hostToShardMap Mapping from a Host to a queue shard + * @return instance of QueueBuilder + */ + public QueueBuilder setHostToShardMap(Function hostToShardMap) { + this.hostToShardMap = hostToShardMap; + return this; + } + + /** + * @param unackTime Time in millisecond, after which the uncked messages will be re-queued for the delivery + * @return instance of QueueBuilder + */ + public QueueBuilder setUnackTime(int unackTime) { + this.unackTime = unackTime; + return this; + } + + /** + * @param currentShard Name of the current shard + * @return instance of QueueBuilder + */ + public QueueBuilder setCurrentShard(String currentShard) { + this.currentShard = currentShard; + return this; + } + + public DynoQueue build() { + if (clock == null) { + clock = Clock.systemDefaultZone(); + } + + boolean useDynomite = false; + //When using Dynomite + if(dynomiteClusterName != null) { + hosts = getHostsFromEureka(ec, dynomiteClusterName); + useDynomite = true; + } + Map shardMap = new HashMap<>(); + for(Host host : hosts) { + String shard = hostToShardMap.apply(host); + shardMap.put(shard, host); + } + + DynoJedisClient dynoClientRead = null; + DynoJedisClient dynoClient = null; + if(useDynomite) { + String appId = queueName; + EurekaHostsSupplier hostSupplier = new EurekaHostsSupplier(dynomiteClusterName, ec) { + @Override + public List getHosts() { + List hosts = super.getHosts(); + List updatedHosts = new ArrayList<>(hosts.size()); + hosts.forEach(host -> { + updatedHosts.add(new Host(host.getHostName(), host.getIpAddress(), nonQuorumPort, host.getRack(), host.getDatacenter(), host.isUp() ? Status.Up : Status.Down)); + }); + return updatedHosts; + } + }; + + dynoClientRead = new DynoJedisClient.Builder().withApplicationName(appId).withDynomiteClusterName(dynomiteClusterName).withHostSupplier(hostSupplier).build(); + dynoClient = new DynoJedisClient.Builder().withApplicationName(appId).withDynomiteClusterName(dynomiteClusterName).withDiscoveryClient(ec).build(); + } + + + Map queues = new HashMap<>(); + for(String queueShard : shardMap.keySet()) { + + Host host = shardMap.get(queueShard); + String hostAddress = host.getIpAddress(); + if(hostAddress == null || "".equals(hostAddress)) { + hostAddress = host.getHostName(); + } + RedisConnection redisConn = null; + RedisConnection redisConnRead = null; + + if(useDynomite) { + redisConn = new DynoClientProxy(dynoClient); + redisConnRead = new DynoClientProxy(dynoClientRead); + } else{ + JedisPool pool = new JedisPool(redisPoolConfig, hostAddress, host.getPort(), 0); + redisConn = new JedisProxy(pool); + redisConnRead = new JedisProxy(pool); + } + + RedisQueue q = new RedisQueue(clock, redisKeyPrefix, queueName, queueShard, unackTime, unackTime, redisConn); + q.setNonQuorumPool(redisConnRead); + + queues.put(queueShard, q); + } + + if(queues.size() == 1) { + //This is a queue with a single shard + return queues.values().iterator().next(); + } + + MultiRedisQueue queue = new MultiRedisQueue(queueName, currentShard, queues); + return queue; + } + + + private static List getHostsFromEureka(EurekaClient ec, String applicationName) { + + Application app = ec.getApplication(applicationName); + List hosts = new ArrayList(); + + if (app == null) { + return hosts; + } + + List ins = app.getInstances(); + + if (ins == null || ins.isEmpty()) { + return hosts; + } + + hosts = Lists.newArrayList(Collections2.transform(ins, + + new Function() { + @Override + public Host apply(InstanceInfo info) { + + Host.Status status = info.getStatus() == InstanceStatus.UP ? Host.Status.Up : Host.Status.Down; + String rack = null; + if (info.getDataCenterInfo() instanceof AmazonInfo) { + AmazonInfo amazonInfo = (AmazonInfo)info.getDataCenterInfo(); + rack = amazonInfo.get(MetaDataKey.availabilityZone); + } + Host host = new Host(info.getHostName(), info.getIPAddr(), rack, status); + return host; + } + })); + return hosts; + } +} diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueue.java index a040428..1b0d033 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueue.java @@ -15,24 +15,6 @@ */ package com.netflix.dyno.queues.redis; -import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.netflix.dyno.connectionpool.HashPartitioner; -import com.netflix.dyno.connectionpool.impl.hash.Murmur3HashPartitioner; -import com.netflix.dyno.queues.DynoQueue; -import com.netflix.dyno.queues.Message; -import com.netflix.servo.monitor.Stopwatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.Pipeline; -import redis.clients.jedis.Response; -import redis.clients.jedis.Tuple; -import redis.clients.jedis.params.sortedset.ZAddParams; - import java.io.IOException; import java.time.Clock; import java.util.ArrayList; @@ -47,6 +29,25 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.netflix.dyno.connectionpool.HashPartitioner; +import com.netflix.dyno.connectionpool.impl.hash.Murmur3HashPartitioner; +import com.netflix.dyno.queues.DynoQueue; +import com.netflix.dyno.queues.Message; +import com.netflix.dyno.queues.redis.conn.Pipe; +import com.netflix.dyno.queues.redis.conn.RedisConnection; +import com.netflix.servo.monitor.Stopwatch; + +import redis.clients.jedis.Response; +import redis.clients.jedis.Tuple; +import redis.clients.jedis.params.sortedset.ZAddParams; + /** * * @author Viren @@ -74,9 +75,9 @@ public class RedisQueue implements DynoQueue { private ObjectMapper om; - private JedisPool connPool; + private RedisConnection connPool; - private JedisPool nonQuorumPool; + private RedisConnection nonQuorumPool; private ScheduledExecutorService schedulerForUnacksProcessing; @@ -84,17 +85,13 @@ public class RedisQueue implements DynoQueue { private HashPartitioner partitioner = new Murmur3HashPartitioner(); - private int maxHashBuckets = 1024; + private int maxHashBuckets = 32; - public RedisQueue(String redisKeyPrefix, String queueName, String shardName, int unackTime, JedisPool pool) { - this(redisKeyPrefix, queueName, shardName, unackTime, unackTime, pool); - } - - public RedisQueue(String redisKeyPrefix, String queueName, String shardName, int unackScheduleInMS, int unackTime, JedisPool pool) { + public RedisQueue(String redisKeyPrefix, String queueName, String shardName, int unackScheduleInMS, int unackTime, RedisConnection pool) { this(Clock.systemDefaultZone(), redisKeyPrefix, queueName, shardName, unackScheduleInMS, unackTime, pool); } - public RedisQueue(Clock clock, String redisKeyPrefix, String queueName, String shardName, int unackScheduleInMS, int unackTime, JedisPool pool) { + public RedisQueue(Clock clock, String redisKeyPrefix, String queueName, String shardName, int unackScheduleInMS, int unackTime, RedisConnection pool) { this.clock = clock; this.queueName = queueName; this.shardName = shardName; @@ -121,7 +118,7 @@ public RedisQueue(Clock clock, String redisKeyPrefix, String queueName, String s schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS); - logger.info(RedisQueue.class.getName() + " is ready to serve " + queueName); + logger.info(RedisQueue.class.getName() + " is ready to serve " + queueName + ", shard=" + shardName); } @@ -129,7 +126,7 @@ public RedisQueue(Clock clock, String redisKeyPrefix, String queueName, String s * * @param nonQuorumPool When using a cluster like Dynomite, which relies on the quorum reads, supply a separate non-quorum read connection for ops like size etc. */ - public void setNonQuorumPool(JedisPool nonQuorumPool) { + public void setNonQuorumPool(RedisConnection nonQuorumPool) { this.nonQuorumPool = nonQuorumPool; } @@ -147,10 +144,10 @@ public int getUnackTime() { public List push(final List messages) { Stopwatch sw = monitor.start(monitor.push, messages.size()); - Jedis conn = connPool.getResource(); + RedisConnection conn = connPool.getResource(); try { - Pipeline pipe = conn.pipelined(); + Pipe pipe = conn.pipelined(); for (Message message : messages) { String json = om.writeValueAsString(message); @@ -188,7 +185,7 @@ private String unackShardKey(String messageId) { public List peek(final int messageCount) { Stopwatch sw = monitor.peek.start(); - Jedis jedis = connPool.getResource(); + RedisConnection jedis = connPool.getResource(); try { @@ -248,10 +245,10 @@ private List _pop(List batch) throws Exception { List popped = new LinkedList<>(); ZAddParams zParams = ZAddParams.zAddParams().nx(); - Jedis jedis = connPool.getResource(); + RedisConnection jedis = connPool.getResource(); try { - Pipeline pipe = jedis.pipelined(); + Pipe pipe = jedis.pipelined(); List> zadds = new ArrayList<>(batch.size()); for (int i = 0; i < batch.size(); i++) { String msgId = batch.get(i); @@ -317,7 +314,7 @@ private List _pop(List batch) throws Exception { public boolean ack(String messageId) { Stopwatch sw = monitor.ack.start(); - Jedis jedis = connPool.getResource(); + RedisConnection jedis = connPool.getResource(); try { @@ -339,8 +336,8 @@ public boolean ack(String messageId) { public void ack(List messages) { Stopwatch sw = monitor.ack.start(); - Jedis jedis = connPool.getResource(); - Pipeline pipe = jedis.pipelined(); + RedisConnection jedis = connPool.getResource(); + Pipe pipe = jedis.pipelined(); List> responses = new LinkedList<>(); try { for(Message msg : messages) { @@ -359,7 +356,7 @@ public void ack(List messages) { pipe.sync(); pipe.close(); - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(e); } finally { jedis.close(); @@ -373,7 +370,7 @@ public void ack(List messages) { public boolean setUnackTimeout(String messageId, long timeout) { Stopwatch sw = monitor.ack.start(); - Jedis jedis = connPool.getResource(); + RedisConnection jedis = connPool.getResource(); try { @@ -395,7 +392,7 @@ public boolean setUnackTimeout(String messageId, long timeout) { @Override public boolean setTimeout(String messageId, long timeout) { - Jedis jedis = connPool.getResource(); + RedisConnection jedis = connPool.getResource(); try { String json = jedis.hget(messageStoreKey(messageId), messageId); @@ -429,7 +426,7 @@ public boolean setTimeout(String messageId, long timeout) { public boolean remove(String messageId) { Stopwatch sw = monitor.remove.start(); - Jedis jedis = connPool.getResource(); + RedisConnection jedis = connPool.getResource(); try { @@ -454,7 +451,7 @@ public boolean remove(String messageId) { public Message get(String messageId) { Stopwatch sw = monitor.get.start(); - Jedis jedis = connPool.getResource(); + RedisConnection jedis = connPool.getResource(); try { String json = jedis.hget(messageStoreKey(messageId), messageId); @@ -480,7 +477,7 @@ public Message get(String messageId) { public long size() { Stopwatch sw = monitor.size.start(); - Jedis jedis = nonQuorumPool.getResource(); + RedisConnection jedis = nonQuorumPool.getResource(); try { long size = jedis.zcard(myQueueShard); @@ -496,7 +493,7 @@ public Map> shardSizes() { Stopwatch sw = monitor.size.start(); Map> shardSizes = new HashMap<>(); - Jedis jedis = nonQuorumPool.getResource(); + RedisConnection jedis = nonQuorumPool.getResource(); try { long size = jedis.zcard(myQueueShard); @@ -521,7 +518,7 @@ public Map> shardSizes() { @Override public void clear() { - Jedis jedis = connPool.getResource(); + RedisConnection jedis = connPool.getResource(); try { jedis.del(myQueueShard); @@ -541,7 +538,7 @@ public void clear() { } private Set peekIds(int offset, int count) { - Jedis jedis = connPool.getResource(); + RedisConnection jedis = connPool.getResource(); try { double now = Long.valueOf(clock.millis() + 1).doubleValue(); Set scanned = jedis.zrangeByScore(myQueueShard, 0, now, offset, count); @@ -561,7 +558,7 @@ public void processUnacks() { private void processUnacks(String unackShardKey) { Stopwatch sw = monitor.processUnack.start(); - Jedis jedis = connPool.getResource(); + RedisConnection jedis = connPool.getResource(); try { @@ -613,6 +610,4 @@ public void close() throws IOException { schedulerForPrefetchProcessing.shutdown(); monitor.close(); } - - } diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/DynoClientProxy.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/DynoClientProxy.java new file mode 100644 index 0000000..18be436 --- /dev/null +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/DynoClientProxy.java @@ -0,0 +1,91 @@ +/** + * + */ +package com.netflix.dyno.queues.redis.conn; + +import java.util.Set; + +import com.netflix.dyno.jedis.DynoJedisClient; + +import redis.clients.jedis.Tuple; + +/** + * @author Viren + * + */ +public class DynoClientProxy implements RedisConnection { + + private DynoJedisClient jedis; + + + public DynoClientProxy(DynoJedisClient jedis) { + this.jedis = jedis; + } + + @Override + public RedisConnection getResource() { + return this; + } + + @Override + public void close() { + //nothing! + } + + @Override + public Pipe pipelined() { + return new DynoJedisPipe(jedis.pipelined()); + } + + @Override + public String hget(String key, String member) { + return jedis.hget(key, member); + } + + @Override + public Long zrem(String key, String member) { + return jedis.zrem(key, member); + } + + @Override + public Long hdel(String key, String member) { + return jedis.hdel(key, member); + + } + + @Override + public Double zscore(String key, String member) { + return jedis.zscore(key, member); + } + + @Override + public void zadd(String key, double score, String member) { + jedis.zadd(key, score, member); + } + + @Override + public void hset(String key, String member, String json) { + jedis.hset(key, member, json); + } + + @Override + public long zcard(String key) { + return jedis.zcard(key); + } + + @Override + public void del(String key) { + jedis.del(key); + } + + @Override + public Set zrangeByScore(String key, int min, double max, int offset, int count) { + return jedis.zrangeByScore(key, min, max, offset, count); + } + + @Override + public Set zrangeByScoreWithScores(String key, int min, double max, int offset, int count) { + return jedis.zrangeByScoreWithScores(key, min, max, offset, count); + } + +} diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/DynoJedisPipe.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/DynoJedisPipe.java new file mode 100644 index 0000000..90b9e05 --- /dev/null +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/DynoJedisPipe.java @@ -0,0 +1,65 @@ +/** + * + */ +package com.netflix.dyno.queues.redis.conn; + +import com.netflix.dyno.jedis.DynoJedisPipeline; + +import redis.clients.jedis.Response; +import redis.clients.jedis.params.sortedset.ZAddParams; + +/** + * @author Viren + * + */ +public class DynoJedisPipe implements Pipe { + + private DynoJedisPipeline pipe; + + public DynoJedisPipe(DynoJedisPipeline pipe) { + this.pipe = pipe; + } + + @Override + public void hset(String key, String field, String value) { + pipe.hset(key, field, value); + + } + + @Override + public Response zadd(String key, double score, String member) { + return pipe.zadd(key, score, member); + } + + @Override + public Response zadd(String key, double score, String member, ZAddParams zParams) { + return pipe.zadd(key, score, member, zParams); + } + + @Override + public Response zrem(String key, String member) { + return pipe.zrem(key, member); + } + + @Override + public Response hget(String key, String member) { + return pipe.hget(key, member); + } + + @Override + public Response hdel(String key, String member) { + return pipe.hdel(key, member); + } + + @Override + public void sync() { + pipe.sync(); + } + + @Override + public void close() throws Exception { + pipe.close(); + } + + +} diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/JedisProxy.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/JedisProxy.java new file mode 100644 index 0000000..0a11689 --- /dev/null +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/JedisProxy.java @@ -0,0 +1,97 @@ +/** + * + */ +package com.netflix.dyno.queues.redis.conn; + +import java.util.Set; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.Tuple; + +/** + * @author Viren + * + */ +public class JedisProxy implements RedisConnection { + + private JedisPool pool; + + private Jedis jedis; + + public JedisProxy(JedisPool pool) { + this.pool = pool; + } + + public JedisProxy(Jedis jedis) { + this.jedis = jedis; + } + + @Override + public RedisConnection getResource() { + Jedis jedis = pool.getResource(); + return new JedisProxy(jedis); + } + + @Override + public void close() { + jedis.close(); + } + + @Override + public Pipe pipelined() { + return new RedisPipe(jedis.pipelined()); + } + + @Override + public String hget(String key, String member) { + return jedis.hget(key, member); + } + + @Override + public Long zrem(String key, String member) { + return jedis.zrem(key, member); + } + + @Override + public Long hdel(String key, String member) { + return jedis.hdel(key, member); + + } + + @Override + public Double zscore(String key, String member) { + return jedis.zscore(key, member); + } + + @Override + public void zadd(String key, double unackScore, String member) { + jedis.zadd(key, unackScore, member); + } + + @Override + public void hset(String key, String member, String json) { + jedis.hset(key, member, json); + } + + @Override + public long zcard(String key) { + return jedis.zcard(key); + } + + @Override + public void del(String key) { + jedis.del(key); + } + + @Override + public Set zrangeByScore(String key, int min, double max, int offset, int count) { + return jedis.zrangeByScore(key, min, max, offset, count); + } + + @Override + public Set zrangeByScoreWithScores(String key, int min, double max, int offset, int count) { + return jedis.zrangeByScoreWithScores(key, min, max, offset, count); + } + +} diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/Pipe.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/Pipe.java new file mode 100644 index 0000000..2cd3e4f --- /dev/null +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/Pipe.java @@ -0,0 +1,83 @@ +package com.netflix.dyno.queues.redis.conn; + +import com.netflix.dyno.jedis.DynoJedisPipeline; + +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Response; +import redis.clients.jedis.params.sortedset.ZAddParams; + +/** + * + * @author Viren + *

+ * Abstraction of Redis Pipeline. + * The abstraction is required as there is no common interface between DynoJedisPipeline and Jedis' Pipeline classes. + *

+ * @see DynoJedisPipeline + * @see Pipeline + * + */ +public interface Pipe { + + /** + * + * @param key + * @param field + * @param value + */ + public void hset(String key, String field, String value); + + /** + * + * @param key + * @param score + * @param member + * @return + */ + public Response zadd(String key, double score, String member); + + /** + * + * @param key + * @param score + * @param member + * @param zParams + * @return + */ + public Response zadd(String key, double score, String member, ZAddParams zParams); + + /** + * + * @param key + * @param member + * @return + */ + public Response zrem(String key, String member); + + /** + * + * @param key + * @param member + * @return + */ + public Response hget(String key, String member); + + /** + * + * @param key + * @param member + * @return + */ + public Response hdel(String key, String member); + + /** + * + */ + public void sync(); + + /** + * + * @throws Exception + */ + public void close() throws Exception; +} \ No newline at end of file diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/RedisConnection.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/RedisConnection.java new file mode 100644 index 0000000..0f56192 --- /dev/null +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/RedisConnection.java @@ -0,0 +1,52 @@ +package com.netflix.dyno.queues.redis.conn; + +import java.util.Set; + +import com.netflix.dyno.jedis.DynoJedisClient; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Tuple; + +/** + * Abstraction of Redis connection. + * + * @author viren + *

+ * The methods are 1-1 proxies from Jedis. See Jedis documentation for the details. + *

+ * @see Jedis + * @see DynoJedisClient + */ +public interface RedisConnection { + + /** + * + * @return Returns the underlying connection resource. For connection pool, returns the actual connection + */ + public RedisConnection getResource(); + + public String hget(String messkeyageStoreKey, String member); + + public Long zrem(String key, String member); + + public Long hdel(String key, String member); + + public Double zscore(String key, String member); + + public void zadd(String key, double score, String member); + + public void hset(String key, String id, String json); + + public long zcard(String key); + + public void del(String key); + + public Set zrangeByScore(String key, int min, double max, int offset, int count); + + public Set zrangeByScoreWithScores(String key, int min, double max, int offset, int count); + + public void close(); + + public Pipe pipelined(); + +} \ No newline at end of file diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/RedisPipe.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/RedisPipe.java new file mode 100644 index 0000000..216a6f2 --- /dev/null +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/conn/RedisPipe.java @@ -0,0 +1,64 @@ +/** + * + */ +package com.netflix.dyno.queues.redis.conn; + +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Response; +import redis.clients.jedis.params.sortedset.ZAddParams; + +/** + * @author Viren + * + */ +public class RedisPipe implements Pipe { + + private Pipeline pipe; + + public RedisPipe(Pipeline pipe) { + this.pipe = pipe; + } + + @Override + public void hset(String key, String field, String value) { + pipe.hset(key, field, value); + + } + + @Override + public Response zadd(String key, double score, String member) { + return pipe.zadd(key, score, member); + } + + @Override + public Response zadd(String key, double score, String member, ZAddParams zParams) { + return pipe.zadd(key, score, member, zParams); + } + + @Override + public Response zrem(String key, String member) { + return pipe.zrem(key, member); + } + + @Override + public Response hget(String key, String member) { + return pipe.hget(key, member); + } + + @Override + public Response hdel(String key, String member) { + return pipe.hdel(key, member); + } + + @Override + public void sync() { + pipe.sync(); + } + + @Override + public void close() throws Exception { + pipe.close(); + } + + +} diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/DynoShardSupplier.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/shard/DynoShardSupplier.java similarity index 97% rename from dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/DynoShardSupplier.java rename to dyno-queues-redis/src/main/java/com/netflix/dyno/queues/shard/DynoShardSupplier.java index 54245e2..c1a43d2 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/DynoShardSupplier.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/shard/DynoShardSupplier.java @@ -16,7 +16,7 @@ /** * */ -package com.netflix.dyno.queues.redis; +package com.netflix.dyno.queues.shard; import java.util.Set; import java.util.stream.Collectors; diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/SingleShardSupplier.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/shard/SingleShardSupplier.java similarity index 96% rename from dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/SingleShardSupplier.java rename to dyno-queues-redis/src/main/java/com/netflix/dyno/queues/shard/SingleShardSupplier.java index 716b423..0162812 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/SingleShardSupplier.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/shard/SingleShardSupplier.java @@ -16,7 +16,7 @@ /** * */ -package com.netflix.dyno.queues.redis; +package com.netflix.dyno.queues.shard; import java.util.Set; diff --git a/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/BenchmarkTests.java b/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/BenchmarkTests.java index 63e99e5..c9f9c45 100644 --- a/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/BenchmarkTests.java +++ b/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/BenchmarkTests.java @@ -4,13 +4,15 @@ package com.netflix.dyno.queues.redis; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import com.netflix.dyno.connectionpool.Host; +import com.netflix.dyno.queues.DynoQueue; import com.netflix.dyno.queues.Message; -import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /** @@ -19,18 +21,31 @@ */ public class BenchmarkTests { - private RedisQueue queue; + private DynoQueue queue; public BenchmarkTests() { + List hosts = new LinkedList<>(); + hosts.add(new Host("localhost", 6379, "us-east-1a")); + QueueBuilder qb = new QueueBuilder(); + JedisPoolConfig config = new JedisPoolConfig(); config.setTestOnBorrow(true); config.setTestOnCreate(true); config.setMaxTotal(10); config.setMaxIdle(5); config.setMaxWaitMillis(60_000); - JedisPool pool = new JedisPool(config, "localhost", 6379); - queue = new RedisQueue("perf", "TEST_QUEUE", "x", 60000_000, pool); + + queue = qb + .setCurrentShard("a") + .setHostToShardMap((Host h) -> h.getRack().substring(h.getRack().length()-1)) + .setQueueName("testq") + .setRedisKeyPrefix("keyprefix") + .setUnackTime(60_000_000) + .useNonDynomiteRedis(config, hosts) + .build(); + + System.out.println("Instance: " + queue.getClass().getName()); } public void publish() { @@ -55,9 +70,10 @@ public void publish() { public void consume() { try { + long s = System.currentTimeMillis(); int loopCount = 100; - int batchSize = 2000; + int batchSize = 3500; int count = 0; for(int i = 0; i < loopCount; i++) { List popped = queue.pop(batchSize, 1, TimeUnit.MILLISECONDS); @@ -67,7 +83,7 @@ public void consume() { long e = System.currentTimeMillis(); long diff = e-s; long throughput = 1000 * ((count)/diff); - System.out.println("Consume time: " + diff + ", read throughput: " + throughput + " msg/sec, read: " + count); + System.out.println("Consume time: " + diff + ", read throughput: " + throughput + " msg/sec, messages read: " + count); }catch(Exception e) { e.printStackTrace(); } @@ -86,13 +102,11 @@ public static void main(String[] args) throws Exception { try { BenchmarkTests tests = new BenchmarkTests(); - - for(int i = 0; i < 20; i++) { - tests.publish(); - tests.consume(); - } - - } finally { + tests.publish(); + tests.consume(); + } catch(Exception e) { + e.printStackTrace(); + }finally { System.exit(0); } } diff --git a/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/DynoShardSupplierTest.java b/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/DynoShardSupplierTest.java index 874fb6e..65c3066 100644 --- a/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/DynoShardSupplierTest.java +++ b/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/DynoShardSupplierTest.java @@ -32,6 +32,7 @@ import com.netflix.dyno.connectionpool.Host; import com.netflix.dyno.connectionpool.Host.Status; +import com.netflix.dyno.queues.shard.DynoShardSupplier; import com.netflix.dyno.connectionpool.HostSupplier; /** diff --git a/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/RedisDynoQueueTest2.java b/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/RedisDynoQueueTest2.java index 12e1c6e..d8ba84a 100644 --- a/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/RedisDynoQueueTest2.java +++ b/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/RedisDynoQueueTest2.java @@ -43,6 +43,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.netflix.dyno.queues.Message; +import com.netflix.dyno.queues.redis.conn.JedisProxy; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; @@ -75,7 +76,8 @@ public static void setUpBeforeClass() throws Exception { JedisPool pool = new JedisPool(config, "localhost", 6379); dynoClient = new Jedis("localhost", 6379, 0, 0); dynoClient.flushAll(); - rdq = new RedisQueue(redisKeyPrefix, queueName, "x", 1_000, pool); + + rdq = new RedisQueue(redisKeyPrefix, queueName, "x", 1_000, 1_000, new JedisProxy(pool)); messageKeyPrefix = redisKeyPrefix + ".MESSAGE."; } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 53b60c3..9a4bb89 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Wed Sep 20 15:04:48 PDT 2017 +#Sun Feb 04 11:53:34 PST 2018 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-3.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-3.1-all.zip