Skip to content

Commit

Permalink
Change api from ThreadPoolExecutor to ExecutorService (#763)
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme authored Oct 9, 2024
1 parent 0c58fce commit c04fd2c
Show file tree
Hide file tree
Showing 17 changed files with 179 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@
import com.yelp.nrtsearch.server.config.ThreadPoolConfiguration;
import com.yelp.nrtsearch.server.monitoring.ThreadPoolCollector;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Static Factory to generate {@link ThreadPoolExecutor} as per the {@link ExecutorType} provided
* Static Factory to generate {@link java.util.concurrent.ExecutorService} as per the {@link
* ExecutorType} provided.
*/
public class ThreadPoolExecutorFactory {
public class ExecutorFactory {
public enum ExecutorType {
SEARCH,
INDEX,
Expand All @@ -38,20 +44,20 @@ public enum ExecutorType {
VECTORMERGE
}

private static final Logger logger = LoggerFactory.getLogger(ThreadPoolExecutorFactory.class);
private static final Logger logger = LoggerFactory.getLogger(ExecutorFactory.class);

private static ThreadPoolExecutorFactory instance;
private static ExecutorFactory instance;

private final ThreadPoolConfiguration threadPoolConfiguration;
private final Map<ExecutorType, ThreadPoolExecutor> executorMap = new ConcurrentHashMap<>();
private final Map<ExecutorType, ExecutorService> executorMap = new ConcurrentHashMap<>();

/**
* Initialize the factory with the provided {@link ThreadPoolConfiguration}.
*
* @param threadPoolConfiguration thread pool configuration
*/
public static void init(ThreadPoolConfiguration threadPoolConfiguration) {
instance = new ThreadPoolExecutorFactory(threadPoolConfiguration);
instance = new ExecutorFactory(threadPoolConfiguration);
}

/**
Expand All @@ -60,9 +66,9 @@ public static void init(ThreadPoolConfiguration threadPoolConfiguration) {
* @return instance of the factory
* @throws IllegalStateException if the factory is not initialized
*/
public static ThreadPoolExecutorFactory getInstance() {
public static ExecutorFactory getInstance() {
if (instance == null) {
throw new IllegalStateException("ThreadPoolExecutorFactory not initialized");
throw new IllegalStateException("ExecutorFactory not initialized");
}
return instance;
}
Expand All @@ -72,22 +78,22 @@ public static ThreadPoolExecutorFactory getInstance() {
*
* @param threadPoolConfiguration thread pool configuration
*/
public ThreadPoolExecutorFactory(ThreadPoolConfiguration threadPoolConfiguration) {
public ExecutorFactory(ThreadPoolConfiguration threadPoolConfiguration) {
this.threadPoolConfiguration = threadPoolConfiguration;
}

/**
* Get the {@link ThreadPoolExecutor} for the provided {@link ExecutorType}. The executor is
* cached, so subsequent calls with the same {@link ExecutorType} will return the same executor.
* Get the {@link ExecutorService} for the provided {@link ExecutorType}. The executor is cached,
* so subsequent calls with the same {@link ExecutorType} will return the same executor.
*
* @param executorType {@link ExecutorType}
* @return {@link ThreadPoolExecutor}
* @return {@link ExecutorService}
*/
public ThreadPoolExecutor getThreadPoolExecutor(ExecutorType executorType) {
return executorMap.computeIfAbsent(executorType, this::createThreadPoolExecutor);
public ExecutorService getExecutor(ExecutorType executorType) {
return executorMap.computeIfAbsent(executorType, this::createExecutor);
}

private ThreadPoolExecutor createThreadPoolExecutor(ExecutorType executorType) {
private ExecutorService createExecutor(ExecutorType executorType) {
ThreadPoolConfiguration.ThreadPoolSettings threadPoolSettings =
threadPoolConfiguration.getThreadPoolSettings(executorType);
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.yelp.nrtsearch.server.config;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.yelp.nrtsearch.server.concurrent.ThreadPoolExecutorFactory;
import com.yelp.nrtsearch.server.concurrent.ExecutorFactory;
import com.yelp.nrtsearch.server.utils.JsonUtils;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -56,56 +56,55 @@ public class ThreadPoolConfiguration {
Math.max(100, 2 * DEFAULT_VECTOR_MERGE_THREADS);

/**
* Settings for a {@link ThreadPoolExecutorFactory.ExecutorType}.
* Settings for a {@link ExecutorFactory.ExecutorType}.
*
* @param maxThreads max number of threads
* @param maxBufferedItems max number of buffered items
* @param threadNamePrefix prefix for thread names
*/
public record ThreadPoolSettings(int maxThreads, int maxBufferedItems, String threadNamePrefix) {}

private static final Map<ThreadPoolExecutorFactory.ExecutorType, ThreadPoolSettings>
private static final Map<ExecutorFactory.ExecutorType, ThreadPoolSettings>
defaultThreadPoolSettings =
Map.of(
ThreadPoolExecutorFactory.ExecutorType.SEARCH,
ExecutorFactory.ExecutorType.SEARCH,
new ThreadPoolSettings(
DEFAULT_SEARCHING_THREADS, DEFAULT_SEARCH_BUFFERED_ITEMS, "LuceneSearchExecutor"),
ThreadPoolExecutorFactory.ExecutorType.INDEX,
ExecutorFactory.ExecutorType.INDEX,
new ThreadPoolSettings(
DEFAULT_INDEXING_THREADS,
DEFAULT_INDEXING_BUFFERED_ITEMS,
"LuceneIndexingExecutor"),
ThreadPoolExecutorFactory.ExecutorType.LUCENESERVER,
ExecutorFactory.ExecutorType.LUCENESERVER,
new ThreadPoolSettings(
DEFAULT_GRPC_LUCENESERVER_THREADS,
DEFAULT_GRPC_LUCENESERVER_BUFFERED_ITEMS,
"GrpcLuceneServerExecutor"),
ThreadPoolExecutorFactory.ExecutorType.REPLICATIONSERVER,
ExecutorFactory.ExecutorType.REPLICATIONSERVER,
new ThreadPoolSettings(
DEFAULT_GRPC_REPLICATIONSERVER_THREADS,
DEFAULT_GRPC_REPLICATIONSERVER_BUFFERED_ITEMS,
"GrpcReplicationServerExecutor"),
ThreadPoolExecutorFactory.ExecutorType.FETCH,
ExecutorFactory.ExecutorType.FETCH,
new ThreadPoolSettings(
DEFAULT_FETCH_THREADS, DEFAULT_FETCH_BUFFERED_ITEMS, "LuceneFetchExecutor"),
ThreadPoolExecutorFactory.ExecutorType.GRPC,
ExecutorFactory.ExecutorType.GRPC,
new ThreadPoolSettings(
DEFAULT_GRPC_THREADS, DEFAULT_GRPC_BUFFERED_ITEMS, "GrpcExecutor"),
ThreadPoolExecutorFactory.ExecutorType.METRICS,
ExecutorFactory.ExecutorType.METRICS,
new ThreadPoolSettings(
DEFAULT_METRICS_THREADS, DEFAULT_METRICS_BUFFERED_ITEMS, "MetricsExecutor"),
ThreadPoolExecutorFactory.ExecutorType.VECTORMERGE,
ExecutorFactory.ExecutorType.VECTORMERGE,
new ThreadPoolSettings(
DEFAULT_VECTOR_MERGE_THREADS,
DEFAULT_VECTOR_MERGE_BUFFERED_ITEMS,
"VectorMergeExecutor"));

private final Map<ThreadPoolExecutorFactory.ExecutorType, ThreadPoolSettings> threadPoolSettings;
private final Map<ExecutorFactory.ExecutorType, ThreadPoolSettings> threadPoolSettings;

public ThreadPoolConfiguration(YamlConfigReader configReader) {
threadPoolSettings = new HashMap<>();
for (ThreadPoolExecutorFactory.ExecutorType executorType :
ThreadPoolExecutorFactory.ExecutorType.values()) {
for (ExecutorFactory.ExecutorType executorType : ExecutorFactory.ExecutorType.values()) {
ThreadPoolSettings defaultSettings = defaultThreadPoolSettings.get(executorType);
String poolConfigPrefix = CONFIG_PREFIX + executorType.name().toLowerCase() + ".";
int maxThreads =
Expand Down Expand Up @@ -175,8 +174,7 @@ static int getNumThreads(YamlConfigReader configReader, String key, int defaultV
defaultValue);
}

public ThreadPoolSettings getThreadPoolSettings(
ThreadPoolExecutorFactory.ExecutorType executorType) {
public ThreadPoolSettings getThreadPoolSettings(ExecutorFactory.ExecutorType executorType) {
return threadPoolSettings.get(executorType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.yelp.nrtsearch.server.concurrent.ThreadPoolExecutorFactory;
import com.yelp.nrtsearch.server.concurrent.ExecutorFactory;
import com.yelp.nrtsearch.server.doc.LoadedDocValues;
import com.yelp.nrtsearch.server.doc.LoadedDocValues.SingleSearchVector;
import com.yelp.nrtsearch.server.doc.LoadedDocValues.SingleVector;
Expand Down Expand Up @@ -143,8 +143,7 @@ private static KnnVectorsFormat createVectorsFormat(
vectorIndexingOptions.hasMergeWorkers() ? vectorIndexingOptions.getMergeWorkers() : 1;
ExecutorService executorService =
mergeWorkers > 1
? ThreadPoolExecutorFactory.getInstance()
.getThreadPoolExecutor(ThreadPoolExecutorFactory.ExecutorType.VECTORMERGE)
? ExecutorFactory.getInstance().getExecutor(ExecutorFactory.ExecutorType.VECTORMERGE)
: null;
KnnVectorsFormat vectorsFormat =
switch (vectorSearchType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
*/
package com.yelp.nrtsearch.server.grpc;

import com.yelp.nrtsearch.server.concurrent.ThreadPoolExecutorFactory;
import com.yelp.nrtsearch.server.concurrent.ExecutorFactory;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallExecutorSupplier;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;

/**
* GrpcServerExecutorSupplier provides the thread pool executors for the gRPC server. It provides
* LuceneServer executor for all methods except metrics, for which it provides a separate executor
* so that even if all LuceneServer threads are occupied by search/index requests, metrics requests
* will still keep working. The {@link ServerCallExecutorSupplier} is an experimental API, see these
* links for more details:
* GrpcServerExecutorSupplier provides the executors for the gRPC server. It provides the server
* executor for all methods except metrics, for which it provides a separate executor so that even
* if all server threads are occupied by search/index requests, metrics requests will still keep
* working. The {@link ServerCallExecutorSupplier} is an experimental API, see these links for more
* details:
*
* <ul>
* <li><a href="https://github.com/grpc/grpc-java/issues/7874">grpc-java#7874</a>
Expand All @@ -38,40 +38,36 @@
*/
public class GrpcServerExecutorSupplier implements ServerCallExecutorSupplier {

private final ThreadPoolExecutor luceneServerThreadPoolExecutor;
private final ThreadPoolExecutor metricsThreadPoolExecutor;
private final ThreadPoolExecutor grpcThreadPoolExecutor;
private final ExecutorService serverExecutor;
private final ExecutorService metricsExecutor;
private final ExecutorService grpcExecutor;

public GrpcServerExecutorSupplier() {
luceneServerThreadPoolExecutor =
ThreadPoolExecutorFactory.getInstance()
.getThreadPoolExecutor(ThreadPoolExecutorFactory.ExecutorType.LUCENESERVER);
metricsThreadPoolExecutor =
ThreadPoolExecutorFactory.getInstance()
.getThreadPoolExecutor(ThreadPoolExecutorFactory.ExecutorType.METRICS);
grpcThreadPoolExecutor =
ThreadPoolExecutorFactory.getInstance()
.getThreadPoolExecutor(ThreadPoolExecutorFactory.ExecutorType.GRPC);
serverExecutor =
ExecutorFactory.getInstance().getExecutor(ExecutorFactory.ExecutorType.LUCENESERVER);
metricsExecutor =
ExecutorFactory.getInstance().getExecutor(ExecutorFactory.ExecutorType.METRICS);
grpcExecutor = ExecutorFactory.getInstance().getExecutor(ExecutorFactory.ExecutorType.GRPC);
}

public ThreadPoolExecutor getLuceneServerThreadPoolExecutor() {
return luceneServerThreadPoolExecutor;
public ExecutorService getServerExecutor() {
return serverExecutor;
}

public ThreadPoolExecutor getMetricsThreadPoolExecutor() {
return metricsThreadPoolExecutor;
public ExecutorService getMetricsExecutor() {
return metricsExecutor;
}

public ThreadPoolExecutor getGrpcThreadPoolExecutor() {
return grpcThreadPoolExecutor;
public ExecutorService getGrpcExecutor() {
return grpcExecutor;
}

@Nullable
@Override
public <ReqT, RespT> Executor getExecutor(ServerCall<ReqT, RespT> serverCall, Metadata metadata) {
if ("metrics".equals(serverCall.getMethodDescriptor().getBareMethodName())) {
return metricsThreadPoolExecutor;
return metricsExecutor;
}
return luceneServerThreadPoolExecutor;
return serverExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.google.protobuf.Empty;
import com.google.protobuf.util.JsonFormat;
import com.yelp.nrtsearch.server.analysis.AnalyzerCreator;
import com.yelp.nrtsearch.server.concurrent.ThreadPoolExecutorFactory;
import com.yelp.nrtsearch.server.concurrent.ExecutorFactory;
import com.yelp.nrtsearch.server.config.NrtsearchConfig;
import com.yelp.nrtsearch.server.config.QueryCacheConfig;
import com.yelp.nrtsearch.server.custom.request.CustomRequestProcessor;
Expand Down Expand Up @@ -162,9 +162,8 @@ public void start() throws IOException {
new ReplicationServerImpl(
globalState, luceneServerConfiguration.getVerifyReplicationIndexId()))
.executor(
ThreadPoolExecutorFactory.getInstance()
.getThreadPoolExecutor(
ThreadPoolExecutorFactory.ExecutorType.REPLICATIONSERVER))
ExecutorFactory.getInstance()
.getExecutor(ExecutorFactory.ExecutorType.REPLICATIONSERVER))
.maxInboundMessageSize(MAX_MESSAGE_BYTES_SIZE)
.maxConcurrentCallsPerConnection(
luceneServerConfiguration.getMaxConcurrentCallsPerConnectionForReplication())
Expand All @@ -182,9 +181,8 @@ public void start() throws IOException {
new ReplicationServerImpl(
globalState, luceneServerConfiguration.getVerifyReplicationIndexId()))
.executor(
ThreadPoolExecutorFactory.getInstance()
.getThreadPoolExecutor(
ThreadPoolExecutorFactory.ExecutorType.REPLICATIONSERVER))
ExecutorFactory.getInstance()
.getExecutor(ExecutorFactory.ExecutorType.REPLICATIONSERVER))
.maxInboundMessageSize(MAX_MESSAGE_BYTES_SIZE)
.build()
.start();
Expand Down Expand Up @@ -212,7 +210,7 @@ public void start() throws IOException {
.callExecutor(executorSupplier)
// We still need this executor to run tasks before the point when executorSupplier can
// be called (https://github.com/grpc/grpc-java/issues/8274)
.executor(executorSupplier.getGrpcThreadPoolExecutor())
.executor(executorSupplier.getGrpcExecutor())
.maxInboundMessageSize(MAX_MESSAGE_BYTES_SIZE)
.compressorRegistry(LuceneServerStubBuilder.COMPRESSOR_REGISTRY)
.decompressorRegistry(LuceneServerStubBuilder.DECOMPRESSOR_REGISTRY)
Expand Down Expand Up @@ -374,7 +372,7 @@ static class LuceneServerImpl extends LuceneServerGrpc.LuceneServerImplBase {
DeadlineUtils.setCancellationEnabled(configuration.getDeadlineCancellation());
CompletionPostingsFormatUtil.setCompletionCodecLoadMode(
configuration.getCompletionCodecLoadMode());
ThreadPoolExecutorFactory.init(configuration.getThreadPoolConfiguration());
ExecutorFactory.init(configuration.getThreadPoolConfiguration());

initQueryCache(configuration);
initExtendableComponents(configuration, plugins);
Expand Down
Loading

0 comments on commit c04fd2c

Please sign in to comment.