Skip to content
This repository was archived by the owner on Dec 13, 2023. It is now read-only.

Commit 3e2c0ae

Browse files
committed
Merge branch 'dev' into revert-374-dev_optimize_upstream
2 parents bd09c5f + 84db22c commit 3e2c0ae

File tree

7 files changed

+81
-93
lines changed

7 files changed

+81
-93
lines changed

contribs/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ dependencies {
44
compile project(':conductor-core')
55

66
compile 'com.amazonaws:aws-java-sdk-sqs:latest.release'
7-
compile ('com.google.inject:guice:4.1+') { force= true}
7+
compile ('com.google.inject:guice:4.1.0')
88

99
compile 'net.thisptr:jackson-jq:+'
1010

core/build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ dependencies {
33
compile project(':conductor-common')
44

55
compile 'io.reactivex:rxjava:1.2.2'
6-
compile 'com.google.inject:guice:4.0+'
7-
compile 'com.google.inject.extensions:guice-multibindings:4.0'
6+
compile 'com.google.inject:guice:4.1.0'
7+
compile 'com.google.inject.extensions:guice-multibindings:4.1.0'
88
compile 'com.netflix.spectator:spectator-api:0.40.0'
99
compile 'com.netflix.eureka:eureka-client:latest.release'
1010
compile ('com.fasterxml.jackson.core:jackson-databind:2.7.5')

server/build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ dependencies {
3232

3333
//Guice
3434
compile 'com.sun.jersey.contribs:jersey-guice:1.19.+'
35-
compile 'com.google.inject:guice:4.+'
36-
compile 'com.google.inject.extensions:guice-servlet:4.1.+'
35+
compile 'com.google.inject:guice:4.1.0'
36+
compile 'com.google.inject.extensions:guice-servlet:4.1.0'
3737

3838
//Swagger
3939
compile 'io.swagger:swagger-jersey-jaxrs:1.5.0'

server/src/main/java/com/netflix/conductor/server/ConductorServer.java

+48-50
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.io.InputStream;
2222
import java.util.Arrays;
23-
import java.util.Collection;
2423
import java.util.EnumSet;
2524
import java.util.LinkedList;
2625
import java.util.List;
@@ -68,29 +67,29 @@ enum DB {
6867
redis, dynomite, memory, redis_cluster, mysql
6968
}
7069

71-
private ServerModule sm;
70+
private ServerModule serverModule;
7271

7372
private Server server;
7473

75-
private ConductorConfig cc;
74+
private ConductorConfig conductorConfig;
7675

77-
private DB db;
76+
private DB database;
7877

79-
public ConductorServer(ConductorConfig cc) {
80-
this.cc = cc;
81-
String dynoClusterName = cc.getProperty("workflow.dynomite.cluster.name", "");
78+
public ConductorServer(ConductorConfig conductorConfig) {
79+
this.conductorConfig = conductorConfig;
80+
String dynoClusterName = conductorConfig.getProperty("workflow.dynomite.cluster.name", "");
8281

8382
List<Host> dynoHosts = new LinkedList<>();
84-
String dbstring = cc.getProperty("db", "memory");
83+
String dbstring = conductorConfig.getProperty("db", "memory");
8584
try {
86-
db = DB.valueOf(dbstring);
85+
database = DB.valueOf(dbstring);
8786
}catch(IllegalArgumentException ie) {
8887
logger.error("Invalid db name: " + dbstring + ", supported values are: " + Arrays.toString(DB.values()));
8988
System.exit(1);
9089
}
9190

92-
if(!(db.equals(DB.memory) || db.equals(DB.mysql))) {
93-
String hosts = cc.getProperty("workflow.dynomite.cluster.hosts", null);
91+
if(!(database.equals(DB.memory) || database.equals(DB.mysql))) {
92+
String hosts = conductorConfig.getProperty("workflow.dynomite.cluster.hosts", null);
9493
if(hosts == null) {
9594
System.err.println("Missing dynomite/redis hosts. Ensure 'workflow.dynomite.cluster.hosts' has been set in the supplied configuration.");
9695
logger.error("Missing dynomite/redis hosts. Ensure 'workflow.dynomite.cluster.hosts' has been set in the supplied configuration.");
@@ -109,59 +108,41 @@ public ConductorServer(ConductorConfig cc) {
109108

110109
}else {
111110
//Create a single shard host supplier
112-
Host dynoHost = new Host("localhost", 0, cc.getAvailabilityZone(), Status.Up);
111+
Host dynoHost = new Host("localhost", 0, conductorConfig.getAvailabilityZone(), Status.Up);
113112
dynoHosts.add(dynoHost);
114113
}
115114
init(dynoClusterName, dynoHosts);
116115
}
117116

118117
private void init(String dynoClusterName, List<Host> dynoHosts) {
119-
HostSupplier hs = new HostSupplier() {
120-
121-
@Override
122-
public Collection<Host> getHosts() {
123-
return dynoHosts;
124-
}
125-
};
118+
HostSupplier hostSupplier = () -> dynoHosts;
126119

127120
JedisCommands jedis = null;
128121

129-
switch(db) {
122+
switch(database) {
130123
case redis:
131124
case dynomite:
132-
ConnectionPoolConfigurationImpl cp = new ConnectionPoolConfigurationImpl(dynoClusterName).withTokenSupplier(new TokenMapSupplier() {
133-
134-
HostToken token = new HostToken(1L, dynoHosts.get(0));
135-
136-
@Override
137-
public List<HostToken> getTokens(Set<Host> activeHosts) {
138-
return Arrays.asList(token);
139-
}
140-
141-
@Override
142-
public HostToken getTokenForHost(Host host, Set<Host> activeHosts) {
143-
return token;
144-
}
145-
146-
147-
}).setLocalRack(cc.getAvailabilityZone()).setLocalDataCenter(cc.getRegion());
148-
cp.setSocketTimeout(0);
149-
cp.setConnectTimeout(0);
150-
cp.setMaxConnsPerHost(cc.getIntProperty("workflow.dynomite.connection.maxConnsPerHost", 10));
151-
125+
ConnectionPoolConfigurationImpl connectionPoolConfiguration = new ConnectionPoolConfigurationImpl(dynoClusterName)
126+
.withTokenSupplier(getTokenMapSupplier(dynoHosts))
127+
.setLocalRack(conductorConfig.getAvailabilityZone())
128+
.setLocalDataCenter(conductorConfig.getRegion())
129+
.setSocketTimeout(0)
130+
.setConnectTimeout(0)
131+
.setMaxConnsPerHost(conductorConfig.getIntProperty("workflow.dynomite.connection.maxConnsPerHost", 10));
132+
152133
jedis = new DynoJedisClient.Builder()
153-
.withHostSupplier(hs)
154-
.withApplicationName(cc.getAppId())
155-
.withDynomiteClusterName(dynoClusterName)
156-
.withCPConfig(cp)
157-
.build();
134+
.withHostSupplier(hostSupplier)
135+
.withApplicationName(conductorConfig.getAppId())
136+
.withDynomiteClusterName(dynoClusterName)
137+
.withCPConfig(connectionPoolConfiguration)
138+
.build();
158139

159140
logger.info("Starting conductor server using dynomite/redis cluster " + dynoClusterName);
160141

161142
break;
162143

163144
case mysql:
164-
logger.info("Starting conductor server using MySQL data store", db);
145+
logger.info("Starting conductor server using MySQL data store", database);
165146
break;
166147
case memory:
167148
jedis = new JedisMock();
@@ -189,11 +170,28 @@ public HostToken getTokenForHost(Host host, Set<Host> activeHosts) {
189170
break;
190171
}
191172

192-
this.sm = new ServerModule(jedis, hs, cc, db);
173+
this.serverModule = new ServerModule(jedis, hostSupplier, conductorConfig, database);
193174
}
194-
175+
176+
private TokenMapSupplier getTokenMapSupplier(List<Host> dynoHosts) {
177+
return new TokenMapSupplier() {
178+
179+
HostToken token = new HostToken(1L, dynoHosts.get(0));
180+
181+
@Override
182+
public List<HostToken> getTokens(Set<Host> activeHosts) {
183+
return Arrays.asList(token);
184+
}
185+
186+
@Override
187+
public HostToken getTokenForHost(Host host, Set<Host> activeHosts) {
188+
return token;
189+
}
190+
};
191+
}
192+
195193
public ServerModule getGuiceModule() {
196-
return sm;
194+
return serverModule;
197195
}
198196

199197
public synchronized void start(int port, boolean join) throws Exception {
@@ -202,7 +200,7 @@ public synchronized void start(int port, boolean join) throws Exception {
202200
throw new IllegalStateException("Server is already running");
203201
}
204202

205-
Guice.createInjector(sm);
203+
Guice.createInjector(serverModule);
206204

207205
//Swagger
208206
String resourceBasePath = Main.class.getResource("/swagger-ui").toExternalForm();

server/src/main/java/com/netflix/conductor/server/ServerModule.java

+17-22
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.List;
2222
import java.util.concurrent.ExecutorService;
23-
import java.util.concurrent.ThreadFactory;
2423
import java.util.concurrent.atomic.AtomicInteger;
2524

2625
import com.google.inject.AbstractModule;
@@ -58,22 +57,22 @@ public class ServerModule extends AbstractModule {
5857

5958
private JedisCommands dynoConn;
6059

61-
private HostSupplier hs;
60+
private HostSupplier hostSupplier;
6261

6362
private String region;
6463

6564
private String localRack;
6665

67-
private ConductorConfig config;
66+
private ConductorConfig conductorConfig;
6867

6968
private ConductorServer.DB db;
7069

71-
public ServerModule(JedisCommands jedis, HostSupplier hs, ConductorConfig config, ConductorServer.DB db) {
70+
public ServerModule(JedisCommands jedis, HostSupplier hostSupplier, ConductorConfig conductorConfig, ConductorServer.DB db) {
7271
this.dynoConn = jedis;
73-
this.hs = hs;
74-
this.config = config;
75-
this.region = config.getRegion();
76-
this.localRack = config.getAvailabilityZone();
72+
this.hostSupplier = hostSupplier;
73+
this.conductorConfig = conductorConfig;
74+
this.region = conductorConfig.getRegion();
75+
this.localRack = conductorConfig.getAvailabilityZone();
7776
this.db = db;
7877

7978
}
@@ -83,15 +82,15 @@ protected void configure() {
8382

8483
configureExecutorService();
8584

86-
bind(Configuration.class).toInstance(config);
85+
bind(Configuration.class).toInstance(conductorConfig);
8786

8887
if (db == ConductorServer.DB.mysql) {
8988
install(new MySQLWorkflowModule());
9089
} else {
9190
String localDC = localRack;
9291
localDC = localDC.replaceAll(region, "");
93-
DynoShardSupplier ss = new DynoShardSupplier(hs, region, localDC);
94-
DynoQueueDAO queueDao = new DynoQueueDAO(dynoConn, dynoConn, ss, config);
92+
DynoShardSupplier ss = new DynoShardSupplier(hostSupplier, region, localDC);
93+
DynoQueueDAO queueDao = new DynoQueueDAO(dynoConn, dynoConn, ss, conductorConfig);
9594

9695
bind(MetadataDAO.class).to(RedisMetadataDAO.class);
9796
bind(ExecutionDAO.class).to(RedisExecutionDAO.class);
@@ -108,10 +107,10 @@ protected void configure() {
108107
install(new CoreModule());
109108
install(new JerseyModule());
110109

111-
new HttpTask(new RestClientManager(), config);
110+
new HttpTask(new RestClientManager(), conductorConfig);
112111
new JsonJqTransform();
113112

114-
List<AbstractModule> additionalModules = config.getAdditionalModules();
113+
List<AbstractModule> additionalModules = conductorConfig.getAdditionalModules();
115114
if(additionalModules != null) {
116115
for(AbstractModule additionalModule : additionalModules) {
117116
install(additionalModule);
@@ -126,14 +125,10 @@ public ExecutorService getExecutorService(){
126125

127126
private void configureExecutorService(){
128127
AtomicInteger count = new AtomicInteger(0);
129-
this.es = java.util.concurrent.Executors.newFixedThreadPool(maxThreads, new ThreadFactory() {
130-
131-
@Override
132-
public Thread newThread(Runnable r) {
133-
Thread t = new Thread(r);
134-
t.setName("conductor-worker-" + count.getAndIncrement());
135-
return t;
136-
}
137-
});
128+
this.es = java.util.concurrent.Executors.newFixedThreadPool(maxThreads, runnable -> {
129+
Thread conductorWorkerThread = new Thread(runnable);
130+
conductorWorkerThread.setName("conductor-worker-" + count.getAndIncrement());
131+
return conductorWorkerThread;
132+
});
138133
}
139134
}

test-harness/build.gradle

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ dependencies {
1616
testCompile 'org.rarefiedredis.redis:redis-java:0.0.17'
1717
testCompile 'com.sun.jersey.contribs:jersey-guice:1.19.+'
1818

19-
testCompile 'com.google.inject:guice:4.+'
20-
testCompile 'com.google.inject.extensions:guice-servlet:4.+'
19+
testCompile 'com.google.inject.extensions:guice-servlet:4.1.0'
2120
testCompile 'io.swagger:swagger-jersey-jaxrs:1.5.0'
2221
}
2322

test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java

+10-14
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.Arrays;
2222
import java.util.Set;
2323
import java.util.concurrent.ExecutorService;
24-
import java.util.concurrent.ThreadFactory;
2524
import java.util.concurrent.atomic.AtomicInteger;
2625
import java.util.stream.Collectors;
2726

@@ -51,18 +50,20 @@ public class TestModule extends AbstractModule {
5150

5251
private int maxThreads = 50;
5352

54-
private ExecutorService es;
53+
private ExecutorService executorService;
5554

5655
@Override
5756
protected void configure() {
57+
5858
System.setProperty("workflow.system.task.worker.callback.seconds", "0");
5959
System.setProperty("workflow.system.task.worker.queue.size", "10000");
6060
System.setProperty("workflow.system.task.worker.thread.count", "10");
61+
6162
configureExecutorService();
63+
6264
ConductorConfig config = new ConductorConfig();
6365
bind(Configuration.class).toInstance(config);
6466
JedisCommands jedisMock = new JedisMock();
65-
6667

6768
DynoQueueDAO queueDao = new DynoQueueDAO(jedisMock, jedisMock, new ShardSupplier() {
6869

@@ -86,24 +87,19 @@ public String getCurrentShard() {
8687
bind(DynoProxy.class).toInstance(proxy);
8788
install(new CoreModule());
8889
bind(UserTask.class).asEagerSingleton();
89-
9090
}
9191

9292
@Provides
9393
public ExecutorService getExecutorService(){
94-
return this.es;
94+
return this.executorService;
9595
}
9696

9797
private void configureExecutorService(){
9898
AtomicInteger count = new AtomicInteger(0);
99-
this.es = java.util.concurrent.Executors.newFixedThreadPool(maxThreads, new ThreadFactory() {
100-
101-
@Override
102-
public Thread newThread(Runnable r) {
103-
Thread t = new Thread(r);
104-
t.setName("workflow-worker-" + count.getAndIncrement());
105-
return t;
106-
}
107-
});
99+
this.executorService = java.util.concurrent.Executors.newFixedThreadPool(maxThreads, runnable -> {
100+
Thread workflowWorkerThread = new Thread(runnable);
101+
workflowWorkerThread.setName(String.format("workflow-worker-%d", count.getAndIncrement()));
102+
return workflowWorkerThread;
103+
});
108104
}
109105
}

0 commit comments

Comments
 (0)