Skip to content

Commit

Permalink
Merge pull request #11 from Netflix/dev
Browse files Browse the repository at this point in the history
dyno support for z_add with params is missing, so using alternative api
  • Loading branch information
v1r3n authored Oct 24, 2016
2 parents 8d9bfca + d0f7a99 commit fc21d19
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public interface DynoQueue {
* Sets the unack timeout on the message (changes the default timeout to the new value). Useful when extended lease is required for a message by consumer before sending ack.
* @param messageId ID of the message to be acknowledged
* @param timeout time in milliseconds for which the message will remain in un-ack state. If no ack is received after the timeout period has expired, the message is put back into the queue
* @return true if the message was found pending acknowledgement and is now ack'ed. false if the message id is invalid or message is no longer present in the queue.
* @return true if the message id was found and updated with new timeout. false otherwise.
*/
public boolean setUnackTimeout(String messageId, long timeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,12 @@ public boolean setUnackTimeout(String messageId, long timeout) {
try {

return execute(() -> {

double unackScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue();
for (String shard : allShards) {

String unackShardKey = getUnackKey(queueName, shard);
Long removed = quorumConn.zrem(unackShardKey, messageId);
if (removed > 0) {
double unackScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue();
Double score = quorumConn.zscore(unackShardKey, messageId);
if(score != null) {
quorumConn.zadd(unackShardKey, unackScore, messageId);
return true;
}
Expand Down Expand Up @@ -580,7 +580,7 @@ private <R> R executeWithRetry(ExecutorService es, Callable<R> r, int retryCount

try {

return es.submit(r).get(1000, TimeUnit.SECONDS); //TODO: replace this with 10
return es.submit(r).get(10, TimeUnit.SECONDS);

} catch (ExecutionException e) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import redis.clients.jedis.ScanResult;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.params.sortedset.ZAddParams;

/**
* @author Viren
Expand Down Expand Up @@ -727,6 +728,27 @@ public Long zadd(final String key, final double score, final String member) {
throw new JedisException(e);
}
}

@Override
public Long zadd(String key, double score, String member, ZAddParams params) {

try {

if(params.contains("xx")) {
Double existing = redis.zscore(key, member);
if(existing == null) {
return 0L;
}
return redis.zadd(key, new ZsetPair(member, score));
}else {
return redis.zadd(key, new ZsetPair(member, score));
}

} catch (Exception e) {
throw new JedisException(e);
}
}


@Override
public Long zadd(final String key, final Map<String, Double> scoreMembers) {
Expand Down Expand Up @@ -837,7 +859,7 @@ public Long zcard(final String key) {
@Override
public Double zscore(final String key, final String member) {
try {
return zscore(key, member);
return redis.zscore(key, member);
} catch (Exception e) {
throw new JedisException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void testTimeoutUpdate() {
assertNotNull(popped);
assertEquals(1, popped.size());

rdq.setUnackTimeout(id, 500);
boolean updated = rdq.setUnackTimeout(id, 500);
assertTrue(updated);
popped = rdq.pop(1, 1, TimeUnit.SECONDS);
assertNotNull(popped);
assertEquals(0, popped.size());
Expand All @@ -140,13 +141,15 @@ public void testTimeoutUpdate() {
assertNotNull(popped);
assertEquals(1, popped.size());

rdq.setUnackTimeout(id, 10_000); //10 seconds!
updated = rdq.setUnackTimeout(id, 10_000); //10 seconds!
assertTrue(updated);
rdq.processUnacks();
popped = rdq.pop(1, 1, TimeUnit.SECONDS);
assertNotNull(popped);
assertEquals(0, popped.size());

rdq.setUnackTimeout(id, 0);
updated = rdq.setUnackTimeout(id, 0);
assertTrue(updated);
rdq.processUnacks();
popped = rdq.pop(1, 1, TimeUnit.SECONDS);
assertNotNull(popped);
Expand Down

0 comments on commit fc21d19

Please sign in to comment.