Skip to content

Commit

Permalink
Add option to enable keepAlive for replication server (#686)
Browse files Browse the repository at this point in the history
* Add option to enable keepAlive for replication server

* Updated doc

* Add test for keepAlive

* Fixed test
  • Loading branch information
sarthakn7 authored Aug 14, 2024
1 parent 08246b7 commit 5a5a82f
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 10 deletions.
5 changes: 5 additions & 0 deletions docs/server_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ Example server configuration
- If enabled, registers JVM metrics with prometheus.
- true

* - useKeepAliveForReplication
- bool
- If enabled, the primary will enable keepAlive on the replication channel with keepAliveTime 1 minute and keepAliveTimeout 10 seconds. Replicas ignore this option.
- true

.. list-table:: `Threadpool Configuration <https://github.com/Yelp/nrtsearch/blob/master/src/main/java/com/yelp/nrtsearch/server/config/ThreadPoolConfiguration.java>`_ (``threadPoolConfiguration.*``)
:widths: 25 10 50 25
:header-rows: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class LuceneServerConfiguration {
private final boolean enableGlobalBucketAccess;
private final int lowPriorityCopyPercentage;
private final boolean verifyReplicationIndexId;
private final boolean useKeepAliveForReplication;

@Inject
public LuceneServerConfiguration(InputStream yamlStream) {
Expand Down Expand Up @@ -190,6 +191,7 @@ public LuceneServerConfiguration(InputStream yamlStream) {
enableGlobalBucketAccess = configReader.getBoolean("enableGlobalBucketAccess", false);
lowPriorityCopyPercentage = configReader.getInteger("lowPriorityCopyPercentage", 0);
verifyReplicationIndexId = configReader.getBoolean("verifyReplicationIndexId", true);
useKeepAliveForReplication = configReader.getBoolean("useKeepAliveForReplication", false);

List<String> indicesWithOverrides = configReader.getKeysOrEmpty("indexLiveSettingsOverrides");
Map<String, IndexLiveSettings> liveSettingsMap = new HashMap<>();
Expand Down Expand Up @@ -394,6 +396,10 @@ public boolean getVerifyReplicationIndexId() {
return verifyReplicationIndexId;
}

public boolean getUseKeepAliveForReplication() {
return useKeepAliveForReplication;
}

public IndexLiveSettings getLiveSettingsOverride(String indexName) {
return indexLiveSettingsOverrides.getOrDefault(
indexName, IndexLiveSettings.newBuilder().build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1875,7 +1875,10 @@ public void addReplicas(
checkIndexId(addReplicaRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId);

IndexState indexState = indexStateManager.getCurrent();
AddReplicaResponse reply = new AddReplicaHandler().handle(indexState, addReplicaRequest);
boolean useKeepAliveForReplication =
globalState.getConfiguration().getUseKeepAliveForReplication();
AddReplicaResponse reply =
new AddReplicaHandler(useKeepAliveForReplication).handle(indexState, addReplicaRequest);
logger.info("AddReplicaHandler returned " + reply.toString());
responseStreamObserver.onNext(reply);
responseStreamObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,30 @@ public DiscoveryFileAndPort(String discoveryFile, int port) {

/** Construct client connecting to ReplicationServer server at {@code host:port}. */
public ReplicationServerClient(String host, int port) {
this(
this(host, port, false);
}

/** Construct client connecting to ReplicationServer server at {@code host:port}. */
public ReplicationServerClient(String host, int port, boolean useKeepAlive) {
this(createManagedChannel(host, port, useKeepAlive), host, port, "");
}

private static ManagedChannel createManagedChannel(String host, int port, boolean useKeepAlive) {
ManagedChannelBuilder<?> managedChannelBuilder =
ManagedChannelBuilder.forAddress(host, port)
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
// needing certificates.
.usePlaintext()
.maxInboundMessageSize(MAX_MESSAGE_BYTES_SIZE)
.build(),
host,
port,
"");
.maxInboundMessageSize(MAX_MESSAGE_BYTES_SIZE);
setKeepAlive(managedChannelBuilder, useKeepAlive);
return managedChannelBuilder.build();
}

static void setKeepAlive(ManagedChannelBuilder<?> managedChannelBuilder, boolean useKeepAlive) {
if (useKeepAlive) {
managedChannelBuilder
.keepAliveTime(1, TimeUnit.MINUTES)
.keepAliveTimeout(10, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
import java.io.IOException;

public class AddReplicaHandler implements Handler<AddReplicaRequest, AddReplicaResponse> {

private final boolean useKeepAlive;

public AddReplicaHandler(boolean useKeepAlive) {
this.useKeepAlive = useKeepAlive;
}

@Override
public AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addReplicaRequest) {
ShardState shardState = indexState.getShard(0);
Expand All @@ -37,7 +44,7 @@ public AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addRep
addReplicaRequest.getReplicaId(),
// channel for primary to talk to replica
new ReplicationServerClient(
addReplicaRequest.getHostName(), addReplicaRequest.getPort()));
addReplicaRequest.getHostName(), addReplicaRequest.getPort(), useKeepAlive));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.yelp.nrtsearch.clientlib.Node;
import com.yelp.nrtsearch.server.grpc.LuceneServer.ReplicationServerImpl;
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient.DiscoveryFileAndPort;
import com.yelp.nrtsearch.server.luceneserver.GlobalState;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.StatusRuntimeException;
Expand All @@ -36,6 +41,7 @@
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -163,4 +169,29 @@ public void testDiscoveryFilePrimaryChange() throws IOException {
}
}
}

@Test
@SuppressWarnings("rawtypes")
public void testKeepAliveEnabled() {
ManagedChannelBuilder managedChannelBuilder = mock(ManagedChannelBuilder.class);
when(managedChannelBuilder.keepAliveTime(anyLong(), any(TimeUnit.class)))
.thenReturn(managedChannelBuilder);
when(managedChannelBuilder.keepAliveTimeout(anyLong(), any(TimeUnit.class)))
.thenReturn(managedChannelBuilder);
when(managedChannelBuilder.keepAliveWithoutCalls(anyBoolean()))
.thenReturn(managedChannelBuilder);

ReplicationServerClient.setKeepAlive(managedChannelBuilder, true);
verify(managedChannelBuilder).keepAliveTime(1, TimeUnit.MINUTES);
verify(managedChannelBuilder).keepAliveTimeout(10, TimeUnit.SECONDS);
verify(managedChannelBuilder).keepAliveWithoutCalls(true);
}

@Test
@SuppressWarnings("rawtypes")
public void testKeepAliveDisabled() {
ManagedChannelBuilder managedChannelBuilder = mock(ManagedChannelBuilder.class);
ReplicationServerClient.setKeepAlive(managedChannelBuilder, false);
verifyNoMoreInteractions(managedChannelBuilder);
}
}

0 comments on commit 5a5a82f

Please sign in to comment.