Skip to content

Commit

Permalink
refactored for the new pipeline based queues
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n committed Feb 11, 2018
1 parent 29b9efc commit 267567b
Show file tree
Hide file tree
Showing 17 changed files with 793 additions and 288 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ redis-3.0.7.tar.gz
*.iml
.idea
.gradle
.classpath
.project
6 changes: 4 additions & 2 deletions dyno-queues-redis/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ dependencies {
compile project(':dyno-queues-core')

compile "com.google.inject:guice:3.0"
compile 'com.netflix.dyno:dyno-core:1.5.9'
compile "com.netflix.dyno:dyno-jedis:1.5.9"

compile 'com.netflix.dyno:dyno-core:1.6.2'
compile "com.netflix.dyno:dyno-jedis:1.6.2"

compile "com.netflix.archaius:archaius-core:0.7.5"
compile "com.netflix.servo:servo-core:0.12.17"
compile 'com.netflix.eureka:eureka-client:1.8.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,7 @@
*/
package com.netflix.dyno.queues.redis;

import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.netflix.appinfo.AmazonInfo;
import com.netflix.appinfo.AmazonInfo.MetaDataKey;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.shared.Application;
import com.netflix.dyno.connectionpool.Host;
import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -42,6 +25,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import com.netflix.dyno.queues.DynoQueue;
import com.netflix.dyno.queues.Message;

/**
* @author Viren
*
Expand Down Expand Up @@ -214,207 +200,5 @@ private String getNextShard() {
String s = shards.get(indx);
return s;
}


public static class Builder {

private Clock clock;

private String queueName;

private EurekaClient ec;

private String dynomiteClusterName;

private String redisKeyPrefix;

private int unackTime;

private String currentShard;

private Function<Host, String> hostToShardMap;

private int redisPoolSize;

private int quorumPort;

private int nonQuorumPort;

private List<Host> hosts;

/**
* @param clock the Clock instance to set
* @return instance of builder
*/
public Builder setClock(Clock clock) {
this.clock = clock;
return this;
}

/**
* @param queueName the queueName to set
* @return instance of builder
*/
public Builder setQueueName(String queueName) {
this.queueName = queueName;
return this;
}

/**
* @param ec the ec to set
* @return instance of builder
*/
public Builder setEc(EurekaClient ec) {
this.ec = ec;
return this;
}

/**
* @param dynomiteClusterName the dynomiteClusterName to set
* @return instance of builder
*/
public Builder setDynomiteClusterName(String dynomiteClusterName) {
this.dynomiteClusterName = dynomiteClusterName;
return this;
}

/**
* @param redisKeyPrefix the redisKeyPrefix to set
* @return instance of builder
*/
public Builder setRedisKeyPrefix(String redisKeyPrefix) {
this.redisKeyPrefix = redisKeyPrefix;
return this;
}

/**
* @param unackTime the unackTime to set
* @return instance of builder
*/
public Builder setUnackTime(int unackTime) {
this.unackTime = unackTime;
return this;
}

/**
* @param currentShard the currentShard to set
* @return instance of builder
*/
public Builder setCurrentShard(String currentShard) {
this.currentShard = currentShard;
return this;
}

/**
* @param hostToShardMap the hostToShardMap to set
* @return instance of builder
*/
public Builder setHostToShardMap(Function<Host, String> hostToShardMap) {
this.hostToShardMap = hostToShardMap;
return this;
}

/**
* @param redisPoolSize the redisPoolSize to set
* @return instance of builder
*/
public Builder setRedisPoolSize(int redisPoolSize) {
this.redisPoolSize = redisPoolSize;
return this;
}

/**
* @param quorumPort the quorumPort to set
* @return instance of builder
*/
public Builder setQuorumPort(int quorumPort) {
this.quorumPort = quorumPort;
return this;
}

/**
* @param nonQuorumPort the nonQuorumPort to set
* @return instance of builder
*/
public Builder setNonQuorumPort(int nonQuorumPort) {
this.nonQuorumPort = nonQuorumPort;
return this;
}

public Builder setHosts(List<Host> hosts) {
this.hosts = hosts;
return this;
}

public MultiRedisQueue build() {
if (clock == null) {
clock = Clock.systemDefaultZone();
}
if(hosts == null) {
hosts = getHostsFromEureka(ec, dynomiteClusterName);
}
Map<String, Host> shardMap = new HashMap<>();
for(Host host : hosts) {
String shard = hostToShardMap.apply(host);
shardMap.put(shard, host);
}

JedisPoolConfig config = new JedisPoolConfig();
config.setTestOnBorrow(true);
config.setTestOnCreate(true);
config.setMaxTotal(redisPoolSize);
config.setMaxIdle(5);
config.setMaxWaitMillis(60_000);

Map<String, RedisQueue> queues = new HashMap<>();
for(String queueShard : shardMap.keySet()) {
String host = shardMap.get(queueShard).getIpAddress();

JedisPool pool = new JedisPool(config, host, quorumPort, 0);
JedisPool readPool = new JedisPool(config, host, nonQuorumPort, 0);

RedisQueue q = new RedisQueue(clock, redisKeyPrefix, queueName, queueShard, unackTime, unackTime, pool);
q.setNonQuorumPool(readPool);
queues.put(queueShard, q);
}
MultiRedisQueue queue = new MultiRedisQueue(queueName, currentShard, queues);
return queue;
}

private static List<Host> getHostsFromEureka(EurekaClient ec, String applicationName) {

Application app = ec.getApplication(applicationName);
List<Host> hosts = new ArrayList<Host>();

if (app == null) {
return hosts;
}

List<InstanceInfo> ins = app.getInstances();

if (ins == null || ins.isEmpty()) {
return hosts;
}

hosts = Lists.newArrayList(Collections2.transform(ins,

new Function<InstanceInfo, Host>() {
@Override
public Host apply(InstanceInfo info) {

Host.Status status = info.getStatus() == InstanceStatus.UP ? Host.Status.Up : Host.Status.Down;
String rack = null;
if (info.getDataCenterInfo() instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo)info.getDataCenterInfo();
rack = amazonInfo.get(MetaDataKey.availabilityZone);
}
Host host = new Host(info.getHostName(), info.getIPAddr(), rack, status);
return host;
}
}));
return hosts;
}
}


}
Loading

0 comments on commit 267567b

Please sign in to comment.