From 03af6f5359fa45c4d18e31ed30103f885f09767d Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Fri, 24 Jan 2020 18:40:13 -0800 Subject: [PATCH] Make findStaleMessages() return stale messages from all shards Note: All items returned MUST be checked at the app level if they've already been processed before acting on them (eg: removing them) --- .../dyno/queues/redis/RedisDynoQueue.java | 92 ++++++++++--------- 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java index 8c99fc9..8ac7887 100644 --- a/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java +++ b/dyno-queues-redis/src/main/java/com/netflix/dyno/queues/redis/RedisDynoQueue.java @@ -1467,60 +1467,64 @@ public List findStaleMessages() { List stale_msgs = new ArrayList<>(); - int batchSize = 1_000; + int batchSize = 10; double now = Long.valueOf(clock.millis()).doubleValue(); long num_stale = 0; - Set elems = nonQuorumConn.zrangeByScore(localQueueShard, 0, now, 0, batchSize); + for (String shard : allShards) { + String queueShardName = getQueueShardKey(queueName, shard); + Set elems = nonQuorumConn.zrangeByScore(queueShardName, 0, now, 0, batchSize); - if (elems.size() == 0) { - return stale_msgs; - } + if (elems.size() == 0) { + continue; + } - String findStaleMsgsScript = "local hkey=KEYS[1]\n" + - "local queue_shard=ARGV[1]\n" + - "local unack_shard=ARGV[2]\n" + - "local num_msgs=ARGV[3]\n" + - "\n" + - "local stale_msgs={}\n" + - "local num_stale_idx = 1\n" + - "for i=0,num_msgs-1 do\n" + - " local msg_id=ARGV[4+i]\n" + - "\n" + - " local exists_hash = redis.call('hget', hkey, msg_id)\n" + - " local exists_queue = redis.call('zscore', queue_shard, msg_id)\n" + - " local exists_unack = redis.call('zscore', unack_shard, msg_id)\n" + - "\n" + - " if (exists_hash and exists_queue) then\n" + - " elseif (not (exists_unack)) then\n" + - " stale_msgs[num_stale_idx] = msg_id\n" + - " num_stale_idx = num_stale_idx + 1\n" + - " end\n" + - "end\n" + - "\n" + - "return stale_msgs\n"; + String findStaleMsgsScript = "local hkey=KEYS[1]\n" + + "local queue_shard=ARGV[1]\n" + + "local unack_shard=ARGV[2]\n" + + "local num_msgs=ARGV[3]\n" + + "\n" + + "local stale_msgs={}\n" + + "local num_stale_idx = 1\n" + + "for i=0,num_msgs-1 do\n" + + " local msg_id=ARGV[4+i]\n" + + "\n" + + " local exists_hash = redis.call('hget', hkey, msg_id)\n" + + " local exists_queue = redis.call('zscore', queue_shard, msg_id)\n" + + " local exists_unack = redis.call('zscore', unack_shard, msg_id)\n" + + "\n" + + " if (exists_hash and exists_queue) then\n" + + " elseif (not (exists_unack)) then\n" + + " stale_msgs[num_stale_idx] = msg_id\n" + + " num_stale_idx = num_stale_idx + 1\n" + + " end\n" + + "end\n" + + "\n" + + "return stale_msgs\n"; - String unackKey = getUnackKey(queueName, shardName); - ImmutableList.Builder builder = ImmutableList.builder(); - builder.add(localQueueShard); - builder.add(unackKey); - builder.add(Integer.toString(elems.size())); - for (String msg : elems) { - builder.add(msg); - } + String unackKey = getUnackKey(queueName, shard); + ImmutableList.Builder builder = ImmutableList.builder(); + builder.add(queueShardName); + builder.add(unackKey); + builder.add(Integer.toString(elems.size())); + for (String msg : elems) { + builder.add(msg); + } - ArrayList stale_msg_ids = (ArrayList) ((DynoJedisClient)quorumConn).eval(findStaleMsgsScript, Collections.singletonList(messageStoreKey), builder.build()); - num_stale = stale_msg_ids.size(); - if (num_stale > 0) { - logger.info("findStaleMsgs(): Found " + num_stale + " messages present in queue but not in hashmap"); - } + ArrayList stale_msg_ids = (ArrayList) ((DynoJedisClient)quorumConn).eval(findStaleMsgsScript, Collections.singletonList(messageStoreKey), builder.build()); + num_stale = stale_msg_ids.size(); + if (num_stale > 0) { + logger.info("findStaleMsgs(): Found " + num_stale + " messages present in queue but not in hashmap"); + } - for (String m : stale_msg_ids) { - Message msg = new Message(); - msg.setId(m); - stale_msgs.add(msg); + for (String m : stale_msg_ids) { + Message msg = new Message(); + msg.setId(m); + stale_msgs.add(msg); + } } + return stale_msgs; }); }