Skip to content

Commit

Permalink
Allow ConsulRawClient of cluster-mode-repository-consul to be con…
Browse files Browse the repository at this point in the history
…figured on ports other than `8500`
  • Loading branch information
linghengqian committed Jan 1, 2024
1 parent d152b65 commit 2310723
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ Apache ShardingSphere 为不同的运行模式提供了不同的元数据持久

### Consul 持久化

`com.ecwid.consul:consul-api:1.4.5` 的 Maven 模块的限制,使用者无法通过 gRPC 端口来连接到 Consul Agent。

`Consul` 实现的 `serverLists` 属性受设计使然,仅可通过 HTTP 端点连接到单个 Consul Agent。
`serverLists` 使用了宽松的 URL 匹配原则。
1.`serverLists` 为空时,将解析到 `http://localhost:8500` 的 Consul Agent 实例。
2.`serverLists``hostname` 时,将解析到 `http://hostname:8500` 的 Consul Agent 实例。
3.`serverLists``hostname:port` 时,将解析到 `http://hostname:port` 的 Consul Agent 实例。
4.`serverLists``http://hostName:port` 时,将解析到 `http://hostName:port` 的 Consul Agent 实例。
5.`serverLists``https://hostName:port` 时,将解析到 `https://hostName:port` 的 Consul Agent 实例。

类型:Consul

适用模式:Cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ Attributes:

### Consul Repository

Due to the limitation of the Maven module of `com.ecwid.consul:consul-api:1.4.5`, users cannot connect to the Consul Agent through the gRPC port.

The `serverLists` property of the `Consul` implementation is by design and can only be connected to a single Consul Agent via an HTTP endpoint.
`serverLists` uses relaxed URL matching principles.
1. When `serverLists` is empty, the Consul Agent instance at `http://localhost:8500` will be resolved.
2. When `serverLists` is `hostname`, it will be resolved to the Consul Agent instance of `http://hostname:8500`.
3. When `serverLists` is `hostname:port`, it will be resolved to the Consul Agent instance of `http://hostname:port`.
4. When `serverLists` is `http://hostName:port`, the Consul Agent instance of `http://hostName:port` will be resolved.
5. When `serverLists` is `https://hostName:port`, the Consul Agent instance of `https://hostName:port` will be resolved.

Type: Consul

Mode: Cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.mode.repository.cluster.consul;

import com.ecwid.consul.transport.HttpResponse;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.QueryParams;
Expand All @@ -27,14 +28,18 @@
import com.ecwid.consul.v1.session.model.Session;
import com.google.common.base.Strings;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.http.HttpStatus;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -63,23 +68,31 @@ public final class ConsulRepository implements ClusterPersistRepository {

@Override
public void init(final ClusterPersistRepositoryConfiguration config) {
ConsulRawClient rawClient = Strings.isNullOrEmpty(config.getServerLists()) ? new ConsulRawClient() : new ConsulRawClient(config.getServerLists());
consulClient = new ShardingSphereConsulClient(rawClient);
consulProps = new ConsulProperties(config.getProps());
ConsulRawClient rawClient = createConsulRawClient(config.getServerLists());
consulClient = new ShardingSphereConsulClient(rawClient);
distributedLockHolder = new DistributedLockHolder(getType(), consulClient, consulProps);
watchKeyMap = new HashMap<>(6, 1F);
}

@Override
public String getDirectly(final String key) {
Response<GetValue> response = consulClient.getKVValue(key);
return null == response ? null : response.getValue().getValue();
if (null == response) {
return null;
}
GetValue value = response.getValue();
return null == value ? null : value.getValue();
}

@Override
public List<String> getChildrenKeys(final String key) {
Response<List<String>> response = consulClient.getKVKeysOnly(key);
return null == response ? Collections.emptyList() : response.getValue();
if (null == response) {
return Collections.emptyList();
}
List<String> value = response.getValue();
return null == value ? Collections.emptyList() : value;
}

@Override
Expand All @@ -102,9 +115,15 @@ public void delete(final String key) {
consulClient.deleteKVValue(key);
}

/**
* {@link ConsulRawClient} is a wrapper of blocking HTTP client and does not have a close method.
* Using such a Client does not necessarily conform to the implementation of the relevant SPI. ShardingSphere needs to
* consider solutions similar to <a href="https://github.com/spring-cloud/spring-cloud-consul/issues/475">spring-cloud/spring-cloud-consul#475</a>.
*
* @see ConsulRawClient
*/
@Override
public void close() {
// TODO
}

@Override
Expand All @@ -115,6 +134,24 @@ public void persistEphemeral(final String key, final String value) {
putParams.setAcquireSession(sessionId);
consulClient.setKVValue(key, value, putParams);
generatorFlushSessionTtlTask(consulClient, sessionId);
verifyConsulAgentRunning();
}

@SuppressWarnings("HttpUrlsUsage")
private ConsulRawClient createConsulRawClient(final String serverLists) {
if (Strings.isNullOrEmpty(serverLists)) {
return new ConsulRawClient();
}
URL serverUrl;
try {
serverUrl = new URL(!serverLists.startsWith("https://") && !serverLists.startsWith("http://") ? "http://" + serverLists : serverLists);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
if (-1 == serverUrl.getPort()) {
return new ConsulRawClient(serverUrl.getHost());
}
return new ConsulRawClient(serverUrl.getHost(), serverUrl.getPort());
}

private NewSession createNewSession(final String key) {
Expand Down Expand Up @@ -142,23 +179,27 @@ private void watchChildKeyChangeEvent(final String key, final DataChangedEventLi
long currentIndex = 0;
while (running.get()) {
Response<List<GetValue>> response = consulClient.getKVValues(key, new QueryParams(consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), currentIndex));
List<GetValue> value = response.getValue();
if (null == value) {
continue;
}
Long index = response.getConsulIndex();
if (null != index && 0 == currentIndex) {
currentIndex = index;
if (!watchKeyMap.containsKey(key)) {
watchKeyMap.put(key, new HashSet<>());
}
Collection<String> watchKeys = watchKeyMap.get(key);
for (GetValue each : response.getValue()) {
for (GetValue each : value) {
watchKeys.add(each.getKey());
}
continue;
}
if (null != index && index > currentIndex) {
currentIndex = index;
Collection<String> newKeys = new HashSet<>(response.getValue().size(), 1F);
Collection<String> newKeys = new HashSet<>(value.size(), 1F);
Collection<String> watchKeys = watchKeyMap.get(key);
for (GetValue each : response.getValue()) {
for (GetValue each : value) {
newKeys.add(each.getKey());
if (!watchKeys.contains(each.getKey())) {
watchKeys.add(each.getKey());
Expand Down Expand Up @@ -189,12 +230,24 @@ private void fireDataChangeEvent(final GetValue getValue, final DataChangedEvent
* Flush session by update TTL.
*
* @param consulClient consul client
* @param sessionId session id
* @param sessionId session id
*/
public void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 1L, 10L, TimeUnit.SECONDS);
}

/**
* See <a href="https://developer.hashicorp.com/consul/api-docs/v1.17.x/status">Status HTTP API</a> .
*
* @throws RuntimeException Unable to connect to Consul Agent.
*/
private void verifyConsulAgentRunning() {
HttpResponse httpResponse = consulClient.getRawClient().makeGetRequest("/v1/status/leader");
if (HttpStatus.SC_OK != httpResponse.getStatusCode()) {
throw new RuntimeException("Unable to connect to Consul Agent and StatusCode is " + httpResponse.getStatusCode() + ".");
}
}

@Override
public String getType() {
return "Consul";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.shardingsphere.mode.repository.cluster.consul;

import com.ecwid.consul.transport.HttpResponse;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;
import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
import lombok.SneakyThrows;
import org.apache.http.HttpStatus;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.awaitility.Awaitility;
Expand All @@ -47,6 +50,7 @@

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -83,6 +87,12 @@ class ConsulRepositoryTest {
@Mock
private List<GetValue> getValueList;

@Mock
private ConsulRawClient consulRawClient;

@Mock
private HttpResponse httpResponse;

private long index = 123456L;

@BeforeEach
Expand Down Expand Up @@ -140,6 +150,9 @@ void assertGetChildrenKeys() {

@Test
void assertPersistEphemeral() {
when(client.getRawClient()).thenReturn(consulRawClient);
when(consulRawClient.makeGetRequest(any(String.class))).thenReturn(httpResponse);
when(httpResponse.getStatusCode()).thenReturn(HttpStatus.SC_OK);
repository.persistEphemeral("key1", "value1");
verify(client).sessionCreate(any(NewSession.class), any(QueryParams.class));
verify(client).setKVValue(any(String.class), any(String.class), any(PutParams.class));
Expand Down Expand Up @@ -205,4 +218,20 @@ void assertPersist() {
repository.persist("key1", "value1");
verify(client).setKVValue(any(String.class), any(String.class));
}

@Test
void assertNullResponse() {
when(response.getValue()).thenReturn(null);
final String key = "/key";
assertDoesNotThrow(() -> {
repository.getDirectly(key);
repository.getChildrenKeys(key);
});
when(responseGetValueList.getValue()).thenReturn(null);
assertDoesNotThrow(() -> {
repository.watch(key, event -> {
});
client.setKVValue(key, "value");
});
}
}

0 comments on commit 2310723

Please sign in to comment.