Skip to content

Commit

Permalink
[Update] Using http5 client instead of opensearch rest client
Browse files Browse the repository at this point in the history
  • Loading branch information
Arsnael committed Feb 5, 2024
1 parent a7a973e commit 39fa24e
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 54 deletions.
17 changes: 12 additions & 5 deletions backends-common/opensearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
</dependency>
<!-- Needed for opensearch-java dependency -->
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.2.3</version>
</dependency>
<!-- Needed for opensearch-java dependency -->
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
<version>5.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
Expand All @@ -85,11 +97,6 @@
<artifactId>opensearch-java</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-client</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.security.cert.CertificateException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

import javax.annotation.PreDestroy;
import javax.inject.Inject;
Expand All @@ -34,21 +35,24 @@

import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.CredentialsStore;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.client5.http.ssl.DefaultHostnameVerifier;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.apache.hc.core5.ssl.TrustStrategy;
import org.apache.james.util.concurrent.NamedThreadFactory;
import org.opensearch.client.RestClient;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -59,7 +63,7 @@
public class ClientProvider implements Provider<ReactorOpenSearchClient> {

private static class HttpAsyncClientConfigurer {

private static final AuthScope ANY = new AuthScope(null, null, -1, null, null);
private static final TrustStrategy TRUST_ALL = (x509Certificates, authType) -> true;
private static final HostnameVerifier ACCEPT_ANY_HOSTNAME = (hostname, sslSession) -> true;

Expand All @@ -74,9 +78,6 @@ private HttpAsyncClientBuilder configure(HttpAsyncClientBuilder builder) {
configureHostScheme(builder);
configureTimeout(builder);

configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal);
configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute);

builder.setThreadFactory(NamedThreadFactory.withName("OpenSearch-driver"));

return builder;
Expand All @@ -98,19 +99,35 @@ private void configureHostScheme(HttpAsyncClientBuilder builder) {
}

private void configureSSLOptions(HttpAsyncClientBuilder builder) {
try {
builder
.setSSLContext(sslContext())
.setSSLHostnameVerifier(hostnameVerifier());
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) {
throw new RuntimeException("Cannot set SSL options to the builder", e);
}
builder.setConnectionManager(connectionManager());
}

private void configureTimeout(HttpAsyncClientBuilder builder) {
builder.setDefaultRequestConfig(requestConfig());
}

private PoolingAsyncClientConnectionManager connectionManager() {
PoolingAsyncClientConnectionManagerBuilder builder = PoolingAsyncClientConnectionManagerBuilder
.create()
.setTlsStrategy(tlsStrategy());

configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal);
configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute);

return builder.build();
}

private TlsStrategy tlsStrategy() {
try {
return ClientTlsStrategyBuilder.create()
.setSslContext(sslContext())
.setHostnameVerifier(hostnameVerifier())
.build();
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) {
throw new RuntimeException("Cannot set SSL options to the builder", e);
}
}

private SSLContext sslContext() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException,
CertificateException, IOException {

Expand Down Expand Up @@ -151,9 +168,9 @@ private HostnameVerifier hostnameVerifier() {

private RequestConfig requestConfig() {
return RequestConfig.custom()
.setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
.setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
.setSocketTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
.setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS)
.setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS)
.setResponseTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS)
.build();
}

Expand All @@ -171,9 +188,9 @@ private SSLContextBuilder applyTrustStore(SSLContextBuilder sslContextBuilder) t
private void configureAuthentication(HttpAsyncClientBuilder builder) {
configuration.getCredential()
.ifPresent(credential -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(credential.getUsername(), String.valueOf(credential.getPassword())));
CredentialsStore credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(ANY,
new UsernamePasswordCredentials(credential.getUsername(), credential.getPassword()));
builder.setDefaultCredentialsProvider(credentialsProvider);
});
}
Expand All @@ -182,7 +199,7 @@ private void configureAuthentication(HttpAsyncClientBuilder builder) {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class);

private final OpenSearchConfiguration configuration;
private final RestClient lowLevelRestClient;
private final OpenSearchTransport openSearchTransport;
private final OpenSearchAsyncClient openSearchClient;
private final HttpAsyncClientConfigurer httpAsyncClientConfigurer;
private final ReactorOpenSearchClient client;
Expand All @@ -191,13 +208,13 @@ private void configureAuthentication(HttpAsyncClientBuilder builder) {
public ClientProvider(OpenSearchConfiguration configuration) {
this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration);
this.configuration = configuration;
this.lowLevelRestClient = buildRestClient();
this.openSearchTransport = buildTransport();
this.openSearchClient = connect();
this.client = new ReactorOpenSearchClient(this.openSearchClient, lowLevelRestClient);
this.client = new ReactorOpenSearchClient(this.openSearchClient);
}

private RestClient buildRestClient() {
return RestClient.builder(hostsToHttpHosts())
private OpenSearchTransport buildTransport() {
return ApacheHttpClient5TransportBuilder.builder(hostsToHttpHosts())
.setHttpClientConfigCallback(httpAsyncClientConfigurer::configure)
.build();
}
Expand All @@ -217,14 +234,12 @@ private OpenSearchAsyncClient connect() {
private OpenSearchAsyncClient connectToCluster() {
LOGGER.info("Trying to connect to OpenSearch service at {}", LocalDateTime.now());

RestClientTransport transport = new RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper());

return new OpenSearchAsyncClient(transport);
return new OpenSearchAsyncClient(openSearchTransport);
}

private HttpHost[] hostsToHttpHosts() {
return configuration.getHosts().stream()
.map(host -> new HttpHost(host.getHostName(), host.getPort(), configuration.getHostScheme().name()))
.map(host -> new HttpHost(configuration.getHostScheme().name(), host.getHostName(), host.getPort()))
.toArray(HttpHost[]::new);
}

Expand All @@ -235,6 +250,6 @@ public ReactorOpenSearchClient get() {

@PreDestroy
public void close() throws IOException {
lowLevelRestClient.close();
openSearchTransport.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import org.opensearch.client.RestClient;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.opensearch.cluster.HealthRequest;
import org.opensearch.client.opensearch.cluster.HealthResponse;
Expand Down Expand Up @@ -58,11 +57,9 @@

public class ReactorOpenSearchClient implements AutoCloseable {
private final OpenSearchAsyncClient client;
private final RestClient lowLevelRestClient;

public ReactorOpenSearchClient(OpenSearchAsyncClient client, RestClient lowLevelRestClient) {
public ReactorOpenSearchClient(OpenSearchAsyncClient client) {
this.client = client;
this.lowLevelRestClient = lowLevelRestClient;
}

public Mono<BulkResponse> bulk(BulkRequest bulkRequest) throws IOException {
Expand All @@ -81,10 +78,6 @@ public Mono<DeleteByQueryResponse> deleteByQuery(DeleteByQueryRequest deleteRequ
return toReactor(client.deleteByQuery(deleteRequest));
}

public RestClient getLowLevelClient() {
return lowLevelRestClient;
}

public <T> Mono<IndexResponse> index(IndexRequest<T> indexRequest) throws IOException {
return toReactor(client.index(indexRequest));
}
Expand Down Expand Up @@ -127,7 +120,7 @@ public Mono<GetResponse<ObjectNode>> get(GetRequest getRequest) throws IOExcepti

@Override
public void close() throws IOException {
lowLevelRestClient.close();
client._transport().close();
}

private static <T> Mono<T> toReactor(CompletableFuture<T> async) {
Expand Down

0 comments on commit 39fa24e

Please sign in to comment.