Skip to content

Commit

Permalink
Merge pull request #26 from robzienert/clock-refactor
Browse files Browse the repository at this point in the history
Refactor queues to use java.time.Clock
  • Loading branch information
v1r3n authored Dec 18, 2017
2 parents 0e98adf + a022a1e commit 29b9efc
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 73 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ dyno-queues-core/build
build
redis-3.0.7/
redis-3.0.7.tar.gz

*.iml
.idea
.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@
*/
package com.netflix.dyno.queues.redis;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
Expand All @@ -38,10 +27,21 @@
import com.netflix.dyno.connectionpool.Host;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* @author Viren
*
Expand Down Expand Up @@ -217,6 +217,8 @@ private String getNextShard() {


public static class Builder {

private Clock clock;

private String queueName;

Expand All @@ -239,6 +241,15 @@ public static class Builder {
private int nonQuorumPort;

private List<Host> hosts;

/**
* @param clock the Clock instance to set
* @return instance of builder
*/
public Builder setClock(Clock clock) {
this.clock = clock;
return this;
}

/**
* @param queueName the queueName to set
Expand Down Expand Up @@ -336,6 +347,9 @@ public Builder setHosts(List<Host> hosts) {
}

public MultiRedisQueue build() {
if (clock == null) {
clock = Clock.systemDefaultZone();
}
if(hosts == null) {
hosts = getHostsFromEureka(ec, dynomiteClusterName);
}
Expand All @@ -352,15 +366,14 @@ public MultiRedisQueue build() {
config.setMaxIdle(5);
config.setMaxWaitMillis(60_000);


Map<String, RedisQueue> queues = new HashMap<>();
for(String queueShard : shardMap.keySet()) {
String host = shardMap.get(queueShard).getIpAddress();

JedisPool pool = new JedisPool(config, host, quorumPort, 0);
JedisPool readPool = new JedisPool(config, host, nonQuorumPort, 0);
RedisQueue q = new RedisQueue(redisKeyPrefix, queueName, queueShard, unackTime, pool);

RedisQueue q = new RedisQueue(clock, redisKeyPrefix, queueName, queueShard, unackTime, unackTime, pool);
q.setNonQuorumPool(readPool);
queues.put(queueShard, q);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,24 @@
*/
package com.netflix.dyno.queues.redis;

import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.dyno.connectionpool.exception.DynoException;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.servo.monitor.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.params.sortedset.ZAddParams;

import java.io.IOException;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
Expand All @@ -31,24 +48,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.dyno.connectionpool.exception.DynoException;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.servo.monitor.Stopwatch;

import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.params.sortedset.ZAddParams;

/**
*
* @author Viren
Expand All @@ -58,6 +57,8 @@ public class RedisDynoQueue implements DynoQueue {

private final Logger logger = LoggerFactory.getLogger(RedisDynoQueue.class);

private Clock clock;

private String queueName;

private List<String> allShards;
Expand Down Expand Up @@ -91,7 +92,13 @@ public class RedisDynoQueue implements DynoQueue {
public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName) {
this(redisKeyPrefix, queueName, allShards, shardName, 60_000);
}

public RedisDynoQueue(String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, int unackScheduleInMS) {
this(Clock.systemDefaultZone(), redisKeyPrefix, queueName, allShards, shardName, unackScheduleInMS);
}

public RedisDynoQueue(Clock clock, String redisKeyPrefix, String queueName, Set<String> allShards, String shardName, int unackScheduleInMS) {
this.clock = clock;
this.redisKeyPrefix = redisKeyPrefix;
this.queueName = queueName;
this.allShards = allShards.stream().collect(Collectors.toList());
Expand Down Expand Up @@ -157,7 +164,7 @@ public List<String> push(final List<Message> messages) {
String json = om.writeValueAsString(message);
quorumConn.hset(messageStoreKey, message.getId(), json);
double priority = message.getPriority() / 100;
double score = Long.valueOf(System.currentTimeMillis() + message.getTimeout()).doubleValue() + priority;
double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;
String shard = getNextShard();
String queueShard = getQueueShardKey(queueName, shard);
quorumConn.zadd(queueShard, score, message.getId());
Expand Down Expand Up @@ -210,11 +217,11 @@ public List<Message> pop(int messageCount, int wait, TimeUnit unit) {

Stopwatch sw = monitor.start(monitor.pop, messageCount);
try {
long start = System.currentTimeMillis();
long start = clock.millis();
long waitFor = unit.toMillis(wait);
prefetch.addAndGet(messageCount);
prefetchIds();
while(prefetchedIds.size() < messageCount && ((System.currentTimeMillis() - start) < waitFor)) {
while(prefetchedIds.size() < messageCount && ((clock.millis() - start) < waitFor)) {
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
prefetchIds();
}
Expand Down Expand Up @@ -255,7 +262,7 @@ private void prefetchIds() {

private List<Message> _pop(int messageCount) throws Exception {

double unackScore = Long.valueOf(System.currentTimeMillis() + unackTime).doubleValue();
double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
String unackQueueName = getUnackKey(queueName, shardName);

List<Message> popped = new LinkedList<>();
Expand Down Expand Up @@ -343,7 +350,7 @@ public boolean setUnackTimeout(String messageId, long timeout) {
try {

return execute(() -> {
double unackScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue();
double unackScore = Long.valueOf(clock.millis() + timeout).doubleValue();
for (String shard : allShards) {

String unackShardKey = getUnackKey(queueName, shard);
Expand Down Expand Up @@ -379,7 +386,7 @@ public boolean setTimeout(String messageId, long timeout) {
Double score = quorumConn.zscore(queueShard, messageId);
if(score != null) {
double priorityd = message.getPriority() / 100;
double newScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue() + priorityd;
double newScore = Long.valueOf(clock.millis() + timeout).doubleValue() + priorityd;
ZAddParams params = ZAddParams.zAddParams().xx();
quorumConn.zadd(queueShard, newScore, messageId, params);
json = om.writeValueAsString(message);
Expand Down Expand Up @@ -510,7 +517,7 @@ public void clear() {
private Set<String> peekIds(int offset, int count) {

return execute(() -> {
double now = Long.valueOf(System.currentTimeMillis() + 1).doubleValue();
double now = Long.valueOf(clock.millis() + 1).doubleValue();
Set<String> scanned = quorumConn.zrangeByScore(myQueueShard, 0, now, offset, count);
return scanned;
});
Expand All @@ -530,7 +537,7 @@ public void processUnacks() {
int batchSize = 1_000;
String unackQueueName = getUnackKey(queueName, shardName);

double now = Long.valueOf(System.currentTimeMillis()).doubleValue();
double now = Long.valueOf(clock.millis()).doubleValue();

Set<Tuple> unacks = quorumConn.zrangeByScoreWithScores(unackQueueName, 0, now, 0, batchSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,6 @@
*/
package com.netflix.dyno.queues.redis;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -40,14 +24,29 @@
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import com.netflix.servo.monitor.Stopwatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.params.sortedset.ZAddParams;

import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
*
* @author Viren
Expand All @@ -57,6 +56,8 @@ public class RedisQueue implements DynoQueue {

private final Logger logger = LoggerFactory.getLogger(RedisQueue.class);

private Clock clock;

private String queueName;

private String shardName;
Expand Down Expand Up @@ -90,6 +91,11 @@ public RedisQueue(String redisKeyPrefix, String queueName, String shardName, int
}

public RedisQueue(String redisKeyPrefix, String queueName, String shardName, int unackScheduleInMS, int unackTime, JedisPool pool) {
this(Clock.systemDefaultZone(), redisKeyPrefix, queueName, shardName, unackScheduleInMS, unackTime, pool);
}

public RedisQueue(Clock clock, String redisKeyPrefix, String queueName, String shardName, int unackScheduleInMS, int unackTime, JedisPool pool) {
this.clock = clock;
this.queueName = queueName;
this.shardName = shardName;
this.messageStoreKeyPrefix = redisKeyPrefix + ".MESSAGE.";
Expand Down Expand Up @@ -150,7 +156,7 @@ public List<String> push(final List<Message> messages) {
String json = om.writeValueAsString(message);
pipe.hset(messageStoreKey(message.getId()), message.getId(), json);
double priority = message.getPriority() / 100.0;
double score = Long.valueOf(System.currentTimeMillis() + message.getTimeout()).doubleValue() + priority;
double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;
pipe.zadd(myQueueShard, score, message.getId());
}
pipe.sync();
Expand Down Expand Up @@ -237,7 +243,7 @@ public synchronized List<Message> pop(int messageCount, int wait, TimeUnit unit)

private List<Message> _pop(List<String> batch) throws Exception {

double unackScore = Long.valueOf(System.currentTimeMillis() + unackTime).doubleValue();
double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();

List<Message> popped = new LinkedList<>();
ZAddParams zParams = ZAddParams.zAddParams().nx();
Expand Down Expand Up @@ -371,7 +377,7 @@ public boolean setUnackTimeout(String messageId, long timeout) {

try {

double unackScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue();
double unackScore = Long.valueOf(clock.millis() + timeout).doubleValue();
Double score = jedis.zscore(unackShardKey(messageId), messageId);
if (score != null) {
jedis.zadd(unackShardKey(messageId), unackScore, messageId);
Expand Down Expand Up @@ -402,7 +408,7 @@ public boolean setTimeout(String messageId, long timeout) {
Double score = jedis.zscore(myQueueShard, messageId);
if (score != null) {
double priorityd = message.getPriority() / 100.0;
double newScore = Long.valueOf(System.currentTimeMillis() + timeout).doubleValue() + priorityd;
double newScore = Long.valueOf(clock.millis() + timeout).doubleValue() + priorityd;
jedis.zadd(myQueueShard, newScore, messageId);
json = om.writeValueAsString(message);
jedis.hset(messageStoreKey(message.getId()), message.getId(), json);
Expand Down Expand Up @@ -537,7 +543,7 @@ public void clear() {
private Set<String> peekIds(int offset, int count) {
Jedis jedis = connPool.getResource();
try {
double now = Long.valueOf(System.currentTimeMillis() + 1).doubleValue();
double now = Long.valueOf(clock.millis() + 1).doubleValue();
Set<String> scanned = jedis.zrangeByScore(myQueueShard, 0, now, offset, count);
return scanned;
} finally {
Expand Down Expand Up @@ -566,7 +572,7 @@ private void processUnacks(String unackShardKey) {

int batchSize = 1_000;

double now = Long.valueOf(System.currentTimeMillis()).doubleValue();
double now = Long.valueOf(clock.millis()).doubleValue();

Set<Tuple> unacks = jedis.zrangeByScoreWithScores(unackShardKey, 0, now, 0, batchSize);

Expand Down
Loading

0 comments on commit 29b9efc

Please sign in to comment.