Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor
Browse files Browse the repository at this point in the history
MartinMedek committed Nov 16, 2023
1 parent 583426f commit 1eee7a1
Showing 5 changed files with 43 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
import static io.debezium.server.redis.wip.TestConstants.MYSQL_PORT;
import static io.debezium.server.redis.wip.TestConstants.MYSQL_USER;
import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT;
import static io.debezium.server.redis.wip.TestConstants.REDIS_SSL_PORT;

import java.util.HashMap;
import java.util.List;
@@ -32,12 +33,21 @@ public List<String> build() {
.collect(Collectors.toList());
}

public Map<String, String> baseRedisConfig(DebeziumTestContainerWrapper redis) {
public static Map<String, String> baseRedisConfig(DebeziumTestContainerWrapper redis) {
return Map.of(
"debezium.sink.type", "redis",
"debezium.sink.redis.address", redis.getContainerAddress() + ":" + REDIS_PORT);
}

public static Map<String, String> baseSslRedisConfig(DebeziumTestContainerWrapper redis) {
return Map.of(
"debezium.sink.type", "redis",
"debezium.sink.redis.address", redis.getContainerAddress() + ":" + REDIS_SSL_PORT,
"debezium.sink.redis.ssl.enabled", "true",
"debezium.source.database.ssl.mode", "disabled",
"debezium.source.offset.storage", "io.debezium.server.redis.RedisOffsetBackingStore");
}

public Map<String, String> baseMySqlConfig(DebeziumTestContainerWrapper mysql) {
Map<String, String> result = new HashMap<>(Map.of("debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector",
"debezium.source.offset.flush.interval.ms", "0",
@@ -53,9 +63,15 @@ public Map<String, String> baseMySqlConfig(DebeziumTestContainerWrapper mysql) {
return result;
}

public DebeziumServerConfigBuilder withBaseMySqlConfig(DebeziumTestContainerWrapper redis, DebeziumTestContainerWrapper mysql) {
public DebeziumServerConfigBuilder withBaseMySqlRedisConfig(DebeziumTestContainerWrapper redis, DebeziumTestContainerWrapper mysql) {
config.putAll(baseRedisConfig(redis));
config.putAll(baseMySqlConfig(mysql));
return this;
}

public DebeziumServerConfigBuilder withBaseMysqlSslRedisConfig(DebeziumTestContainerWrapper redis, DebeziumTestContainerWrapper mysql) {
config.putAll(baseSslRedisConfig(redis));
config.putAll(baseMySqlConfig(mysql));
return this;
}
}
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ private TestConstants() {
// REDIS
public static final String REDIS_IMAGE = "redis";
public static final int REDIS_PORT = 6379;
public static final int REDIS_SSL_PORT = 6378;

// MYSQL
public static final String MYSQL_USER = "debezium";
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
import static io.debezium.server.redis.wip.TestProperties.DEBEZIUM_SERVER_IMAGE;

import java.time.Duration;
import java.util.List;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -26,7 +27,6 @@
import redis.clients.jedis.Jedis;

public class TestContainersRedisTestBase {
protected DebeziumTestContainerWrapper postgres;
protected DebeziumTestContainerWrapper redis;
protected DebeziumTestContainerWrapper server;
protected DebeziumTestContainerWrapper mysql;
@@ -69,4 +69,9 @@ public void tearDown() {
redis.stop();
}

protected void startServerWithEnv(List<String> env) {
server.setEnv(env);
server.start();
}

}
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
import static io.debezium.server.redis.wip.TestConstants.INITIAL_SCHEMA_HISTORY_SIZE;
import static io.debezium.server.redis.wip.TestConstants.LOCALHOST;
import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT;
import static io.debezium.server.redis.wip.TestConstants.REDIS_SSL_PORT;
import static io.debezium.server.redis.wip.TestUtils.insertCustomerToMySql;
import static io.debezium.server.redis.wip.TestUtils.waitForStreamLength;

@@ -27,18 +28,17 @@
public class TestContainersSslStreamIT extends TestContainersRedisTestBase {

private static final Logger LOGGER = LoggerFactory.getLogger(TestContainersSslStreamIT.class);
private static final int REDIS_NON_SSL_PORT = 6378;

public TestContainersSslStreamIT() {
super();
redis
.withCommand(
"redis-server --tls-port " + REDIS_PORT + " " +
"--port " + REDIS_NON_SSL_PORT + " " +
"redis-server --tls-port " + REDIS_SSL_PORT + " " +
"--port " + REDIS_PORT + " " +
"--tls-cert-file /etc/certificates/redis.crt " +
"--tls-key-file /etc/certificates/redis.key " +
"--tls-ca-cert-file /etc/certificates/ca.crt")
.withExposedPorts(REDIS_NON_SSL_PORT, REDIS_PORT)
.withExposedPorts(REDIS_SSL_PORT, REDIS_PORT)
.withClasspathResourceMapping("ssl", "/etc/certificates", BindMode.READ_ONLY);
server
.withCommand("-Djavax.net.ssl.keyStore=/ssl/client-keystore.p12",
@@ -51,14 +51,11 @@ public TestContainersSslStreamIT() {

@Test
public void shouldStreamWithSslEnabled() throws IOException, InterruptedException {
server.setEnv(new DebeziumServerConfigBuilder().withBaseMySqlConfig(redis, mysql)
.withValue("debezium.sink.redis.ssl.enabled", "true")
.withValue("debezium.source.database.ssl.mode", "disabled")
.withValue("debezium.source.offset.storage", "io.debezium.server.redis.RedisOffsetBackingStore")
startServerWithEnv(new DebeziumServerConfigBuilder()
.withBaseMysqlSslRedisConfig(redis, mysql)
.build());
server.start();

jedis = new Jedis(new HostAndPort(LOCALHOST, redis.getMappedPort(REDIS_NON_SSL_PORT)));
jedis = new Jedis(new HostAndPort(LOCALHOST, redis.getMappedPort(REDIS_PORT)));
final String STREAM_NAME = "testc.inventory.customers";

waitForStreamLength(jedis, STREAM_NAME, INITIAL_CUSTOMER_COUNT);
Original file line number Diff line number Diff line change
@@ -33,8 +33,7 @@ public class TestContainersStreamIT extends TestContainersRedisTestBase {

@Test
public void shouldStreamChanges() throws InterruptedException, IOException {
server.setEnv(new DebeziumServerConfigBuilder().withBaseMySqlConfig(redis, mysql).build());
server.start();
startServerWithEnv(new DebeziumServerConfigBuilder().withBaseMySqlRedisConfig(redis, mysql).build());

final String STREAM_NAME = "testc.inventory.customers";

@@ -48,11 +47,10 @@ public void shouldStreamChanges() throws InterruptedException, IOException {

@Test
public void shouldFailWithIncorrectRedisAddress() {
server.setEnv(new DebeziumServerConfigBuilder()
.withBaseMySqlConfig(redis, mysql)
startServerWithEnv(new DebeziumServerConfigBuilder()
.withBaseMySqlRedisConfig(redis, mysql)
.withValue("debezium.sink.redis.address", redis.getContainerAddress() + ":" + 1000)
.build());
server.start();

server.waitForContainerLog("Failed to connect to any host resolved for DNS name");
server.waitForStop();
@@ -62,10 +60,9 @@ public void shouldFailWithIncorrectRedisAddress() {
@FixFor("DBZ-4510")
public void shouldRetryAfterRedisCrash() throws Exception {
final int SOCKET_TIMEOUT = 4000;
server.setEnv(new DebeziumServerConfigBuilder().withBaseMySqlConfig(redis, mysql)
startServerWithEnv(new DebeziumServerConfigBuilder().withBaseMySqlRedisConfig(redis, mysql)
.withValue("debezium.sink.redis.socket.timeout.ms", String.valueOf(SOCKET_TIMEOUT))
.build());
server.start();

final String STREAM_NAME = "testc.inventory.customers";
waitForStreamLength(jedis, STREAM_NAME, INITIAL_CUSTOMER_COUNT);
@@ -82,10 +79,9 @@ public void shouldRetryAfterRedisCrash() throws Exception {
@Test
public void shouldTimeoutAfterRedisCrash() throws Exception {
final int SOCKET_TIMEOUT = 2000;
server.setEnv(new DebeziumServerConfigBuilder().withBaseMySqlConfig(redis, mysql)
startServerWithEnv(new DebeziumServerConfigBuilder().withBaseMySqlRedisConfig(redis, mysql)
.withValue("debezium.sink.redis.socket.timeout.ms", String.valueOf(SOCKET_TIMEOUT))
.build());
server.start();

final String STREAM_NAME = "testc.inventory.customers";
waitForStreamLength(jedis, STREAM_NAME, INITIAL_CUSTOMER_COUNT);
@@ -104,8 +100,7 @@ public void shouldTimeoutAfterRedisCrash() throws Exception {
@Test
@FixFor("DBZ-4510")
public void shouldRetryAfterRedisOOM() throws Exception {
server.setEnv(new DebeziumServerConfigBuilder().withBaseMySqlConfig(redis, mysql).build());
server.start();
startServerWithEnv(new DebeziumServerConfigBuilder().withBaseMySqlRedisConfig(redis, mysql).build());

final String STREAM_NAME = "testc.inventory.customers";
final int INSERTED_RECORDS_COUNT = 1000;
@@ -137,10 +132,9 @@ public void shouldRetryAfterRedisOOM() throws Exception {

@Test
public void shouldStreamExtendedMessageFormat() {
server.setEnv(new DebeziumServerConfigBuilder().withBaseMySqlConfig(redis, mysql)
startServerWithEnv(new DebeziumServerConfigBuilder().withBaseMySqlRedisConfig(redis, mysql)
.withValue("debezium.sink.redis.message.format", "extended")
.build());
server.start();
final String STREAM_NAME = "testc.inventory.customers";

waitForStreamLength(jedis, STREAM_NAME, INITIAL_CUSTOMER_COUNT);
@@ -160,12 +154,10 @@ public void shouldStreamExtendedMessageFormat() {
public void shouldStreamSchemaHistory() throws Exception {
final String STREAM_NAME = "metadata:debezium:schema_history";
final String TABLE_NAME = "redis_test";

server.setEnv(new DebeziumServerConfigBuilder()
.withBaseMySqlConfig(redis, mysql)
startServerWithEnv(new DebeziumServerConfigBuilder()
.withBaseMySqlRedisConfig(redis, mysql)
.withValue("debezium.source.schema.history.internal", "io.debezium.server.redis.RedisSchemaHistory")
.build());
server.start();

waitForStreamLength(jedis, STREAM_NAME, INITIAL_SCHEMA_HISTORY_SIZE);
redis.pause();
@@ -192,11 +184,10 @@ public void shouldStreamSchemaHistory() throws Exception {
@FixFor("DBZ-4509")
public void shouldStoreOffsetInRedis() throws Exception {
final String OFFSETS_HASH_NAME = "metadata:debezium:offsets";
server.setEnv(new DebeziumServerConfigBuilder()
.withBaseMySqlConfig(redis, mysql)
startServerWithEnv(new DebeziumServerConfigBuilder()
.withBaseMySqlRedisConfig(redis, mysql)
.withValue("debezium.source.offset.storage", "io.debezium.server.redis.RedisOffsetBackingStore")
.build());
server.start();

awaitHashSizeGte(jedis, OFFSETS_HASH_NAME, 1);
jedis.del(OFFSETS_HASH_NAME);

0 comments on commit 1eee7a1

Please sign in to comment.