From 2a0870248bc274fbfd9d3ed96a4863b5b1ee5846 Mon Sep 17 00:00:00 2001 From: Rene Cordier Date: Tue, 18 Jul 2023 17:12:50 +0700 Subject: [PATCH] JAMES-3929 Upgrade to opensearch-java 2.6.0 -> Refactor code to use http5 and drop the rest client --- backends-common/opensearch/pom.xml | 17 +++- .../backends/opensearch/ClientProvider.java | 95 +++++++++++-------- .../opensearch/ReactorOpenSearchClient.java | 11 +-- 3 files changed, 69 insertions(+), 54 deletions(-) diff --git a/backends-common/opensearch/pom.xml b/backends-common/opensearch/pom.xml index c78ac542f59f..1fac4a89cc16 100644 --- a/backends-common/opensearch/pom.xml +++ b/backends-common/opensearch/pom.xml @@ -71,6 +71,18 @@ org.apache.commons commons-configuration2 + + + org.apache.httpcomponents.client5 + httpclient5 + 5.1.4 + + + + org.apache.httpcomponents.core5 + httpcore5 + 5.1.5 + org.apache.logging.log4j log4j-to-slf4j @@ -85,11 +97,6 @@ opensearch-java 2.6.0 - - org.opensearch.client - opensearch-rest-client - 2.8.0 - org.slf4j jcl-over-slf4j diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java index d253fc6d6b32..7a8520d8691c 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java @@ -25,6 +25,7 @@ import java.security.cert.CertificateException; import java.time.Duration; import java.time.LocalDateTime; +import java.util.concurrent.TimeUnit; import javax.annotation.PreDestroy; import javax.inject.Inject; @@ -34,21 +35,24 @@ import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.time.DurationFormatUtils; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.conn.ssl.DefaultHostnameVerifier; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; -import org.apache.http.ssl.SSLContextBuilder; -import org.apache.http.ssl.TrustStrategy; -import org.apache.james.backends.opensearch.json.jackson.JacksonJsonpMapper; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.CredentialsStore; +import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; +import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.client5.http.ssl.DefaultHostnameVerifier; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.ssl.SSLContextBuilder; +import org.apache.hc.core5.ssl.TrustStrategy; import org.apache.james.util.concurrent.NamedThreadFactory; -import org.opensearch.client.RestClient; import org.opensearch.client.opensearch.OpenSearchAsyncClient; -import org.opensearch.client.transport.rest_client.RestClientTransport; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +63,7 @@ public class ClientProvider implements Provider { private static class HttpAsyncClientConfigurer { - + private static final AuthScope ANY = new AuthScope(null, null, -1, null, null); private static final TrustStrategy TRUST_ALL = (x509Certificates, authType) -> true; private static final HostnameVerifier ACCEPT_ANY_HOSTNAME = (hostname, sslSession) -> true; @@ -74,9 +78,6 @@ private HttpAsyncClientBuilder configure(HttpAsyncClientBuilder builder) { configureHostScheme(builder); configureTimeout(builder); - configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal); - configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute); - builder.setThreadFactory(NamedThreadFactory.withName("OpenSearch-driver")); return builder; @@ -98,19 +99,35 @@ private void configureHostScheme(HttpAsyncClientBuilder builder) { } private void configureSSLOptions(HttpAsyncClientBuilder builder) { - try { - builder - .setSSLContext(sslContext()) - .setSSLHostnameVerifier(hostnameVerifier()); - } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) { - throw new RuntimeException("Cannot set SSL options to the builder", e); - } + builder.setConnectionManager(connectionManager()); } private void configureTimeout(HttpAsyncClientBuilder builder) { builder.setDefaultRequestConfig(requestConfig()); } + private PoolingAsyncClientConnectionManager connectionManager() { + PoolingAsyncClientConnectionManagerBuilder builder = PoolingAsyncClientConnectionManagerBuilder + .create() + .setTlsStrategy(tlsStrategy()); + + configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal); + configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute); + + return builder.build(); + } + + private TlsStrategy tlsStrategy() { + try { + return ClientTlsStrategyBuilder.create() + .setSslContext(sslContext()) + .setHostnameVerifier(hostnameVerifier()) + .build(); + } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) { + throw new RuntimeException("Cannot set SSL options to the builder", e); + } + } + private SSLContext sslContext() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException, CertificateException, IOException { @@ -151,9 +168,9 @@ private HostnameVerifier hostnameVerifier() { private RequestConfig requestConfig() { return RequestConfig.custom() - .setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis())) - .setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis())) - .setSocketTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis())) + .setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS) + .setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS) + .setResponseTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()), TimeUnit.MILLISECONDS) .build(); } @@ -171,9 +188,9 @@ private SSLContextBuilder applyTrustStore(SSLContextBuilder sslContextBuilder) t private void configureAuthentication(HttpAsyncClientBuilder builder) { configuration.getCredential() .ifPresent(credential -> { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(credential.getUsername(), String.valueOf(credential.getPassword()))); + CredentialsStore credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(ANY, + new UsernamePasswordCredentials(credential.getUsername(), credential.getPassword())); builder.setDefaultCredentialsProvider(credentialsProvider); }); } @@ -182,7 +199,7 @@ private void configureAuthentication(HttpAsyncClientBuilder builder) { private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class); private final OpenSearchConfiguration configuration; - private final RestClient lowLevelRestClient; + private final OpenSearchTransport openSearchTransport; private final OpenSearchAsyncClient openSearchClient; private final HttpAsyncClientConfigurer httpAsyncClientConfigurer; private final ReactorOpenSearchClient client; @@ -191,13 +208,13 @@ private void configureAuthentication(HttpAsyncClientBuilder builder) { public ClientProvider(OpenSearchConfiguration configuration) { this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration); this.configuration = configuration; - this.lowLevelRestClient = buildRestClient(); + this.openSearchTransport = buildTransport(); this.openSearchClient = connect(); - this.client = new ReactorOpenSearchClient(this.openSearchClient, lowLevelRestClient); + this.client = new ReactorOpenSearchClient(this.openSearchClient); } - private RestClient buildRestClient() { - return RestClient.builder(hostsToHttpHosts()) + private OpenSearchTransport buildTransport() { + return ApacheHttpClient5TransportBuilder.builder(hostsToHttpHosts()) .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure) .build(); } @@ -217,14 +234,12 @@ private OpenSearchAsyncClient connect() { private OpenSearchAsyncClient connectToCluster() { LOGGER.info("Trying to connect to OpenSearch service at {}", LocalDateTime.now()); - RestClientTransport transport = new RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper()); - - return new OpenSearchAsyncClient(transport); + return new OpenSearchAsyncClient(openSearchTransport); } private HttpHost[] hostsToHttpHosts() { return configuration.getHosts().stream() - .map(host -> new HttpHost(host.getHostName(), host.getPort(), configuration.getHostScheme().name())) + .map(host -> new HttpHost(configuration.getHostScheme().name(), host.getHostName(), host.getPort())) .toArray(HttpHost[]::new); } @@ -235,6 +250,6 @@ public ReactorOpenSearchClient get() { @PreDestroy public void close() throws IOException { - lowLevelRestClient.close(); + openSearchTransport.close(); } } diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java index 05e41ecebec8..4b722d3af2ae 100644 --- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java +++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; -import org.opensearch.client.RestClient; import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.opensearch.cluster.HealthRequest; import org.opensearch.client.opensearch.cluster.HealthResponse; @@ -58,11 +57,9 @@ public class ReactorOpenSearchClient implements AutoCloseable { private final OpenSearchAsyncClient client; - private final RestClient lowLevelRestClient; - public ReactorOpenSearchClient(OpenSearchAsyncClient client, RestClient lowLevelRestClient) { + public ReactorOpenSearchClient(OpenSearchAsyncClient client) { this.client = client; - this.lowLevelRestClient = lowLevelRestClient; } public Mono bulk(BulkRequest bulkRequest) throws IOException { @@ -81,10 +78,6 @@ public Mono deleteByQuery(DeleteByQueryRequest deleteRequ return toReactor(client.deleteByQuery(deleteRequest)); } - public RestClient getLowLevelClient() { - return lowLevelRestClient; - } - public Mono index(IndexRequest indexRequest) throws IOException { return toReactor(client.index(indexRequest)); } @@ -127,7 +120,7 @@ public Mono> get(GetRequest getRequest) throws IOExcepti @Override public void close() throws IOException { - lowLevelRestClient.close(); + client._transport().close(); } private static Mono toReactor(CompletableFuture async) {