Skip to content

Commit

Permalink
Refactor metacat-connector-druid http client to support custom extens…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
jtuglu-netflix committed Dec 4, 2024
1 parent 52066ec commit 2b7f281
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
*/
public final class DruidConfigConstants {
/**
* DRUID_COORDINATOR_URI.
* DRUID_ROUTER_URI .
*/
public static final String DRUID_COORDINATOR_URI = "druid.uri";
public static final String DRUID_ROUTER_URI = "druid.uri";

//Http client
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ public class DruidConnectorFactory extends SpringConnectorFactory {
*/
DruidConnectorFactory(
final DruidConnectorInfoConverter druidConnectorInfoConverter,
final ConnectorContext connectorContext
final ConnectorContext connectorContext,
final Class<? extends DruidHttpClientConfig> clientConfigClass
) {
super(druidConnectorInfoConverter, connectorContext);
super.registerClazz(DruidConnectorConfig.class, DruidHttpClientConfig.class);
super.registerClazz(DruidConnectorConfig.class, clientConfigClass);
super.refresh();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.netflix.metacat.common.server.connectors.ConnectorFactory;
import com.netflix.metacat.common.server.connectors.ConnectorPlugin;
import com.netflix.metacat.common.server.connectors.ConnectorTypeConverter;
import com.netflix.metacat.connector.druid.configs.DruidHttpClientConfigImpl;
import com.netflix.metacat.connector.druid.converter.DruidConnectorInfoConverter;

/**
Expand All @@ -45,7 +46,10 @@ public String getType() {
@Override
public ConnectorFactory create(final ConnectorContext connectorContext) {
return new DruidConnectorFactory(
new DruidConnectorInfoConverter(connectorContext.getCatalogName()), connectorContext);
new DruidConnectorInfoConverter(connectorContext.getCatalogName()),
connectorContext,
DruidHttpClientConfigImpl.class
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,33 @@
* @since 1.2.0
*/
@Slf4j
public class DruidHttpClientImpl implements MetacatDruidClient {
private String druidURI;
private final RestTemplate restTemplate;
private final MetacatJsonLocator jsonLocator = new MetacatJsonLocator();
public class BaseDruidClientImpl implements MetacatDruidClient {
protected String druidURI;
protected final RestTemplate restTemplate;
protected final MetacatJsonLocator jsonLocator = new MetacatJsonLocator();

/**
* Constructor.
*
* @param connectorContext connector context
* @param restTemplate rest template
*/
public DruidHttpClientImpl(final ConnectorContext connectorContext,
final RestTemplate restTemplate) {
public BaseDruidClientImpl(
final ConnectorContext connectorContext,
final RestTemplate restTemplate
) {
this.restTemplate = restTemplate;
final Map<String, String> config = connectorContext.getConfiguration();
final String coordinatorUri = config.get(DruidConfigConstants.DRUID_COORDINATOR_URI);
if (coordinatorUri == null) {
final String routerUri = config.get(DruidConfigConstants.DRUID_ROUTER_URI);
if (routerUri == null) {
throw new MetacatException("Druid cluster ending point not provided.");
}
try {
new URI(coordinatorUri);
new URI(routerUri);
} catch (URISyntaxException exception) {
throw new MetacatException("Druid ending point invalid");
}
this.druidURI = coordinatorUri;
this.druidURI = routerUri;
log.info("druid server uri={}", this.druidURI);
}

Expand Down Expand Up @@ -100,7 +102,7 @@ public ObjectNode getLatestDataByName(final String dataSourceName) {
if (result == null) {
throw new MetacatException(String.format("Druid cluster: %s result not found.", dataSourceName));
}
final String latestSegment = DruidHttpClientUtil.getLatestSegment(result);
final String latestSegment = DruidClientUtil.getLatestSegment(result);
log.debug("Get the latest segment {}", latestSegment);
url = String.format(druidURI + "/%s/segments/%s", dataSourceName, latestSegment);
result = restTemplate.getForObject(url, String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
* @author zhenl
* @since 1.2.0
*/
public final class DruidHttpClientUtil {
private DruidHttpClientUtil() {
public final class DruidClientUtil {
private DruidClientUtil() {
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,14 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.metacat.connector.druid.configs;

import com.netflix.metacat.common.server.connectors.ConnectorContext;
import com.netflix.metacat.common.server.connectors.util.TimeUtil;
import com.netflix.metacat.connector.druid.DruidConfigConstants;
import com.netflix.metacat.connector.druid.MetacatDruidClient;
import com.netflix.metacat.connector.druid.client.DruidHttpClientImpl;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;

import java.net.UnknownHostException;
Expand All @@ -36,11 +17,10 @@
/**
* DruidHttpClientConfig.
*
* @author zhenl
* @since 1.2.0
* @author zhenl jtuglu
* @since 1.2.0 TODO: change
*/
@Configuration
public class DruidHttpClientConfig {
public interface DruidHttpClientConfig {
/**
* Druid client instance.
*
Expand All @@ -50,10 +30,10 @@ public class DruidHttpClientConfig {
* @throws UnknownHostException exception for unknownhost
*/
@Bean
public MetacatDruidClient createMetacatDruidClient(
default MetacatDruidClient createMetacatDruidClient(
final ConnectorContext connectorContext,
final RestTemplate restTemplate) throws UnknownHostException {
return new DruidHttpClientImpl(connectorContext, restTemplate);
throw new UnsupportedOperationException("Method 'createMetacatDruidClient' not implemented.");
}

/**
Expand All @@ -63,8 +43,8 @@ public MetacatDruidClient createMetacatDruidClient(
* @return RestTemplate
*/
@Bean
public RestTemplate restTemplate(final ConnectorContext connectorContext) {
return new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient(connectorContext)));
default RestTemplate restTemplate(final ConnectorContext connectorContext) {
throw new UnsupportedOperationException("Method 'restTemplate' not implemented.");
}

/**
Expand All @@ -74,7 +54,7 @@ public RestTemplate restTemplate(final ConnectorContext connectorContext) {
* @return HttpClient
*/
@Bean
public HttpClient httpClient(final ConnectorContext connectorContext) {
default HttpClient httpClient(final ConnectorContext connectorContext) {
final int timeout = (int) TimeUtil.toTime(
connectorContext.getConfiguration().getOrDefault(DruidConfigConstants.HTTP_TIMEOUT, "5s"),
TimeUnit.SECONDS,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.metacat.connector.druid.configs;

import com.netflix.metacat.common.server.connectors.ConnectorContext;
import com.netflix.metacat.connector.druid.MetacatDruidClient;
import com.netflix.metacat.connector.druid.client.BaseDruidClientImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;

import java.net.UnknownHostException;

/**
* DruidHttpClientConfig.
*
* @author zhenl jtuglu
* @since 1.2.0
*/
@Configuration
public class DruidHttpClientConfigImpl implements DruidHttpClientConfig {
/**
* Druid client instance.
*
* @param connectorContext connector context
* @param restTemplate rest template
* @return MetacatDruidClient
* @throws UnknownHostException exception for unknownhost
*/
@Bean
public MetacatDruidClient createMetacatDruidClient(
final ConnectorContext connectorContext,
final RestTemplate restTemplate) throws UnknownHostException {
return new BaseDruidClientImpl(connectorContext, restTemplate);
}

/**
* Rest template.
*
* @param connectorContext connector context
* @return RestTemplate
*/
@Bean
public RestTemplate restTemplate(final ConnectorContext connectorContext) {
return new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient(connectorContext)));
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package com.netflix.metacat.connector.druid

import com.netflix.metacat.connector.druid.client.DruidHttpClientUtil
import com.netflix.metacat.connector.druid.client.DruidClientUtil
import spock.lang.Specification

/**
* DruidHttpClientUtilSpec.
* @author zhenl
* @since 1.2.0
*/
class DruidHttpClientUtilSpec extends Specification{
class DruidClientUtilSpec extends Specification{

def "Test for getLatestDataByName"() {

when:
def ret = DruidHttpClientUtil.getLatestSegment(getInput())
def ret = DruidClientUtil.getLatestSegment(getInput())

then:
ret.equals("algodash_map_row_report_agg_2016-09-03T00:00:00.000Z_2016-09-04T00:00:00.000Z_2016-10-10T21:19:50.893Z");
Expand Down

0 comments on commit 2b7f281

Please sign in to comment.