Skip to content

Commit

Permalink
dyno support for z_add with params is missing, so using alternative api
Browse files Browse the repository at this point in the history
  • Loading branch information
Viren Baraiya committed Oct 21, 2016
1 parent 8449117 commit d58bd56
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public interface DynoQueue {
* Sets the unack timeout on the message (changes the default timeout to the new value). Useful when extended lease is required for a message by consumer before sending ack.
* @param messageId ID of the message to be acknowledged
* @param timeout time in milliseconds for which the message will remain in un-ack state. If no ack is received after the timeout period has expired, the message is put back into the queue
*
* @return true if the message id was found and updated with new timeout. false otherwise.
*/
public void setUnackTimeout(String messageId, long timeout);
public boolean setUnackTimeout(String messageId, long timeout);

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@

import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.params.sortedset.ZAddParams;

/**
*
Expand Down Expand Up @@ -356,21 +355,24 @@ public boolean ack(String messageId) {
}

@Override
public void setUnackTimeout(String messageId, long timeout) {
public boolean setUnackTimeout(String messageId, long timeout) {

Stopwatch sw = monitor.ack.start();

try {

execute(() -> {
return execute(() -> {
double unackScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue();
for (String shard : allShards) {

String unackShardKey = getUnackKey(queueName, shard);
ZAddParams params = ZAddParams.zAddParams().xx();
quorumConn.zadd(unackShardKey, unackScore, messageId, params);
Double score = quorumConn.zscore(unackShardKey, messageId);
if(score != null) {
quorumConn.zadd(unackShardKey, unackScore, messageId);
return true;
}
}
return true;
return false;
});

} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ public Long zcard(final String key) {
@Override
public Double zscore(final String key, final String member) {
try {
return zscore(key, member);
return redis.zscore(key, member);
} catch (Exception e) {
throw new JedisException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void testTimeoutUpdate() {
assertNotNull(popped);
assertEquals(1, popped.size());

rdq.setUnackTimeout(id, 500);
boolean updated = rdq.setUnackTimeout(id, 500);
assertTrue(updated);
popped = rdq.pop(1, 1, TimeUnit.SECONDS);
assertNotNull(popped);
assertEquals(0, popped.size());
Expand All @@ -140,13 +141,15 @@ public void testTimeoutUpdate() {
assertNotNull(popped);
assertEquals(1, popped.size());

rdq.setUnackTimeout(id, 10_000); //10 seconds!
updated = rdq.setUnackTimeout(id, 10_000); //10 seconds!
assertTrue(updated);
rdq.processUnacks();
popped = rdq.pop(1, 1, TimeUnit.SECONDS);
assertNotNull(popped);
assertEquals(0, popped.size());

rdq.setUnackTimeout(id, 0);
updated = rdq.setUnackTimeout(id, 0);
assertTrue(updated);
rdq.processUnacks();
popped = rdq.pop(1, 1, TimeUnit.SECONDS);
assertNotNull(popped);
Expand Down

0 comments on commit d58bd56

Please sign in to comment.