Skip to content

Commit

Permalink
Use READONCE io context for nrt, add MMap grouping options (#774)
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme authored Oct 28, 2024
1 parent 015c1c5 commit 47f4f9b
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.protobuf.util.JsonFormat;
import com.yelp.nrtsearch.server.grpc.IndexLiveSettings;
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient;
import com.yelp.nrtsearch.server.index.DirectoryFactory;
import com.yelp.nrtsearch.server.utils.JsonUtils;
import com.yelp.nrtsearch.server.warming.WarmerConfig;
import java.io.IOException;
Expand Down Expand Up @@ -108,6 +109,7 @@ public class NrtsearchConfig {
private final int lowPriorityCopyPercentage;
private final boolean verifyReplicationIndexId;
private final boolean useKeepAliveForReplication;
private final DirectoryFactory.MMapGrouping mmapGrouping;

@Inject
public NrtsearchConfig(InputStream yamlStream) {
Expand Down Expand Up @@ -180,6 +182,11 @@ public NrtsearchConfig(InputStream yamlStream) {
lowPriorityCopyPercentage = configReader.getInteger("lowPriorityCopyPercentage", 0);
verifyReplicationIndexId = configReader.getBoolean("verifyReplicationIndexId", true);
useKeepAliveForReplication = configReader.getBoolean("useKeepAliveForReplication", false);
mmapGrouping =
configReader.get(
"mmapGrouping",
o -> DirectoryFactory.parseMMapGrouping(o.toString()),
DirectoryFactory.MMapGrouping.SEGMENT);

List<String> indicesWithOverrides = configReader.getKeysOrEmpty("indexLiveSettingsOverrides");
Map<String, IndexLiveSettings> liveSettingsMap = new HashMap<>();
Expand Down Expand Up @@ -360,6 +367,10 @@ public boolean getUseKeepAliveForReplication() {
return useKeepAliveForReplication;
}

public DirectoryFactory.MMapGrouping getMMapGrouping() {
return mmapGrouping;
}

public IndexLiveSettings getLiveSettingsOverride(String indexName) {
return indexLiveSettingsOverrides.getOrDefault(
indexName, IndexLiveSettings.newBuilder().build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void handle(FileInfo fileInfoRequest, StreamObserver<RawFileChunk> respon
IndexState indexState = indexStateManager.getCurrent();
ShardState shardState = indexState.getShard(0);
try (IndexInput luceneFile =
shardState.indexDir.openInput(fileInfoRequest.getFileName(), IOContext.DEFAULT)) {
shardState.indexDir.openInput(fileInfoRequest.getFileName(), IOContext.READONCE)) {
long len = luceneFile.length();
long pos = fileInfoRequest.getFpStart();
luceneFile.seek(pos);
Expand Down
73 changes: 55 additions & 18 deletions src/main/java/com/yelp/nrtsearch/server/index/DirectoryFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
*/
package com.yelp.nrtsearch.server.index;

import com.google.common.annotations.VisibleForTesting;
import com.yelp.nrtsearch.server.config.IndexPreloadConfig;
import com.yelp.nrtsearch.server.config.NrtsearchConfig;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.function.Function;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.MMapDirectory;
Expand All @@ -28,6 +32,13 @@
/** A factory to open a {@link Directory} from a provided filesystem path. */
public abstract class DirectoryFactory {

// How files should be grouped in memory Arenas when using MMapDirectory
public enum MMapGrouping {
SEGMENT,
SEGMENT_EXCEPT_SI,
NONE
}

/** Sole constructor. */
public DirectoryFactory() {}

Expand All @@ -43,14 +54,15 @@ public DirectoryFactory() {}
* Returns an instance, using the specified implementation {FSDirectory, MMapDirectory,
* NIOFSDirectory}.
*/
public static DirectoryFactory get(final String dirImpl) {
public static DirectoryFactory get(final String dirImpl, NrtsearchConfig config) {
if (dirImpl.equals("FSDirectory")) {
return new DirectoryFactory() {
@Override
public Directory open(Path path, IndexPreloadConfig preloadConfig) throws IOException {
Directory directory = FSDirectory.open(path);
if (directory instanceof MMapDirectory mMapDirectory) {
mMapDirectory.setPreload(preloadConfig.preloadPredicate());
setMMapGrouping(mMapDirectory, config.getMMapGrouping());
}
return directory;
}
Expand All @@ -61,6 +73,7 @@ public Directory open(Path path, IndexPreloadConfig preloadConfig) throws IOExce
public Directory open(Path path, IndexPreloadConfig preloadConfig) throws IOException {
MMapDirectory mMapDirectory = new MMapDirectory(path);
mMapDirectory.setPreload(preloadConfig.preloadPredicate());
setMMapGrouping(mMapDirectory, config.getMMapGrouping());
return mMapDirectory;
}
};
Expand Down Expand Up @@ -105,33 +118,57 @@ public Directory open(Path path, IndexPreloadConfig preloadConfig) throws IOExce
} else {
return finalCtor.newInstance(path, preloadConfig);
}
} catch (InstantiationException ie) {
} catch (InstantiationException | InvocationTargetException | IllegalAccessException ie) {
throw new RuntimeException(
"failed to instantiate directory class \""
+ dirImpl
+ "\" on path=\""
+ path
+ "\"",
ie);
} catch (InvocationTargetException ite) {
throw new RuntimeException(
"failed to instantiate directory class \""
+ dirImpl
+ "\" on path=\""
+ path
+ "\"",
ite);
} catch (IllegalAccessException iae) {
throw new RuntimeException(
"failed to instantiate directory class \""
+ dirImpl
+ "\" on path=\""
+ path
+ "\"",
iae);
}
}
};
}
}

// Function to group segments by their names, excluding ".si" files
public static Function<String, Optional<String>> SEGMENT_EXCEPT_SI_FUNCTION =
(filename) -> {
if (filename.endsWith(".si")) {
return Optional.empty();
}
return MMapDirectory.GROUP_BY_SEGMENT.apply(filename);
};

/**
* Set MMapGrouping for the directory.
*
* @param directory the MMapDirectory
* @param grouping the MMapGrouping
*/
@VisibleForTesting
static void setMMapGrouping(MMapDirectory directory, MMapGrouping grouping) {
switch (grouping) {
case SEGMENT -> directory.setGroupingFunction(MMapDirectory.GROUP_BY_SEGMENT);
case SEGMENT_EXCEPT_SI -> directory.setGroupingFunction(SEGMENT_EXCEPT_SI_FUNCTION);
case NONE -> directory.setGroupingFunction(MMapDirectory.NO_GROUPING);
}
}

/**
* Parse MMapGrouping from string.
*
* @param grouping the string representation of the grouping
* @return MMapGrouping
* @throws IllegalArgumentException if the grouping is invalid
*/
public static MMapGrouping parseMMapGrouping(String grouping) {
return switch (grouping) {
case "SEGMENT" -> MMapGrouping.SEGMENT;
case "SEGMENT_EXCEPT_SI" -> MMapGrouping.SEGMENT_EXCEPT_SI;
case "NONE" -> MMapGrouping.NONE;
default -> throw new IllegalArgumentException("Invalid MMapGrouping: " + grouping);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ public ImmutableIndexState(
}
indexMergeSchedulerAutoThrottle =
mergedSettings.getIndexMergeSchedulerAutoThrottle().getValue();
directoryFactory = DirectoryFactory.get(mergedSettings.getDirectory().getValue());
directoryFactory =
DirectoryFactory.get(
mergedSettings.getDirectory().getValue(), globalState.getConfiguration());

// live settings
mergedLiveSettings =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,4 +579,9 @@ public void close() throws IOException {
nrtDataManager.close();
super.close();
}

@Override
public FileMetaData readLocalFileMetaData(String fileName) throws IOException {
return NrtUtils.readOnceLocalFileMetaData(fileName, lastFileMetaData, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,9 @@ public void syncFromCurrentPrimary(long primaryWaitMs, long maxTimeMs) throws IO
}
logger.info("Finished syncing nrt point from current primary, current version: {}", curVersion);
}

@Override
public FileMetaData readLocalFileMetaData(String fileName) throws IOException {
return NrtUtils.readOnceLocalFileMetaData(fileName, lastFileMetaData, this);
}
}
95 changes: 95 additions & 0 deletions src/main/java/com/yelp/nrtsearch/server/nrt/NrtUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2024 Yelp Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yelp.nrtsearch.server.nrt;

import static org.apache.lucene.replicator.nrt.Node.VERBOSE_FILES;
import static org.apache.lucene.replicator.nrt.Node.bytesToString;

import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Map;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.replicator.nrt.FileMetaData;
import org.apache.lucene.replicator.nrt.Node;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;

public class NrtUtils {

/**
* Modified version of {@link Node} implementation that opens files with the READONCE io context.
*
* @param fileName the name of the file to read
* @param cache a map to cache file metadata
* @param node the nrt node reading the metadata
* @return the metadata of the file, or null if the file is corrupt or does not exist
* @throws IOException if an I/O error occurs
*/
public static FileMetaData readOnceLocalFileMetaData(
String fileName, Map<String, FileMetaData> cache, Node node) throws IOException {

FileMetaData result;
if (cache != null) {
// We may already have this file cached from the last NRT point:
result = cache.get(fileName);
} else {
result = null;
}

if (result == null) {
// Pull from the filesystem
long checksum;
long length;
byte[] header;
byte[] footer;
try (IndexInput in = node.getDirectory().openInput(fileName, IOContext.READONCE)) {
try {
length = in.length();
header = CodecUtil.readIndexHeader(in);
footer = CodecUtil.readFooter(in);
checksum = CodecUtil.retrieveChecksum(in);
} catch (@SuppressWarnings("unused") EOFException | CorruptIndexException cie) {
// File exists but is busted: we must copy it. This happens when node had crashed,
// corrupting an un-fsync'd file. On init we try
// to delete such unreferenced files, but virus checker can block that, leaving this bad
// file.
if (VERBOSE_FILES) {
node.message("file " + fileName + ": will copy [existing file is corrupt]");
}
return null;
}
if (VERBOSE_FILES) {
node.message("file " + fileName + " has length=" + bytesToString(length));
}
} catch (@SuppressWarnings("unused") FileNotFoundException | NoSuchFileException e) {
if (VERBOSE_FILES) {
node.message("file " + fileName + ": will copy [file does not exist]");
}
return null;
}

// NOTE: checksum is redundant w/ footer, but we break it out separately because when the bits
// cross the wire we need direct access to
// checksum when copying to catch bit flips:
result = new FileMetaData(header, footer, length, checksum);
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.protobuf.Int32Value;
import com.yelp.nrtsearch.server.grpc.IndexLiveSettings;
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient;
import com.yelp.nrtsearch.server.index.DirectoryFactory;
import java.io.ByteArrayInputStream;
import org.apache.lucene.search.suggest.document.CompletionPostingsFormat.FSTLoadMode;
import org.junit.Test;
Expand Down Expand Up @@ -206,4 +207,18 @@ public void testVerifyReplicationIndexId_set() {
NrtsearchConfig luceneConfig = getForConfig(config);
assertFalse(luceneConfig.getVerifyReplicationIndexId());
}

@Test
public void testMMapGrouping_default() {
String config = "nodeName: \"lucene_server_foo\"";
NrtsearchConfig luceneConfig = getForConfig(config);
assertEquals(DirectoryFactory.MMapGrouping.SEGMENT, luceneConfig.getMMapGrouping());
}

@Test
public void testMMapGrouping_set() {
String config = "mmapGrouping: NONE";
NrtsearchConfig luceneConfig = getForConfig(config);
assertEquals(DirectoryFactory.MMapGrouping.NONE, luceneConfig.getMMapGrouping());
}
}
Loading

0 comments on commit 47f4f9b

Please sign in to comment.