Skip to content

Commit

Permalink
Move parallel fetch config into index live settings (#740)
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme authored Oct 3, 2024
1 parent 81575b2 commit 14e4421
Show file tree
Hide file tree
Showing 11 changed files with 781 additions and 587 deletions.
4 changes: 4 additions & 0 deletions clientlib/src/main/proto/yelp/nrtsearch/luceneserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,10 @@ message IndexLiveSettings {
google.protobuf.UInt64Value maxMergePreCopyDurationSec = 14;
// Collect and publish additional index metrics, which may be more expensive in terms of volume, memory and/or compute, default: false
google.protobuf.BoolValue verboseMetrics = 15;
// If fetch parallelism should be done by groups of fields instead of document, default: false
google.protobuf.BoolValue parallelFetchByField = 16;
// The number of documents/fields per parallel fetch task, default: 50
google.protobuf.Int32Value parallelFetchChunkSize = 17;
}

message IndexStateInfo {
Expand Down
19 changes: 18 additions & 1 deletion docs/index_live_settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,21 @@ Specifies the maximum time to wait for replicas to precopy merged segment files.

Must be >= 0

Default: 0
Default: 0

parallelFetchByField
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When using parallelism to fetch field values, this setting determines if the work should be divided by fields or by documents.

Default: false (divide by documents)


parallelFetchChunkSize
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When using parallelism to fetch field values, this setting determines the maximum number of fields/documents to process in a single task.

Must be > 0

Default: 50
1,061 changes: 546 additions & 515 deletions grpc-gateway/luceneserver.pb.go

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions grpc-gateway/luceneserver.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,21 @@
"required": false,
"type": "boolean"
},
{
"name": "liveSettings.parallelFetchByField",
"description": "If fetch parallelism should be done by groups of fields instead of document, default: false",
"in": "query",
"required": false,
"type": "boolean"
},
{
"name": "liveSettings.parallelFetchChunkSize",
"description": "The number of documents/fields per parallel fetch task, default: 50",
"in": "query",
"required": false,
"type": "integer",
"format": "int32"
},
{
"name": "local",
"description": "When set to true, live settings changes are only applied to the local node. These changes are ephemeral, so will not persist through a restart. Also, the live settings returned in the response will contain the local overrides only when this flag is true.",
Expand Down Expand Up @@ -3531,6 +3546,15 @@
"verboseMetrics": {
"type": "boolean",
"title": "Collect and publish additional index metrics, which may be more expensive in terms of volume, memory and/or compute, default: false"
},
"parallelFetchByField": {
"type": "boolean",
"title": "If fetch parallelism should be done by groups of fields instead of document, default: false"
},
"parallelFetchChunkSize": {
"type": "integer",
"format": "int32",
"title": "The number of documents/fields per parallel fetch task, default: 50"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public class ThreadPoolConfiguration {

public static final int DEFAULT_FETCH_THREADS = 1;
public static final int DEFAULT_FETCH_BUFFERED_ITEMS = DEFAULT_SEARCH_BUFFERED_ITEMS;
public static final int DEFAULT_MIN_PARALLEL_FETCH_NUM_FIELDS = 20;
public static final int DEFAULT_MIN_PARALLEL_FETCH_NUM_HITS = 50;

public static final int DEFAULT_GRPC_THREADS = AVAILABLE_PROCESSORS * 2;
public static final int DEFAULT_GRPC_BUFFERED_ITEMS = 8;
Expand All @@ -57,10 +55,6 @@ public class ThreadPoolConfiguration {
public static final int DEFAULT_VECTOR_MERGE_BUFFERED_ITEMS =
Math.max(100, 2 * DEFAULT_VECTOR_MERGE_THREADS);

private final int minParallelFetchNumFields;
private final int minParallelFetchNumHits;
private final boolean parallelFetchByField;

/**
* Settings for a {@link com.yelp.nrtsearch.server.utils.ThreadPoolExecutorFactory.ExecutorType}.
*
Expand Down Expand Up @@ -126,17 +120,6 @@ public ThreadPoolConfiguration(YamlConfigReader configReader) {
threadPoolSettings.put(
executorType, new ThreadPoolSettings(maxThreads, maxBufferedItems, threadNamePrefix));
}

// TODO: Move these setting somewhere else. They might be better as index live settings.
minParallelFetchNumFields =
configReader.getInteger(
"threadPoolConfiguration.minParallelFetchNumFields",
DEFAULT_MIN_PARALLEL_FETCH_NUM_FIELDS);
minParallelFetchNumHits =
configReader.getInteger(
"threadPoolConfiguration.minParallelFetchNumHits", DEFAULT_MIN_PARALLEL_FETCH_NUM_HITS);
parallelFetchByField =
configReader.getBoolean("threadPoolConfiguration.parallelFetchByField", true);
}

@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down Expand Up @@ -196,16 +179,4 @@ public ThreadPoolSettings getThreadPoolSettings(
ThreadPoolExecutorFactory.ExecutorType executorType) {
return threadPoolSettings.get(executorType);
}

public int getMinParallelFetchNumFields() {
return minParallelFetchNumFields;
}

public int getMinParallelFetchNumHits() {
return minParallelFetchNumHits;
}

public boolean getParallelFetchByField() {
return parallelFetchByField;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public abstract class IndexState implements Closeable {

private static final Pattern reSimpleName = Pattern.compile("^[a-zA-Z_][a-zA-Z_0-9]*$");
private final ThreadPoolExecutor searchThreadPoolExecutor;
private final ExecutorService fetchThreadPoolExecutor;
private Warmer warmer = null;

/** The meta field definitions */
Expand All @@ -96,6 +95,20 @@ public abstract class IndexState implements Closeable {
*/
public final DocLookup docLookup = new DocLookup(this::getField);

/**
* Holds the configuration for parallel fetch operations.
*
* @param maxParallelism maximum number of parallel fetch operations
* @param parallelFetchByField if true, fetches are parallelized by field instead of by document
* @param parallelFetchChunkSize number of documents/fields in each parallel fetch operation
* @param fetchExecutor executor service for parallel fetch operations
*/
public record ParallelFetchConfig(
int maxParallelism,
boolean parallelFetchByField,
int parallelFetchChunkSize,
ExecutorService fetchExecutor) {}

/** Search-time analyzer. */
public final Analyzer searchAnalyzer =
new AnalyzerWrapper(Analyzer.PER_FIELD_REUSE_STRATEGY) {
Expand Down Expand Up @@ -222,7 +235,6 @@ public IndexState(GlobalState globalState, String name, Path rootDir) throws IOE
}

searchThreadPoolExecutor = globalState.getSearchThreadPoolExecutor();
fetchThreadPoolExecutor = globalState.getFetchService();
}

/** Get index name. */
Expand Down Expand Up @@ -273,11 +285,6 @@ public ThreadPoolExecutor getSearchThreadPoolExecutor() {
return searchThreadPoolExecutor;
}

/** Get thread pool to use for parallel fetch operations. */
public ExecutorService getFetchThreadPoolExecutor() {
return fetchThreadPoolExecutor;
}

public ThreadPoolConfiguration getThreadPoolConfiguration() {
return globalState.getThreadPoolConfiguration();
}
Expand Down Expand Up @@ -370,6 +377,13 @@ public abstract void start(

public abstract FacetsConfig getFacetsConfig();

/**
* Get configuration for parallel fetch for this index.
*
* @return configuration for parallel fetch
*/
public abstract ParallelFetchConfig getParallelFetchConfig();

/**
* Get shard state.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import com.yelp.nrtsearch.server.luceneserver.search.SearcherResult;
import com.yelp.nrtsearch.server.monitoring.SearchResponseCollector;
import com.yelp.nrtsearch.server.utils.ObjectToCompositeFieldTransformer;
import com.yelp.nrtsearch.server.utils.ThreadPoolExecutorFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
Expand Down Expand Up @@ -323,23 +322,13 @@ private void fetchFields(SearchContext searchContext)
new ArrayList<>(searchContext.getResponseBuilder().getHitsBuilderList());
hitBuilders.sort(Comparator.comparing(Hit.Builder::getLuceneDocId));

IndexState indexState = searchContext.getIndexState();
int fetchThreadPoolSize =
indexState
.getThreadPoolConfiguration()
.getThreadPoolSettings(ThreadPoolExecutorFactory.ExecutorType.FETCH)
.maxThreads();
int minParallelFetchNumFields =
indexState.getThreadPoolConfiguration().getMinParallelFetchNumFields();
int minParallelFetchNumHits =
indexState.getThreadPoolConfiguration().getMinParallelFetchNumHits();
boolean parallelFetchByField =
indexState.getThreadPoolConfiguration().getParallelFetchByField();

if (parallelFetchByField
&& fetchThreadPoolSize > 1
&& searchContext.getRetrieveFields().keySet().size() > minParallelFetchNumFields
&& hitBuilders.size() > minParallelFetchNumHits) {
IndexState.ParallelFetchConfig parallelFetchConfig =
searchContext.getIndexState().getParallelFetchConfig();

if (parallelFetchConfig.parallelFetchByField()
&& parallelFetchConfig.maxParallelism() > 1
&& searchContext.getRetrieveFields().keySet().size()
> parallelFetchConfig.parallelFetchChunkSize()) {
// Fetch fields in parallel

List<LeafReaderContext> leaves =
Expand All @@ -353,12 +342,13 @@ private void fetchFields(SearchContext searchContext)
}
List<String> fields = new ArrayList<>(searchContext.getRetrieveFields().keySet());

// parallelism is min of fetchThreadPoolSize and fields.size() / MIN_PARALLEL_NUM_FIELDS
// parallelism is min of maxParallelism and fields.size() / parallelFetchChunkSize
// round up
int parallelism =
Math.min(
fetchThreadPoolSize,
(fields.size() + minParallelFetchNumFields - 1) / minParallelFetchNumFields);
parallelFetchConfig.maxParallelism(),
(fields.size() + parallelFetchConfig.parallelFetchChunkSize() - 1)
/ parallelFetchConfig.parallelFetchChunkSize());
List<List<String>> fieldsChunks =
Lists.partition(fields, (fields.size() + parallelism - 1) / parallelism);
List<Future<List<Map<String, CompositeFieldValue>>>> futures = new ArrayList<>();
Expand All @@ -368,11 +358,10 @@ private void fetchFields(SearchContext searchContext)
// Stored fields are not widely used for NRTSearch (not recommended for memory usage)
for (List<String> fieldsChunk : fieldsChunks) {
futures.add(
indexState
.getFetchThreadPoolExecutor()
parallelFetchConfig
.fetchExecutor()
.submit(
new FillFieldsTask(
indexState,
searchContext.getSearcherAndTaxonomy().searcher,
hitIdToLeaves,
hitBuilders,
Expand Down Expand Up @@ -401,26 +390,27 @@ private void fetchFields(SearchContext searchContext)
}
searchContext.getFetchTasks().processHit(searchContext, leaf, hitResponse);
}
} else if (!parallelFetchByField
&& fetchThreadPoolSize > 1
&& hitBuilders.size() > minParallelFetchNumHits) {
} else if (!parallelFetchConfig.parallelFetchByField()
&& parallelFetchConfig.maxParallelism() > 1
&& hitBuilders.size() > parallelFetchConfig.parallelFetchChunkSize()) {
// Fetch docs in parallel

// parallelism is min of fetchThreadPoolSize and hitsBuilder.size() / MIN_PARALLEL_NUM_HITS
// parallelism is min of maxParallelism and hitsBuilder.size() / parallelFetchChunkSize
// round up
int parallelism =
Math.min(
fetchThreadPoolSize,
(hitBuilders.size() + minParallelFetchNumHits - 1) / minParallelFetchNumHits);
parallelFetchConfig.maxParallelism(),
(hitBuilders.size() + parallelFetchConfig.parallelFetchChunkSize() - 1)
/ parallelFetchConfig.parallelFetchChunkSize());
List<List<Hit.Builder>> docChunks =
Lists.partition(hitBuilders, (hitBuilders.size() + parallelism - 1) / parallelism);

// process each document chunk in parallel
List<Future<?>> futures = new ArrayList<>();
for (List<Hit.Builder> docChunk : docChunks) {
futures.add(
indexState
.getFetchThreadPoolExecutor()
parallelFetchConfig
.fetchExecutor()
.submit(new FillDocsTask(searchContext, docChunk, searchContext.getQuery())));
}
for (Future<?> future : futures) {
Expand Down Expand Up @@ -709,21 +699,18 @@ private static SearcherTaxonomyManager.SearcherAndTaxonomy openSnapshotReader(

public static class FillFieldsTask implements Callable<List<Map<String, CompositeFieldValue>>> {

private IndexState state;
private IndexSearcher s;
private List<LeafReaderContext> hitIdToleaves;
private List<Hit.Builder> hitBuilders;
private List<String> fields;
private SearchContext searchContext;

public FillFieldsTask(
IndexState indexState,
IndexSearcher indexSearcher,
List<LeafReaderContext> hitIdToleaves,
List<Hit.Builder> hitBuilders,
List<String> fields,
SearchContext searchContext) {
this.state = indexState;
this.s = indexSearcher;
this.fields = fields;
this.searchContext = searchContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.yelp.nrtsearch.server.luceneserver.nrt.NrtDataManager;
import com.yelp.nrtsearch.server.luceneserver.search.sort.SortParser;
import com.yelp.nrtsearch.server.remote.RemoteBackend;
import com.yelp.nrtsearch.server.utils.ThreadPoolExecutorFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -126,6 +127,7 @@ public class ImmutableIndexState extends IndexState {
public static final int DEFAULT_VIRTUAL_SHARDS = 1;
public static final int DEFAULT_SEGMENTS_PER_TIER = 10;
public static final int DEFAULT_MAX_MERGED_SEGMENT_MB = 5 * 1024;
public static final int DEFAULT_PARALLEL_FETCH_CHUNK_SIZE = 50;

// default live settings as message, so they can be merged with saved settings
public static final IndexLiveSettings DEFAULT_INDEX_LIVE_SETTINGS =
Expand All @@ -149,6 +151,9 @@ public class ImmutableIndexState extends IndexState {
.setDefaultTerminateAfter(Int32Value.newBuilder().setValue(0).build())
.setMaxMergePreCopyDurationSec(UInt64Value.newBuilder().setValue(0))
.setVerboseMetrics(BoolValue.newBuilder().setValue(false).build())
.setParallelFetchByField(BoolValue.newBuilder().setValue(false).build())
.setParallelFetchChunkSize(
Int32Value.newBuilder().setValue(DEFAULT_PARALLEL_FETCH_CHUNK_SIZE).build())
.build();

// Live Settings
Expand All @@ -167,6 +172,7 @@ public class ImmutableIndexState extends IndexState {
private final int defaultTerminateAfter;
private final long maxMergePreCopyDurationSec;
private final boolean verboseMetrics;
private final ParallelFetchConfig parallelFetchConfig;

private final IndexStateManager indexStateManager;
private final String uniqueName;
Expand Down Expand Up @@ -260,6 +266,20 @@ public ImmutableIndexState(
maxMergePreCopyDurationSec =
mergedLiveSettingsWithLocal.getMaxMergePreCopyDurationSec().getValue();
verboseMetrics = mergedLiveSettingsWithLocal.getVerboseMetrics().getValue();
// Parallel fetch config
int maxParallelism =
globalState
.getThreadPoolConfiguration()
.getThreadPoolSettings(ThreadPoolExecutorFactory.ExecutorType.FETCH)
.maxThreads();
boolean parallelFetchByField = mergedLiveSettingsWithLocal.getParallelFetchByField().getValue();
int parallelFetchChunkSize = mergedLiveSettingsWithLocal.getParallelFetchChunkSize().getValue();
parallelFetchConfig =
new ParallelFetchConfig(
maxParallelism,
parallelFetchByField,
parallelFetchChunkSize,
globalState.getFetchService());

// If there is previous shard state, use it. Otherwise, initialize the shard.
if (previousShardState != null) {
Expand Down Expand Up @@ -488,6 +508,11 @@ public FacetsConfig getFacetsConfig() {
return fieldAndFacetState.getFacetsConfig();
}

@Override
public ParallelFetchConfig getParallelFetchConfig() {
return parallelFetchConfig;
}

@Override
public ShardState getShard(int shardOrd) {
ShardState shardState = shards.get(shardOrd);
Expand Down Expand Up @@ -793,6 +818,9 @@ static void validateLiveSettings(IndexLiveSettings liveSettings) {
if (liveSettings.getMaxMergePreCopyDurationSec().getValue() < 0) {
throw new IllegalArgumentException("maxMergePreCopyDurationSec must be >= 0");
}
if (liveSettings.getParallelFetchChunkSize().getValue() <= 0) {
throw new IllegalArgumentException("parallelFetchChunkSize must be > 0");
}
}

private static void validateIndexSort(Sort sort) {
Expand Down
Loading

0 comments on commit 14e4421

Please sign in to comment.