Skip to content

Commit

Permalink
Move StartIndexHandler to index package (#753)
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme authored Oct 4, 2024
1 parent 2e2b40d commit 021fbbb
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yelp.nrtsearch.server.luceneserver;
package com.yelp.nrtsearch.server.luceneserver.index;

import com.yelp.nrtsearch.server.grpc.Mode;
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient;
Expand All @@ -22,9 +22,6 @@
import com.yelp.nrtsearch.server.grpc.StartIndexRequest;
import com.yelp.nrtsearch.server.grpc.StartIndexResponse;
import com.yelp.nrtsearch.server.luceneserver.handler.Handler.HandlerException;
import com.yelp.nrtsearch.server.luceneserver.index.IndexState;
import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager;
import com.yelp.nrtsearch.server.luceneserver.index.ShardState;
import com.yelp.nrtsearch.server.luceneserver.nrt.NrtDataManager;
import com.yelp.nrtsearch.server.luceneserver.state.BackendGlobalState;
import com.yelp.nrtsearch.server.remote.RemoteBackend;
Expand All @@ -36,9 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO: this class should be called something else to differentiate from the handler for grpc
// request, or alternatively the call structure needs to be changed
public class StartIndexHandler {
public class StartIndexProcessor {
private static final Set<String> startingIndices = new HashSet<>();

private final String serviceName;
Expand All @@ -47,7 +42,7 @@ public class StartIndexHandler {
private final IndexStateManager indexStateManager;
private final boolean remoteCommit;
private final int discoveryFileUpdateIntervalMs;
private static final Logger logger = LoggerFactory.getLogger(StartIndexHandler.class);
private static final Logger logger = LoggerFactory.getLogger(StartIndexProcessor.class);

/**
* Constructor for StartIndexHandler.
Expand All @@ -59,7 +54,7 @@ public class StartIndexHandler {
* @param remoteCommit whether to commit to remote state
* @param discoveryFileUpdateIntervalMs interval to update backends from discovery file
*/
public StartIndexHandler(
public StartIndexProcessor(
String serviceName,
String ephemeralId,
RemoteBackend remoteBackend,
Expand All @@ -74,8 +69,8 @@ public StartIndexHandler(
this.discoveryFileUpdateIntervalMs = discoveryFileUpdateIntervalMs;
}

public StartIndexResponse handle(IndexState indexState, StartIndexRequest startIndexRequest)
throws StartIndexHandlerException {
public StartIndexResponse process(IndexState indexState, StartIndexRequest startIndexRequest)
throws StartIndexProcessorException {
String indexName = indexState.getName();
synchronized (startingIndices) {
if (indexState.isStarted()) {
Expand All @@ -90,17 +85,17 @@ public StartIndexResponse handle(IndexState indexState, StartIndexRequest startI
}

try {
return handleInternal(indexState, startIndexRequest);
return processInternal(indexState, startIndexRequest);
} finally {
synchronized (startingIndices) {
startingIndices.remove(indexName);
}
}
}

private StartIndexResponse handleInternal(
private StartIndexResponse processInternal(
IndexState indexState, StartIndexRequest startIndexRequest)
throws StartIndexHandlerException {
throws StartIndexProcessorException {
final ShardState shardState = indexState.getShard(0);
final Mode mode = startIndexRequest.getMode();
final long primaryGen;
Expand Down Expand Up @@ -137,7 +132,7 @@ private StartIndexResponse handleInternal(
indexStateManager.start(mode, nrtDataManager, primaryGen, primaryClient);
} catch (Exception e) {
logger.error("Cannot start IndexState/ShardState", e);
throw new StartIndexHandlerException(e);
throw new StartIndexProcessorException(e);
}

StartIndexResponse.Builder startIndexResponseBuilder = StartIndexResponse.newBuilder();
Expand All @@ -146,7 +141,7 @@ private StartIndexResponse handleInternal(
s = shardState.acquire();
} catch (IOException e) {
logger.error("Acquire shard state failed", e);
throw new StartIndexHandlerException(e);
throw new StartIndexProcessorException(e);
}
try {
IndexReader r = s.searcher.getIndexReader();
Expand All @@ -158,7 +153,7 @@ private StartIndexResponse handleInternal(
shardState.release(s);
} catch (IOException e) {
logger.error("Release shard state failed", e);
throw new StartIndexHandlerException(e);
throw new StartIndexProcessorException(e);
}
}
long t1 = System.nanoTime();
Expand All @@ -179,9 +174,9 @@ private ReplicationServerClient getPrimaryClientForRequest(StartIndexRequest req
}
}

public static class StartIndexHandlerException extends HandlerException {
public static class StartIndexProcessorException extends HandlerException {

public StartIndexHandlerException(Exception e) {
public StartIndexProcessorException(Exception e) {
super(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
import com.yelp.nrtsearch.server.grpc.StartIndexResponse;
import com.yelp.nrtsearch.server.grpc.StartIndexV2Request;
import com.yelp.nrtsearch.server.grpc.StopIndexRequest;
import com.yelp.nrtsearch.server.luceneserver.StartIndexHandler;
import com.yelp.nrtsearch.server.luceneserver.StartIndexHandler.StartIndexHandlerException;
import com.yelp.nrtsearch.server.luceneserver.index.BackendStateManager;
import com.yelp.nrtsearch.server.luceneserver.index.IndexState;
import com.yelp.nrtsearch.server.luceneserver.index.IndexStateManager;
import com.yelp.nrtsearch.server.luceneserver.index.StartIndexProcessor;
import com.yelp.nrtsearch.server.luceneserver.index.StartIndexProcessor.StartIndexProcessorException;
import com.yelp.nrtsearch.server.luceneserver.state.backend.LocalStateBackend;
import com.yelp.nrtsearch.server.luceneserver.state.backend.RemoteStateBackend;
import com.yelp.nrtsearch.server.luceneserver.state.backend.StateBackend;
Expand Down Expand Up @@ -435,8 +435,8 @@ public synchronized StartIndexResponse startIndexV2(StartIndexV2Request startInd

private StartIndexResponse startIndex(
IndexStateManager indexStateManager, StartIndexRequest startIndexRequest) throws IOException {
StartIndexHandler startIndexHandler =
new StartIndexHandler(
StartIndexProcessor startIndexHandler =
new StartIndexProcessor(
getConfiguration().getServiceName(),
getEphemeralId(),
getRemoteBackend(),
Expand All @@ -447,8 +447,8 @@ private StartIndexResponse startIndex(
.equals(IndexDataLocationType.REMOTE),
getConfiguration().getDiscoveryFileUpdateIntervalMs());
try {
return startIndexHandler.handle(indexStateManager.getCurrent(), startIndexRequest);
} catch (StartIndexHandlerException e) {
return startIndexHandler.process(indexStateManager.getCurrent(), startIndexRequest);
} catch (StartIndexProcessorException e) {
throw new RuntimeException(e);
}
}
Expand Down

0 comments on commit 021fbbb

Please sign in to comment.