From f3dd1041a0c8a9c4b040238a940cf85acf4ebb1d Mon Sep 17 00:00:00 2001 From: Rob Zienert Date: Mon, 4 Dec 2017 13:03:31 -0800 Subject: [PATCH 1/3] ignoring IDEA files --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index e2f8b8c..aee7a2c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ dyno-queues-core/build build redis-3.0.7/ redis-3.0.7.tar.gz + +*.iml +.idea From b9caecc9dbe785220e496ec0d6e4c672954f02ec Mon Sep 17 00:00:00 2001 From: Rob Zienert Date: Mon, 4 Dec 2017 13:20:30 -0800 Subject: [PATCH 2/3] ignoring .gradle dir --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index aee7a2c..abf0c50 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ redis-3.0.7.tar.gz *.iml .idea +.gradle From a022a1eef73d28fd723c57c113d4de5a035c5d34 Mon Sep 17 00:00:00 2001 From: Rob Zienert Date: Mon, 4 Dec 2017 13:21:02 -0800 Subject: [PATCH 3/3] refactoring queue implementations to accept a Clock --- .../dyno/queues/redis/MultiRedisQueue.java | 43 +++++++++----- .../dyno/queues/redis/RedisDynoQueue.java | 59 +++++++++++-------- .../netflix/dyno/queues/redis/RedisQueue.java | 52 ++++++++-------- .../dyno/queues/redis/RedisQueues.java | 31 +++++++--- 4 files changed, 112 insertions(+), 73 deletions(-) diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/MultiRedisQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/MultiRedisQueue.java index 1499974..f4d3d20 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/MultiRedisQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/MultiRedisQueue.java @@ -15,17 +15,6 @@ */ package com.netflix.dyno.queues.redis; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; @@ -38,10 +27,21 @@ import com.netflix.dyno.connectionpool.Host; import com.netflix.dyno.queues.DynoQueue; import com.netflix.dyno.queues.Message; - import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + /** * @author Viren * @@ -217,6 +217,8 @@ private String getNextShard() { public static class Builder { + + private Clock clock; private String queueName; @@ -239,6 +241,15 @@ public static class Builder { private int nonQuorumPort; private List hosts; + + /** + * @param clock the Clock instance to set + * @return instance of builder + */ + public Builder setClock(Clock clock) { + this.clock = clock; + return this; + } /** * @param queueName the queueName to set @@ -336,6 +347,9 @@ public Builder setHosts(List hosts) { } public MultiRedisQueue build() { + if (clock == null) { + clock = Clock.systemDefaultZone(); + } if(hosts == null) { hosts = getHostsFromEureka(ec, dynomiteClusterName); } @@ -352,15 +366,14 @@ public MultiRedisQueue build() { config.setMaxIdle(5); config.setMaxWaitMillis(60_000); - Map queues = new HashMap<>(); for(String queueShard : shardMap.keySet()) { String host = shardMap.get(queueShard).getIpAddress(); JedisPool pool = new JedisPool(config, host, quorumPort, 0); JedisPool readPool = new JedisPool(config, host, nonQuorumPort, 0); - - RedisQueue q = new RedisQueue(redisKeyPrefix, queueName, queueShard, unackTime, pool); + + RedisQueue q = new RedisQueue(clock, redisKeyPrefix, queueName, queueShard, unackTime, unackTime, pool); q.setNonQuorumPool(readPool); queues.put(queueShard, q); } diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java index 4797b65..6a6dd48 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java @@ -15,7 +15,24 @@ */ package com.netflix.dyno.queues.redis; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Uninterruptibles; +import com.netflix.dyno.connectionpool.exception.DynoException; +import com.netflix.dyno.queues.DynoQueue; +import com.netflix.dyno.queues.Message; +import com.netflix.servo.monitor.Stopwatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCommands; +import redis.clients.jedis.Tuple; +import redis.clients.jedis.params.sortedset.ZAddParams; + import java.io.IOException; +import java.time.Clock; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -31,24 +48,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.Uninterruptibles; -import com.netflix.dyno.connectionpool.exception.DynoException; -import com.netflix.dyno.queues.DynoQueue; -import com.netflix.dyno.queues.Message; -import com.netflix.servo.monitor.Stopwatch; - -import redis.clients.jedis.JedisCommands; -import redis.clients.jedis.Tuple; -import redis.clients.jedis.params.sortedset.ZAddParams; - /** * * @author Viren @@ -58,6 +57,8 @@ public class RedisDynoQueue implements DynoQueue { private final Logger logger = LoggerFactory.getLogger(RedisDynoQueue.class); + private Clock clock; + private String queueName; private List allShards; @@ -91,7 +92,13 @@ public class RedisDynoQueue implements DynoQueue { public RedisDynoQueue(String redisKeyPrefix, String queueName, Set allShards, String shardName) { this(redisKeyPrefix, queueName, allShards, shardName, 60_000); } + public RedisDynoQueue(String redisKeyPrefix, String queueName, Set allShards, String shardName, int unackScheduleInMS) { + this(Clock.systemDefaultZone(), redisKeyPrefix, queueName, allShards, shardName, unackScheduleInMS); + } + + public RedisDynoQueue(Clock clock, String redisKeyPrefix, String queueName, Set allShards, String shardName, int unackScheduleInMS) { + this.clock = clock; this.redisKeyPrefix = redisKeyPrefix; this.queueName = queueName; this.allShards = allShards.stream().collect(Collectors.toList()); @@ -157,7 +164,7 @@ public List push(final List messages) { String json = om.writeValueAsString(message); quorumConn.hset(messageStoreKey, message.getId(), json); double priority = message.getPriority() / 100; - double score = Long.valueOf(System.currentTimeMillis() + message.getTimeout()).doubleValue() + priority; + double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority; String shard = getNextShard(); String queueShard = getQueueShardKey(queueName, shard); quorumConn.zadd(queueShard, score, message.getId()); @@ -210,11 +217,11 @@ public List pop(int messageCount, int wait, TimeUnit unit) { Stopwatch sw = monitor.start(monitor.pop, messageCount); try { - long start = System.currentTimeMillis(); + long start = clock.millis(); long waitFor = unit.toMillis(wait); prefetch.addAndGet(messageCount); prefetchIds(); - while(prefetchedIds.size() < messageCount && ((System.currentTimeMillis() - start) < waitFor)) { + while(prefetchedIds.size() < messageCount && ((clock.millis() - start) < waitFor)) { Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); prefetchIds(); } @@ -255,7 +262,7 @@ private void prefetchIds() { private List _pop(int messageCount) throws Exception { - double unackScore = Long.valueOf(System.currentTimeMillis() + unackTime).doubleValue(); + double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue(); String unackQueueName = getUnackKey(queueName, shardName); List popped = new LinkedList<>(); @@ -343,7 +350,7 @@ public boolean setUnackTimeout(String messageId, long timeout) { try { return execute(() -> { - double unackScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue(); + double unackScore = Long.valueOf(clock.millis() + timeout).doubleValue(); for (String shard : allShards) { String unackShardKey = getUnackKey(queueName, shard); @@ -379,7 +386,7 @@ public boolean setTimeout(String messageId, long timeout) { Double score = quorumConn.zscore(queueShard, messageId); if(score != null) { double priorityd = message.getPriority() / 100; - double newScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue() + priorityd; + double newScore = Long.valueOf(clock.millis() + timeout).doubleValue() + priorityd; ZAddParams params = ZAddParams.zAddParams().xx(); quorumConn.zadd(queueShard, newScore, messageId, params); json = om.writeValueAsString(message); @@ -510,7 +517,7 @@ public void clear() { private Set peekIds(int offset, int count) { return execute(() -> { - double now = Long.valueOf(System.currentTimeMillis() + 1).doubleValue(); + double now = Long.valueOf(clock.millis() + 1).doubleValue(); Set scanned = quorumConn.zrangeByScore(myQueueShard, 0, now, offset, count); return scanned; }); @@ -530,7 +537,7 @@ public void processUnacks() { int batchSize = 1_000; String unackQueueName = getUnackKey(queueName, shardName); - double now = Long.valueOf(System.currentTimeMillis()).doubleValue(); + double now = Long.valueOf(clock.millis()).doubleValue(); Set unacks = quorumConn.zrangeByScoreWithScores(unackQueueName, 0, now, 0, batchSize); diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueue.java index aeee37c..a040428 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueue.java @@ -15,22 +15,6 @@ */ package com.netflix.dyno.queues.redis; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -40,7 +24,8 @@ import com.netflix.dyno.queues.DynoQueue; import com.netflix.dyno.queues.Message; import com.netflix.servo.monitor.Stopwatch; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Pipeline; @@ -48,6 +33,20 @@ import redis.clients.jedis.Tuple; import redis.clients.jedis.params.sortedset.ZAddParams; +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + /** * * @author Viren @@ -57,6 +56,8 @@ public class RedisQueue implements DynoQueue { private final Logger logger = LoggerFactory.getLogger(RedisQueue.class); + private Clock clock; + private String queueName; private String shardName; @@ -90,6 +91,11 @@ public RedisQueue(String redisKeyPrefix, String queueName, String shardName, int } public RedisQueue(String redisKeyPrefix, String queueName, String shardName, int unackScheduleInMS, int unackTime, JedisPool pool) { + this(Clock.systemDefaultZone(), redisKeyPrefix, queueName, shardName, unackScheduleInMS, unackTime, pool); + } + + public RedisQueue(Clock clock, String redisKeyPrefix, String queueName, String shardName, int unackScheduleInMS, int unackTime, JedisPool pool) { + this.clock = clock; this.queueName = queueName; this.shardName = shardName; this.messageStoreKeyPrefix = redisKeyPrefix + ".MESSAGE."; @@ -150,7 +156,7 @@ public List push(final List messages) { String json = om.writeValueAsString(message); pipe.hset(messageStoreKey(message.getId()), message.getId(), json); double priority = message.getPriority() / 100.0; - double score = Long.valueOf(System.currentTimeMillis() + message.getTimeout()).doubleValue() + priority; + double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority; pipe.zadd(myQueueShard, score, message.getId()); } pipe.sync(); @@ -237,7 +243,7 @@ public synchronized List pop(int messageCount, int wait, TimeUnit unit) private List _pop(List batch) throws Exception { - double unackScore = Long.valueOf(System.currentTimeMillis() + unackTime).doubleValue(); + double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue(); List popped = new LinkedList<>(); ZAddParams zParams = ZAddParams.zAddParams().nx(); @@ -371,7 +377,7 @@ public boolean setUnackTimeout(String messageId, long timeout) { try { - double unackScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue(); + double unackScore = Long.valueOf(clock.millis() + timeout).doubleValue(); Double score = jedis.zscore(unackShardKey(messageId), messageId); if (score != null) { jedis.zadd(unackShardKey(messageId), unackScore, messageId); @@ -402,7 +408,7 @@ public boolean setTimeout(String messageId, long timeout) { Double score = jedis.zscore(myQueueShard, messageId); if (score != null) { double priorityd = message.getPriority() / 100.0; - double newScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue() + priorityd; + double newScore = Long.valueOf(clock.millis() + timeout).doubleValue() + priorityd; jedis.zadd(myQueueShard, newScore, messageId); json = om.writeValueAsString(message); jedis.hset(messageStoreKey(message.getId()), message.getId(), json); @@ -537,7 +543,7 @@ public void clear() { private Set peekIds(int offset, int count) { Jedis jedis = connPool.getResource(); try { - double now = Long.valueOf(System.currentTimeMillis() + 1).doubleValue(); + double now = Long.valueOf(clock.millis() + 1).doubleValue(); Set scanned = jedis.zrangeByScore(myQueueShard, 0, now, offset, count); return scanned; } finally { @@ -566,7 +572,7 @@ private void processUnacks(String unackShardKey) { int batchSize = 1_000; - double now = Long.valueOf(System.currentTimeMillis()).doubleValue(); + double now = Long.valueOf(clock.millis()).doubleValue(); Set unacks = jedis.zrangeByScoreWithScores(unackShardKey, 0, now, 0, batchSize); diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueues.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueues.java index 30bbbfe..291c06f 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueues.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisQueues.java @@ -15,17 +15,17 @@ */ package com.netflix.dyno.queues.redis; +import com.netflix.dyno.queues.DynoQueue; +import com.netflix.dyno.queues.ShardSupplier; +import redis.clients.jedis.JedisCommands; + import java.io.Closeable; import java.io.IOException; +import java.time.Clock; import java.util.Collection; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import com.netflix.dyno.queues.DynoQueue; -import com.netflix.dyno.queues.ShardSupplier; - -import redis.clients.jedis.JedisCommands; - /** * @author Viren * @@ -34,6 +34,8 @@ */ public class RedisQueues implements Closeable { + private Clock clock; + private JedisCommands quorumConn; private JedisCommands nonQuorumConn; @@ -51,7 +53,6 @@ public class RedisQueues implements Closeable { private ConcurrentHashMap queues; /** - * * @param quorumConn Dyno connection with dc_quorum enabled * @param nonQuorumConn Dyno connection to local Redis * @param redisKeyPrefix prefix applied to the Redis keys @@ -59,9 +60,21 @@ public class RedisQueues implements Closeable { * @param unackTime Time in millisecond within which a message needs to be acknowledged by the client, after which the message is re-queued. * @param unackHandlerIntervalInMS Time in millisecond at which the un-acknowledgement processor runs */ - public RedisQueues(JedisCommands quorumConn, JedisCommands nonQuorumConn, String redisKeyPrefix, ShardSupplier shardSupplier, int unackTime, - int unackHandlerIntervalInMS) { + public RedisQueues(JedisCommands quorumConn, JedisCommands nonQuorumConn, String redisKeyPrefix, ShardSupplier shardSupplier, int unackTime, int unackHandlerIntervalInMS) { + this(Clock.systemDefaultZone(), quorumConn, nonQuorumConn, redisKeyPrefix, shardSupplier, unackTime, unackHandlerIntervalInMS); + } + /** + * @param clock Time provider + * @param quorumConn Dyno connection with dc_quorum enabled + * @param nonQuorumConn Dyno connection to local Redis + * @param redisKeyPrefix prefix applied to the Redis keys + * @param shardSupplier Provider for the shards for the queues created + * @param unackTime Time in millisecond within which a message needs to be acknowledged by the client, after which the message is re-queued. + * @param unackHandlerIntervalInMS Time in millisecond at which the un-acknowledgement processor runs + */ + public RedisQueues(Clock clock, JedisCommands quorumConn, JedisCommands nonQuorumConn, String redisKeyPrefix, ShardSupplier shardSupplier, int unackTime, int unackHandlerIntervalInMS) { + this.clock = clock; this.quorumConn = quorumConn; this.nonQuorumConn = nonQuorumConn; this.redisKeyPrefix = redisKeyPrefix; @@ -88,7 +101,7 @@ public DynoQueue get(String queueName) { } synchronized (this) { - queue = new RedisDynoQueue(redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS) + queue = new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS) .withUnackTime(unackTime) .withNonQuorumConn(nonQuorumConn) .withQuorumConn(quorumConn);