diff --git a/debezium-server-redis/pom.xml b/debezium-server-redis/pom.xml
index 96047f5f..b114eaf7 100644
--- a/debezium-server-redis/pom.xml
+++ b/debezium-server-redis/pom.xml
@@ -12,7 +12,10 @@
jar
+ true
java
+ debezium-server-test
+ ${artifactId}
@@ -154,9 +157,18 @@
${version.debezium}
${quarkus.container-image.group}
+ ${quarkus.container-image.name}
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ **/*IT.java
+
+
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumServerConfigBuilder.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/DebeziumServerConfigBuilder.java
similarity index 86%
rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumServerConfigBuilder.java
rename to debezium-server-redis/src/test/java/io/debezium/server/redis/DebeziumServerConfigBuilder.java
index a1930c2d..ea82cc70 100644
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumServerConfigBuilder.java
+++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/DebeziumServerConfigBuilder.java
@@ -3,14 +3,14 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.debezium.server.redis.wip;
+package io.debezium.server.redis;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_DATABASE;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_PASSWORD;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_PORT;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_USER;
-import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT;
-import static io.debezium.server.redis.wip.TestConstants.REDIS_SSL_PORT;
+import static io.debezium.server.redis.TestConstants.MYSQL_DATABASE;
+import static io.debezium.server.redis.TestConstants.MYSQL_PASSWORD;
+import static io.debezium.server.redis.TestConstants.MYSQL_PORT;
+import static io.debezium.server.redis.TestConstants.MYSQL_USER;
+import static io.debezium.server.redis.TestConstants.REDIS_PORT;
+import static io.debezium.server.redis.TestConstants.REDIS_SSL_PORT;
import java.util.HashMap;
import java.util.List;
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumTestContainerWrapper.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/DebeziumTestContainerWrapper.java
similarity index 98%
rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumTestContainerWrapper.java
rename to debezium-server-redis/src/test/java/io/debezium/server/redis/DebeziumTestContainerWrapper.java
index 5b767047..deed72bb 100644
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumTestContainerWrapper.java
+++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/DebeziumTestContainerWrapper.java
@@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.debezium.server.redis.wip;
+package io.debezium.server.redis;
import static org.awaitility.Awaitility.await;
import static org.testcontainers.containers.output.OutputFrame.OutputType.STDOUT;
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetIT.java
deleted file mode 100644
index 000f4224..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetIT.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.Map;
-
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-
-import io.debezium.connector.postgresql.connection.PostgresConnection;
-import io.debezium.doc.FixFor;
-import io.debezium.util.Testing;
-import io.quarkus.test.common.QuarkusTestResource;
-import io.quarkus.test.junit.QuarkusIntegrationTest;
-import io.quarkus.test.junit.TestProfile;
-
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Jedis;
-
-/**
- * Integration test that verifies reading and writing offsets from Redis key value store
- *
- * @author Oren Elias
- */
-@Disabled
-@QuarkusIntegrationTest
-@TestProfile(RedisOffsetTestProfile.class)
-@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
-public class RedisOffsetIT {
-
- private static final int MESSAGE_COUNT = 4;
- private static final String STREAM_NAME = "testc.inventory.customers";
- private static final String OFFSETS_HASH_NAME = "metadata:debezium:offsets";
-
- protected static Jedis jedis;
-
- @Test
- @FixFor("DBZ-4509")
- public void testRedisStream() throws Exception {
- jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
- TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT);
-
- Map redisOffsets = jedis.hgetAll(OFFSETS_HASH_NAME);
- assertThat(redisOffsets.size() > 0).isTrue();
- }
-
- /**
- * Test retry mechanism when encountering Redis connectivity issues:
- * 1. Make Redis to be unavailable while the server is up
- * 2. Create a new table named redis_test in PostgreSQL and insert 5 records to it
- * 3. Bring Redis up again and make sure the offsets have been written successfully
- */
- @Test
- @FixFor("DBZ-4509")
- public void testRedisConnectionRetry() throws Exception {
- Testing.Print.enable();
-
- Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
- // wait until the offsets are written for the first time
- TestUtils.awaitHashSizeGte(jedis, OFFSETS_HASH_NAME, 1);
-
- // clear the offsets key
- jedis.del(OFFSETS_HASH_NAME);
-
- // pause container
- Testing.print("Pausing container");
- RedisTestResourceLifecycleManager.pause();
-
- final PostgresConnection connection = TestUtils.getPostgresConnection();
- Testing.print("Creating new redis_test table and inserting 5 records to it");
- connection.execute(
- "CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)",
- "INSERT INTO inventory.redis_test VALUES (1)",
- "INSERT INTO inventory.redis_test VALUES (2)",
- "INSERT INTO inventory.redis_test VALUES (3)",
- "INSERT INTO inventory.redis_test VALUES (4)",
- "INSERT INTO inventory.redis_test VALUES (5)");
- connection.close();
-
- Testing.print("Sleeping for 2 seconds to flush records");
- Thread.sleep(2000);
- Testing.print("Unpausing container");
-
- RedisTestResourceLifecycleManager.unpause();
- Testing.print("Sleeping for 2 seconds to reconnect to redis and write offset");
-
- // wait until the offsets are re-written
- TestUtils.awaitHashSizeGte(jedis, OFFSETS_HASH_NAME, 1);
-
- Map redisOffsets = jedis.hgetAll(OFFSETS_HASH_NAME);
- jedis.close();
- assertThat(redisOffsets.size() > 0).isTrue();
- }
-
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetTestProfile.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetTestProfile.java
deleted file mode 100644
index 30d581a7..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetTestProfile.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
-import io.quarkus.test.junit.QuarkusTestProfile;
-
-public class RedisOffsetTestProfile implements QuarkusTestProfile {
-
- @Override
- public List testResources() {
- return Arrays.asList(new TestResourceEntry(PostgresTestResourceLifecycleManager.class));
- }
-
- @Override
- public Map getConfigOverrides() {
- Map config = new HashMap();
- config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
- config.put("debezium.source.offset.storage", "io.debezium.server.redis.RedisOffsetBackingStore");
- return config;
- }
-
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamIT.java
deleted file mode 100644
index 05aa062b..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamIT.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.Map;
-
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-
-import io.quarkus.test.junit.QuarkusIntegrationTest;
-import io.quarkus.test.junit.TestProfile;
-
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Jedis;
-
-/**
- * Integration tests for secured Redis
- *
- * @author Oren Elias
- */
-@Disabled
-@QuarkusIntegrationTest
-@TestProfile(RedisSSLStreamTestProfile.class)
-public class RedisSSLStreamIT {
-
- /**
- * Verifies that all the records of a PostgreSQL table are streamed to Redis
- */
- @Test
- public void testRedisStream() throws Exception {
- HostAndPort address = HostAndPort.from(RedisSSLTestResourceLifecycleManager.getRedisContainerAddress());
- Jedis jedis = new Jedis(address.getHost(), address.getPort(), true);
- final int MESSAGE_COUNT = 4;
- final String STREAM_NAME = "testc.inventory.customers";
- final String HASH_NAME = "metadata:debezium:offsets";
-
- TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT);
-
- Long streamLength = jedis.xlen(STREAM_NAME);
- assertTrue("Redis Basic Stream Test Failed", streamLength == MESSAGE_COUNT);
-
- // wait until the offsets are re-written
- TestUtils.awaitHashSizeGte(jedis, HASH_NAME, 1);
-
- Map redisOffsets = jedis.hgetAll(HASH_NAME);
- assertTrue(redisOffsets.size() > 0);
-
- jedis.close();
- }
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamTestProfile.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamTestProfile.java
deleted file mode 100644
index 05f0a2db..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamTestProfile.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import java.net.URL;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
-import io.quarkus.test.junit.QuarkusTestProfile;
-
-public class RedisSSLStreamTestProfile implements QuarkusTestProfile {
-
- @Override
- public List testResources() {
- return Arrays.asList(
- new TestResourceEntry(PostgresTestResourceLifecycleManager.class),
- new TestResourceEntry(RedisSSLTestResourceLifecycleManager.class));
- }
-
- public Map getConfigOverrides() {
-
- Map config = new HashMap();
- URL keyStoreFile = RedisSSLStreamTestProfile.class.getClassLoader().getResource("ssl/client-keystore.p12");
- URL trustStoreFile = RedisSSLStreamTestProfile.class.getClassLoader().getResource("ssl/client-truststore.p12");
-
- config.put("javax.net.ssl.keyStore", keyStoreFile.getPath());
- config.put("javax.net.ssl.trustStore", trustStoreFile.getPath());
- config.put("javax.net.ssl.keyStorePassword", "secret");
- config.put("javax.net.ssl.trustStorePassword", "secret");
- config.put("debezium.source.offset.storage", "io.debezium.server.redis.RedisOffsetBackingStore");
- config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
- return config;
- }
-
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLTestResourceLifecycleManager.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLTestResourceLifecycleManager.java
deleted file mode 100644
index fa514e56..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLTestResourceLifecycleManager.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.testcontainers.containers.BindMode;
-import org.testcontainers.containers.GenericContainer;
-
-import io.debezium.server.TestConfigSource;
-import io.debezium.util.Testing;
-import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
-
-public class RedisSSLTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {
-
- public static final int REDIS_PORT = 6379;
- public static final String REDIS_IMAGE = "redis";
-
- private static final AtomicBoolean running = new AtomicBoolean(false);
- private static final GenericContainer> container = new GenericContainer<>(REDIS_IMAGE)
- .withClasspathResourceMapping("ssl", "/etc/certificates", BindMode.READ_ONLY)
- .withCommand(
- "redis-server --tls-port 6379 --port 0 --tls-cert-file /etc/certificates/redis.crt --tls-key-file /etc/certificates/redis.key --tls-ca-cert-file /etc/certificates/ca.crt")
- .withExposedPorts(REDIS_PORT);
-
- private static synchronized void start(boolean ignored) {
- if (!running.get()) {
- container.start();
- TestUtils.waitBoolean(() -> container.getLogs().contains(RedisTestResourceLifecycleManager.READY_MESSAGE));
- running.set(true);
- }
- }
-
- @Override
- public Map start() {
- start(true);
- Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
- Testing.Files.createTestingFile(TestConfigSource.OFFSET_STORE_PATH);
-
- Map params = new ConcurrentHashMap<>();
- params.put("debezium.sink.type", "redis");
- params.put("debezium.source.offset.storage.redis.address", RedisSSLTestResourceLifecycleManager.getRedisContainerAddress());
- params.put("debezium.source.offset.storage.redis.ssl.enabled", "true");
- params.put("debezium.sink.redis.address", RedisSSLTestResourceLifecycleManager.getRedisContainerAddress());
- params.put("debezium.sink.redis.ssl.enabled", "true");
- params.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
- params.put("debezium.source.offset.flush.interval.ms", "0");
- params.put("debezium.source.topic.prefix", "testc");
- params.put("debezium.source.schema.include.list", "inventory");
- params.put("debezium.source.table.include.list", "inventory.customers,inventory.redis_test,inventory.redis_test2");
-
- return params;
- }
-
- @Override
- public void stop() {
- try {
- container.stop();
- }
- catch (Exception e) {
- // ignored
- }
- running.set(false);
- }
-
- public static void pause() {
- container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec();
- }
-
- public static void unpause() {
- container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec();
- }
-
- public static String getRedisContainerAddress() {
- start(true);
-
- return String.format("%s:%d", container.getContainerIpAddress(), container.getFirstMappedPort());
- }
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryIT.java
deleted file mode 100644
index 689a2e15..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryIT.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.List;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junitpioneer.jupiter.RetryingTest;
-
-import io.debezium.config.Configuration;
-import io.debezium.connector.mysql.MySqlConnectorConfig;
-import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
-import io.debezium.doc.FixFor;
-import io.debezium.relational.history.AbstractSchemaHistoryTest;
-import io.debezium.relational.history.SchemaHistory;
-import io.debezium.relational.history.SchemaHistoryMetrics;
-import io.debezium.testing.testcontainers.MySqlTestResourceLifecycleManager;
-import io.debezium.util.Testing;
-import io.quarkus.test.common.QuarkusTestResource;
-import io.quarkus.test.junit.QuarkusIntegrationTest;
-import io.quarkus.test.junit.TestProfile;
-
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.StreamEntryID;
-import redis.clients.jedis.resps.StreamEntry;
-
-/**
- * Integration test that verifies reading and writing database schema history from Redis key value store
- *
- * @author Oren Elias
- */
-@Disabled
-@QuarkusIntegrationTest
-@TestProfile(RedisSchemaHistoryTestProfile.class)
-@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
-public class RedisSchemaHistoryIT extends AbstractSchemaHistoryTest {
-
- private static final String STREAM_NAME = "metadata:debezium:schema_history";
- private static final int INIT_HISTORY_SIZE = 16; // Initial number of entries in the schema history stream.
-
- protected static Jedis jedis;
-
- @Override
- @BeforeEach
- public void beforeEach() {
- super.beforeEach();
- }
-
- @Override
- protected SchemaHistory createHistory() {
- SchemaHistory history = new RedisSchemaHistory();
-
- history.configure(Configuration.create()
- .with("schema.history.internal.redis.address", HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()))
- .build(), null, SchemaHistoryMetrics.NOOP, true);
- history.start();
- return history;
- }
-
- @Test
- @FixFor("DBZ-4771")
- public void testSchemaHistoryIsSaved() {
- jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
- TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, INIT_HISTORY_SIZE + 1);
-
- final List entries = jedis.xrange(STREAM_NAME, (StreamEntryID) null, (StreamEntryID) null);
- assertEquals(INIT_HISTORY_SIZE + 1, entries.size());
- assertTrue(entries.stream().anyMatch(item -> item.getFields().get("schema").contains("CREATE TABLE `customers`")));
- }
-
- @Test
- @FixFor("DBZ-4771")
- public void shouldRecordChangesAndRecoverToVariousPoints() {
- super.shouldRecordChangesAndRecoverToVariousPoints();
- }
-
- /**
- * Test retry mechanism when encountering Redis connectivity issues:
- * 1. Make Redis unavailable while the server is up
- * 2. Create a new table named redis_test in MySQL
- * 3. Bring Redis up again and make sure the database schema has been written successfully
- */
- @RetryingTest(3)
- @FixFor("DBZ-4509")
- public void testRedisConnectionRetry() throws Exception {
- Testing.Print.enable();
-
- Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
- // wait until the db schema history is written for the first time
- TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, 1);
-
- // pause container
- Testing.print("Pausing container");
- RedisTestResourceLifecycleManager.pause();
-
- final AbstractConnectorConnection connection = getMySqlConnection();
- connection.connect();
- Testing.print("Creating new redis_test table and inserting 5 records to it");
- connection.execute("CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)");
- connection.close();
-
- Testing.print("Sleeping for 2 seconds to flush records");
- Thread.sleep(2000);
- Testing.print("Unpausing container");
- RedisTestResourceLifecycleManager.unpause();
-
- // wait until the db schema history is written for the first time
- TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, INIT_HISTORY_SIZE + 1);
-
- final List entries = jedis.xrange(STREAM_NAME, (StreamEntryID) null, (StreamEntryID) null);
-
- assertEquals(INIT_HISTORY_SIZE + 1, entries.size());
- assertTrue(entries.get(INIT_HISTORY_SIZE).getFields().get("schema").contains("redis_test"));
- }
-
- private AbstractConnectorConnection getMySqlConnection() {
- final Configuration config = Configuration.create()
- .with("database.user", MySqlTestResourceLifecycleManager.PRIVILEGED_USER)
- .with("database.password", MySqlTestResourceLifecycleManager.PRIVILEGED_PASSWORD)
- .with("database.dbname", MySqlTestResourceLifecycleManager.DBNAME)
- .with("database.hostname", MySqlTestResourceLifecycleManager.HOST)
- .with("database.port", MySqlTestResourceLifecycleManager.getContainer().getMappedPort(MySqlTestResourceLifecycleManager.PORT))
- .build();
- return new MySqlConnectorConfig(config).getConnectorAdapter().createConnection(config);
- }
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryTestProfile.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryTestProfile.java
deleted file mode 100644
index e11813fb..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryTestProfile.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import io.debezium.testing.testcontainers.MySqlTestResourceLifecycleManager;
-import io.debezium.util.Testing;
-import io.quarkus.test.junit.QuarkusTestProfile;
-
-public class RedisSchemaHistoryTestProfile implements QuarkusTestProfile {
- public static final String OFFSETS_FILE = "file-connector-offsets.txt";
- public static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath(OFFSETS_FILE).toAbsolutePath();
- public static final String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename";
-
- @Override
- public List testResources() {
- return Arrays.asList(new TestResourceEntry(MySqlTestResourceLifecycleManager.class));
- }
-
- public Map getConfigOverrides() {
- Map config = new HashMap();
- config.put("debezium.source." + OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
- config.put("debezium.source.schema.history.internal", "io.debezium.server.redis.RedisSchemaHistory");
- config.put("debezium.source.database.server.id", "12345");
- return config;
- }
-
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamIT.java
deleted file mode 100644
index f72e1c42..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamIT.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-import java.util.Map;
-
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-
-import io.debezium.connector.postgresql.connection.PostgresConnection;
-import io.debezium.doc.FixFor;
-import io.debezium.util.Testing;
-import io.quarkus.test.common.QuarkusTestResource;
-import io.quarkus.test.junit.QuarkusIntegrationTest;
-import io.quarkus.test.junit.TestProfile;
-
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.StreamEntryID;
-import redis.clients.jedis.resps.StreamEntry;
-
-/**
- * Integration tests that verify basic reading from PostgreSQL database and writing to Redis stream
- * and retry mechanism in case of connectivity issues or OOM in Redis
- *
- * @author M Sazzadul Hoque
- * @author Yossi Shirizli
- */
-@Disabled
-@QuarkusIntegrationTest
-@TestProfile(RedisStreamTestProfile.class)
-@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
-public class RedisStreamIT {
-
- /**
- * Verifies that all the records of a PostgreSQL table are streamed to Redis
- */
- @Test
- public void testRedisStream() throws Exception {
- Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
- final int MESSAGE_COUNT = 4;
- final String STREAM_NAME = "testc.inventory.customers";
-
- TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT);
-
- Long streamLength = jedis.xlen(STREAM_NAME);
- assertTrue("Expected stream length of " + MESSAGE_COUNT, streamLength == MESSAGE_COUNT);
-
- final List entries = jedis.xrange(STREAM_NAME, (StreamEntryID) null, (StreamEntryID) null);
- for (StreamEntry entry : entries) {
- Map map = entry.getFields();
- assertTrue("Expected map of size 1", map.size() == 1);
- Map.Entry mapEntry = map.entrySet().iterator().next();
- assertTrue("Expected json like key starting with {\"schema\":...", mapEntry.getKey().startsWith("{\"schema\":"));
- assertTrue("Expected json like value starting with {\"schema\":...", mapEntry.getValue().startsWith("{\"schema\":"));
- }
-
- jedis.close();
- }
-
- /**
- * Test retry mechanism when encountering Redis connectivity issues:
- * 1. Make Redis to be unavailable while the server is up
- * 2. Create a new table named redis_test in PostgreSQL and insert 5 records to it
- * 3. Bring Redis up again and make sure these records have been streamed successfully
- */
- @Test
- @FixFor("DBZ-4510")
- public void testRedisConnectionRetry() throws Exception {
- Testing.Print.enable();
-
- final int MESSAGE_COUNT = 5;
- final String STREAM_NAME = "testc.inventory.redis_test";
- Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
- Testing.print("Pausing container");
- RedisTestResourceLifecycleManager.pause();
-
- final PostgresConnection connection = TestUtils.getPostgresConnection();
- Testing.print("Creating new redis_test table and inserting 5 records to it");
- connection.execute(
- "CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)",
- "INSERT INTO inventory.redis_test VALUES (1)",
- "INSERT INTO inventory.redis_test VALUES (2)",
- "INSERT INTO inventory.redis_test VALUES (3)",
- "INSERT INTO inventory.redis_test VALUES (4)",
- "INSERT INTO inventory.redis_test VALUES (5)");
- connection.close();
-
- Testing.print("Sleeping for 3 seconds to simulate no connection errors");
- Thread.sleep(3000);
- Testing.print("Unpausing container");
- RedisTestResourceLifecycleManager.unpause();
- Thread.sleep(2000);
-
- Long streamLength = jedis.xlen(STREAM_NAME);
- Testing.print("Entries in " + STREAM_NAME + ":" + streamLength);
- jedis.close();
- assertTrue("Redis Connection Test Failed", streamLength == MESSAGE_COUNT);
- }
-
- /**
- * Test retry mechanism when encountering Redis Out of Memory:
- * 1. Simulate a Redis OOM by setting its max memory to 1M
- * 2. Create a new table named redis_test2 in PostgreSQL and insert 50 records to it
- * 3. Sleep for 1 second to simulate Redis OOM (stream does not contain 50 records)
- * 4. Unlimit memory and verify that all 50 records have been streamed
- */
- @Test
- @FixFor("DBZ-4510")
- public void testRedisOOMRetry() throws Exception {
- Testing.Print.enable();
-
- Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
- final String STREAM_NAME = "testc.inventory.redis_test2";
- final int TOTAL_RECORDS = 50;
-
- Testing.print("Setting Redis' maxmemory to 1M");
- jedis.configSet("maxmemory", "1M");
-
- PostgresConnection connection = TestUtils.getPostgresConnection();
- connection.execute("CREATE TABLE inventory.redis_test2 " +
- "(id VARCHAR(100) PRIMARY KEY, " +
- "first_name VARCHAR(100), " +
- "last_name VARCHAR(100))");
- connection.execute(String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) " +
- "SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", TOTAL_RECORDS));
- connection.commit();
-
- Thread.sleep(1000);
- Testing.print("Entries in " + STREAM_NAME + ":" + jedis.xlen(STREAM_NAME));
- assertTrue(jedis.xlen(STREAM_NAME) < TOTAL_RECORDS);
-
- Thread.sleep(1000);
- jedis.configSet("maxmemory", "0");
- TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, TOTAL_RECORDS);
-
- long streamLength = jedis.xlen(STREAM_NAME);
- assertTrue("Redis OOM Test Failed", streamLength == TOTAL_RECORDS);
- }
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdIT.java
deleted file mode 100644
index 1f7e4f9e..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdIT.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import static org.junit.Assert.assertTrue;
-
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-
-import io.debezium.connector.postgresql.connection.PostgresConnection;
-import io.debezium.util.Testing;
-import io.quarkus.test.common.QuarkusTestResource;
-import io.quarkus.test.junit.QuarkusIntegrationTest;
-import io.quarkus.test.junit.TestProfile;
-
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Jedis;
-
-@Disabled
-@QuarkusIntegrationTest
-@TestProfile(RedisStreamMemoryThresholdTestProfile.class)
-@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
-public class RedisStreamMemoryThresholdIT {
-
- @Test
- public void testRedisMemoryThreshold() throws Exception {
- Testing.Print.enable();
-
- Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
- final String STREAM_NAME = "testc.inventory.redis_test2";
- final int TOTAL_RECORDS = 50;
-
- Testing.print("Setting Redis' maxmemory to 1M");
- jedis.configSet("maxmemory", "1M");
-
- PostgresConnection connection = TestUtils.getPostgresConnection();
- connection.execute("CREATE TABLE inventory.redis_test2 " +
- "(id VARCHAR(100) PRIMARY KEY, " +
- "first_name VARCHAR(100), " +
- "last_name VARCHAR(100))");
- connection.execute(String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) " +
- "SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", TOTAL_RECORDS));
- connection.commit();
-
- Thread.sleep(1000);
- Testing.print("Entries in " + STREAM_NAME + ":" + jedis.xlen(STREAM_NAME));
- assertTrue(jedis.xlen(STREAM_NAME) < TOTAL_RECORDS);
-
- Thread.sleep(1000);
- jedis.configSet("maxmemory", "0");
- TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, TOTAL_RECORDS);
-
- long streamLength = jedis.xlen(STREAM_NAME);
- assertTrue("Redis Memory Threshold Test Failed", streamLength == TOTAL_RECORDS);
- }
-
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdTestProfile.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdTestProfile.java
deleted file mode 100644
index 7126ac9c..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdTestProfile.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import java.util.Map;
-
-public class RedisStreamMemoryThresholdTestProfile extends RedisStreamTestProfile {
-
- @Override
- public Map getConfigOverrides() {
- Map config = super.getConfigOverrides();
- config.put("debezium.sink.redis.memory.threshold.percentage", "75");
- return config;
- }
-
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageIT.java
deleted file mode 100644
index 5bc5f22f..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageIT.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.List;
-import java.util.Map;
-
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-
-import io.debezium.util.Testing;
-import io.quarkus.test.common.QuarkusTestResource;
-import io.quarkus.test.junit.QuarkusIntegrationTest;
-import io.quarkus.test.junit.TestProfile;
-
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.StreamEntryID;
-import redis.clients.jedis.resps.StreamEntry;
-
-/**
- * Integration tests that verify basic reading from PostgreSQL database and writing to Redis stream
- *
- * @author ggaborg
- */
-@Disabled
-@QuarkusIntegrationTest
-@TestProfile(RedisStreamMessageTestProfile.class)
-@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
-public class RedisStreamMessageIT {
-
- /**
- * Verifies that all the records of a PostgreSQL table are streamed to Redis in extended message format
- */
- @Test
- public void testRedisStreamExtendedMessage() {
- Testing.Print.enable();
-
- Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
- final int MESSAGE_COUNT = 4;
- final String STREAM_NAME = "testc.inventory.customers";
-
- TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT);
-
- long streamLength = jedis.xlen(STREAM_NAME);
- assertEquals(MESSAGE_COUNT, streamLength, "Expected stream length of " + MESSAGE_COUNT);
-
- final List entries = jedis.xrange(STREAM_NAME, (StreamEntryID) null, (StreamEntryID) null);
- for (StreamEntry entry : entries) {
- Map map = entry.getFields();
- assertEquals(3, map.size(), "Expected map of size 3");
- assertTrue(map.get("key") != null && map.get("key").startsWith("{\"schema\":"), "Expected key's value starting with {\"schema\":...");
- assertTrue(map.get("value") != null && map.get("value").startsWith("{\"schema\":"), "Expected value's value starting with {\"schema\":...");
- assertTrue(map.containsKey("HEADERKEY"));
- }
-
- jedis.close();
- }
-
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageTestProfile.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageTestProfile.java
deleted file mode 100644
index 329b3414..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageTestProfile.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import java.util.Map;
-
-public class RedisStreamMessageTestProfile extends RedisStreamTestProfile {
-
- @Override
- public Map getConfigOverrides() {
- Map config = super.getConfigOverrides();
- config.put("debezium.sink.redis.message.format", "extended");
- return config;
- }
-
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamTestProfile.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamTestProfile.java
deleted file mode 100644
index 2349d22d..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamTestProfile.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
-import io.debezium.util.Testing;
-import io.quarkus.test.junit.QuarkusTestProfile;
-
-public class RedisStreamTestProfile implements QuarkusTestProfile {
-
- public static final String OFFSETS_FILE = "file-connector-offsets.txt";
- public static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath(OFFSETS_FILE).toAbsolutePath();
- public static final String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename";
-
- @Override
- public List testResources() {
- return Arrays.asList(new TestResourceEntry(PostgresTestResourceLifecycleManager.class));
- }
-
- public Map getConfigOverrides() {
- Map config = new HashMap();
- config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
- config.put("debezium.source." + OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
- config.put("debezium.sink.redis.memory.threshold.percentage", "0");
- return config;
- }
-
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisTestResourceLifecycleManager.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisTestResourceLifecycleManager.java
deleted file mode 100644
index 4c775464..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisTestResourceLifecycleManager.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.testcontainers.containers.GenericContainer;
-
-import io.debezium.server.TestConfigSource;
-import io.debezium.util.Testing;
-import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
-
-public class RedisTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {
-
- static final String READY_MESSAGE = "Ready to accept connections";
- public static final int REDIS_PORT = 6379;
- public static final String REDIS_IMAGE = "redis";
-
- private static final AtomicBoolean running = new AtomicBoolean(false);
- private static final GenericContainer> container = new GenericContainer<>(REDIS_IMAGE)
- .withExposedPorts(REDIS_PORT);
-
- private static synchronized void start(boolean ignored) {
- if (!running.get()) {
- container.start();
- TestUtils.waitBoolean(() -> container.getLogs().contains(READY_MESSAGE));
- running.set(true);
- }
- }
-
- @Override
- public Map start() {
- start(true);
- Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
- Testing.Files.createTestingFile(TestConfigSource.OFFSET_STORE_PATH);
-
- Map params = new ConcurrentHashMap<>();
- params.put("debezium.sink.type", "redis");
- params.put("debezium.sink.redis.address", RedisTestResourceLifecycleManager.getRedisContainerAddress());
- params.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
- params.put("debezium.source.offset.flush.interval.ms", "0");
- params.put("debezium.source.topic.prefix", "testc");
- params.put("debezium.source.schema.include.list", "inventory");
- params.put("debezium.source.table.include.list", "inventory.customers,inventory.redis_test,inventory.redis_test2");
- params.put("debezium.transforms", "addheader");
- params.put("debezium.transforms.addheader.type", "org.apache.kafka.connect.transforms.InsertHeader");
- params.put("debezium.transforms.addheader.header", "headerKey");
- params.put("debezium.transforms.addheader.value.literal", "headerValue");
-
- return params;
- }
-
- @Override
- public void stop() {
- try {
- container.stop();
- }
- catch (Exception e) {
- // ignored
- }
- running.set(false);
- }
-
- public static void pause() {
- container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec();
- }
-
- public static void unpause() {
- container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec();
- }
-
- public static String getRedisContainerAddress() {
- start(true);
-
- return String.format("%s:%d", container.getContainerIpAddress(), container.getMappedPort(REDIS_PORT));
- }
-}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestConstants.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestConstants.java
similarity index 96%
rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestConstants.java
rename to debezium-server-redis/src/test/java/io/debezium/server/redis/TestConstants.java
index f26dbf46..ac8bd3d5 100644
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestConstants.java
+++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestConstants.java
@@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.debezium.server.redis.wip;
+package io.debezium.server.redis;
public class TestConstants {
private TestConstants() {
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersRedisTestBase.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersRedisTestBase.java
similarity index 78%
rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersRedisTestBase.java
rename to debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersRedisTestBase.java
index cbffdfd1..7962aa30 100644
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersRedisTestBase.java
+++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersRedisTestBase.java
@@ -3,16 +3,16 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.debezium.server.redis.wip;
+package io.debezium.server.redis;
-import static io.debezium.server.redis.wip.TestConstants.LOCALHOST;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_PORT;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_PRIVILEGED_PASSWORD;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_PRIVILEGED_USER;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_ROOT_PASSWORD;
-import static io.debezium.server.redis.wip.TestConstants.REDIS_IMAGE;
-import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT;
-import static io.debezium.server.redis.wip.TestProperties.DEBEZIUM_SERVER_IMAGE;
+import static io.debezium.server.redis.TestConstants.LOCALHOST;
+import static io.debezium.server.redis.TestConstants.MYSQL_PORT;
+import static io.debezium.server.redis.TestConstants.MYSQL_PRIVILEGED_PASSWORD;
+import static io.debezium.server.redis.TestConstants.MYSQL_PRIVILEGED_USER;
+import static io.debezium.server.redis.TestConstants.MYSQL_ROOT_PASSWORD;
+import static io.debezium.server.redis.TestConstants.REDIS_IMAGE;
+import static io.debezium.server.redis.TestConstants.REDIS_PORT;
+import static io.debezium.server.redis.TestProperties.DEBEZIUM_SERVER_IMAGE;
import java.time.Duration;
import java.util.List;
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersSslStreamIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersSslStreamIT.java
similarity index 80%
rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersSslStreamIT.java
rename to debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersSslStreamIT.java
index fb0134f9..ea17d623 100644
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersSslStreamIT.java
+++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersSslStreamIT.java
@@ -3,15 +3,15 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.debezium.server.redis.wip;
+package io.debezium.server.redis;
-import static io.debezium.server.redis.wip.TestConstants.INITIAL_CUSTOMER_COUNT;
-import static io.debezium.server.redis.wip.TestConstants.INITIAL_SCHEMA_HISTORY_SIZE;
-import static io.debezium.server.redis.wip.TestConstants.LOCALHOST;
-import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT;
-import static io.debezium.server.redis.wip.TestConstants.REDIS_SSL_PORT;
-import static io.debezium.server.redis.wip.TestUtils.insertCustomerToMySql;
-import static io.debezium.server.redis.wip.TestUtils.waitForStreamLength;
+import static io.debezium.server.redis.TestConstants.INITIAL_CUSTOMER_COUNT;
+import static io.debezium.server.redis.TestConstants.INITIAL_SCHEMA_HISTORY_SIZE;
+import static io.debezium.server.redis.TestConstants.LOCALHOST;
+import static io.debezium.server.redis.TestConstants.REDIS_PORT;
+import static io.debezium.server.redis.TestConstants.REDIS_SSL_PORT;
+import static io.debezium.server.redis.TestUtils.insertCustomerToMySql;
+import static io.debezium.server.redis.TestUtils.waitForStreamLength;
import java.io.IOException;
@@ -20,8 +20,6 @@
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
-import io.debezium.server.redis.TestUtils;
-
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersStreamIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersStreamIT.java
similarity index 94%
rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersStreamIT.java
rename to debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersStreamIT.java
index af39e5f0..5f2356b9 100644
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersStreamIT.java
+++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersStreamIT.java
@@ -3,14 +3,14 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.debezium.server.redis.wip;
-
-import static io.debezium.server.redis.wip.TestConstants.INITIAL_CUSTOMER_COUNT;
-import static io.debezium.server.redis.wip.TestConstants.INITIAL_SCHEMA_HISTORY_SIZE;
-import static io.debezium.server.redis.wip.TestUtils.awaitHashSizeGte;
-import static io.debezium.server.redis.wip.TestUtils.getMySqlConnection;
-import static io.debezium.server.redis.wip.TestUtils.insertCustomerToMySql;
-import static io.debezium.server.redis.wip.TestUtils.waitForStreamLength;
+package io.debezium.server.redis;
+
+import static io.debezium.server.redis.TestConstants.INITIAL_CUSTOMER_COUNT;
+import static io.debezium.server.redis.TestConstants.INITIAL_SCHEMA_HISTORY_SIZE;
+import static io.debezium.server.redis.TestUtils.awaitHashSizeGte;
+import static io.debezium.server.redis.TestUtils.getMySqlConnection;
+import static io.debezium.server.redis.TestUtils.insertCustomerToMySql;
+import static io.debezium.server.redis.TestUtils.waitForStreamLength;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestProperties.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestProperties.java
similarity index 70%
rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestProperties.java
rename to debezium-server-redis/src/test/java/io/debezium/server/redis/TestProperties.java
index 341eaf26..2cab56e8 100644
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestProperties.java
+++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestProperties.java
@@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.debezium.server.redis.wip;
+package io.debezium.server.redis;
public final class TestProperties {
private TestProperties() {
@@ -12,5 +12,6 @@ private TestProperties() {
public static final String DEBEZIUM_VERSION = System.getProperty("test.version.debezium");
public static final String DEBEZIUM_SERVER_IMAGE_GROUP = System.getProperty("test.server.image.group");
- public static final String DEBEZIUM_SERVER_IMAGE = DEBEZIUM_SERVER_IMAGE_GROUP + "/debezium-server-redis:" + DEBEZIUM_VERSION;
+ private static final String DEBEZIUM_SERVER_IMAGE_NAME = System.getProperty("test.server.image.name");
+ public static final String DEBEZIUM_SERVER_IMAGE = DEBEZIUM_SERVER_IMAGE_GROUP + "/" + DEBEZIUM_SERVER_IMAGE_NAME + ":" + DEBEZIUM_VERSION;
}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/TestUtils.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestUtils.java
index aec9a6d1..d395f29f 100644
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/TestUtils.java
+++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestUtils.java
@@ -5,45 +5,54 @@
*/
package io.debezium.server.redis;
-import java.time.Duration;
-import java.util.function.Supplier;
+import static io.debezium.server.redis.TestConstants.LOCALHOST;
+import static io.debezium.server.redis.TestConstants.MYSQL_DATABASE;
+import static io.debezium.server.redis.TestConstants.MYSQL_PORT;
+import static io.debezium.server.redis.TestConstants.MYSQL_PRIVILEGED_PASSWORD;
+import static io.debezium.server.redis.TestConstants.MYSQL_PRIVILEGED_USER;
+import static io.debezium.server.redis.TestConstants.MYSQL_ROOT_PASSWORD;
+import static org.awaitility.Awaitility.await;
-import org.awaitility.Awaitility;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
-import io.debezium.connector.postgresql.connection.PostgresConnection;
-import io.debezium.jdbc.JdbcConfiguration;
-import io.debezium.server.TestConfigSource;
-import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.strategy.mysql.MySqlConnection;
+import io.debezium.connector.mysql.strategy.mysql.MySqlConnectionConfiguration;
import redis.clients.jedis.Jedis;
public class TestUtils {
- private TestUtils() {
+ public static void insertCustomerToMySql(DebeziumTestContainerWrapper container, String firstName, String lastName, String email)
+ throws IOException, InterruptedException {
+ container.execInContainer("mysql",
+ "-u", MYSQL_PRIVILEGED_USER,
+ "-p" + MYSQL_PRIVILEGED_PASSWORD,
+ "-D", MYSQL_DATABASE,
+ "-e", "INSERT INTO customers VALUES (default,'" + firstName + "','" + lastName + "','" + email + "')");
}
- public static PostgresConnection getPostgresConnection() {
- return new PostgresConnection(JdbcConfiguration.create()
- .with("user", PostgresTestResourceLifecycleManager.POSTGRES_USER)
- .with("password", PostgresTestResourceLifecycleManager.POSTGRES_PASSWORD)
- .with("dbname", PostgresTestResourceLifecycleManager.POSTGRES_DBNAME)
- .with("hostname", PostgresTestResourceLifecycleManager.POSTGRES_HOST)
- .with("port", PostgresTestResourceLifecycleManager.getContainer().getMappedPort(PostgresTestResourceLifecycleManager.POSTGRES_PORT))
- .build(), "Debezium Redis Test");
+ public static MySqlConnection getMySqlConnection(DebeziumTestContainerWrapper container) {
+ return new MySqlConnection(new MySqlConnectionConfiguration(Configuration.create()
+ .with("database.user", "root")
+ .with("database.password", MYSQL_ROOT_PASSWORD)
+ .with("database.dbname", MYSQL_DATABASE)
+ .with("database.hostname", LOCALHOST)
+ .with("database.port", container.getMappedPort(MYSQL_PORT))
+ .build()), null);
}
- public static void awaitStreamLengthGte(Jedis jedis, String streamName, int expectedLength) {
- waitBoolean(() -> jedis.xlen(streamName) >= expectedLength);
+ public static void waitForStreamLength(Jedis jedis, String streamName, int expectedLength) {
+ await()
+ .atMost(60, TimeUnit.SECONDS)
+ .until(() -> jedis.xlen(streamName) == expectedLength);
}
public static void awaitHashSizeGte(Jedis jedis, String hashName, int expectedSize) {
- waitBoolean(() -> jedis.hgetAll(hashName).size() >= expectedSize);
- }
-
- public static void waitBoolean(Supplier bool) {
- Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
- return Boolean.TRUE.equals(bool.get());
- });
+ await()
+ .atMost(60, TimeUnit.SECONDS)
+ .until(() -> jedis.hgetAll(hashName).size() >= expectedSize);
}
}
diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java
deleted file mode 100644
index 5aa2526b..00000000
--- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.server.redis.wip;
-
-import static io.debezium.server.redis.wip.TestConstants.LOCALHOST;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_DATABASE;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_PORT;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_PRIVILEGED_PASSWORD;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_PRIVILEGED_USER;
-import static io.debezium.server.redis.wip.TestConstants.MYSQL_ROOT_PASSWORD;
-import static org.awaitility.Awaitility.await;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import io.debezium.config.Configuration;
-import io.debezium.connector.mysql.strategy.mysql.MySqlConnection;
-import io.debezium.connector.mysql.strategy.mysql.MySqlConnectionConfiguration;
-
-import redis.clients.jedis.Jedis;
-
-public class TestUtils {
-
- public static void insertCustomerToMySql(DebeziumTestContainerWrapper container, String firstName, String lastName, String email)
- throws IOException, InterruptedException {
- container.execInContainer("mysql",
- "-u", MYSQL_PRIVILEGED_USER,
- "-p" + MYSQL_PRIVILEGED_PASSWORD,
- "-D", MYSQL_DATABASE,
- "-e", "INSERT INTO customers VALUES (default,'" + firstName + "','" + lastName + "','" + email + "')");
- }
-
- public static MySqlConnection getMySqlConnection(DebeziumTestContainerWrapper container) {
- return new MySqlConnection(new MySqlConnectionConfiguration(Configuration.create()
- .with("database.user", "root")
- .with("database.password", MYSQL_ROOT_PASSWORD)
- .with("database.dbname", MYSQL_DATABASE)
- .with("database.hostname", LOCALHOST)
- .with("database.port", container.getMappedPort(MYSQL_PORT))
- .build()), null);
- }
-
- public static void waitForStreamLength(Jedis jedis, String streamName, int expectedLength) {
- await()
- .atMost(60, TimeUnit.SECONDS)
- .until(() -> jedis.xlen(streamName) == expectedLength);
- }
-
- public static void awaitHashSizeGte(Jedis jedis, String hashName, int expectedSize) {
- await()
- .atMost(60, TimeUnit.SECONDS)
- .until(() -> jedis.hgetAll(hashName).size() >= expectedSize);
- }
-
-}
diff --git a/pom.xml b/pom.xml
index 1fac7523..346a5003 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,8 +23,6 @@
${project.version}
- true
- ${artifactId}