Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ConsulRawClient of cluster-mode-repository-consul to be configured on ports other than 8500 #29621

Merged
merged 1 commit into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ Apache ShardingSphere 为不同的运行模式提供了不同的元数据持久

### Consul 持久化

受 `com.ecwid.consul:consul-api:1.4.5` 的 Maven 模块的限制,使用者无法通过 gRPC 端口来连接到 Consul Agent。

`Consul` 实现的 `serverLists` 属性受设计使然,仅可通过 HTTP 端点连接到单个 Consul Agent。
`serverLists` 使用了宽松的 URL 匹配原则。
1. 当 `serverLists` 为空时,将解析到 `http://localhost:8500` 的 Consul Agent 实例。
2. 当 `serverLists` 为 `hostname` 时,将解析到 `http://hostname:8500` 的 Consul Agent 实例。
3. 当 `serverLists` 为 `hostname:port` 时,将解析到 `http://hostname:port` 的 Consul Agent 实例。
4. 当 `serverLists` 为 `http://hostName:port` 时,将解析到 `http://hostName:port` 的 Consul Agent 实例。
5. 当 `serverLists` 为 `https://hostName:port` 时,将解析到 `https://hostName:port` 的 Consul Agent 实例。

类型:Consul

适用模式:Cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ Attributes:

### Consul Repository

Due to the limitation of the Maven module of `com.ecwid.consul:consul-api:1.4.5`, users cannot connect to the Consul Agent through the gRPC port.

The `serverLists` property of the `Consul` implementation is by design and can only be connected to a single Consul Agent via an HTTP endpoint.
`serverLists` uses relaxed URL matching principles.
1. When `serverLists` is empty, the Consul Agent instance at `http://localhost:8500` will be resolved.
2. When `serverLists` is `hostname`, it will be resolved to the Consul Agent instance of `http://hostname:8500`.
3. When `serverLists` is `hostname:port`, it will be resolved to the Consul Agent instance of `http://hostname:port`.
4. When `serverLists` is `http://hostName:port`, the Consul Agent instance of `http://hostName:port` will be resolved.
5. When `serverLists` is `https://hostName:port`, the Consul Agent instance of `https://hostName:port` will be resolved.

Type: Consul

Mode: Cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.mode.repository.cluster.consul;

import com.ecwid.consul.transport.HttpResponse;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.QueryParams;
Expand All @@ -27,14 +28,17 @@
import com.ecwid.consul.v1.session.model.Session;
import com.google.common.base.Strings;
import lombok.Getter;
import org.apache.http.HttpStatus;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulPropertyKey;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -63,23 +67,31 @@ public final class ConsulRepository implements ClusterPersistRepository {

@Override
public void init(final ClusterPersistRepositoryConfiguration config) {
ConsulRawClient rawClient = Strings.isNullOrEmpty(config.getServerLists()) ? new ConsulRawClient() : new ConsulRawClient(config.getServerLists());
consulClient = new ShardingSphereConsulClient(rawClient);
consulProps = new ConsulProperties(config.getProps());
ConsulRawClient rawClient = createConsulRawClient(config.getServerLists());
consulClient = new ShardingSphereConsulClient(rawClient);
distributedLockHolder = new DistributedLockHolder(getType(), consulClient, consulProps);
watchKeyMap = new HashMap<>(6, 1F);
}

@Override
public String getDirectly(final String key) {
Response<GetValue> response = consulClient.getKVValue(key);
return null == response ? null : response.getValue().getValue();
if (null == response) {
return null;
}
GetValue value = response.getValue();
return null == value ? null : value.getValue();
}

@Override
public List<String> getChildrenKeys(final String key) {
Response<List<String>> response = consulClient.getKVKeysOnly(key);
return null == response ? Collections.emptyList() : response.getValue();
if (null == response) {
return Collections.emptyList();
}
List<String> value = response.getValue();
return null == value ? Collections.emptyList() : value;
}

@Override
Expand All @@ -102,9 +114,15 @@ public void delete(final String key) {
consulClient.deleteKVValue(key);
}

/**
* {@link ConsulRawClient} is a wrapper of blocking HTTP client and does not have a close method.
* Using such a Client does not necessarily conform to the implementation of the relevant SPI. ShardingSphere needs to
* consider solutions similar to <a href="https://github.com/spring-cloud/spring-cloud-consul/issues/475">spring-cloud/spring-cloud-consul#475</a>.
*
* @see ConsulRawClient
*/
@Override
public void close() {
// TODO
}

@Override
Expand All @@ -115,6 +133,24 @@ public void persistEphemeral(final String key, final String value) {
putParams.setAcquireSession(sessionId);
consulClient.setKVValue(key, value, putParams);
generatorFlushSessionTtlTask(consulClient, sessionId);
verifyConsulAgentRunning();
}

@SuppressWarnings("HttpUrlsUsage")
private ConsulRawClient createConsulRawClient(final String serverLists) {
if (Strings.isNullOrEmpty(serverLists)) {
return new ConsulRawClient();
}
URL serverUrl;
try {
serverUrl = new URL(!serverLists.startsWith("https://") && !serverLists.startsWith("http://") ? "http://" + serverLists : serverLists);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
if (-1 == serverUrl.getPort()) {
return new ConsulRawClient(serverUrl.getHost());
}
return new ConsulRawClient(serverUrl.getHost(), serverUrl.getPort());
}

private NewSession createNewSession(final String key) {
Expand Down Expand Up @@ -142,23 +178,27 @@ private void watchChildKeyChangeEvent(final String key, final DataChangedEventLi
long currentIndex = 0;
while (running.get()) {
Response<List<GetValue>> response = consulClient.getKVValues(key, new QueryParams(consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), currentIndex));
List<GetValue> value = response.getValue();
if (null == value) {
continue;
}
Long index = response.getConsulIndex();
if (null != index && 0 == currentIndex) {
currentIndex = index;
if (!watchKeyMap.containsKey(key)) {
watchKeyMap.put(key, new HashSet<>());
}
Collection<String> watchKeys = watchKeyMap.get(key);
for (GetValue each : response.getValue()) {
for (GetValue each : value) {
watchKeys.add(each.getKey());
}
continue;
}
if (null != index && index > currentIndex) {
currentIndex = index;
Collection<String> newKeys = new HashSet<>(response.getValue().size(), 1F);
Collection<String> newKeys = new HashSet<>(value.size(), 1F);
Collection<String> watchKeys = watchKeyMap.get(key);
for (GetValue each : response.getValue()) {
for (GetValue each : value) {
newKeys.add(each.getKey());
if (!watchKeys.contains(each.getKey())) {
watchKeys.add(each.getKey());
Expand Down Expand Up @@ -189,12 +229,24 @@ private void fireDataChangeEvent(final GetValue getValue, final DataChangedEvent
* Flush session by update TTL.
*
* @param consulClient consul client
* @param sessionId session id
* @param sessionId session id
*/
public void generatorFlushSessionTtlTask(final ConsulClient consulClient, final String sessionId) {
SESSION_FLUSH_EXECUTOR.scheduleAtFixedRate(() -> consulClient.renewSession(sessionId, QueryParams.DEFAULT), 1L, 10L, TimeUnit.SECONDS);
}

/**
* See <a href="https://developer.hashicorp.com/consul/api-docs/v1.17.x/status">Status HTTP API</a> .
*
* @throws RuntimeException Unable to connect to Consul Agent.
*/
private void verifyConsulAgentRunning() {
HttpResponse httpResponse = consulClient.getRawClient().makeGetRequest("/v1/status/leader");
if (HttpStatus.SC_OK != httpResponse.getStatusCode()) {
throw new RuntimeException("Unable to connect to Consul Agent and StatusCode is " + httpResponse.getStatusCode() + ".");
}
}

@Override
public String getType() {
return "Consul";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.shardingsphere.mode.repository.cluster.consul;

import com.ecwid.consul.transport.HttpResponse;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;
import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
import lombok.SneakyThrows;
import org.apache.http.HttpStatus;
import org.apache.shardingsphere.mode.repository.cluster.consul.props.ConsulProperties;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.awaitility.Awaitility;
Expand All @@ -47,6 +50,7 @@

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -83,6 +87,12 @@ class ConsulRepositoryTest {
@Mock
private List<GetValue> getValueList;

@Mock
private ConsulRawClient consulRawClient;

@Mock
private HttpResponse httpResponse;

private long index = 123456L;

@BeforeEach
Expand Down Expand Up @@ -140,6 +150,9 @@ void assertGetChildrenKeys() {

@Test
void assertPersistEphemeral() {
when(client.getRawClient()).thenReturn(consulRawClient);
when(consulRawClient.makeGetRequest(any(String.class))).thenReturn(httpResponse);
when(httpResponse.getStatusCode()).thenReturn(HttpStatus.SC_OK);
repository.persistEphemeral("key1", "value1");
verify(client).sessionCreate(any(NewSession.class), any(QueryParams.class));
verify(client).setKVValue(any(String.class), any(String.class), any(PutParams.class));
Expand Down Expand Up @@ -205,4 +218,20 @@ void assertPersist() {
repository.persist("key1", "value1");
verify(client).setKVValue(any(String.class), any(String.class));
}

@Test
void assertNullResponse() {
when(response.getValue()).thenReturn(null);
final String key = "/key";
assertDoesNotThrow(() -> {
repository.getDirectly(key);
repository.getChildrenKeys(key);
});
when(responseGetValueList.getValue()).thenReturn(null);
assertDoesNotThrow(() -> {
repository.watch(key, event -> {
});
client.setKVValue(key, "value");
});
}
}