diff --git a/docs/server_configuration.rst b/docs/server_configuration.rst index 2e766ba94..854ff0d83 100644 --- a/docs/server_configuration.rst +++ b/docs/server_configuration.rst @@ -136,6 +136,11 @@ Example server configuration - If enabled, registers JVM metrics with prometheus. - true + * - useKeepAliveForReplication + - bool + - If enabled, the primary will enable keepAlive on the replication channel with keepAliveTime 1 minute and keepAliveTimeout 10 seconds. Replicas ignore this option. + - true + .. list-table:: `Threadpool Configuration `_ (``threadPoolConfiguration.*``) :widths: 25 10 50 25 :header-rows: 1 diff --git a/src/main/java/com/yelp/nrtsearch/server/config/LuceneServerConfiguration.java b/src/main/java/com/yelp/nrtsearch/server/config/LuceneServerConfiguration.java index 9b97e317e..24615bf60 100644 --- a/src/main/java/com/yelp/nrtsearch/server/config/LuceneServerConfiguration.java +++ b/src/main/java/com/yelp/nrtsearch/server/config/LuceneServerConfiguration.java @@ -115,6 +115,7 @@ public class LuceneServerConfiguration { private final boolean enableGlobalBucketAccess; private final int lowPriorityCopyPercentage; private final boolean verifyReplicationIndexId; + private final boolean useKeepAliveForReplication; @Inject public LuceneServerConfiguration(InputStream yamlStream) { @@ -190,6 +191,7 @@ public LuceneServerConfiguration(InputStream yamlStream) { enableGlobalBucketAccess = configReader.getBoolean("enableGlobalBucketAccess", false); lowPriorityCopyPercentage = configReader.getInteger("lowPriorityCopyPercentage", 0); verifyReplicationIndexId = configReader.getBoolean("verifyReplicationIndexId", true); + useKeepAliveForReplication = configReader.getBoolean("useKeepAliveForReplication", false); List indicesWithOverrides = configReader.getKeysOrEmpty("indexLiveSettingsOverrides"); Map liveSettingsMap = new HashMap<>(); @@ -394,6 +396,10 @@ public boolean getVerifyReplicationIndexId() { return verifyReplicationIndexId; } + public boolean getUseKeepAliveForReplication() { + return useKeepAliveForReplication; + } + public IndexLiveSettings getLiveSettingsOverride(String indexName) { return indexLiveSettingsOverrides.getOrDefault( indexName, IndexLiveSettings.newBuilder().build()); diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java index 29abc8b81..edffad740 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java @@ -1875,7 +1875,10 @@ public void addReplicas( checkIndexId(addReplicaRequest.getIndexId(), indexStateManager.getIndexId(), verifyIndexId); IndexState indexState = indexStateManager.getCurrent(); - AddReplicaResponse reply = new AddReplicaHandler().handle(indexState, addReplicaRequest); + boolean useKeepAliveForReplication = + globalState.getConfiguration().getUseKeepAliveForReplication(); + AddReplicaResponse reply = + new AddReplicaHandler(useKeepAliveForReplication).handle(indexState, addReplicaRequest); logger.info("AddReplicaHandler returned " + reply.toString()); responseStreamObserver.onNext(reply); responseStreamObserver.onCompleted(); diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClient.java b/src/main/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClient.java index f9af4a2fd..8d95e5e76 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClient.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClient.java @@ -81,16 +81,30 @@ public DiscoveryFileAndPort(String discoveryFile, int port) { /** Construct client connecting to ReplicationServer server at {@code host:port}. */ public ReplicationServerClient(String host, int port) { - this( + this(host, port, false); + } + + /** Construct client connecting to ReplicationServer server at {@code host:port}. */ + public ReplicationServerClient(String host, int port, boolean useKeepAlive) { + this(createManagedChannel(host, port, useKeepAlive), host, port, ""); + } + + private static ManagedChannel createManagedChannel(String host, int port, boolean useKeepAlive) { + ManagedChannelBuilder managedChannelBuilder = ManagedChannelBuilder.forAddress(host, port) - // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid - // needing certificates. .usePlaintext() - .maxInboundMessageSize(MAX_MESSAGE_BYTES_SIZE) - .build(), - host, - port, - ""); + .maxInboundMessageSize(MAX_MESSAGE_BYTES_SIZE); + setKeepAlive(managedChannelBuilder, useKeepAlive); + return managedChannelBuilder.build(); + } + + static void setKeepAlive(ManagedChannelBuilder managedChannelBuilder, boolean useKeepAlive) { + if (useKeepAlive) { + managedChannelBuilder + .keepAliveTime(1, TimeUnit.MINUTES) + .keepAliveTimeout(10, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true); + } } /** diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddReplicaHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddReplicaHandler.java index 4d1c6b6f5..0b5cd7dbb 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddReplicaHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddReplicaHandler.java @@ -21,6 +21,13 @@ import java.io.IOException; public class AddReplicaHandler implements Handler { + + private final boolean useKeepAlive; + + public AddReplicaHandler(boolean useKeepAlive) { + this.useKeepAlive = useKeepAlive; + } + @Override public AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addReplicaRequest) { ShardState shardState = indexState.getShard(0); @@ -37,7 +44,7 @@ public AddReplicaResponse handle(IndexState indexState, AddReplicaRequest addRep addReplicaRequest.getReplicaId(), // channel for primary to talk to replica new ReplicationServerClient( - addReplicaRequest.getHostName(), addReplicaRequest.getPort())); + addReplicaRequest.getHostName(), addReplicaRequest.getPort(), useKeepAlive)); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClientTest.java b/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClientTest.java index 3299d6c10..240539696 100644 --- a/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClientTest.java +++ b/src/test/java/com/yelp/nrtsearch/server/grpc/ReplicationServerClientTest.java @@ -19,7 +19,11 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.ObjectMapper; @@ -27,6 +31,7 @@ import com.yelp.nrtsearch.server.grpc.LuceneServer.ReplicationServerImpl; import com.yelp.nrtsearch.server.grpc.ReplicationServerClient.DiscoveryFileAndPort; import com.yelp.nrtsearch.server.luceneserver.GlobalState; +import io.grpc.ManagedChannelBuilder; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.StatusRuntimeException; @@ -36,6 +41,7 @@ import java.nio.file.Paths; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -163,4 +169,29 @@ public void testDiscoveryFilePrimaryChange() throws IOException { } } } + + @Test + @SuppressWarnings("rawtypes") + public void testKeepAliveEnabled() { + ManagedChannelBuilder managedChannelBuilder = mock(ManagedChannelBuilder.class); + when(managedChannelBuilder.keepAliveTime(anyLong(), any(TimeUnit.class))) + .thenReturn(managedChannelBuilder); + when(managedChannelBuilder.keepAliveTimeout(anyLong(), any(TimeUnit.class))) + .thenReturn(managedChannelBuilder); + when(managedChannelBuilder.keepAliveWithoutCalls(anyBoolean())) + .thenReturn(managedChannelBuilder); + + ReplicationServerClient.setKeepAlive(managedChannelBuilder, true); + verify(managedChannelBuilder).keepAliveTime(1, TimeUnit.MINUTES); + verify(managedChannelBuilder).keepAliveTimeout(10, TimeUnit.SECONDS); + verify(managedChannelBuilder).keepAliveWithoutCalls(true); + } + + @Test + @SuppressWarnings("rawtypes") + public void testKeepAliveDisabled() { + ManagedChannelBuilder managedChannelBuilder = mock(ManagedChannelBuilder.class); + ReplicationServerClient.setKeepAlive(managedChannelBuilder, false); + verifyNoMoreInteractions(managedChannelBuilder); + } }