Skip to content

Commit

Permalink
Fix query warming for backend global state (#460)
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme authored May 31, 2022
1 parent c7406db commit d813c8d
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ static class LuceneServerImpl extends LuceneServerGrpc.LuceneServerImplBase {
initQueryCache(configuration);
initExtendableComponents(configuration, plugins);

this.globalState = GlobalState.createState(configuration, incArchiver);
this.globalState = GlobalState.createState(configuration, incArchiver, archiver);
this.searchThreadPoolExecutor = globalState.getSearchThreadPoolExecutor();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,18 @@ public static GlobalState createState(LuceneServerConfiguration luceneServerConf
public static GlobalState createState(
LuceneServerConfiguration luceneServerConfiguration, Archiver incArchiver)
throws IOException {
return createState(luceneServerConfiguration, incArchiver, null);
}

public static GlobalState createState(
LuceneServerConfiguration luceneServerConfiguration,
Archiver incArchiver,
Archiver legacyArchiver)
throws IOException {
if (luceneServerConfiguration.getStateConfig().useLegacyStateManagement()) {
return new LegacyGlobalState(luceneServerConfiguration, incArchiver);
} else {
return new BackendGlobalState(luceneServerConfiguration, incArchiver);
return new BackendGlobalState(luceneServerConfiguration, incArchiver, legacyArchiver);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,19 @@ public Path getRootDir() {
}

public void initWarmer(Archiver archiver) {
initWarmer(archiver, name);
}

public void initWarmer(Archiver archiver, String indexName) {
LuceneServerConfiguration configuration = globalState.getConfiguration();
WarmerConfig warmerConfig = configuration.getWarmerConfig();
if (warmerConfig.isWarmOnStartup() || warmerConfig.getMaxWarmingQueries() > 0) {
this.warmer =
new Warmer(
archiver, configuration.getServiceName(), name, warmerConfig.getMaxWarmingQueries());
archiver,
configuration.getServiceName(),
indexName,
warmerConfig.getMaxWarmingQueries());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,11 @@ public Map<String, Lookup> getSuggesters() {
throw new UnsupportedOperationException("Suggesters only supported by LEGACY state backend");
}

@Override
public void initWarmer(Archiver archiver) {
initWarmer(archiver, uniqueName);
}

@Override
public void close() throws IOException {
for (Map.Entry<Integer, ShardState> entry : shards.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private static class ImmutableState {
// volatile for atomic replacement
private volatile ImmutableState immutableState;
private final StateBackend stateBackend;
private final Archiver legacyArchiver;

/**
* Build unique index name from index name and instance id (UUID).
Expand All @@ -100,7 +101,24 @@ public static String getUniqueIndexName(String indexName, String id) {
public BackendGlobalState(
LuceneServerConfiguration luceneServerConfiguration, Archiver incArchiver)
throws IOException {
this(luceneServerConfiguration, incArchiver, null);
}

/**
* Constructor.
*
* @param luceneServerConfiguration server config
* @param incArchiver archiver for remote backends
* @param legacyArchiver legacy archiver
* @throws IOException on filesystem error
*/
public BackendGlobalState(
LuceneServerConfiguration luceneServerConfiguration,
Archiver incArchiver,
Archiver legacyArchiver)
throws IOException {
super(luceneServerConfiguration, incArchiver);
this.legacyArchiver = legacyArchiver;
stateBackend = createStateBackend();
GlobalStateInfo globalStateInfo = stateBackend.loadOrCreateGlobalState();
// init index state managers
Expand Down Expand Up @@ -316,7 +334,7 @@ private StartIndexResponse startIndex(
IndexStateManager indexStateManager, StartIndexRequest startIndexRequest) throws IOException {
StartIndexHandler startIndexHandler =
new StartIndexHandler(
null,
legacyArchiver,
getIncArchiver().orElse(null),
getConfiguration().getArchiveDirectory(),
getConfiguration().getBackupWithInArchiver(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

public class Warmer {
private static final Logger logger = LoggerFactory.getLogger(Warmer.class);
private static final String WARMING_QUERIES_RESOURCE = "_warming_queries";
public static final String WARMING_QUERIES_RESOURCE = "_warming_queries";
public static final String WARMING_QUERIES_DIR = "warming_queries";
private static final String WARMING_QUERIES_FILE = "warming_queries.txt";

Expand Down
70 changes: 64 additions & 6 deletions src/test/java/com/yelp/nrtsearch/server/grpc/TestServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.yelp.nrtsearch.server.backup.Archiver;
import com.yelp.nrtsearch.server.backup.ArchiverImpl;
import com.yelp.nrtsearch.server.backup.BackupDiffManager;
import com.yelp.nrtsearch.server.backup.ContentDownloader;
import com.yelp.nrtsearch.server.backup.ContentDownloaderImpl;
import com.yelp.nrtsearch.server.backup.FileCompressAndUploader;
import com.yelp.nrtsearch.server.backup.IndexArchiver;
import com.yelp.nrtsearch.server.backup.Tar;
import com.yelp.nrtsearch.server.backup.TarImpl;
import com.yelp.nrtsearch.server.backup.VersionManager;
import com.yelp.nrtsearch.server.config.IndexStartConfig.IndexDataLocationType;
Expand Down Expand Up @@ -64,8 +67,9 @@

public class TestServer {
private static final List<TestServer> createdServers = new ArrayList<>();
private static final String SERVICE_NAME = "test_server";
private static final String TEST_BUCKET = "test-server-data-bucket";
public static final String SERVICE_NAME = "test_server";
public static final String TEST_BUCKET = "test-server-data-bucket";
public static final String S3_ENDPOINT = "http://127.0.0.1:8011";
private static final List<String> simpleFieldNames = List.of("id", "field1", "field2");
private static final List<Field> simpleFields =
List.of(
Expand Down Expand Up @@ -94,6 +98,8 @@ public class TestServer {
private Server replicationServer;
private LuceneServerClient client;
private LuceneServerImpl serverImpl;
private Archiver legacyArchiver;
private Archiver indexArchiver;

private static void initS3(TemporaryFolder folder) throws IOException {
if (api == null) {
Expand Down Expand Up @@ -122,7 +128,7 @@ private IndexArchiver createIndexArchiver(Path archiverDir) throws IOException {
Files.createDirectories(archiverDir);

AmazonS3 s3 = new AmazonS3Client(new AnonymousAWSCredentials());
s3.setEndpoint("http://127.0.0.1:8011");
s3.setEndpoint(S3_ENDPOINT);
s3.createBucket(TEST_BUCKET);
TransferManager transferManager =
TransferManagerBuilder.standard().withS3Client(s3).withShutDownThreadPools(false).build();
Expand All @@ -147,17 +153,31 @@ private IndexArchiver createIndexArchiver(Path archiverDir) throws IOException {
false);
}

private Archiver createLegacyArchiver(Path archiverDir) throws IOException {
Files.createDirectories(archiverDir);

AmazonS3 s3 = new AmazonS3Client(new AnonymousAWSCredentials());
s3.setEndpoint(S3_ENDPOINT);
s3.createBucket(TEST_BUCKET);
return new ArchiverImpl(
s3, TEST_BUCKET, archiverDir, new TarImpl(Tar.CompressionMode.LZ4), true);
}

public void restart() throws IOException {
restart(false);
}

public void restart(boolean clearData) throws IOException {
cleanup(clearData);
IndexArchiver indexArchiver =
createIndexArchiver(Paths.get(configuration.getArchiveDirectory()));
legacyArchiver = createLegacyArchiver(Paths.get(configuration.getArchiveDirectory()));
indexArchiver = createIndexArchiver(Paths.get(configuration.getArchiveDirectory()));
serverImpl =
new LuceneServerImpl(
configuration, null, indexArchiver, new CollectorRegistry(), Collections.emptyList());
configuration,
legacyArchiver,
indexArchiver,
new CollectorRegistry(),
Collections.emptyList());

replicationServer =
ServerBuilder.forPort(0)
Expand All @@ -180,6 +200,22 @@ public String getServiceName() {
return serverImpl.getGlobalState().getConfiguration().getServiceName();
}

public GlobalState getGlobalState() {
return serverImpl.getGlobalState();
}

public LuceneServerClient getClient() {
return client;
}

public Archiver getLegacyArchiver() {
return legacyArchiver;
}

public Archiver getIndexArchiver() {
return indexArchiver;
}

public void cleanup() {
cleanup(false);
}
Expand Down Expand Up @@ -446,6 +482,10 @@ public static class Builder {

private boolean syncInitialNrtPoint = true;

private int maxWarmingQueries = 0;
private int warmingParallelism = 1;
private boolean warmOnStartup = false;

private String additionalConfig = "";

Builder(TemporaryFolder folder) {
Expand Down Expand Up @@ -493,6 +533,14 @@ public Builder withSyncInitialNrtPoint(boolean enable) {
return this;
}

public Builder withWarming(
int maxWarmingQueries, int warmingParallelism, boolean warmOnStartup) {
this.maxWarmingQueries = maxWarmingQueries;
this.warmingParallelism = warmingParallelism;
this.warmOnStartup = warmOnStartup;
return this;
}

public TestServer build() throws IOException {
initS3(folder);
String configFile =
Expand All @@ -502,6 +550,7 @@ public TestServer build() throws IOException {
backendConfig(),
autoStartConfig(),
archiverConfig(),
warmingConfig(),
"syncInitialNrtPoint: " + syncInitialNrtPoint,
additionalConfig);
return new TestServer(
Expand Down Expand Up @@ -538,6 +587,15 @@ private String autoStartConfig() {
" port: " + port);
}

private String warmingConfig() {
return String.join(
"\n",
"warmer:",
" maxWarmingQueries: " + maxWarmingQueries,
" warmingParallelism: " + warmingParallelism,
" warmOnStartup: " + warmOnStartup);
}

private String baseConfig() {
return String.join(
"\n",
Expand Down
Loading

0 comments on commit d813c8d

Please sign in to comment.