Skip to content

Commit

Permalink
Refactor handler interface and LuceneServer class (#743)
Browse files Browse the repository at this point in the history
* Refactor handlers for LuceneServer

* Remove static singletons

* Spotless apply

* Added a javadoc, remove unused method

* Passing GlobalState to SearchHandler from Warmer
  • Loading branch information
sarthakn7 authored Oct 3, 2024
1 parent 14e4421 commit a243ea5
Show file tree
Hide file tree
Showing 39 changed files with 2,433 additions and 1,323 deletions.
1,338 changes: 163 additions & 1,175 deletions src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,236 @@
*/
package com.yelp.nrtsearch.server.luceneserver;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.protobuf.ProtocolStringList;
import com.yelp.nrtsearch.server.grpc.AddDocumentRequest;
import com.yelp.nrtsearch.server.grpc.AddDocumentResponse;
import com.yelp.nrtsearch.server.grpc.DeadlineUtils;
import com.yelp.nrtsearch.server.grpc.FacetHierarchyPath;
import com.yelp.nrtsearch.server.luceneserver.field.FieldDef;
import com.yelp.nrtsearch.server.luceneserver.field.IdFieldDef;
import com.yelp.nrtsearch.server.luceneserver.field.IndexableFieldDef;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AddDocumentHandler {
public class AddDocumentHandler extends Handler<AddDocumentRequest, AddDocumentResponse> {
private static final Logger logger = LoggerFactory.getLogger(AddDocumentHandler.class);

public AddDocumentHandler(GlobalState globalState) {
super(globalState);
}

@Override
public StreamObserver<AddDocumentRequest> handle(
StreamObserver<AddDocumentResponse> responseObserver) {
return new StreamObserver<>() {
Multimap<String, Future<Long>> futures = HashMultimap.create();
// Map of {indexName: addDocumentRequestQueue}
Map<String, ArrayBlockingQueue<AddDocumentRequest>> addDocumentRequestQueueMap =
new ConcurrentHashMap<>();
// Map of {indexName: count}
Map<String, Long> countMap = new ConcurrentHashMap<>();

private int getAddDocumentsMaxBufferLen(String indexName) {
try {
return getGlobalState().getIndex(indexName).getAddDocumentsMaxBufferLen();
} catch (Exception e) {
String error =
String.format("Index %s does not exist, unable to add documents", indexName);
logger.error(error, e);
throw Status.INVALID_ARGUMENT.withDescription(error).withCause(e).asRuntimeException();
}
}

private ArrayBlockingQueue<AddDocumentRequest> getAddDocumentRequestQueue(String indexName) {
if (addDocumentRequestQueueMap.containsKey(indexName)) {
return addDocumentRequestQueueMap.get(indexName);
} else {
int addDocumentsMaxBufferLen = getAddDocumentsMaxBufferLen(indexName);
ArrayBlockingQueue<AddDocumentRequest> addDocumentRequestQueue =
new ArrayBlockingQueue<>(addDocumentsMaxBufferLen);
addDocumentRequestQueueMap.put(indexName, addDocumentRequestQueue);
return addDocumentRequestQueue;
}
}

private long getCount(String indexName) {
return countMap.getOrDefault(indexName, 0L);
}

private void incrementCount(String indexName) {
if (countMap.containsKey(indexName)) {
countMap.put(indexName, countMap.get(indexName) + 1);
} else {
countMap.put(indexName, 1L);
}
}

@Override
public void onNext(AddDocumentRequest addDocumentRequest) {
String indexName = addDocumentRequest.getIndexName();
ArrayBlockingQueue<AddDocumentRequest> addDocumentRequestQueue;
try {
addDocumentRequestQueue = getAddDocumentRequestQueue(indexName);
} catch (Exception e) {
onError(e);
return;
}
logger.debug(
String.format(
"onNext, index: %s, addDocumentRequestQueue size: %s",
indexName, addDocumentRequestQueue.size()));
incrementCount(indexName);
addDocumentRequestQueue.add(addDocumentRequest);
if (addDocumentRequestQueue.remainingCapacity() == 0) {
logger.debug(
String.format(
"indexing addDocumentRequestQueue size: %s, total: %s",
addDocumentRequestQueue.size(), getCount(indexName)));
try {
DeadlineUtils.checkDeadline("addDocuments: onNext", "INDEXING");

List<AddDocumentRequest> addDocRequestList = new ArrayList<>(addDocumentRequestQueue);
Future<Long> future =
getGlobalState()
.submitIndexingTask(
Context.current()
.wrap(
new DocumentIndexer(
getGlobalState(), addDocRequestList, indexName)));
futures.put(indexName, future);
} catch (Exception e) {
responseObserver.onError(e);
} finally {
addDocumentRequestQueue.clear();
}
}
}

@Override
public void onError(Throwable t) {
logger.warn("addDocuments Cancelled", t);
responseObserver.onError(t);
}

private String onCompletedForIndex(String indexName) {
ArrayBlockingQueue<AddDocumentRequest> addDocumentRequestQueue =
getAddDocumentRequestQueue(indexName);
logger.debug(
String.format(
"onCompleted, addDocumentRequestQueue: %s", addDocumentRequestQueue.size()));
long highestGen = -1;
try {
DeadlineUtils.checkDeadline("addDocuments: onCompletedForIndex", "INDEXING");

// index the left over docs
if (!addDocumentRequestQueue.isEmpty()) {
logger.debug(
String.format(
"indexing left over addDocumentRequestQueue of size: %s",
addDocumentRequestQueue.size()));
List<AddDocumentRequest> addDocRequestList = new ArrayList<>(addDocumentRequestQueue);
// Since we are already running in the indexing threadpool run the indexing job
// for remaining documents directly. This serializes indexing remaining documents for
// multiple indices but avoids deadlocking if there aren't more threads than the
// maximum
// number of parallel addDocuments calls.
long gen =
new DocumentIndexer(getGlobalState(), addDocRequestList, indexName)
.runIndexingJob();
if (gen > highestGen) {
highestGen = gen;
}
}
// collect futures, block if needed
int numIndexingChunks = futures.size();
long t0 = System.nanoTime();
for (Future<Long> result : futures.get(indexName)) {
Long gen = result.get();
logger.debug("Indexing returned sequence-number {}", gen);
if (gen > highestGen) {
highestGen = gen;
}
}
long t1 = System.nanoTime();
logger.debug(
"Indexing job completed for {} docs, in {} chunks, with latest sequence number: {}, took: {} micro seconds",
getCount(indexName),
numIndexingChunks,
highestGen,
((t1 - t0) / 1000));
return String.valueOf(highestGen);
} catch (Exception e) {
logger.warn("error while trying to addDocuments", e);
throw Status.INTERNAL
.withDescription("error while trying to addDocuments ")
.augmentDescription(e.getMessage())
.withCause(e)
.asRuntimeException();
} finally {
addDocumentRequestQueue.clear();
countMap.put(indexName, 0L);
}
}

@Override
public void onCompleted() {
try {
getGlobalState()
.submitIndexingTask(
Context.current()
.wrap(
() -> {
try {
// TODO: this should return a map on index to genId in the response
String genId = "-1";
for (String indexName : addDocumentRequestQueueMap.keySet()) {
genId = onCompletedForIndex(indexName);
}
responseObserver.onNext(
AddDocumentResponse.newBuilder()
.setGenId(genId)
.setPrimaryId(getGlobalState().getEphemeralId())
.build());
responseObserver.onCompleted();
} catch (Throwable t) {
responseObserver.onError(t);
}
return null;
}));
} catch (RejectedExecutionException e) {
logger.error("Threadpool is full, unable to submit indexing completion job");
responseObserver.onError(
Status.RESOURCE_EXHAUSTED
.withDescription("Threadpool is full, unable to submit indexing completion job")
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}
};
}

/**
* DocumentsContext is created for each GRPC AddDocumentRequest It hold all lucene documents
* context for the AddDocumentRequest including root document and optional child documents if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,54 @@
import com.yelp.nrtsearch.server.grpc.CreateSnapshotRequest;
import com.yelp.nrtsearch.server.grpc.CreateSnapshotResponse;
import com.yelp.nrtsearch.server.grpc.SnapshotId;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Collection;
import org.apache.lucene.facet.taxonomy.SearcherTaxonomyManager;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.search.IndexSearcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CreateSnapshotHandler extends Handler<CreateSnapshotRequest, CreateSnapshotResponse> {
private static final Logger logger = LoggerFactory.getLogger(CreateSnapshotHandler.class);

public CreateSnapshotHandler(GlobalState globalState) {
super(globalState);
}

public class CreateSnapshotHandler
implements Handler<CreateSnapshotRequest, CreateSnapshotResponse> {
@Override
public CreateSnapshotResponse handle(
IndexState indexState, CreateSnapshotRequest createSnapshotRequest) throws HandlerException {
public void handle(
CreateSnapshotRequest createSnapshotRequest,
StreamObserver<CreateSnapshotResponse> responseObserver) {
try {
return createSnapshot(indexState, createSnapshotRequest);
} catch (IOException e) {
throw new RuntimeException(e);
IndexState indexState = getGlobalState().getIndex(createSnapshotRequest.getIndexName());
CreateSnapshotResponse reply = createSnapshot(indexState, createSnapshotRequest);
logger.info(String.format("CreateSnapshotHandler returned results %s", reply.toString()));
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Exception e) {
logger.warn(
String.format(
"error while trying to createSnapshot for index %s",
createSnapshotRequest.getIndexName()),
e);
responseObserver.onError(
Status.UNKNOWN
.withDescription(
String.format(
"error while trying to createSnapshot for index %s",
createSnapshotRequest.getIndexName()))
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}

public CreateSnapshotResponse createSnapshot(
private CreateSnapshotResponse createSnapshot(
IndexState indexState, CreateSnapshotRequest createSnapshotRequest) throws IOException {
indexState.verifyStarted();

Expand Down Expand Up @@ -116,46 +141,4 @@ public static String getSnapshotIdAsString(SnapshotId snapshotId) {
+ ":"
+ snapshotId.getStateGen();
}

/**
* Get names of all index files in a given snapshot.
*
* @param indexState index state
* @param snapshotId snapshot id
* @return collection of file names
* @throws IOException
*/
public static Collection<String> getSegmentFilesInSnapshot(
IndexState indexState, SnapshotId snapshotId) throws IOException {
String snapshotIdAsString = CreateSnapshotHandler.getSnapshotIdAsString(snapshotId);
IndexState.Gens snapshot = new IndexState.Gens(snapshotIdAsString);
if (indexState.getShards().size() != 1) {
throw new IllegalStateException(
String.format(
"%s shards found index %s instead of exactly 1",
indexState.getShards().size(), indexState.getName()));
}
ShardState state = indexState.getShards().entrySet().iterator().next().getValue();
SearcherTaxonomyManager.SearcherAndTaxonomy searcherAndTaxonomy = null;
IndexReader indexReader = null;
try {
searcherAndTaxonomy = state.acquire();
indexReader =
DirectoryReader.openIfChanged(
(DirectoryReader) searcherAndTaxonomy.searcher.getIndexReader(),
state.snapshots.getIndexCommit(snapshot.indexGen));
if (!(indexReader instanceof StandardDirectoryReader)) {
throw new IllegalStateException("Unable to find segments to backup");
}
StandardDirectoryReader standardDirectoryReader = (StandardDirectoryReader) indexReader;
return standardDirectoryReader.getSegmentInfos().files(true);
} finally {
if (searcherAndTaxonomy != null) {
state.release(searcherAndTaxonomy);
}
if (indexReader != null) {
indexReader.close();
}
}
}
}
Loading

0 comments on commit a243ea5

Please sign in to comment.