Skip to content

Commit

Permalink
Add unsafeBulkPop() and localGet()
Browse files Browse the repository at this point in the history
unsafeBulkPop() allows bulk popping from all shards.

loaclGet() does a get() with a non quorum connection.

TODO: unsafeBulkPop() will return nil if messageCount > size().
Fix this.
TODO 2: Do code cleanup.
  • Loading branch information
smukil committed Nov 14, 2019
1 parent d4305d4 commit 5089a7c
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,16 @@ public interface DynoQueue extends Closeable {
*/
public Message get(String messageId);

/**
*
* Same as get(), but uses the non quorum connection.
* @param messageId message to be retrieved.
* @return Retrieves the message stored in the queue by the messageId. Null if not found.
*/
public Message localGet(String messageId);

public List<Message> bulkPop(int messageCount, int wait, TimeUnit unit);
public List<Message> unsafeBulkPop(int messageCount, int wait, TimeUnit unit);

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ private int doPrefetchIdsHelper(String queueShardName, AtomicInteger prefetchCou
// Attempt to peek up to 'numToPrefetch' message Ids.
Set<String> ids = doPeekIdsFromShardHelper(queueShardName, prefetchFromTs, 0, numToPrefetch);

// TODO: Check for duplicates.
// Store prefetched IDs in a queue.
prefetchedIdQueue.addAll(ids);

Expand Down Expand Up @@ -826,7 +827,8 @@ public List<Message> bulkPop(int messageCount, int wait, TimeUnit unit) {
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
prefetchIds();
}
return atomicBulkPopHelper(shardName, messageCount, prefetchedIds);
int numToPop = (prefetchedIds.size() > messageCount) ? messageCount : prefetchedIds.size();
return atomicBulkPopHelper(numToPop, prefetchedIds, true);

} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -836,8 +838,47 @@ public List<Message> bulkPop(int messageCount, int wait, TimeUnit unit) {

}

private List<Message> atomicBulkPopHelper(String shard, int messageCount,
ConcurrentLinkedQueue<String> prefetchedIdQueue) {
@Override
public List<Message> unsafeBulkPop(int messageCount, int wait, TimeUnit unit) {
if (messageCount < 1) {
return Collections.emptyList();
}

Stopwatch sw = monitor.start(monitor.pop, messageCount);
try {
long start = clock.millis();
long waitFor = unit.toMillis(wait);
unsafeNumIdsToPrefetchAllShards.addAndGet(messageCount);

prefetchIdsAllShards();
while(unsafeGetNumPrefetchedIds() < messageCount && ((clock.millis() - start) < waitFor)) {
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
prefetchIdsAllShards();
}

int numToPop = (unsafeGetNumPrefetchedIds() > messageCount) ? messageCount : unsafeGetNumPrefetchedIds();
ConcurrentLinkedQueue<String> messageIds = new ConcurrentLinkedQueue<>();
int numPrefetched = 0;
for (String shard : allShards) {
String queueShardName = getQueueShardKey(queueName, shard);
int prefetchedIdsSize = unsafePrefetchedIdsAllShardsMap.get(queueShardName).size();
for (int i = 0; i < prefetchedIdsSize; ++i) {
messageIds.add(unsafePrefetchedIdsAllShardsMap.get(queueShardName).poll());
if (++numPrefetched == numToPop) break;
}
if (numPrefetched == numToPop) break;
}
return atomicBulkPopHelper(numToPop, messageIds, false);
} catch(Exception e) {
throw new RuntimeException(e);
} finally {
sw.stop();
}
}

// TODO: Do code cleanup/consolidation
private List<Message> atomicBulkPopHelper(int messageCount,
ConcurrentLinkedQueue<String> prefetchedIdQueue, boolean localShardOnly) {

double now = Long.valueOf(clock.millis() + 1).doubleValue();
double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
Expand All @@ -853,7 +894,7 @@ private List<Message> atomicBulkPopHelper(String shard, int messageCount,
messageIds.add(prefetchedIdQueue.poll());
}

String atomicBulkPopScript="local hkey=KEYS[1]\n" +
String atomicBulkPopScriptLocalOnly="local hkey=KEYS[1]\n" +
"local num_msgs=ARGV[1]\n" +
"local peek_until=ARGV[2]\n" +
"local unack_score=ARGV[3]\n" +
Expand Down Expand Up @@ -883,25 +924,84 @@ private List<Message> atomicBulkPopHelper(String shard, int messageCount,
"end\n" +
"return return_vals";

String unackShardName = getUnackKey(queueName, shardName);

ImmutableList.Builder builder = ImmutableList.builder();
builder.add(Integer.toString(messageCount));
builder.add(nowScoreString);
builder.add(unackScoreString);
builder.add(localQueueShard);
builder.add(unackShardName);
for (int i = 0; i < messageCount; ++i) {
builder.add(messageIds.get(i));
}
String atomicBulkPopScript="local hkey=KEYS[1]\n" +
"local num_msgs=ARGV[1]\n" +
"local num_shards=ARGV[2]\n" +
"local peek_until=ARGV[3]\n" +
"local unack_score=ARGV[4]\n" +
"local shard_start_idx = 5\n" +
"local msg_start_idx = 5 + (num_shards * 2)\n" +
"local out_idx = 1\n" +
"local return_vals={}\n" +
"for i=0,num_msgs-1 do\n" +
" local found_msg=false\n" +
" local message_id=ARGV[msg_start_idx + i]\n" +
" for j=0,num_shards-1 do\n" +
" local queue_shard_name=ARGV[shard_start_idx + (j*2)]\n" +
" local unack_shard_name=ARGV[shard_start_idx + (j*2) + 1]\n" +
" local exists = redis.call('zscore', queue_shard_name, message_id)\n" +
" if (exists) then\n" +
" found_msg=true\n" +
" if (exists <=peek_until) then\n" +
" local value = redis.call('hget', hkey, message_id)\n" +
" if (value) then\n" +
" local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" +
" if (zadd_ret) then\n" +
" redis.call('zrem', queue_shard_name, message_id)\n" +
" return_vals[out_idx]=value\n" +
" out_idx=out_idx+1\n" +
" break\n" +
" end\n" +
" end\n" +
" end\n" +
" end\n" +
" end\n" +
" if (found_msg == false) then\n" +
" return {}\n" +
" end\n" +
"end\n" +
"return return_vals";

List<Message> payloads;
// Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'.
payloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScript,
Collections.singletonList(messageStoreKey), builder.build());
if (localShardOnly) {
String unackShardName = getUnackKey(queueName, shardName);

ImmutableList.Builder builder = ImmutableList.builder();
builder.add(Integer.toString(messageCount));
builder.add(nowScoreString);
builder.add(unackScoreString);
builder.add(localQueueShard);
builder.add(unackShardName);
for (int i = 0; i < messageCount; ++i) {
builder.add(messageIds.get(i));
}
// Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'.
payloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScriptLocalOnly,
Collections.singletonList(messageStoreKey), builder.build());
} else {
ImmutableList.Builder builder = ImmutableList.builder();
builder.add(Integer.toString(messageCount));
builder.add(Integer.toString(allShards.size()));
builder.add(nowScoreString);
builder.add(unackScoreString);
for (String shard : allShards) {
String queueShard = getQueueShardKey(queueName, shard);
String unackShardName = getUnackKey(queueName, shard);
builder.add(queueShard);
builder.add(unackShardName);
}
for (int i = 0; i < messageCount; ++i) {
builder.add(messageIds.get(i));
}

// Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'.
payloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScript,
Collections.singletonList(messageStoreKey), builder.build());
}

return payloads;
}

/**
*
* Similar to popWithMsgId() but completes all the operations in one round trip.
Expand Down Expand Up @@ -1019,6 +1119,29 @@ public Message get(String messageId) {
}
}

@Override
public Message localGet(String messageId) {

Stopwatch sw = monitor.get.start();

try {

return execute("localGet", messageStoreKey, () -> {
String json = nonQuorumConn.hget(messageStoreKey, messageId);
if (json == null) {
logger.warn("Cannot get the message payload " + messageId);
return null;
}

Message msg = om.readValue(json, Message.class);
return msg;
});

} finally {
sw.stop();
}
}

@Override
public long size() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ public List<Message> bulkPop(int messageCount, int wait, TimeUnit unit) {
throw new UnsupportedOperationException();
}

@Override
public List<Message> unsafeBulkPop(int messageCount, int wait, TimeUnit unit) {
throw new UnsupportedOperationException();
}

@Override
public Message get(String messageId) {
for (DynoQueue q : queues.values()) {
Expand All @@ -199,6 +204,11 @@ public Message get(String messageId) {
return null;
}

@Override
public Message localGet(String messageId) {
throw new UnsupportedOperationException();
}

@Override
public long size() {
long size = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,11 @@ public List<Message> bulkPop(int messageCount, int wait, TimeUnit unit) {
throw new UnsupportedOperationException();
}

@Override
public List<Message> unsafeBulkPop(int messageCount, int wait, TimeUnit unit) {
throw new UnsupportedOperationException();
}

@Override
public Message get(String messageId) {

Expand All @@ -528,6 +533,11 @@ public Message get(String messageId) {
}
}

@Override
public Message localGet(String messageId) {
throw new UnsupportedOperationException();
}

@Override
public long size() {

Expand Down

0 comments on commit 5089a7c

Please sign in to comment.