From 8449117e496425fe9cc71289de8d7d4940a90b8f Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Thu, 20 Oct 2016 20:09:08 -0700 Subject: [PATCH 1/2] bug fix for the setAckTimeout API --- .../com/netflix/dyno/queues/DynoQueue.java | 4 ++-- .../dyno/queues/redis/RedisDynoQueue.java | 20 ++++++++--------- .../netflix/dyno/queues/jedis/JedisMock.java | 22 +++++++++++++++++++ 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java b/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java index bc35125..be85bc6 100644 --- a/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java +++ b/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java @@ -80,9 +80,9 @@ public interface DynoQueue { * Sets the unack timeout on the message (changes the default timeout to the new value). Useful when extended lease is required for a message by consumer before sending ack. * @param messageId ID of the message to be acknowledged * @param timeout time in milliseconds for which the message will remain in un-ack state. If no ack is received after the timeout period has expired, the message is put back into the queue - * @return true if the message was found pending acknowledgement and is now ack'ed. false if the message id is invalid or message is no longer present in the queue. + * */ - public boolean setUnackTimeout(String messageId, long timeout); + public void setUnackTimeout(String messageId, long timeout); /** * diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java index 52e6b84..2641726 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java @@ -47,6 +47,7 @@ import redis.clients.jedis.JedisCommands; import redis.clients.jedis.Tuple; +import redis.clients.jedis.params.sortedset.ZAddParams; /** * @@ -355,24 +356,21 @@ public boolean ack(String messageId) { } @Override - public boolean setUnackTimeout(String messageId, long timeout) { + public void setUnackTimeout(String messageId, long timeout) { Stopwatch sw = monitor.ack.start(); try { - return execute(() -> { - + execute(() -> { + double unackScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue(); for (String shard : allShards) { + String unackShardKey = getUnackKey(queueName, shard); - Long removed = quorumConn.zrem(unackShardKey, messageId); - if (removed > 0) { - double unackScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue(); - quorumConn.zadd(unackShardKey, unackScore, messageId); - return true; - } + ZAddParams params = ZAddParams.zAddParams().xx(); + quorumConn.zadd(unackShardKey, unackScore, messageId, params); } - return false; + return true; }); } finally { @@ -580,7 +578,7 @@ private R executeWithRetry(ExecutorService es, Callable r, int retryCount try { - return es.submit(r).get(1000, TimeUnit.SECONDS); //TODO: replace this with 10 + return es.submit(r).get(10, TimeUnit.SECONDS); } catch (ExecutionException e) { diff --git a/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/jedis/JedisMock.java b/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/jedis/JedisMock.java index 15c499e..e548c69 100644 --- a/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/jedis/JedisMock.java +++ b/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/jedis/JedisMock.java @@ -36,6 +36,7 @@ import redis.clients.jedis.ScanResult; import redis.clients.jedis.Tuple; import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.params.sortedset.ZAddParams; /** * @author Viren @@ -727,6 +728,27 @@ public Long zadd(final String key, final double score, final String member) { throw new JedisException(e); } } + + @Override + public Long zadd(String key, double score, String member, ZAddParams params) { + + try { + + if(params.contains("xx")) { + Double existing = redis.zscore(key, member); + if(existing == null) { + return 0L; + } + return redis.zadd(key, new ZsetPair(member, score)); + }else { + return redis.zadd(key, new ZsetPair(member, score)); + } + + } catch (Exception e) { + throw new JedisException(e); + } + } + @Override public Long zadd(final String key, final Map scoreMembers) { From d58bd566915f5a39300ed199f10d4ede59514fdc Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Thu, 20 Oct 2016 22:00:42 -0700 Subject: [PATCH 2/2] dyno support for z_add with params is missing, so using alternative api --- .../java/com/netflix/dyno/queues/DynoQueue.java | 4 ++-- .../netflix/dyno/queues/redis/RedisDynoQueue.java | 14 ++++++++------ .../com/netflix/dyno/queues/jedis/JedisMock.java | 2 +- .../dyno/queues/redis/RedisDynoQueueTest.java | 9 ++++++--- 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java b/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java index be85bc6..ec5018c 100644 --- a/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java +++ b/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java @@ -80,9 +80,9 @@ public interface DynoQueue { * Sets the unack timeout on the message (changes the default timeout to the new value). Useful when extended lease is required for a message by consumer before sending ack. * @param messageId ID of the message to be acknowledged * @param timeout time in milliseconds for which the message will remain in un-ack state. If no ack is received after the timeout period has expired, the message is put back into the queue - * + * @return true if the message id was found and updated with new timeout. false otherwise. */ - public void setUnackTimeout(String messageId, long timeout); + public boolean setUnackTimeout(String messageId, long timeout); /** * diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java index 2641726..78dcc97 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java @@ -47,7 +47,6 @@ import redis.clients.jedis.JedisCommands; import redis.clients.jedis.Tuple; -import redis.clients.jedis.params.sortedset.ZAddParams; /** * @@ -356,21 +355,24 @@ public boolean ack(String messageId) { } @Override - public void setUnackTimeout(String messageId, long timeout) { + public boolean setUnackTimeout(String messageId, long timeout) { Stopwatch sw = monitor.ack.start(); try { - execute(() -> { + return execute(() -> { double unackScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue(); for (String shard : allShards) { String unackShardKey = getUnackKey(queueName, shard); - ZAddParams params = ZAddParams.zAddParams().xx(); - quorumConn.zadd(unackShardKey, unackScore, messageId, params); + Double score = quorumConn.zscore(unackShardKey, messageId); + if(score != null) { + quorumConn.zadd(unackShardKey, unackScore, messageId); + return true; + } } - return true; + return false; }); } finally { diff --git a/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/jedis/JedisMock.java b/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/jedis/JedisMock.java index e548c69..fffbea0 100644 --- a/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/jedis/JedisMock.java +++ b/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/jedis/JedisMock.java @@ -859,7 +859,7 @@ public Long zcard(final String key) { @Override public Double zscore(final String key, final String member) { try { - return zscore(key, member); + return redis.zscore(key, member); } catch (Exception e) { throw new JedisException(e); } diff --git a/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/RedisDynoQueueTest.java b/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/RedisDynoQueueTest.java index 8a220fe..afd9e01 100644 --- a/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/RedisDynoQueueTest.java +++ b/dyno-queues-redis/src/test/java/com/netflix/dyno/queues/redis/RedisDynoQueueTest.java @@ -129,7 +129,8 @@ public void testTimeoutUpdate() { assertNotNull(popped); assertEquals(1, popped.size()); - rdq.setUnackTimeout(id, 500); + boolean updated = rdq.setUnackTimeout(id, 500); + assertTrue(updated); popped = rdq.pop(1, 1, TimeUnit.SECONDS); assertNotNull(popped); assertEquals(0, popped.size()); @@ -140,13 +141,15 @@ public void testTimeoutUpdate() { assertNotNull(popped); assertEquals(1, popped.size()); - rdq.setUnackTimeout(id, 10_000); //10 seconds! + updated = rdq.setUnackTimeout(id, 10_000); //10 seconds! + assertTrue(updated); rdq.processUnacks(); popped = rdq.pop(1, 1, TimeUnit.SECONDS); assertNotNull(popped); assertEquals(0, popped.size()); - rdq.setUnackTimeout(id, 0); + updated = rdq.setUnackTimeout(id, 0); + assertTrue(updated); rdq.processUnacks(); popped = rdq.pop(1, 1, TimeUnit.SECONDS); assertNotNull(popped);