Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #442 from Netflix/cleanup
Browse files Browse the repository at this point in the history
Pinned Guice dependecy to 4.1.0
  • Loading branch information
pctreddy authored Mar 1, 2018
2 parents db7dbdd + b57adc7 commit 84db22c
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 93 deletions.
2 changes: 1 addition & 1 deletion contribs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ dependencies {
compile project(':conductor-core')

compile 'com.amazonaws:aws-java-sdk-sqs:latest.release'
compile ('com.google.inject:guice:4.1+') { force= true}
compile ('com.google.inject:guice:4.1.0')

compile 'net.thisptr:jackson-jq:+'

Expand Down
4 changes: 2 additions & 2 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ dependencies {
compile project(':conductor-common')

compile 'io.reactivex:rxjava:1.2.2'
compile 'com.google.inject:guice:4.0+'
compile 'com.google.inject.extensions:guice-multibindings:4.0'
compile 'com.google.inject:guice:4.1.0'
compile 'com.google.inject.extensions:guice-multibindings:4.1.0'
compile 'com.netflix.spectator:spectator-api:0.40.0'
compile 'com.netflix.eureka:eureka-client:latest.release'
compile ('com.fasterxml.jackson.core:jackson-databind:2.7.5')
Expand Down
4 changes: 2 additions & 2 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ dependencies {

//Guice
compile 'com.sun.jersey.contribs:jersey-guice:1.19.+'
compile 'com.google.inject:guice:4.+'
compile 'com.google.inject.extensions:guice-servlet:4.1.+'
compile 'com.google.inject:guice:4.1.0'
compile 'com.google.inject.extensions:guice-servlet:4.1.0'

//Swagger
compile 'io.swagger:swagger-jersey-jaxrs:1.5.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -68,29 +67,29 @@ enum DB {
redis, dynomite, memory, redis_cluster, mysql
}

private ServerModule sm;
private ServerModule serverModule;

private Server server;

private ConductorConfig cc;
private ConductorConfig conductorConfig;

private DB db;
private DB database;

public ConductorServer(ConductorConfig cc) {
this.cc = cc;
String dynoClusterName = cc.getProperty("workflow.dynomite.cluster.name", "");
public ConductorServer(ConductorConfig conductorConfig) {
this.conductorConfig = conductorConfig;
String dynoClusterName = conductorConfig.getProperty("workflow.dynomite.cluster.name", "");

List<Host> dynoHosts = new LinkedList<>();
String dbstring = cc.getProperty("db", "memory");
String dbstring = conductorConfig.getProperty("db", "memory");
try {
db = DB.valueOf(dbstring);
database = DB.valueOf(dbstring);
}catch(IllegalArgumentException ie) {
logger.error("Invalid db name: " + dbstring + ", supported values are: " + Arrays.toString(DB.values()));
System.exit(1);
}

if(!(db.equals(DB.memory) || db.equals(DB.mysql))) {
String hosts = cc.getProperty("workflow.dynomite.cluster.hosts", null);
if(!(database.equals(DB.memory) || database.equals(DB.mysql))) {
String hosts = conductorConfig.getProperty("workflow.dynomite.cluster.hosts", null);
if(hosts == null) {
System.err.println("Missing dynomite/redis hosts. Ensure 'workflow.dynomite.cluster.hosts' has been set in the supplied configuration.");
logger.error("Missing dynomite/redis hosts. Ensure 'workflow.dynomite.cluster.hosts' has been set in the supplied configuration.");
Expand All @@ -109,59 +108,41 @@ public ConductorServer(ConductorConfig cc) {

}else {
//Create a single shard host supplier
Host dynoHost = new Host("localhost", 0, cc.getAvailabilityZone(), Status.Up);
Host dynoHost = new Host("localhost", 0, conductorConfig.getAvailabilityZone(), Status.Up);
dynoHosts.add(dynoHost);
}
init(dynoClusterName, dynoHosts);
}

private void init(String dynoClusterName, List<Host> dynoHosts) {
HostSupplier hs = new HostSupplier() {

@Override
public Collection<Host> getHosts() {
return dynoHosts;
}
};
HostSupplier hostSupplier = () -> dynoHosts;

JedisCommands jedis = null;

switch(db) {
switch(database) {
case redis:
case dynomite:
ConnectionPoolConfigurationImpl cp = new ConnectionPoolConfigurationImpl(dynoClusterName).withTokenSupplier(new TokenMapSupplier() {

HostToken token = new HostToken(1L, dynoHosts.get(0));

@Override
public List<HostToken> getTokens(Set<Host> activeHosts) {
return Arrays.asList(token);
}

@Override
public HostToken getTokenForHost(Host host, Set<Host> activeHosts) {
return token;
}


}).setLocalRack(cc.getAvailabilityZone()).setLocalDataCenter(cc.getRegion());
cp.setSocketTimeout(0);
cp.setConnectTimeout(0);
cp.setMaxConnsPerHost(cc.getIntProperty("workflow.dynomite.connection.maxConnsPerHost", 10));

ConnectionPoolConfigurationImpl connectionPoolConfiguration = new ConnectionPoolConfigurationImpl(dynoClusterName)
.withTokenSupplier(getTokenMapSupplier(dynoHosts))
.setLocalRack(conductorConfig.getAvailabilityZone())
.setLocalDataCenter(conductorConfig.getRegion())
.setSocketTimeout(0)
.setConnectTimeout(0)
.setMaxConnsPerHost(conductorConfig.getIntProperty("workflow.dynomite.connection.maxConnsPerHost", 10));

jedis = new DynoJedisClient.Builder()
.withHostSupplier(hs)
.withApplicationName(cc.getAppId())
.withDynomiteClusterName(dynoClusterName)
.withCPConfig(cp)
.build();
.withHostSupplier(hostSupplier)
.withApplicationName(conductorConfig.getAppId())
.withDynomiteClusterName(dynoClusterName)
.withCPConfig(connectionPoolConfiguration)
.build();

logger.info("Starting conductor server using dynomite/redis cluster " + dynoClusterName);

break;

case mysql:
logger.info("Starting conductor server using MySQL data store", db);
logger.info("Starting conductor server using MySQL data store", database);
break;
case memory:
jedis = new JedisMock();
Expand Down Expand Up @@ -189,11 +170,28 @@ public HostToken getTokenForHost(Host host, Set<Host> activeHosts) {
break;
}

this.sm = new ServerModule(jedis, hs, cc, db);
this.serverModule = new ServerModule(jedis, hostSupplier, conductorConfig, database);
}


private TokenMapSupplier getTokenMapSupplier(List<Host> dynoHosts) {
return new TokenMapSupplier() {

HostToken token = new HostToken(1L, dynoHosts.get(0));

@Override
public List<HostToken> getTokens(Set<Host> activeHosts) {
return Arrays.asList(token);
}

@Override
public HostToken getTokenForHost(Host host, Set<Host> activeHosts) {
return token;
}
};
}

public ServerModule getGuiceModule() {
return sm;
return serverModule;
}

public synchronized void start(int port, boolean join) throws Exception {
Expand All @@ -202,7 +200,7 @@ public synchronized void start(int port, boolean join) throws Exception {
throw new IllegalStateException("Server is already running");
}

Guice.createInjector(sm);
Guice.createInjector(serverModule);

//Swagger
String resourceBasePath = Main.class.getResource("/swagger-ui").toExternalForm();
Expand Down
39 changes: 17 additions & 22 deletions server/src/main/java/com/netflix/conductor/server/ServerModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.inject.AbstractModule;
Expand Down Expand Up @@ -58,22 +57,22 @@ public class ServerModule extends AbstractModule {

private JedisCommands dynoConn;

private HostSupplier hs;
private HostSupplier hostSupplier;

private String region;

private String localRack;

private ConductorConfig config;
private ConductorConfig conductorConfig;

private ConductorServer.DB db;

public ServerModule(JedisCommands jedis, HostSupplier hs, ConductorConfig config, ConductorServer.DB db) {
public ServerModule(JedisCommands jedis, HostSupplier hostSupplier, ConductorConfig conductorConfig, ConductorServer.DB db) {
this.dynoConn = jedis;
this.hs = hs;
this.config = config;
this.region = config.getRegion();
this.localRack = config.getAvailabilityZone();
this.hostSupplier = hostSupplier;
this.conductorConfig = conductorConfig;
this.region = conductorConfig.getRegion();
this.localRack = conductorConfig.getAvailabilityZone();
this.db = db;

}
Expand All @@ -83,15 +82,15 @@ protected void configure() {

configureExecutorService();

bind(Configuration.class).toInstance(config);
bind(Configuration.class).toInstance(conductorConfig);

if (db == ConductorServer.DB.mysql) {
install(new MySQLWorkflowModule());
} else {
String localDC = localRack;
localDC = localDC.replaceAll(region, "");
DynoShardSupplier ss = new DynoShardSupplier(hs, region, localDC);
DynoQueueDAO queueDao = new DynoQueueDAO(dynoConn, dynoConn, ss, config);
DynoShardSupplier ss = new DynoShardSupplier(hostSupplier, region, localDC);
DynoQueueDAO queueDao = new DynoQueueDAO(dynoConn, dynoConn, ss, conductorConfig);

bind(MetadataDAO.class).to(RedisMetadataDAO.class);
bind(ExecutionDAO.class).to(RedisExecutionDAO.class);
Expand All @@ -108,10 +107,10 @@ protected void configure() {
install(new CoreModule());
install(new JerseyModule());

new HttpTask(new RestClientManager(), config);
new HttpTask(new RestClientManager(), conductorConfig);
new JsonJqTransform();

List<AbstractModule> additionalModules = config.getAdditionalModules();
List<AbstractModule> additionalModules = conductorConfig.getAdditionalModules();
if(additionalModules != null) {
for(AbstractModule additionalModule : additionalModules) {
install(additionalModule);
Expand All @@ -126,14 +125,10 @@ public ExecutorService getExecutorService(){

private void configureExecutorService(){
AtomicInteger count = new AtomicInteger(0);
this.es = java.util.concurrent.Executors.newFixedThreadPool(maxThreads, new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("conductor-worker-" + count.getAndIncrement());
return t;
}
});
this.es = java.util.concurrent.Executors.newFixedThreadPool(maxThreads, runnable -> {
Thread conductorWorkerThread = new Thread(runnable);
conductorWorkerThread.setName("conductor-worker-" + count.getAndIncrement());
return conductorWorkerThread;
});
}
}
3 changes: 1 addition & 2 deletions test-harness/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ dependencies {
testCompile 'org.rarefiedredis.redis:redis-java:0.0.17'
testCompile 'com.sun.jersey.contribs:jersey-guice:1.19.+'

testCompile 'com.google.inject:guice:4.+'
testCompile 'com.google.inject.extensions:guice-servlet:4.+'
testCompile 'com.google.inject.extensions:guice-servlet:4.1.0'
testCompile 'io.swagger:swagger-jersey-jaxrs:1.5.0'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -51,18 +50,20 @@ public class TestModule extends AbstractModule {

private int maxThreads = 50;

private ExecutorService es;
private ExecutorService executorService;

@Override
protected void configure() {

System.setProperty("workflow.system.task.worker.callback.seconds", "0");
System.setProperty("workflow.system.task.worker.queue.size", "10000");
System.setProperty("workflow.system.task.worker.thread.count", "10");

configureExecutorService();

ConductorConfig config = new ConductorConfig();
bind(Configuration.class).toInstance(config);
JedisCommands jedisMock = new JedisMock();


DynoQueueDAO queueDao = new DynoQueueDAO(jedisMock, jedisMock, new ShardSupplier() {

Expand All @@ -86,24 +87,19 @@ public String getCurrentShard() {
bind(DynoProxy.class).toInstance(proxy);
install(new CoreModule());
bind(UserTask.class).asEagerSingleton();

}

@Provides
public ExecutorService getExecutorService(){
return this.es;
return this.executorService;
}

private void configureExecutorService(){
AtomicInteger count = new AtomicInteger(0);
this.es = java.util.concurrent.Executors.newFixedThreadPool(maxThreads, new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("workflow-worker-" + count.getAndIncrement());
return t;
}
});
this.executorService = java.util.concurrent.Executors.newFixedThreadPool(maxThreads, runnable -> {
Thread workflowWorkerThread = new Thread(runnable);
workflowWorkerThread.setName(String.format("workflow-worker-%d", count.getAndIncrement()));
return workflowWorkerThread;
});
}
}

0 comments on commit 84db22c

Please sign in to comment.