diff --git a/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java b/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java index 343b9e5..2c3c7fd 100644 --- a/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java +++ b/dyno-queues-core/src/main/java/com/netflix/dyno/queues/DynoQueue.java @@ -186,6 +186,8 @@ public interface DynoQueue extends Closeable { */ public Message get(String messageId); + public List bulkPop(int messageCount, int wait, TimeUnit unit); + /** * * @return Size of the queue. diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java index 0032560..e9054ba 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java @@ -808,6 +808,100 @@ public String getMsgWithPredicate(String predicate, boolean localShardOnly) { }); } + @Override + public List bulkPop(int messageCount, int wait, TimeUnit unit) { + + if (messageCount < 1) { + return Collections.emptyList(); + } + + Stopwatch sw = monitor.start(monitor.pop, messageCount); + try { + long start = clock.millis(); + long waitFor = unit.toMillis(wait); + numIdsToPrefetch.addAndGet(messageCount); + + prefetchIds(); + while (prefetchedIds.size() < messageCount && ((clock.millis() - start) < waitFor)) { + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + prefetchIds(); + } + return atomicBulkPopHelper(shardName, messageCount, prefetchedIds); + + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + sw.stop(); + } + + } + + private List atomicBulkPopHelper(String shard, int messageCount, + ConcurrentLinkedQueue prefetchedIdQueue) { + + double now = Long.valueOf(clock.millis() + 1).doubleValue(); + double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue(); + + // The script requires the scores as whole numbers + NumberFormat fmt = NumberFormat.getIntegerInstance(); + fmt.setGroupingUsed(false); + String nowScoreString = fmt.format(now); + String unackScoreString = fmt.format(unackScore); + + List messageIds = new ArrayList<>(); + for (int i = 0; i < messageCount; ++i) { + messageIds.add(prefetchedIdQueue.poll()); + } + + String atomicBulkPopScript="local hkey=KEYS[1]\n" + + "local num_msgs=ARGV[1]\n" + + "local peek_until=ARGV[2]\n" + + "local unack_score=ARGV[3]\n" + + "local queue_shard_name=ARGV[4]\n" + + "local unack_shard_name=ARGV[5]\n" + + "local msg_start_idx = 6\n" + + "local idx = 1\n" + + "local return_vals={}\n" + + "for i=0,num_msgs-1 do\n" + + " local message_id=ARGV[msg_start_idx + i]\n" + + " local exists = redis.call('zscore', queue_shard_name, message_id)\n" + + " if (exists) then\n" + + " if (exists <=peek_until) then\n" + + " local value = redis.call('hget', hkey, message_id)\n" + + " if (value) then\n" + + " local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" + + " if (zadd_ret) then\n" + + " redis.call('zrem', queue_shard_name, message_id)\n" + + " return_vals[idx]=value\n" + + " idx=idx+1\n" + + " end\n" + + " end\n" + + " end\n" + + " else\n" + + " return {}\n" + + " end\n" + + "end\n" + + "return return_vals"; + + String unackShardName = getUnackKey(queueName, shardName); + + ImmutableList.Builder builder = ImmutableList.builder(); + builder.add(Integer.toString(messageCount)); + builder.add(nowScoreString); + builder.add(unackScoreString); + builder.add(localQueueShard); + builder.add(unackShardName); + for (int i = 0; i < messageCount; ++i) { + builder.add(messageIds.get(i)); + } + + List payloads; + // Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'. + payloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScript, + Collections.singletonList(messageStoreKey), builder.build()); + + return payloads; + } /** * * Similar to popWithMsgId() but completes all the operations in one round trip. diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/MultiRedisQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/MultiRedisQueue.java index c15756f..94b69f9 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/MultiRedisQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/MultiRedisQueue.java @@ -183,6 +183,11 @@ public Message popMsgWithPredicate(String predicate, boolean localShardOnly) { throw new UnsupportedOperationException(); } + @Override + public List bulkPop(int messageCount, int wait, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + @Override public Message get(String messageId) { for (DynoQueue q : queues.values()) { diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/RedisPipelineQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/RedisPipelineQueue.java index 5deeb9c..1c3b466 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/RedisPipelineQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/v2/RedisPipelineQueue.java @@ -497,6 +497,11 @@ public Message popMsgWithPredicate(String predicate, boolean localShardOnly) { throw new UnsupportedOperationException(); } + @Override + public List bulkPop(int messageCount, int wait, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + @Override public Message get(String messageId) {