Skip to content

Commit

Permalink
Refactor replication server handlers (#747)
Browse files Browse the repository at this point in the history
* Refactor ReplicationServer handlers

* Fix tests

* Remove checkIndexId from LuceneServer

* Correct imports after merge

* Correct test import
  • Loading branch information
sarthakn7 authored Oct 4, 2024
1 parent cdbdfb0 commit 64786de
Show file tree
Hide file tree
Showing 16 changed files with 710 additions and 500 deletions.
448 changes: 35 additions & 413 deletions src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,61 @@
import com.yelp.nrtsearch.server.grpc.AddReplicaRequest;
import com.yelp.nrtsearch.server.grpc.AddReplicaResponse;
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler;
import com.yelp.nrtsearch.server.luceneserver.index.IndexState;
import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager;
import com.yelp.nrtsearch.server.luceneserver.index.ShardState;
import com.yelp.nrtsearch.server.luceneserver.state.GlobalState;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AddReplicaHandler implements Handler<AddReplicaRequest, AddReplicaResponse> {
public class AddReplicaHandler extends Handler<AddReplicaRequest, AddReplicaResponse> {
private static final Logger logger = LoggerFactory.getLogger(AddReplicaHandler.class);

private final boolean useKeepAlive;
private final boolean verifyIndexId;

public AddReplicaHandler(boolean useKeepAlive) {
this.useKeepAlive = useKeepAlive;
public AddReplicaHandler(GlobalState globalState, boolean verifyIndexId) {
super(globalState);
this.useKeepAlive = globalState.getConfiguration().getUseKeepAliveForReplication();
this.verifyIndexId = verifyIndexId;
}

@Override
public AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addReplicaRequest) {
public void handle(
AddReplicaRequest addReplicaRequest, StreamObserver<AddReplicaResponse> responseObserver) {
try {
IndexStateManager indexStateManager =
getGlobalState().getIndexStateManager(addReplicaRequest.getIndexName());
checkIndexId(addReplicaRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
AddReplicaResponse reply = handle(indexState, addReplicaRequest);
logger.info("AddReplicaHandler returned " + reply);
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e);
responseObserver.onError(e);
} catch (Exception e) {
logger.warn("error while trying addReplicas " + addReplicaRequest.getIndexName(), e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
"error while trying to addReplicas for index: "
+ addReplicaRequest.getIndexName())
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}

private AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addReplicaRequest) {
ShardState shardState = indexState.getShard(0);
if (shardState.isPrimary() == false) {
if (!shardState.isPrimary()) {
throw new IllegalArgumentException(
"index \"" + indexState.getName() + "\" was not started or is not a primary");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,67 @@
import com.yelp.nrtsearch.server.grpc.CopyFiles;
import com.yelp.nrtsearch.server.grpc.TransferStatus;
import com.yelp.nrtsearch.server.grpc.TransferStatusCode;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler;
import com.yelp.nrtsearch.server.luceneserver.index.IndexState;
import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager;
import com.yelp.nrtsearch.server.luceneserver.index.ShardState;
import com.yelp.nrtsearch.server.luceneserver.nrt.NRTReplicaNode;
import com.yelp.nrtsearch.server.luceneserver.state.GlobalState;
import com.yelp.nrtsearch.server.monitoring.NrtMetrics;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.replicator.nrt.CopyJob;
import org.apache.lucene.replicator.nrt.FileMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopyFilesHandler implements Handler<CopyFiles, TransferStatus> {
public class CopyFilesHandler extends Handler<CopyFiles, TransferStatus> {
private static final Logger logger = LoggerFactory.getLogger(CopyFilesHandler.class);
private static final long CHECK_SLEEP_TIME_MS = 10;
private static final int CHECKS_PER_STATUS_MESSAGE = 3000; // at least 30s between status messages
private final boolean verifyIndexId;

public CopyFilesHandler(GlobalState globalState, boolean verifyIndexId) {
super(globalState);
this.verifyIndexId = verifyIndexId;
}

@Override
public void handle(
public void handle(CopyFiles request, StreamObserver<TransferStatus> responseObserver) {
try {
IndexStateManager indexStateManager =
getGlobalState().getIndexStateManager(request.getIndexName());
checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
// we need to send multiple responses to client from this method
handle(indexState, request, responseObserver);
logger.info("CopyFilesHandler returned successfully");
} catch (StatusRuntimeException e) {
logger.warn("error while trying copyFiles " + request.getIndexName(), e);
responseObserver.onError(e);
} catch (Exception e) {
logger.warn(
String.format(
"error on copyFiles for primaryGen: %s, for index: %s",
request.getPrimaryGen(), request.getIndexName()),
e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
String.format(
"error on copyFiles for primaryGen: %s, for index: %s",
request.getPrimaryGen(), request.getIndexName()))
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}

private void handle(
IndexState indexState,
CopyFiles copyFilesRequest,
StreamObserver<TransferStatus> responseObserver)
Expand Down Expand Up @@ -117,10 +161,4 @@ public void handle(
NrtMetrics.nrtMergeSize.labelValues(indexName).observe(job.getTotalBytesCopied());
}
}

@Override
public TransferStatus handle(IndexState indexState, CopyFiles protoRequest)
throws HandlerException {
throw new UnsupportedOperationException("This method is in not implemented for this class");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,45 @@
import com.yelp.nrtsearch.server.grpc.GetNodesRequest;
import com.yelp.nrtsearch.server.grpc.GetNodesResponse;
import com.yelp.nrtsearch.server.grpc.NodeInfo;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler;
import com.yelp.nrtsearch.server.luceneserver.index.IndexState;
import com.yelp.nrtsearch.server.luceneserver.index.ShardState;
import com.yelp.nrtsearch.server.luceneserver.nrt.NRTPrimaryNode;
import com.yelp.nrtsearch.server.luceneserver.state.GlobalState;
import com.yelp.nrtsearch.server.utils.HostPort;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GetNodesInfoHandler implements Handler<GetNodesRequest, GetNodesResponse> {
public class GetNodesInfoHandler extends Handler<GetNodesRequest, GetNodesResponse> {
private static final Logger logger = LoggerFactory.getLogger(GetNodesInfoHandler.class);

public GetNodesInfoHandler(GlobalState globalState) {
super(globalState);
}

@Override
public GetNodesResponse handle(IndexState indexState, GetNodesRequest getNodesRequest)
throws HandlerException {
public void handle(
GetNodesRequest getNodesRequest, StreamObserver<GetNodesResponse> responseObserver) {
try {
IndexState indexState = getGlobalState().getIndex(getNodesRequest.getIndexName());
GetNodesResponse reply = handle(indexState);
logger.debug("GetNodesInfoHandler returned GetNodeResponse of size " + reply.getNodesCount());
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Exception e) {
logger.warn("error on GetNodesInfoHandler", e);
responseObserver.onError(
Status.INTERNAL
.withDescription("error on GetNodesInfoHandler")
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}

private GetNodesResponse handle(IndexState indexState) {
GetNodesResponse.Builder builder = GetNodesResponse.newBuilder();
ShardState shardState = indexState.getShard(0);
if (!shardState.isPrimary() || !shardState.isStarted()) {
Expand Down
50 changes: 0 additions & 50 deletions src/main/java/com/yelp/nrtsearch/server/luceneserver/Handler.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,68 @@
import com.yelp.nrtsearch.server.grpc.NewNRTPoint;
import com.yelp.nrtsearch.server.grpc.TransferStatus;
import com.yelp.nrtsearch.server.grpc.TransferStatusCode;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler;
import com.yelp.nrtsearch.server.luceneserver.index.IndexState;
import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager;
import com.yelp.nrtsearch.server.luceneserver.index.ShardState;
import com.yelp.nrtsearch.server.luceneserver.state.GlobalState;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NewNRTPointHandler implements Handler<NewNRTPoint, TransferStatus> {
public class NewNRTPointHandler extends Handler<NewNRTPoint, TransferStatus> {
private static final Logger logger = LoggerFactory.getLogger(NewNRTPointHandler.class);
private final boolean verifyIndexId;

public NewNRTPointHandler(GlobalState globalState, boolean verifyIndexId) {
super(globalState);
this.verifyIndexId = verifyIndexId;
}

@Override
public TransferStatus handle(IndexState indexState, NewNRTPoint newNRTPointRequest)
throws HandlerException {
public void handle(NewNRTPoint request, StreamObserver<TransferStatus> responseObserver) {
try {
IndexStateManager indexStateManager =
getGlobalState().getIndexStateManager(request.getIndexName());
checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
TransferStatus reply = handle(indexState, request);
logger.debug(
"NewNRTPointHandler returned status "
+ reply.getCode()
+ " message: "
+ reply.getMessage());
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
logger.warn(
String.format(
"error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s",
request.getIndexName(), request.getVersion(), request.getPrimaryGen()),
e);
responseObserver.onError(e);
} catch (Exception e) {
logger.warn(
String.format(
"error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s",
request.getIndexName(), request.getVersion(), request.getPrimaryGen()),
e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
String.format(
"error on newNRTPoint for indexName: %s, for version: %s, primaryGen: %s",
request.getIndexName(), request.getVersion(), request.getPrimaryGen()))
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}

private TransferStatus handle(IndexState indexState, NewNRTPoint newNRTPointRequest) {
ShardState shardState = indexState.getShard(0);
if (shardState.isReplica() == false) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,66 @@
import com.yelp.nrtsearch.server.grpc.CopyStateRequest;
import com.yelp.nrtsearch.server.grpc.FileMetadata;
import com.yelp.nrtsearch.server.grpc.FilesMetadata;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler;
import com.yelp.nrtsearch.server.luceneserver.index.IndexState;
import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager;
import com.yelp.nrtsearch.server.luceneserver.index.ShardState;
import com.yelp.nrtsearch.server.luceneserver.nrt.NRTPrimaryNode;
import com.yelp.nrtsearch.server.luceneserver.state.GlobalState;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Map;
import org.apache.lucene.replicator.nrt.FileMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecvCopyStateHandler extends Handler<CopyStateRequest, CopyState> {
private static final Logger logger = LoggerFactory.getLogger(RecvCopyStateHandler.class);

private final boolean verifyIndexId;

public RecvCopyStateHandler(GlobalState globalState, boolean verifyIndexId) {
super(globalState);
this.verifyIndexId = verifyIndexId;
}

public class RecvCopyStateHandler implements Handler<CopyStateRequest, CopyState> {
@Override
public CopyState handle(IndexState indexState, CopyStateRequest copyStateRequest) {
public void handle(CopyStateRequest request, StreamObserver<CopyState> responseObserver) {
try {
IndexStateManager indexStateManager =
getGlobalState().getIndexStateManager(request.getIndexName());
checkIndexId(request.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
CopyState reply = handle(indexState, request);
logger.debug(
"RecvCopyStateHandler returned, completedMergeFiles count: "
+ reply.getCompletedMergeFilesCount());
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
logger.warn("error while trying recvCopyState " + request.getIndexName(), e);
responseObserver.onError(e);
} catch (Exception e) {
logger.warn(
String.format(
"error on recvCopyState for replicaId: %s, for index: %s",
request.getReplicaId(), request.getIndexName()),
e);
responseObserver.onError(
Status.INTERNAL
.withDescription(
String.format(
"error on recvCopyState for replicaId: %s, for index: %s",
request.getReplicaId(), request.getIndexName()))
.augmentDescription(e.getMessage())
.asRuntimeException());
}
}

private CopyState handle(IndexState indexState, CopyStateRequest copyStateRequest) {
ShardState shardState = indexState.getShard(0);
if (shardState.isPrimary() == false) {
throw new IllegalArgumentException(
Expand Down
Loading

0 comments on commit 64786de

Please sign in to comment.